data engineering

pyspark 큰 용량의 collect()를 범위로 잘라서 가져오는 방법.

qkqhxla1 2017. 10. 30. 10:43

pyspark를 하다보면 가장 어려운 부분은 세팅 부분이란걸 알게 된다. 프로그래밍이야 뭐 파이썬과 거의 같고 함수 몇개만 알고 어떻게 돌아가는지 적당히 몇개 돌리다 보면 할수 있다. 하지만 세팅이나 설정과 관련해서 곤란을 많이 겪는다. 

예를들어 몇시간동안 어떤 작업을 돌리고 collect()로 모든 데이터를 리스트로 가져왔는데 메모리가 부족해서 프로그램이 죽어서 처음부터 다시 돌려야한다던지....(-_-) 하는 일이 참 많다. 이번 글에 적을 내용은 collect()와 관련된 내용이다.


내가 해야 할 일은 어떤 데이터를 어떤 함수에서 처리한 후 redis에 전부 집어넣는것이다. 나는 이미 redis관련 클래스를 만들어놓았었다. 그래서 리스트로 가져온 후 레디스에 push할수 있었다. 이것만 생각해보면 간단하다. 대충 플로우를 그려보면 

rdd 트랜스포메이션으로 어떤 함수 처리 -> collect()로 가져옴. -> redis관련 클래스에 push함.

인데... rdd 트랜스포메이션으로 어떤 작업을 처리한후 collect()로 가져오면 메모리가 부족하다고 프로그램이 죽어버렸다. 당연히 메모리가 부족하면 메모리를 늘리면 되지. 하고 아래와 같은 여러가지 설정을 찾아서 넣어봤지만 적용이 되지 않았다.

sconf = SparkConf().setAppName("PySpark")
sconf.set('spark.kryoserializer.buffer.max', '1024')
sconf.set('spark.executor.memory', '4G')
sconf.set('spark.driver.memory', '10G')
sconf.set('spark.driver.maxResultsSize', '10G')

여기말고 workflow의 옵션에서 driver의 메모리 설정을  --driver-memory 2g --executor-memory 2g 요렇게 줘도 프로그램이 계속 죽었다.(설정 수치도 크게크게 바꿔봤는데 해결 x)

결국 메모리를 늘리는 방법은 포기하고 로직을 바꾸자고 생각을 변경하였다.


원래 플로우인

'rdd 트랜스포메이션으로 어떤 함수 처리 -> collect()로 가져옴. -> redis관련 클래스에 push함.' 에서

'rdd 트랜스포메이션으로 어떤 함수 처리하면서 그 함수 내부에서 redis관련 클래스의 인스턴스를 생성해서 redis에 집어넣음. 리턴은 None으로 아무것도 없게 함-> distinct()로 중복 제거 후 rdd 액션을 실행시킴..' 으로 변경을 해봤는데..


이전에 적었듯이 http://qkqhxla1.tistory.com/907 의 피클링 관련 문제때문에 포기했다...

그러니까 글로벌하게 클래스를 선언하고, rdd 트랜스포메이션으로 사용할 함수 내에서 글로벌한 클래스의 인스턴스를 만들면 피클링 관련 문제가 있어서 에러가 발생하면서 죽는다. 아래 링크도 보자.

https://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute


결국 또 'rdd 트랜스포메이션함수 내부에서 redis로 보내기'도 포기하고 또 로직을 바꿔보기로 했다... rdd 연산 중에 take()라는게 있다. take()를 쓰면 앞에서부터 인자갯수만큼 데이터를 리스트로 가져올수 있다. 이를 이용해서 메모리 에러가 안날만큼의 작은 데이터를 가져와서 레디스에 넣고, 다시 작은 데이터를 가져와서 넣고.... 하려고 해봤는데. 

take()는 정수 인자 하나만 받으며 앞에서부터 정수 인자까지의 갯수를 가져온다. 즉 0~20은 가져올수 있는데 20~40은 가져올수 없다는거다. 거기에 take는 pop도 아니기에 take를 계속 써봐도 똑같은 값만 나온다....(다들 알겠지만 pop는 데이터를 리턴하고 해당 데이터를 원본에서 버린다. 하지만 take는 데이터를 리턴하고 끝. 원본에 여전히 있다.)


이것들도 다 포기하고 flatmap, aggregate등등을 다 써본것같지만 항상 실행만 하면 메모리가 부족하다고 에러가 떴다.

별 난리를 치다가 take()를 이용해서 pop처럼 만들어보려고 아래처럼 만들어봤다.

x = sc.parallelize(range(50))
for i in range(5):
     t = x.take(5)
     s = set(t)
     x = x.filter(lambda x:x not in s)
     c = x.count()
     print x.collect()

결과를 한번 예측해보자. take로 5개를 가져오고, filter로 x에서 앞에 take로 가져온 것들을 제외시키고 재구성한다. x.count()를 굳이 쓴 이유는 count()가 액션이기 때문이다. pyspark는 action때에 진짜로 함수가 실행되기 때문에 액션을 써 줬다. action연산을 쓰지 않으면 실제로 실행이 되지 않는다. 어쨌든 저런식으로 프로그램을 짜면 t에 5개씩 다른 값이 나올거라고 예측했지만 제대로 실행되지 않았다...

count()로 액션을 중간중간 실행시켜줬음에도 불구하고 잘 안나온다.

% 트랜스포메이션과 액션 : https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code

나는 매번 액션마다 앞의 모든 트랜스포메이션을 실행시키는줄 알았는데 위 결과를 보니 그건 또 아닌가보다.


그러다 찾은 글이!

https://stackoverflow.com/questions/37368635/what-is-the-best-practice-to-collect-a-large-data-set-from-spark-rdd

다. 역시 나 말고도 다른사람들이 고민해봤었는지 collect()를 일정 범위로 가져오는 함수를 누가 만들어놨다.

원리를 보면 각각의 데이터에 인덱스를 부여한 후 filter로 인덱스 범위의 값만 yield로 리턴하는건데 진짜 왜 저생각을 못했나 싶을정도로 내겐 간절하게 필요한 함수였다.


아래 함수를 써서 collect()로 큰 데이터를 잘라서 리스트로 가져올 수 있다. 리스트로 가져오기만 하면 프로그래밍이야 익숙하니 잘 처리할수 있다.


혹시 모르니 백업해둠.

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size