data engineering

spark mongo data processing, write csv to s3 삽질 정리.

qkqhxla1 2020. 9. 21. 13:38

https://qkqhxla1.tistory.com/1006 의 mongo부분의 글을 재정리함과 동시에 겪었던 모든 문제들을 적으려고 한다.


하려는 작업의 도식도는 위의 그림과 같다. 몽고디비에서 데이터를 가져와서 스파크에서 전처리를 한다. 그후 전처리된 데이터를 csv형태로 S3에 저장한다. (S3에 저장된 데이터는 하이브 테이블로 만들 예정인데 그건 다른 글에 많이 써놔서 제외한다.

공용으로 쓰는 spark서버 말고 새로 구축해서 우리꺼 전용으로 사용하려고 내가 구축한 kubernetes rancher서버에 bitnami spark로 아무 버전이나 설치했었는데 이러면 안되었다.


삽질하면서 얻은 중요한 결론을 먼저 적자면 spark는 spark버전과 hadoop버전, 거기에 관련된 jar버전까지 전부 맞아야 한다. 가장 삽질을 많이 한 부분이 프로그래밍적인 부분이 아니라 이러한 버전 맞추는 부분이었다.


처음에 스파크 버전을 보니 마치 파이썬처럼 2.4.7이 2버전 마지막이고, 최근에 3.0.1 까지 나왔었다. 근데 3버전으로 메이저 패치 버전이 바뀐만큼 함수등이 크게 바뀌었으면 어쩌지.. 하는 걱정과 함께 bitnami spark 2.4.7을 설치했다. 근데 필요한 jar들을 다 설치한것같은데 잡이 돌지를 않는다. 내가 쿠버네티스를 rancher라는 gui로 편하게 배운터라, 내부 구조가 아직까지 깊게 이해되지는 않고, 찾아보다가 몇일 시간을 날렸다.


지금 생각해보니 문제는 내가 kubernetes도 깊게 이해 못한 상태인데 spark도 클러스터 구축은 처음인지라 이해를 못한 상태에서 불분명한 에러가 발생했을때 kubernetes를 봐야하는지 spark를 봐야하는지 어디를 봐야하는지 몰라서 멘붕이 와서 이것저것 쓸때없는것만 찾다가 실패했었다.


그래서 이번엔 팀장님의 조언에 따라 문제를 나눠보았다.

1. spark를 놀고있는 aws ec2에다가 standalone으로 1대만 구축해놓는다

2. 원격에서 몽고 관련 잡을 standalone spark서버로 보내서 실행시켜본다. 

만약 여기서 잘 된다면 설정이 같으니 spark보다는 kubernetes에서의 문제라고 좁혀볼 수 있다.


1. Mongodb -> spark cluster


standalone spark 서버 구축해서 알아낸건..


처음에 2.4.7로 설치하고 삽질했었는데 3.0.1이 답이었다. (하둡 버전은 이따가 아래에 적는다.)

인스턴스 안에서 spark의 sbin폴더 내의 ./start-master.sh, ./start-slave.sh spark://x.x.x.x:7077 처럼 마스터와 슬레이브를 한 인스턴스내에서 실행해서 초간단 spark cluster를 설치 후에 테스트를 해보았다.


이전 글에도 적었지만 스파크와 몽고 사이에 processing이 필요할땐 jar들이 필요한데 버전 영향을 많이 받는다.

https://docs.mongodb.com/spark-connector/master/여기 정보가 가장 중요하다.

맨 아래에 mongodb connector for spark version 몇이 release되었다고 나와있는데 자세히 보면 스파크 2.4.7버전에 대해서는 mongo connector가 없다.

띠용..... 결국 kubernetes나 spark가 아니라 지원이 안되는 2.4.7로 어떻게든 될줄알고 괜히 개삽질하고있었던거임..

이상한건 저기에는 3.0.1도 지원한다는 말이 없는데 3.0.0버전의 jar을 3.0.1 스파크에 넣고 돌리니까 된다.(이건 또 왜 되는거지..) 어쨌든 mongodb spark connector은 여기에서 3.0.0용 버전인 이걸 spark의 jars폴더에 다운받았다.


테스트를 돌리기위한 test.py이다.

# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
import time

reload(sys)
sys.setdefaultencoding('utf-8')

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://mongoid:mongopw@x.x.x.x:27017/db.collection?authSource=admin") \
    .getOrCreate()

pipeline = "{'$match': {'started_day': '20200919'}}"
df = spark.read.format("mongo").option("pipeline", pipeline).load()

df.limit(100).show()

test.py를 submit하기위한 test.sh이다. (test.py와 test.sh는 spark/bin에 그냥 넣어놓음. 위에서 다운받은 mongo-spark-connector는 아래처럼 --packages인자로 추가해준다.)

#!/bin/bash

./spark-submit --master spark://x.x.x.x:7077 --driver-memory 2g --executor-memory 8g 
--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 test.py

--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 로 spark connector을 추가해주었다.


돌리니까 당연히 추가 jars가 없어서 에러가 떠서

mongo-java-driver :  https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver/3.11.2

bson : https://mvnrepository.com/artifact/org.mongodb/bson/3.11.2


을 추가로 jars에 다운받아주었다.(jar 버전이 중요하다고 적었는데 위의 mongo-java-driver, bson은 버전에 대한 이슈가 없었다.)


이제 결과가 잘나온다.


+ mongo aggregation pipeline을 만들때

pipeline = "{'$match': {'$and': [{'started_day':'20200919'}}, {'_id':{'$lte': 4768810367}}]}"


처럼 $and를 이용해서 started_day가 20200919면서 id가 4768810367이하인 값을 찾으려고 했었는데 list of pipeline으로 하라면서 에러가 뜬다. 아니 또 왜 하면서 찾아보니 여기에 설명이 잘돼있다.

pipeline = "[{'$match': {'start_day': '20200919'}}, {'$match': {'_id':{'$lte': 4710898222}}}]"


요런식으로 바꿔서 주니 되었다. 오 이제 스파크로 몽고디비 데이터를 100개지만 가져왔네? 하고 limit()을 풀고 돌리니 안된다. test.sh를 보면 알겠지만 driver-memory와 executor-memory를 나름 충분히 줬었다. 

driver-memory와 executor-memory의 차이점이 궁금하면 이 글을 읽어보자.

요약하자면 driver-memory는 내가 돌릴 메인 프로그램의 메모리이고 executor-memory는 spark에서 map reduce를 실행할때 각각의 잡에 할당될 메모리이다. driver-memory는 2~4gb, executor-memory는 2~8gb라고 해서 난 2,8gb 를 줬다.


어쨌든 메모리는 충분한것같은데 왜 큰 데이터를 가져오려고 하면 메모리가 부족하다면서 죽을까? 찾아보니 몽고를 저런식으로 호출하면 몽고에 있는 전체 데이터를 한번에 가져온다.


그럼 나눠서 가져오면되는데 어떻게 나눠? 하면서 찾아보다가 공식 홈페이지에서 파티셔너를 발견했다.

https://docs.mongodb.com/spark-connector/current/configuration/ 파티셔너를 적용해서 코드를 짜면

# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
import time

reload(sys)
sys.setdefaultencoding('utf-8')

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

start_id, end_id = 1, 100
pipeline = "[{'$match': {'start_day': '20200919'}}, {'$match': {'_id':{'$gte': " + start_id + ", '$lt': " + end_id  + "}}}]"

df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
               .option("spark.mongodb.input.partitioner","MongoSamplePartitioner") \
               .option("spark.mongodb.input.partitionerOptions.partitionKey", "_id") \
               .option("spark.mongodb.input.partitionerOptions.partitionSizeMB", 64) \
               .option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 10) \
               .option("spark.mongodb.input.uri", "mongodb://mongoid:mongopw@x.x.x.x:27017/db.collection?authSource=admin") \
              .option("pipeline", pipeline).load()

df.limit(1).show()

요렇게 파티셔너를 적용해주니 잘 되는걸 확인할수 있었다. (pipeline에 start_id, end_id차이가 100으로 작아서 되는게아니라 더 커도 됨.이제 진짜 몽고에서 스파크로 데이터를 적절히 가져오는 로직을 짜는데 성공하였다. 



2. Spark -> S3


스파크 프로세싱하는게 제일 간단해서 성능 업시키려고 찾아냈던것만 적자면, 현재 standalone서버로 스파크를 돌리고 있다. slave를 실행시키면 기본적으로 하나의 worker와 executor만 나오길래 현재 ec2가 성능이 좋은거여서 여러개의 worker나 executor을 돌리려고 했다.


스파크 3부터는 1개의 노드에 1개의 워커, 1개의 워커에 여러개의 익스큐터를 돌리는것을 추천한다. 여기서 가져옴

익스큐터 갯수 설정 관련해서는 여기 스택오버플로우 글이 가장 참고가 잘 되었다.

익스큐터들의 메모리의 합은 워커에 배정된 메모리를 넘을 수 없다. 당연한 말이지만 돌리면서 스파크 히스토리서버를 보면서 익히는게 더 잘 기억된다. 그리고 현재 위 몽고 잡의 경우 하나의 워커에 여러개의 익스큐터를 돌려도 큰 속도차이는 없는것같다.(io가 가장 큰 버틀넥이라 하나의 노드에서 여러개 돌려도 소용없는듯)


Spark에서 S3로 csv를 쓰면서 발생했던 이슈에 대해서 적자면..

처음에는 바로 쓰는 코드를 썼다가 에러가 발생하면 어디서 에러가 발생한지 확인하기가 힘들어서 csv를 로컬에 써보기로 결심했다. 


https://stackoverflow.com/questions/31385363/how-to-export-a-table-dataframe-in-pyspark-to-csv


여기 보면 엄청 간단하게 나와있다. 

spark 2이상으로는 df.write.csv('mycsv.csv'처럼 하면 된다고한다. 근데 로컬에 있는 절대경로를 입력했는데 폴더만 하나 딸랑 생성될 뿐, 내부에 csv가 만들어지지 않았다. 뭐지????

df.toPandas().to_csv('mycsv.csv') 이렇게 pandas로 만들면 단일 csv가 로컬에 만들어진다. 근데 왜 df.write.csv로는 안만들어지지? 했었는데 어디서 봤는데 실행시키는 local이 hdfs와 같은 분산 시스템이면 제대로 저장되는데, 나처럼 그냥 ec2같은곳에서의 일반적인 디스크면 분산 시스템이 아니기때문에 안 된다는 얘기를 들었다.

df.toPandas().to_csv 로는 일반적인 디스크에도 저장이 되는데, 이건 toPandas()는 현재 사용하는 메인 프로그램에 모든 데이터를 로드하는것 같다. 그래서 데이터가 크면 프로그램이 죽는다. 


어쨌든 내가 로컬에 csv를 쓰려는 목적이 S3에 넣기 전에 테스트 용으로 쓰는거였는데, 해답이 hdfs나 S3같은 분산 시스템에 저장하라는게 해법이면 바로 S3에 저장하도록 하면 된다.

그런데 현재 aws에 권한이 걸려 있다. S3에 접근하려면

https://qkqhxla1.tistory.com/1056 여기에서 글썼듯이 temporary credential을 가져와서 spark에 설정해야 한다. 이전에 spark에서 credential을 넘기는 방법을 https://qkqhxla1.tistory.com/993 에서 쓴적이 있는데, 여기에 session token값을 추가로 넘겨야 할 듯 싶다. 글을 찾다가 아주 좋은 글을 발견했다.


https://medium.com/@leythg/access-s3-using-pyspark-by-assuming-an-aws-role-9558dbef0b9e

이미지를 그대로 가져온다.

이대로 pyspark에서 세팅하면 되고 중요한 부분은 hadoop-aws-3.1.2.jar 와 aws-java-sdk-bundle-1.11.271.jar 라는 두개의 jar을 설정했다는 것이다. 


여기서부터 헬이었는데 처음에는 spark-2.4.7-bin-hadoop2.7 을 설치했었는데 안되었다.(스파크, 하둡 버전에 유의하자.) 분명 credential을 잘 줬는데 


Error: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 2518ADAB075DA7E


이런 에러가 발생했다. 정말 여기서 찾아도 찾아도 해법이 안나오다가 겨우 찾은게 이 글이다. 글 맨아래 답변을 읽어보면 하둡에서 aws작업 관련해서는 hadoop 2.7까지는 지원하지 않고, 2.8버전의 하둡부터 지원한다는 말이다. 실제로 하둡 공식 홈페이지를 살펴보면 하둡 2.7 docs, 하둡 3.2 docs를 비교해서 살펴보면, 하둡 3.2docs에서는 fs.s3a.session.token가 있지만, 2.7docs에는 없다.


결국 내가 설치한 spark-2.4.7-bin-hadoop2.7는 하둡 버전이 2.7이므로 안된다. 지우고 spark-3.0.1-bin-hadoop3.2 으로 다시 설치했다. 참고로 spark cluster, spark client둘다 이 버전이어야 한다.


그리고도 계속 에러가 났었는데.. 중간에 java.lang.NumberFormatException: For input string: "100M" 자꾸 100M으로 뭐가 잘못 설정되어있다는 형식의 에러가 났다.(참고로 설정한 적 없음.) 원인을 찾다가 발견한 사실은 hadoop-aws-3.1.2.jar 와 aws-java-sdk-bundle-1.11.271.jar 요 두개의 버전에 관한 것이었다. 


또 스택오버플로우를 엄청 뒤지니 좋은 글이 하나 있었다 : https://stackoverflow.com/questions/52821077/how-to-check-if-hadoop-common-jar-and-hadoop-aws-jar-are-in-sync

이 글에서 가장 유용한 정보는 jar과 하둡의 버전이 incompatible하다는 것이었다.엇 내 하둡은 3.2버전인데 아무생각없이 hadoop-aws-3.1.2.jar를 설치했었네? 바로 hadoop-aws-3.2.0.jar을 다운받았는데, 문제는 aws-java-sdk-bundle-1.11.271.jar이 버전은 어떻게 맞추냐였다. 이것도 엄청 삽질하다가 발견한건데 답은 jar 다운로드 홈페이지에 있었다. 


hadoop-aws-3.2.0.jar 다운로드 주소이다 : https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/3.2.0 근데 여기 아래에 보면

이렇게 compile dependencies라고 aws-java-sdk-bundle버전이 적혀있다! 결국 내 환경은



spark cluster, spark client : spark-3.0.1-bin-hadoop3.2 -> mongo connector를 사용하기 위한 2.4.x버전이나 3.0.1, hadoop aws를 위한 2.8버전 이상의 하둡.


mongo-spark-driver : https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector_2.12/3.0.0 -> 내 스파크 버전에 맞는 mongo connector


mongo-java-driver :  https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver/3.11.2


bson : https://mvnrepository.com/artifact/org.mongodb/bson/3.11.2


hadoop-aws-3.2.0.jar : https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/3.2.0 -> 내 하둡 버전에 맞는 jar


aws-java-sdk-bundle-1.11.375.jar : https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle/1.11.375 -> 바로 위 hadoop-aws-3.2.0.jar에 맞는 ajr


이렇게 되었다. 그리고 필요한것만 가져온 test.py

# -*- coding: utf-8 -*-

import sys
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import time
import requests
import json
reload(sys)
sys.setdefaultencoding('utf-8')

def get_temporary_aws_key():
    aws_key_info = json.loads(requests.get("임시 권한 가져올 서버").content)
    return aws_key_info.get('AccessKeyId', ''), aws_key_info.get('SecretAccessKey', ''), aws_key_info.get('Token', '')

access_key, secret_key, session_token = get_temporary_aws_key()
#pipeline = "[{'$match': {'start_day': '20200919'}}, {'$match': {'_id':{'$gte': " + start_id + ", '$lt': " + end_id  + "}}}]"
s = time.time()
pipeline = "{'$match': {'start_day': '20200919'}}"
spark = SparkSession.builder \
    .appName("myApp") \
    .getOrCreate()

spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider')
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.session.token', session_token)

df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
               .option("spark.mongodb.input.partitioner","MongoSamplePartitioner") \
               .option("spark.mongodb.input.partitionerOptions.partitionKey", "_id") \
               .option("spark.mongodb.input.partitionerOptions.partitionSizeMB", 64) \
               .option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 10) \
               .option("spark.mongodb.input.uri", "mongodb://mongoid:mongopw@x.x.x.x:27017/db.collection?authSource=admin") \
               .option("pipeline", pipeline).load()

df.write.format('com.databricks.spark.csv').option("header", "true") \
        .option("delimiter", ",") \
        .mode("overwrite") \
        .save('s3a://bucket/folder1/folder2')

test.sh jar을 여기에 추가해줬다.

#!/bin/bash

./spark-submit --master spark://x.x.x.x:7077 --driver-memory 2g --total-executor-cores 5 --executor-cores 1 
--executor-memory 4g --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375 ./test.py

근데 또 문제가 있다 ㅋㅋㅋ... 잘 저장하다가 마지막 task에서 거의 멈춘다.

아나 스파크는 뭐 한번 제대로 돌때까지 방심을 할수가 없다.. 마지막 잡에서 거의 멈추듯이 10분정도 흐르다가 시간이 오래되어서 ssh 커넥션이 끊어져 버린다.(그리고 당연히 돌리고 있던 잡도 죽고.) 또 찾아봤는데 이것도 자주 발생하는 현상이다.


https://stackoverflow.com/questions/31817143/spark-write-parquet-to-s3-the-last-task-takes-forever

https://stackoverflow.com/questions/42822483/extremely-slow-s3-write-times-from-emr-spark


후.. 저기 나온대로 

spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2") 를 추가해주고, coalesce(100) 함수로 파티션을 100개로 나눈후, 내 커넥션이 죽어도 프로그램이 계속 돌도록 nohup으로 실행시켰더니 이것들이 먹힌건지, 아니면 애초에 내가 기다림이 부족했었는지 33분정도가 걸려서 끝난다. 위에 941/942 진입시 15분이 걸렸는데, 저 마지막 잡 하나에서 18분이 더 걸려서야 끝났다.


참고로 내가 돌리는 데이터는 s3에 저장했을때 45기가정도 나오고, 각각 아이템이 35개의 필드를 가지고 있는 4000만개정도의 아이템이다. 스파크 서버 성능은 워커 하나가 돌고있고 10개의 코어, 40g의 메모리를 가지고 있고, 클라이언트는 각 익스큐터가 코어 2개씩 5개(총 10개), 메모리는 각각 4기가를 갖고 돌리고있다.


스파크는 분명이 빠르고 좋은 오픈소스임에는 맞는것같은데 프로그램이 너무 예민하다그래야하나.. 짜증난다.



1. 몽고에서 데이터를 가져와서 작업

2. aws temporary credential을 가져와서 설정 후 

3. s3의 bucket/folder1/folder2 위치에 저장


% 추가. spark properties를 설정할때에 https://qkqhxla1.tistory.com/1006 첫 글에서는 

conf = SparkConf().setAll([('spark.driver.extraClassPath', '/Users/go/Downloads/mysql-connector-java-5.1.47-bin.jar')])

요런 식으로 소스안에서 추가를 했고, 위의 소스에서는 대부분 spark-submit할때 추가하긴 했지만 소스에 추가한것도 일부 있다. 사실 두가지 방법이 동일하게 동작하고 그냥 코딩 컨벤션 차이인줄 알았는데

https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties 를 읽어보면 이런 글이 있다.


Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options;

Spark properties는 두가지 방법으로 전달할 수 있다. spark.driver.memory를 설정하는것처럼 프로그래밍시에 세팅해주는 경우와 spark-submit으로 전달하는 경우, 이렇게 두가지가 있는데 프로그래밍으로 runtime에 SparkConf를 통해서 전달해주는 방법은 제대로 반영이 안 될수도 있다. 그러니까 properties을 전달할때는 spark-submit을 실행시킬때 command line이나 configuration file으로 전달하는 방법이 추천된다.