data engineering

hadoop 맵리듀서 구현.

qkqhxla1 2017. 7. 13. 18:22

http://www.glennklockwood.com/data-intensive/hadoop/streaming.html#3-5-adjusting-parallelism


보고 따라 구현하면서 느낀점 등을 적음. 기본적인 하둡 설치같은건 되있다고 가정. 저기에 써있는데로 워드카운팅하는 맵리듀서 예제는 하둡의 Hello World라고 함. 


맵리듀스에 관한 예제는 이전에 적어놓았었다 : http://qkqhxla1.tistory.com/900

wordcount 예제.


mapper.py

#!/usr/bin/python
import sys

for line in sys.stdin:
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1
        print( "%s\t%d" % (key, value) )

보면 알겠지만 공백으로 나눈후 단어  1 을 각 줄에 출력함.


reducer.py

#!/usr/bin/python
 
import sys
 
last_key = None
running_total = 0
 
for input_line in sys.stdin:
   input_line = input_line.strip()
   this_key, value = input_line.split("\t", 1)
   value = int(value)
 
   if last_key == this_key:
       running_total += value
   else:
       if last_key:
           print( "%s\t%d" % (last_key, running_total) )
       running_total = value
       last_key = this_key
 
if last_key == this_key:
   print( "%s\t%d" % (last_key, running_total) )

맵퍼의 결과를 받아서 결과값을 도출해낸다고 추측 가능함. 아래는 실행 결과.


워드카운팅에 사용할 pg2701파일.

pg2701.rtf

공부용 디렉터리를 만들고 하둡 명령어로 디렉터리를 만든다.


hdfs dfs -mkdir wordcount


hadoop dfs와 hdfs dfs의 차이가 궁금하면 여기를 보자. hadoop명령어는 사라지는 명령어라고 한다.

https://stackoverflow.com/questions/18142960/whats-the-difference-between-hadoop-fs-shell-commands-and-hdfs-dfs-shell-co


만들어진 디렉터리는 단순하게 쉘에서 mkdir wordcount로 만든것과 동일하게 생겼는데 내부적으로 처리할때 차이가 있는것 같다. 현재 디렉터리 내부에는

mapper.py pg2701.txt reducer.py wordcount 가 있다. pg2701의 확장자는 알아서 바꾸던지 하자.


이후 hdfs dfs -copyFromLocal ./pg2701.txt wordcount/mobydick.txt 명령어로 pg2701.txt를 wordcount아래로 복사한다. mobydick.txt도 vi로 만든것과 동일하게 생겼는데, hdfs로 만들지 않고 vi로 만들었을 경우 하둡에서 실행시 뭔가 오류가 나는것을 확인했다. 외부에서 보기엔 같지만 내부의 분산되는 형태?가 다른것같다.


맵리듀스가 잘 작동하는지 확인하자.

head -n1000 pg2701.txt | ./mapper.py | sort | ./reducer.py 를 실행시키면 워드카운팅이 잘 되어서 나온다.


실행 순서를 보자. 텍스트 파일의 1000줄만 가져와서 그것을 mapper에 보내고, 정렬후 다시 reducer에 보낸다. 당연하지만 이 경우에는 mapper가 하나인 경우이다. 


하둡에서 실행시켜보자. 나같은 경우 hadoop-streaming의 경로가 달라서 아래 명령어로 실행시켰다.

hadoop jar /opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.8.2.jar -mapper "mapper.py" -reducer "reducer.py" -input "wordcount/mobydick.txt" -output "wordcount/output"


눈치로 알수 있겠지만 맵퍼를 설정하고, 리듀서를 설정하고, 넣을 파일을 설정하고 결과값을 낼 파일을 설정한다.

여러번 실행시키면 이미 output이 있다고 에러가 난다.


병렬적으로 실행하기 위해 여러개의 맵퍼를 할당해보자. 아래의 빨간 옵션만 추가해주면 된다.


hadoop jar /opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.8.2.jar -D mapred.map.tasks=4 -mapper "mapper.py" -reducer "reducer.py" -input "wordcount/mobydick.txt" -output "wordcount/output"