1. AIR FLOW DAG 코드 기본 구조
(1) Import Statements : 필요한 모듈 import
from airflow import DAG #Airflow에서 DAG(Directed Acyclic Graph)를 정의하기 위해 사용되는 클래스
from airflow.operators.bash import BashOperator #Bash 명령을 실행하기 위한 Operator
from airflow.operators.empty import EmptyOperator #아무 작업도 수행하지 않는, 주로 흐름 제어에 사용되는 Operator
from datetime import datetime, timedelta #날짜와 시간을 다루기 위한 Python의 내장 모듈
(2) DAG 선언 및 Default Argument 작성 (start_date, end_date, depends_on_past, retries, retry_relay 등)
default_args = {
'owner': 'airflow',
'depends_on_past': False, #이전 실행의 성공 여부에 관계없이 태스크를 실행할지를 결정. False로 설정하면 이전 실행 결과에 관계없이 실행합
'start_date': days_ago(2), #DAG의 시작 날짜, 필수입력값
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, #태스크 실패 시 재시도 횟수를 설정
'retry_delay': timedelta(minutes=5), #태스크 재시도 간의 지연 시간
}
dag = DAG(
'simple_bash',
default_args=default_args,
#schedule=timedelta(days=1),
schedule='10 4 * * *', #DAG의 스케줄을 cron 표현식으로 정의. 위 표현식은 매일 오전 4시 10분에 실행을 의미.
catchup=True, #시작 날짜 이후 실행되지 않은 DAG 실행을 모두 실행
description='make history count.log', #설명 추가
tags=['simple', 'bash'], #DAG에 태그를 추가하여 DAG를 분류
)
cron 표현식은 수업시간에 간단하게 다뤘다. https://zamezzz.tistory.com/197 참고.
# ┌─────── minute (0 - 59)
# │ ┌────── hour (0 - 23)
# │ │ ┌───── day of the month (1 - 31)
# │ │ │ ┌───── month (1 - 12)
# │ │ │ │ ┌──── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │ 7 is also Sunday on some systems)
# * * * * *
출처: https://gngsn.tistory.com/263
(3) Task Definition
DAG이 실제로 작업을 수행하기 위한 태스크 정의. 하나 또는 여러개의 Task를 연결하여 DAG을 생성한다.
2. Operator
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/index.html
Using Operators — Airflow Documentation
airflow.apache.org
오퍼레이터는 하나의 Task를 정의하는데 사용된다. 오퍼레이터를 실행시키면 Task가 된다. 하나의 테스크들이 모여 하나의 DAG을 구성하고, DAG들이 모여 workflow가 이루어지기 때문에, operator는 DAG를 구성하기 위한 가장 작은 단위이다.
1. EmptyOperator: 아무 작업을 하지 않는 Operator이다. 주로 DAG의 시작과 종료를 나타내거나, 다른 작업들을 그룹화 하는데 사용한다.
2. Bash Operator: Bash Shell 스크립트를 실행하는 Operator. 리눅스 명령어 실행도 가능하며 프로그램 실행도 가능하다.
그 외에도 PythonOperator, EmailOperator, MysqlOperator, Sensor 등이 있다.
참고: https://www.bearpooh.com/152
3. Trigger rule
상위 task가 여러개의 하나의 하위 task로 연결이 되는 구조에서 하위 task의 실행 조건을 설정하는 방법
all_success (기본값) | 상위 task 모두 성공하면 실행 |
all_failed | 상위 task 모두 실패하면 실행 |
all_done | 상위 task가 모두 수행되면 실행 (성공, 실패여부 관계x) |
all_skipped | 상위 task가 모두 skipped 상태면 실행 |
one_failed | 상위 task 중 하나 이상 실패하면 실행 (모든 상위 task 완료 기다리지 않음) |
one_success | 상위 task 중 하나 이상 성공하면 실행 (모든 상위 task 완료 기다리지 않음) |
one_done | 상위 task 중 하나 이상 수행되면 실행 |
none_failed | 상위 task 중 실패가 없는 경우 실행 (성공 혹은 skipped) |
none_failed_min_one_success | 상위 task 중 실패가 없고 성공한 task가 적어도 1개 이상이면 실행 |
none_skipped | skip된 상위 task 없으면 실행 (성공, 실패여부 관계x) |
always | 언제나 실행 |
참고: https://letzgorats.tistory.com/entry/Airflow-Trigger-Rule
'Data Engineering > AIRFLOW' 카테고리의 다른 글
[AIRFLOW] make_parquet 파이프라인 만들기 실습 (0) | 2024.07.26 |
---|---|
[AIRFLOW] import_db 파이프라인 만들기 실습 (1) | 2024.07.26 |
[AIRFLOW] simple_bash 파이프라인 만들기 실습 (3) | 2024.07.24 |
[AIRFLOW] jinja template (1) | 2024.07.24 |
[AIRFLOW] Airflow 기본개념, 설치 및 실행 (0) | 2024.07.22 |