http://www.glennklockwood.com/data-intensive/hadoop/streaming.html#3-5-adjusting-parallelism
보고 따라 구현하면서 느낀점 등을 적음. 기본적인 하둡 설치같은건 되있다고 가정. 저기에 써있는데로 워드카운팅하는 맵리듀서 예제는 하둡의 Hello World라고 함.
맵리듀스에 관한 예제는 이전에 적어놓았었다 : http://qkqhxla1.tistory.com/900
wordcount 예제.
mapper.py
#!/usr/bin/python import sys for line in sys.stdin: line = line.strip() keys = line.split() for key in keys: value = 1 print( "%s\t%d" % (key, value) )
보면 알겠지만 공백으로 나눈후 단어 1 을 각 줄에 출력함.
reducer.py
#!/usr/bin/python import sys last_key = None running_total = 0 for input_line in sys.stdin: input_line = input_line.strip() this_key, value = input_line.split("\t", 1) value = int(value) if last_key == this_key: running_total += value else: if last_key: print( "%s\t%d" % (last_key, running_total) ) running_total = value last_key = this_key if last_key == this_key: print( "%s\t%d" % (last_key, running_total) )
맵퍼의 결과를 받아서 결과값을 도출해낸다고 추측 가능함. 아래는 실행 결과.
워드카운팅에 사용할 pg2701파일.
공부용 디렉터리를 만들고 하둡 명령어로 디렉터리를 만든다.
hdfs dfs -mkdir wordcount
hadoop dfs와 hdfs dfs의 차이가 궁금하면 여기를 보자. hadoop명령어는 사라지는 명령어라고 한다.
만들어진 디렉터리는 단순하게 쉘에서 mkdir wordcount로 만든것과 동일하게 생겼는데 내부적으로 처리할때 차이가 있는것 같다. 현재 디렉터리 내부에는
mapper.py pg2701.txt reducer.py wordcount 가 있다. pg2701의 확장자는 알아서 바꾸던지 하자.
이후 hdfs dfs -copyFromLocal ./pg2701.txt wordcount/mobydick.txt 명령어로 pg2701.txt를 wordcount아래로 복사한다. mobydick.txt도 vi로 만든것과 동일하게 생겼는데, hdfs로 만들지 않고 vi로 만들었을 경우 하둡에서 실행시 뭔가 오류가 나는것을 확인했다. 외부에서 보기엔 같지만 내부의 분산되는 형태?가 다른것같다.
맵리듀스가 잘 작동하는지 확인하자.
head -n1000 pg2701.txt | ./mapper.py | sort | ./reducer.py 를 실행시키면 워드카운팅이 잘 되어서 나온다.
실행 순서를 보자. 텍스트 파일의 1000줄만 가져와서 그것을 mapper에 보내고, 정렬후 다시 reducer에 보낸다. 당연하지만 이 경우에는 mapper가 하나인 경우이다.
하둡에서 실행시켜보자. 나같은 경우 hadoop-streaming의 경로가 달라서 아래 명령어로 실행시켰다.
hadoop jar /opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.8.2.jar -mapper "mapper.py" -reducer "reducer.py" -input "wordcount/mobydick.txt" -output "wordcount/output"
눈치로 알수 있겠지만 맵퍼를 설정하고, 리듀서를 설정하고, 넣을 파일을 설정하고 결과값을 낼 파일을 설정한다.
여러번 실행시키면 이미 output이 있다고 에러가 난다.
병렬적으로 실행하기 위해 여러개의 맵퍼를 할당해보자. 아래의 빨간 옵션만 추가해주면 된다.
hadoop jar /opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.8.2.jar -D mapred.map.tasks=4 -mapper "mapper.py" -reducer "reducer.py" -input "wordcount/mobydick.txt" -output "wordcount/output"
'data engineering' 카테고리의 다른 글
pyspark에서 jar파일 사용하는방법. (py4j) (0) | 2017.07.20 |
---|---|
spark로 hive 쿼리 실행시키기. (0) | 2017.07.14 |
스파크 최적화 삽질 관련 2. (0) | 2017.07.09 |
스파크 최적화 삽질 관련 1. (0) | 2017.07.09 |
shell script 하둡 명령어 자동 프로그램. (0) | 2017.07.07 |