data engineering

apache airflow tutorial.

qkqhxla1 2019. 3. 24. 16:03

기본적으로 

https://airflow.apache.org/start.html

https://aldente0630.github.io/data-engineering/2018/06/17/developing-workflows-with-apache-airflow.html

https://bcho.tistory.com/1184


에서 많은 정보를 얻은 후 필요한 내용 정리와, 그리고 저 링크들에 없는, 얻은부분을 더해서 적었습니다. 위의 글을 보면서 삽질했던점도 추가했습니다. 


airflow란 작업흐름을 자동화해주는 도구입니다. 일반적으로 crontab으로 실행시키고.. 이쯤이면 끝났겠지? 싶었을때 다음 작업을 돌리곤 하는데, 이것보다 더 정교한 처리가 필요한 경우에 씁니다. (더 자세한 내용은 위 블로그에 잘 나와있으니 생략)


airflow는 dag라는 그래프로 구성됩니다. dag는 하나의 작업 흐름 단위라고 생각하시면 됩니다. dag는 task로 구성됩니다. task는 작업 하나하나입니다. 파이썬 코드를 실행시킨다거나 하는요.

위 블로그에서 예시로 사진을 가져왔습니다. 위 사진은 dag하나입니다. run_this_first등이 각각의 task입니다.


airflow의 기본적인 설치는 다음과 같습니다.

sudo pip install airflow(또는 sudo pip install apache-airflow) 로 설치합니다.

RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes 에러가 뜰수 있는데 이경우 
https://stackoverflow.com/questions/52203441/error-while-install-airflow-by-default-one-of-airflows-dependencies-installs-a 를 참조합니다.


이후 반드시 airflow initdb 명령어로 airflow에서 쓰는 db를 초기화해줍니다.

안하면 나중에 에러뜨면서 진행이 안될겁니다.


airflow가 사용하는 디렉터리는 기본적으로 ~/airflow/경로에 있습니다. ~/airflow/안에는 설정파일이나 dag 파일의 위치가 들어있는 airflow.cfg나, 기본적으로 dag들을 저장해두는 dags디렉터리(이건 처음 설치시 없습니다. 만들어야 함.), airflow db인 airflow.db가 있습니다.


airflow는 기본적인 web ui를 제공해줍니다.

airflow webserver명령어로 실행시켜줍니다. 기본적으로 http://0.0.0.0:8080로 실행되며 -p옵션을 줘서 포트는 바꿀수 있습니다. 기본 실행시 아래와 같은 ui가 나옵니다.

airflow web server와 별개로 airflow의 스케쥴러라는게 있습니다. 스케쥴러는 실행시킬 task를 실행시키도록 executor로 보낸다고 합니다. 백그라운드로 돌면서 주기적으로 관측한다고 합니다. 공식 링크

위에서 실행시켰던 웹서버와 별개로 scheduler도 실행해줘야 합니다. airflow scheduler 명령어로 스케쥴러를 실행시키면 웹서버처럼 계속 도는것을 확인할수 있습니다. 스케쥴러를 실행시키지 않으면 잡이 돌지 않습니다.


airflow코드를 다루기 전에 웹서버와 스케쥴러를 실행시켜 놔야 합니다.


작업 흐름의 단위인 dag하나를 만들어서 실행시켜 보겠습니다. dag는 위에서 언급하였던 기본 폴더인 ~/airflow/dags폴더 안에 만들도록 하겠습니다. tutorial.py라는 파일을 ~/airflow/dags안에 만든 후 아래와 같이 소스를 채웁니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

def print_hello():
    return 'Hello world!'

# DAG명은 first_test, interval은 cron과 같다고 한다.
dag = DAG('first_test', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

# 아래는 테스크 하나. 맨 아래의 dag=dag에서 해당 테스크가 어떤 dag에 속하는지를 정해준다.
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

# t2먼저 실행, 그후 hello_operator를 실행시킬거다.
t2 >> hello_operator

소스에 관한 설명은 위 주석으로 충분하고, 맨 아래에 t2 task먼저 실행시키고 이후에 hello_operator를 실행시킨다는것만 알면 된다. 더 자세한 정보나 다른 operator(task)는 구글링하면 다 나옴.


소스를 저장하고 web ui에서 새로고침을 계속 하다보면 아래처럼 web ui에 dag명이 나옵니다.

오른쪽에 새로고침 아이콘을 클릭하면서 제대로 뜰때까지 기다려줍니다.

제대로 떴으면 빨간색 원의 왼쪽부터 눌러줍니다. 

왼쪽에 On/Off부분이 뭔 의민지 좀 의아했었는데 web ui의 윗부분에 i에 마우스를 올리면 pause를 toggle할수 있는 버튼이라고 합니다. 이부분을 On으로 바꾸지 않고 trigger를 해줘도 잡은 실행이 안됩니다.


On이면 unpause상태고, Off면 pause상태입니다. 처음엔 거꾸로 On이 pause인줄 알아서.. 상식적으로 왜 pause일때만 trigger(실행)을 할수있을까..? 했는데 아니었습니다. 아직도 헷갈리시는 분들은 airflow unpause example_xcom명령어로 example_xcom을 unpause해보세요. web ui에 On으로 바뀌는걸 확인할수 있습니다. 


그러니까.. unpause -> trigger ->후 그래프를 보시면 그래프에 현재 상태가 나옵니다.

조금만 새로고침하면서 기다리면 이와 같이 테스크들이 성공적으로 실행되었음을 녹색 상자로 알수 있습니다. ->로 순서도 대충 알수 있네요.


또 웹 ui의 dag(위위사진)의 graph view가 있는 탭에서 오른쪽으로 보다보면 Logs로 로그를 볼수 있습니다.

맨 아래에 UTC 6시 43분에 트리거를 시켰고, 위로 올라가면서 hello_task라는 task를 실행시켰고, hello_task를 성공했고, 그 다음으로 sleep라는 task 실행, 성공, 그리고 맨 위로 그래프 탭을 눌렀으므로 anonymous가 그래프 이벤트를 호출했다는것을 알수 있습니다. 

로그는 여기 말고도 Graph에서 Tree View탭에서 각각의 실행되었던 테스크를 눌러서 더 자세한 로그를 볼 수도 있습니다. 


현재까지 한번 딱 돌려볼 말그대로 튜토리얼적인 내용을 정리했고, 조금 더 자세히 들어가려면 맨 위에 링크 걸어놓은거에 더 많이 있으니 읽어보시는게 좋습니다.