data engineering

airflow 삽질

qkqhxla1 2020. 10. 20. 20:46

몇일전에 spark etl을 제작했고, 이걸 배치잡으로 돌린다는 글을 적었었다. link

도커로 하는건 아직 뭐가 문제인지 이유를 못찾았고, 이유를 못찾았다는 이유로 그냥 크론탭에서 계속 돌아가도록 놔둘수 없기에 생각해보다가 airflow에서 돌리기로했다.


처음엔 rancher에서 kubernetes airflow app을 만들어서 거기다가 내 전용으로 따로 올려볼까 하다가 idc에서 돌아가는 airflow서버가 하나 있어서 거기에 올리기로 했다. kubenetes airflow로 올리려다가 만 이유는 dag파일을 커스터마이징하기가 힘들기 때문이다.

bitnami의 airflow가 계속 최신에도 업그레이드되고 있어서 이걸 쓰려고 했는데 (bitnami airflow) dag 추가하는부분을 찾아보면 3가지 방법이 있다.


1. 로컬에서 helm chart로 커스터마이징(내가 아직 쿠버네티스 cli에 익숙하지 않아서 못함)

2. config map에 소스코드 저장해서 쓰는방법(spark를 통째로 올려야 하는데 못 올린다.)

3. git을 참조하도록 하는 방법(마찬가지로 git에 spark를 통째로 올려야하는데 효과적이지 못하다.)


1번이 어떻게든 잘 말아서 하게끔 효과적일것 같은데 1번 하려다가 kubernetes cli 등 삽질해볼게 두배는 더 많아져서 포기하고 idc에서 돌아가는 airflow에서 돌리기로 했다. 

참고로 airflow가 돌아가는 pod에 직접 접속해보면 권한이 없는 일반 유저라서 apt-get install이나 wget사용이 불가능하다. 내가 만들어서 올릴 수 없다는 이야기이다. 내가 만든 어플리케이션인데 root로 권한 업을 할수 없는데, 이것에 대해서는 여기에 이유가 나와있다. -> 보안 이슈나 플랫폼 제한적으로 사용하려고 이렇다고 한다.


그럼 이제 airflow 에서 스파크 관련 Operator을 찾아서 사용법을 알아야 하는데 이 글을 읽고 그냥 BashOperator로 자동화해줬다. 애초에 etl플로우 자체가 좀 복잡했으면 쓸 거리가 많았을텐데 하나의 파이프라인만 가지고 있어서 별로 이슈거리도 없었다.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG('sdfsfsdfsdfsdf-backup',
        description='sdfsdfsdfsdfsdf backup',
        schedule_interval='0 8 * * *',
        start_date=datetime(2017, 3, 20),
        catchup=False
        )

a_backup = BashOperator(
    task_id='sdfsfsdfs_backup',
    bash_command='cd /home/qkqhxla1/spark-3.0.1-bin-hadoop3.2/bin; ./a_backup.sh ',
    retries=0,
    dag=dag)

b_backup = BashOperator(
    task_id='sdfsdfsdfwf_backup',
    bash_command='cd /home/qkqhxla1/spark-3.0.1-bin-hadoop3.2/bin; ./b_backup.sh ',
    retries=0,
    dag=dag)

a_backup >> b_backup

짜면서 schedule_interval, start_date, catchup에 대해 좀더 주의하게 되었는데(여태까지는 별신경 안썼음.) 

start_date는 말 그대로 시작할 날짜인데 schedule_interval을 아무리 현재 시간으로 해놔도 start_date가 미래이면 시작하지 않는다.(실수로 처음에 미래로 설정해놨음) 거기에 catchup은 start_date를 예로 1주일 전으로 해놨을 경우 catchup이 True이면 혼자서 일주일 전부터 오늘치까지 일곱 번을 실행한다. 그런게 필요없으면 당연히 False로 설정해놓고 시작하면 된다.


또 BashOperator의 경우 bash_command를 보면 끝에 스페이스가 한칸 들어가있다. 스페이스를 안 넣으면 에러가 났는데 요 스택오버플로우 글을 참조하자. airflow의 버그란다.


airflow는 한번만 삽질해보면서 만들면 사용법이 쉬워서 다음 잡을 만들때 새 로직이나 개념이라도 큰 부담 없이 만들수있는것같다.