data engineering

스파크 최적화 삽질 관련 1.

qkqhxla1 2017. 7. 9. 16:00

최근 하둡 에코시스템 관련해서 프로그래밍을 하면서 삽질을 엄청나게 많이 했었다. 삽질의 반은 구현에 관한 삽질이었고, 나머지 반은 성능을 향상시키기 위한 삽질이었다. http://qkqhxla1.tistory.com/907 요 글에서 스파크 기초에 관한 지식만 가지고 있었고, 내부적으로 병렬화가 어떻게 실행되는지는 아예 몰랐었다. 

그냥 병렬화 관련된 map함수? 이런걸 쓰면 내부적으로 '알아서 잘 나뉘어져서' 병렬적으로 실행되는구나. 몰라도 되겠지. 정도였는데 최적화를 어떻게 하느냐에 따라 속도가 엄청나게 차이가 많이난다. 아래에 내가 여태까지 삽질한 결과를 적겠다.


아래에 처리한 내용들을 적기 전에 병렬화에서 가장 자원을 효율적으로 쓰면서 속도를 빠르게 내려면 어떻게 해야하는지 적어야 할 것 같다.

삽질하면서 찾은 최적화에 가장 중요한 영향을 끼치는 요소는 4가지였다. (이건 제가 삽질하면서 찾은거고, 다른 요소가 더 있을 수도 있습니다. 아마 있을겁니다.)

1. 적당한 크기로 잡 나누기.

2. 적당한 수의 프로세서 할당.

3. 잡의 실행 순서.(일단 spark만 해당. 다른건 모르겠다.)

4. 캐시 사용


병렬화 프로그램의 가장 기본적인 예로 1에서 100까지 더하는 프로그램을 병렬화 한다고 가정해보자. 가장 일반적으로 병렬화를 구현하면 작업을 10개씩 쪼갠 뒤, 10개의 프로세서에 할당하는 방법이 있다.(1~10따로 더하고, 10~20따로 더하고. 이런식으로..)


1번과 2번이 중요한 이유는 할당할수 있는 프로세서는 시스템마다 차이가 있기 때문이다. 50개의 프로세서를 할당할 수 있고, 작업을 최대한 빨리 처리해야하면 1~100까지 더하는 프로그램을 50개로 쪼갠 뒤 50개의 프로세서에서 동시에 실행시키는게 가장 빠를것이다. 프로세서를 50개를 할당할 자원이 되더라도 잡을 2개로 쪼갠다면 프로세서 50개중 2개가 잡을 처리하는동안 48개는 그냥 놀고만 있어야 한다. 50개를 할당하더라도 2개를 할당한것과 동일한 성능이 나오므로 자원의 낭비이다. 이러한 경우때문에 현재 내가 할당할 수 있는 프로세서의 수에 따라 적당한 크기로 잡을 분배하는게 중요하다. 

잠깐 생각을 해 보면 최적의 잡 갯수는 할당할 프로세서와 같은 갯수이거나 프로세서보다 살짝 낮은수가 가장 좋다는걸 깨달을수 있다. 


3번은 1,2번과 연관이 되어있는건데, 스파크에서는 프로세서를 익스큐터라고 한다. 스파크에서 익스큐터로 각각의 잡을 병렬로 실행시킬때, 크기나 그런것에 상관없이 가장 앞에서부터 하나씩 잡을 익스큐터에 할당한다. 먼저 일이 끝나는 익스큐터는 그 다음 잡을 실행시키는 방식으로 끝까지 간다. 잡의 갯수보다 익스큐터의 갯수가 많아지면 남은 익스큐터들은 모든 잡이 끝날때까지 대기한다.


위의 특성을 생각해서 스케쥴링을 해보자. 해야할 일이 두가지가 있는데, 두개의 일을 병렬로 프로그래밍을 하고자 한다. 익스큐터는 총 5개만 사용할수 있다.

첫번째 일은 3개의 잡이 있으며, 각각의 잡은 10분이 소모된다. 두번째 일은 3개의 잡이 있으며 각각은 60분이 소모된다.

단순하게 순서를 (10 10 10) (60 60 60) 앞에서부터 이런식으로 잡고, 익스큐터를 가장 많은 5개를 할당했다고 하자. 위에서 말한대로 잡은 앞에서부터 할당되니 익스큐터의 순서대로 (10 10 10) (60 60)이 할당될것이다. 

총 소요시간은 얼마나 걸릴까? 70분이 걸릴것이다. 맨 앞의 10분짜리 잡이 끝나면 그 익스큐터가 마지막 60분짜리를 실행시킬거고, 나머지는 일찍 끝나고 마지막 잡이 끝날때까지 기다려야하니 총 70분이 걸린다.

하지만 순서를 바꿔보자. 일의 순서를 (60 60 60) (10 10 10) 으로 하고, 익스큐터를 4개를 할당했다고 가정해보자. 계산을 해보면 모든 소요시간은 60분이 걸림을 알 수 있다. 4번째 익스큐터가 10분짜리 잡을 끝내고 그다음 10분, 이런식으로 10분짜리 잡을 3개 끝내고 기다리다 보면 60분짜리 잡이 끝나고, 그러면 모든 잡이 끝날테니 말이다.

단순히 잡의 순서만 바꿔줬는데 익스큐터도 5개에서 4개로 줄고, 실행시간도 10분이 줄었다. 조금만 생각해보면 알겠지만 위의 경우와 같을 경우 잡의 순서는 무조건 오래걸리는 잡을 먼저 실행시키도록 순서를 배치하는게 좋다.


마지막 4번 캐싱에 대해 보자. 이건 순수하게 삽질하면서 얻은 결과이다.(구글링해도 언제 딱 써야하는지는 잘 못찾겠다. 단순하게 오래걸리는 일이고 다시 재사용할 일이 있으면 쓰라고 한다.) pyspark에는 persist()라는 함수로 한번 연산한 결과를 저장해놓도록 할수 있다(반대로 unpersist()를 쓰면 풀린다.). 몇천만건의 데이터를 돌려보다 이상한 점을 발견했다. 이상한 부분의 코드는 아래와 같았다.

sc.textFile(~~).map(lambda x:특정 함수(x)).distinct().collect()

어떤 파일을 읽어서, map으로 특정 함수를 실행 후 중복 제거를 하고 리스트로 가져오는 명령이다. 데이터가 크고, 특정 함수를 모두 실행시키기에는 오래걸리는 작업이었다. 문제는 map()의 작업이 끝난 후 map내부의 특정 함수 작업을 한번 더 실행한다는 것이다. (그러니까 데이터가 100개면 map에서 100번 실행시키고, distinct에서 다시 100번을 실행시킨다. 뭐지?)

한번 실행시키는데 2시간이 걸리는 작업인데, 작업이 끝나면 한번을 더 실행시키니 총 4시간이 넘게 걸렸다. 열심히 삽질하다 추측한 내용은 map의 결과를 distinct()로 중복제거해야하는데, map의 결과가 너무 크고 오래걸려서 distinct()를 실행할 때쯤에는 메모리에서 사라져서(??) 다시 실행시킨다 였다. 

아직까지 정확한 원인은 모르겠는데 이렇게 생각하고 캐쉬 처리를 아래처럼 했더니 한번만 실행되고 제대로 끝났다.

r = sc.textFile(~~).map(lambda x:특정 함수(x)) # 오래걸리는 작업
r.persist()
data = r.distinct().collect()
함수(data)
~~
r.unpersist()

간단한 캐쉬 처리로 시간을 많이 줄일수 있었다. 이후 얻은 교훈은 오래걸리는 작업이면 무조건 persist()로 남겨놓자.. 이다.


다음 글은 실제 환경에서 설정이나 위의 지식들을 어떻게 활용했느냐이다.


이동하기. http://qkqhxla1.tistory.com/919