최근 작업한 작은 구조.
문제.
1. 레디스 리스트 자료구조(덱) 안에 많은 데이터가 있다.(그냥 큐라고 부름) 이 데이터들을 빨리빨리 빼내와서 어떤 작업을 처리해야 한다.
2. 데이터가 중복이 있는데, 처음으로 빼낸 데이터는 어떤 작업을 처리하고, 그다음에 동일한 데이터가 나오면 그냥 버린다.
3. 데이터는 빨리빨리 빼내야 하므로 멀티프로세스로 빼낸다.
4. 데이터 중복에는 규칙이 있는데, 10개의 데이터를 빼내면 최대 3가지 데이터가 중복되서 나올 수 있다.
ex) [큐 앞] 1 1 2 1 3 3 3 1 2 2 [큐 뒤]
이런식이다. 큐의 뒤에서부터 빼온다고 가정하면 2처리, 그다음 2는 버리고, 1 처리, 3 처리, 331211은 버린다.
어떻게 구조를 만들수 있을까?
1. 간단하게 멀티프로세스의 각각의 프로세스에서 레디스에서 데이터를 하나하나 빼와서 어떤 작업을 처리하는 소스를 짠다.
2. 데이터를 빼올때 중복제거를 위해서 캐시를 만든다. 캐시는 크기 10의 리스트로 만들고, 데이터가 캐시에 없으면 처리하고 캐시에 데이터를 넣고, 데이터가 캐시에 있으면 작업을 처리하지 않고 그냥 버린다. 캐시 크기가 10을 넘어가면 fifo로 가장 나중에 처리되었던 데이터를 pop하면서 크기 10을 유지한다.
3. 프로세스는 메모리를 공유하지 않으므로 캐시는 ipc로 만든다.
4. 여러 프로세스가 한번에 레디스에서 데이터를 빼와서 캐시에 접근하고 하면 문제가 생길 수 있다. (레이스 컨디션)
예로 .... 1 2 2 [큐 뒤] 이런 구조에서, 프로세스 1과 프로세스 2가 한번에 데이터를 빼온다고 가정하자.
동일한 시간에 데이터를 빼온다고 가정하면, 둘다 데이터 2를 빼올것이고, 이것은 아직 캐시에 저장되기 전이므로 프로세스 1과 프로세스 2에서 어떤 작업을 처리한다.
작업은 한 데이터당 한번만 처리되어야 하므로 이것을 방지하기 위해 레디스에서 데이터를 빼내는 로직에는 한 프로세스만 접근하도록 구성한다. 이를 위해 데이터를 빼네는 부분을 lock을 건다.
소스.
from multiprocessing import Process, Lock, Manager class Worker: def __init__(self): self.cache = Manager().list() def pop_data(self, lock, redis): lock.acquire() while True: item = redis.pop_data() if item == None: # 큐가 비워졌으면 끝 item = None break if item in self.cache: print 'already finished! dump it = {}'.format(item) continue if len(self.cache) > 10: self.cache.pop(0) self.cache.append(item) break lock.release() return each_item def working(self, lock, redis): while True: data = self.pop_data(lock, redis) if not data: break ~~~~~ # 어떤 작업 def main(argv): redis = ~~~~ process_list = [] lock = Lock() process_count = 5 for i in xrange(process_count): worker = Worker() ps = Process(target=worker.working, args=(lock, redis, i)) process_list.append(ps) ps.start() for i in xrange(len(process_list)): process_list[i].join() if '__name__' == '__main__': main()
그냥 처음으로 이런 문제를 다뤄봐서 재밌었다.
python multiprocessing lock 체크하는 소스.
공식 홈페이지에 보면 https://docs.python.org/2/library/multiprocessing.html 가 있고, 이안에 lock이 있다.
락 예제가 있는 소스가 있는데, 비슷비슷하게 출력되서 이게 진짜 한 프로세스만 진입이 되는지 모르겠다.
time.sleep을 lock되는부분 안에 넣었더니 제대로 알수 있었다.
from multiprocessing import Process, Lock def f(l, i): l.acquire() import time time.sleep(1) print 'hello world', i l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
만약 lock이 제대로 안잡혔다면 sleep이 있건 없건 동시에 hello world 가 5번 출력되었겠지...
'Python > 2.7 information' 카테고리의 다른 글
crontab으로 프로그램 돌릴시 주의할점, 자세한 에러 출력법 (1) | 2018.02.13 |
---|---|
파이썬으로 구글 시트 조작. (2) | 2018.02.07 |
전문가를 위한 파이썬 책에서 알아낸 것들.. (0) | 2017.07.15 |
multiprocessing에서 여러 프로세스가 동시 변수 참조하는 문제 (2) | 2017.05.29 |
파이썬 정규식 한글. (0) | 2017.05.12 |