data engineering

pyspark when does python used in spark 'worker'

qkqhxla1 2021. 10. 14. 20:50

apache spark를 쓰는데 리모트 환경에서 submit하는 환경이면 주의해야할게 상당히 많았다. 예로
1. spark submit하는곳과 spark server의 버전이 같아야 한다. 메이저 버전이 같으면 호환되는 경우도 있는데 호환 안되는 경우도 많다..
2. spark submit을 리모트 서버의 도커에서 하는 경우 네트워크를 호스트와 동일하게 해주거나, 도커의 포트 설정을 추가로 해줘야 한다. 이전글 : https://qkqhxla1.tistory.com/1138

등이 있었다. 위의 방법들이 그닥 선호되는 방법은 아니어서 최근에는 일반적인 방법으로 사용해보려고 airflow를 세팅해서 그 안에 스파크를 submit용으로 설치해서 리모트 스파크 서버에 호출해서 쓰는 방법으로 사용하고 있다.

그리고 코딩하다 스파크 리모트 환경시 또다른 주의사항을 하나 더 찾아냈다. 이번에는 평소 일정하게 세팅해서 사용하던 도커 환경이 아니라 bitnami가 미리 세팅한 airflow환경에서 돌리기에 발견할수 있었다. 현재 환경세팅은 아래와 같다.

airflow내에서 spark가 사용하는 python location과 버전이, spark worker에서 도는 python location과 버전이 서로 다르다. 여태까지 코드를 짜면서 스파크 버전만 주로 이슈가 되었기에, 스파크 버전만 맞추고 파이썬 버전, 설치경로는 딱히 안 맞췄었는데..(이전에는 airflow가 아니라 도커로 돌려서 환경 정보가 전부 동일했었다.) 이번에 이와 관련된 이슈가 터졌다. 아래 코드를 보자.

코드 1.

import requests
from datetime import datetime
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

......

df.coalesce(100).select(*column_list) \
    .withColumn('title', regexp_replace('title', '\t|\r|\n', '')) \
    .write \

코드 2.

import requests
from datetime import datetime
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def reformat_createat(CREATE_AT):
    return '{:%Y-%m-%d %H:%M}'.format(datetime.strptime(CREATE_AT, '%Y%m%d%H%M%S'))

.......

df.coalesce(100).select(*column_list) \
    .withColumn('title', regexp_replace('title', '\t|\r|\n', '')) \
    .withColumn('CREATE_AT', reformat_createat(df.CREATE_AT)) \

코드 1은 잘 작동하고, 코드 2에서 단순히 udf를 추가했을뿐인데 에러가 발생했다. 에러도 로직관련 에러가 아닌 환경 에러였다.

--------------------------------------------------------------

[2021-10-12 06:49:22,675] {subprocess.py:78} INFO - 21/10/12 06:49:22 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, 10.42.11.38, executor 1): java.io.IOException: Cannot run program "/opt/bitnami/airflow/venv/bin/python": error=2, No such file or directory
--------------------------------------------------------------


/opt/bitnami/airflow/venv/bin/python 경로에 아무 파일도 없다고 에러가 나오는데 이건 현재 airflow내의 파이썬 경로이다. 단순히 udf를 쓴다는 이유만으로 있던 파이썬이 사라져 버린다??? 는건 이해가 가질 않았다. 그리고 파이썬이 사라진것도 아니고 저 위치에 777권한으로 존재했다.

결국 이것저것 삽질하다 알아낸 사실은 pyspark의 실행 로직 중에 worker에서 파이썬을 사용할 일이 없으면 드라이버 환경의 파이썬을 그냥 사용하고, worker에서 파이썬을 사용할 일이 있으면 코드 처음부터 worker환경의 파이썬을 사용하는것 같다는 것이다.
드라이버는 위의 프로그램이 시작되는 환경이고,(여기서는 airflow) worker는 spark server내부에 있다. 현재 PYSPARK_PYTHON=/opt/bitnami/airflow/venv/bin/python 로 설정되어 있는데 이건 airflow내의 파이썬 경로이다.
그리고 spark worker의 파이썬 경로는 /opt/bitnami/python/bin/python 이다.

https://spark.apache.org/docs/latest/configuration.html 공식문서를 확인해보니 driver의 파이썬 path와 driver, worker가 사용하는 파이썬 path를 설정해줄수있었다. 그래서 아래와 같이 재설정해줘봤다.

export PYSPARK_PYTHON=/opt/bitnami/python/bin/python 
export PYSPARK_DRIVER_PYTHON=/opt/bitnami/airflow/venv/bin/python

이러면 driver(airflow에 있는)의 파이썬 경로는 원래 경로로, worker의 파이썬 경로는 spark worker의 파이썬 경로로 설정될것이다. 중간에 여러번의 삽질이 더 있었지만 생략하고 실행해봤더니..

추측이 맞았다. 문제는 파이썬 버전이 3.8, 3.6버전으로 서로 달라서.. airflow나 spark worker쪽 둘중하나는 파이썬 버전을 바꿔서 서로 맞춰줘야 하는것 같다.(그리고 모듈 설치도 같이..)

왜 일반적으로 리모트 환경으로 세팅을 잘 안하고, airflow와 스파크를 연동시에 동일한 서버 내에 세팅해주는지 한번더 깨달을수 있었다.. 주의하지 않으면 이런 자잘한 이슈가 하나씩 계속 터지는구나..