data engineering

스파크 최적화 삽질 관련 2.

qkqhxla1 2017. 7. 9. 16:47

http://qkqhxla1.tistory.com/918 요 글에 이은 두번째 글이다. 실제 환경에서 어떻게 처리했는지 적었다.


일단 프로그램의 흐름은 hive에서 쿼리 결과를 가져와서 csv파일로 저장 -> 다른 하둡 서버에서 csv를 가져옴 -> 저장된 csv파일을 spark에서 읽어서 처리. 하는, 흐름만 보면 간단해보인다.(처음에는 더 복잡했는데 줄이고 압축하다보니 플로우가 줄었다.)

지워야 할것같은건 다 지웠다. 현재 oozie의 워크플로우다. hive script로 sql을 실행하고, 그 아래의 shell은 현재 서버에서 다른 하둡서버로 distcp를 이용해 데이터를 쏜다. 그리고 작업이 성공적으로 끝나면 finish_successfully라는 스크립트를 실행시키는데, 내부에 슬랙으로 메시지를 쏴주는 코드가 있어 성공적으로 끝났음을 확인할 수 있다. 작업중에 어떤 작업이라도 실패할 경우에는 shoot_error_to_slack스크립트를 이용해 에러메시지를 슬랙으로 쏴준다. 


첫번째 hive처리 부분부터.

hive에서 쿼리 결과를 csv로 저장하려면 insert overwrite directory '파일경로'이라는 구문이 있다. 이 구문을 맨 위에 적고, 쿼리를 짜면 된다. (이미 있으면 덮어쓴다.)


insert overwrite directory '경로.csv'


select ~~~~ limit 10000000


처럼 하면 실행 결과가 잘 들어간다. 그런데 문제는 저 쿼리의 끝에 limiit이 있었다는 점이다. limit을 안붙이고 실행할 경우 

요렇게 뭔가 알아서 잘 쪼개져서 결과가 저장되는데, limit을 붙일경우 하나의 파일로 저장된다. (왜 그런지는 아직도 모르겠다. 하이브 내부적인 로직에 의한것 같다.) 여러개의 파일로 저장되는것과 하나의 큰 파일로 저장되는것의 차이는 앞 글에서 설명한 것과 같이 중요하다. 더 적기 전에 하둡의 맵리듀스의 원리에 대해서 설명을 해야겠다.


하둡에서 맵리듀스(병렬처리)로 작업을 처리할때 어떻게 하나의 파일을 병렬화로 동시에 처리할까? 하둡에서는 병렬화 작업을 수행할때 block size, input split이라는 개념으로 처리한다. block size는 물리적으로 쪼개진 덩어리고, input split은 각각의 block size에서 논리적으로 쪼개진 개념이다.

참고 : https://stackoverflow.com/questions/2831507/how-does-hadoop-perform-input-splits

참고2 : https://stackoverflow.com/questions/30549261/split-size-vs-block-size-in-hadoop


나같은경우 block size의 디폴트값은 hdfs-site.xml의 dfs.blocksize 프로퍼티에 있었다. 134217728라는 값이 들어가있는데, 이 값은 바이트이므로 보기 편하게 바꿔 보면 약 134메가바이트이다. 고로 파일 하나당 134메가바이트와 근처의 값으로... 알아서 잘 쪼개진다는 뜻이다.(더 적을수도있고 많을수도있고 정확하게 저 값으로 쪼개질 수도 있다.)


하둡에서의 병렬처리시 각각의 잡은 맵퍼가 돌리고, 스파크에서는 익스큐터가 하나의 잡을 돌린다. 결국 하둡 맵퍼 == 스파크 익스큐터 == 프로세스 하나 라고 생각하면 편하다. (다만 하둡 맵퍼는 싱글쓰레드, 익스큐터는 프로세스 개념이다.)

참고 : https://www.mail-archive.com/user@spark.apache.org/msg19472.html

그리고 input split에 대해서 테스트는 안해봤는데, spark에서는 한개의 block size당 한개의 익스큐터가 할당된다. 그말은 앞에서 말한것처럼 block size를 기반으로 잡을 나눌수 있다는 말이다. 

하이브에서 쿼리 결과를 저장시 어떻게 파일이 분산되어 저장되는지는 아직 잘 모르겠다. 여러번 돌려보면서 추측한 결과는 block size보다 결과가 작으면 그냥 그 크기대로 저장되고, 더 크면 block size 만큼 나뉘어져서 저장된다는 거다.

하이브에서 block size 설정은 set dfs.block.size=5242880; 처럼 할수 있다.


set dfs.block.size=5242880; -- 5242880바이트 == 5메가바이트


insert overwrite directory '경로'


select ~~~~ limit 10000000

실행시킨 결과는 아래와 같다.

뭔가 5mb씩 딱딱 맞춰서 나올것 같았는데 별 차이 없는것같다.(맨위의 사진과 동일한 쿼리) 스파크에서 돌릴때 일정 수만큼 돌아가는걸 보면 내부적으로 잘 사이즈가 처리된것 같긴 하다. 위의 block size글에서 block size는 물리적으로 쪼개진 덩어리라고 해서 딱딱 5mb씩 나올줄 알았는데 각각 파일마다 잔존 데이터가 있어서 그런지(?) 뭔가 만족스럽게 나오진 않는다.


맨위에서 쿼리에 limit을 걸었을때 결과가 한 파일에 나온다고 했는데, block size를 지정하면 한 파일에 결과가 저장되더라도 내부적으로는 나뉘어져있다.(나중에 스파크로 파일을 실행시킬때 익스큐터가 몇개나 동시에 돌아가는지로 확인 가능하다.) 어쨌든 블록사이즈를 잘 주고 파일을 적당히 내 입맛대로 나눴다.


그리고 뒷쪽에 http://qkqhxla1.tistory.com/917에 썼던 스크립트를 실행되도록 만들었다. distcp로 다른 서버에서 여기로 파일을 가져오는것뿐 아니라, 이 서버에서 다른 서버로 쏘는 것도 가능하다.

export HADOOP_USER_NAME구문이 있는데, 이 구문으로 사용자를 지정해주지 않으면 권한 관련 에러가 날 수도 있다.

그리고 마지막으로 모니터링을 위해 슬랙으로 메시지를 쏴주도록 구현했다.


여기까지가 첫번째 서버에서 한 일이다. distcp에서 쏜 두번째 서버에서 데이터를 처리한다.

두번째 서버의 oozie 워크플로우에는 pyspark스크립트 한개와 에러시 슬랙으로 쏴주는 또다른 스크립트만 들어있다. pyspark의 옵션이다.

'--num-executors 숫자'옵션으로 익스큐터를 20개를 할당하도록 설정했다. 

추가.

'--executor-cores 숫자'옵션으로 한 익스큐터당 코어 갯수를 지정할수 있는데, 이게 쓰레드 개념으로 보면 되는것같다. 익스큐터 갯수는 프로세스, 코어는 쓰레드. 

이해가 안 갈것 같으면 병렬화하는 함수 외부에 글로벌로 변수 하나를 선언해놓고 내부에서 +=1 처리를 해준 후, 출력하도록 해보면 된다. core갯수가 5개일때 한 익스큐터의 로그 메시지를 보면 여러개의 쓰레드가 돌때처럼 동시에 숫자가 출력되는걸 확인할수 있다. 

executor*core수가 총 워커의 수가 되며, 8*5나 5*8이나 똑같이 워커의 수는 40이지만 일반적으로 익스큐터의 수가 더 많을때 더 좋은 효율을 낸다.

참고 : https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors

 

내부에서 처리하는 스크립트는 다음과 같다. 처리하는 부분만 가져왔다.

def main():
	site = sys.argv[1].split(',')
	for each_site in site:
		rdd = getCsvResult('파일경로')
		rdd.persist()
		push(rdd.distinct().collect())
		rdd.unpersist()

아주 간단하다. 사실 처음엔 파일이 여러개여서 여러개의 파일을 연 다음 union이란걸로 합쳤다가 하고 해서 복잡했는데 스파크에서 파일을 열때 와일드카드 문자 *를 사용할수 있다는걸 찾아서 그걸 사용하고 있다. getCsvResult함수 내부에서는 파일을 열어서 map()작업까지 수행하고 난뒤 return하라고 적혀있다. 

앞에서 이야기했듯이 캐쉬 설정을 안해주면 작업이 두번 실행되기에, persist()로 만들었다가 push함수에서 사용하고 다시 해제해주었다.


캐쉬를 사용하기 전에 찾은 또다른 이상한점인데, http://qkqhxla1.tistory.com/907 요 글에서 적었듯이, 스파크의 연산이 실제 실행되는 곳은 위의 스크립트에서 collect()부분이다. 반복문 안에 있긴 하지만, collect()차례가 되면 collect()가 끝날때까지 작업이 멈춰있다가, 다음 반복문으로 넘어갈줄 알았는데 아니다. 반복문이 3번 돈다고 가정하면 3번 모두 돌고 난후 collect()가 한번에 모아서 처리된다.(???) 그래서 시간도 원래 걸려야 할 시간보다 훨씬 더 오래걸린다. 

이것도 뭔가 스파크의 내부적인 처리 문제 같긴 한데... 캐쉬를 쓰니까 다 해결되긴 하였다. 이것도 원인을 알게 되면 글을 써야겠다.


스크립트를 실행시키면 파일이 열리면서 내가 지정한 map이 실행되는데, 앞에서 말했듯이 큰 잡을 먼저 수행하고, 작은 잡을 나중에 수행하게 하니 1시간이 줄었다. 그리고 캐쉬를 사용하니 2시간이 줄었고, 아무리 살펴봐도 더이상 줄일게 없는것같다. 글을 보면 간단해보이는데, 이런것들 시간을 줄이려고 거의 일주일을 투자했다.