make_parquet 파이프라인에서는 1~5 단계의 task가 존재한다.
1. start
2. check.done
3. to.parquet
4. make.done
5. end
1. start : DAG의 시작을 나타내는 task
EmptyOperator을 생성하는 get_emp 함수를 정의. id와 rule 두개의 매개변수를 받으며, rule의 기본값은 all_success 이다.
def gen_emp(id, rule="all_success"):
op = EmptyOperator(task_id=id, trigger_rule=rule)
return op
task_start = gen_emp('start') #태스크의 ID를 'start'로 설정
함수를 분리한 이유는, 여러 task를 생성해야 할 때 동일한 코드를 반복하지 않고 함수 호출로 간단히 처리하여 쉽게 정의 및 관리할 수 있게 하기 위함이다.
2. check.done
task_check = BashOperator(
task_id="check.done",
bash_command="bash {{var.value.CHECK_SH}} {{var.value.DONE_PATH}}/import_done/{{ds_nodash}}/_DONE"
)
이 태스크는 _DONE 파일의 여부를 통해 import_db 태스크들이 잘 실행되었는지 확인한다. ( csv 파일이 temp_usage 테이블에 제대로 로드되고 모든 상위 task가 성공하면 _DONE 파일이 생성된다.)
3. to.parquet
task_parquet = BashOperator(
task_id="to.parquet",
bash_command="""
echo "to.parquet"
READ_PATH=~/data/csv/{{ds_nodash}}/csv.csv
SAVE_PATH=~/data/parquet/
python ~/airflow/py/csv2parquet.py $READ_PATH $SAVE_PATH
"""
)
csv2parquet.py
import pandas as pd
import sys
READ_PATH = sys.argv[1]
SAVE_PATH = sys.argv[2]
#CSV 파일을 읽어 DataFrame 생성
df = pd.read_csv(READ_PATH,
on_bad_lines='skip', #잘못된 라인 건너뜀
names=['dt', 'cmd', 'cnt'], #열 이름 지정
encoding = "latin")
df['dt'] = df['dt'].str.replace('^', '')
df['cmd'] = df['cmd'].str.replace('^', '')
df['cnt'] = df['cnt'].str.replace('^', '')
#'coerce'는 변환할 수 없는 데이터를 만나면 그 값을 강제로 NaN으로 바군다
df['cnt'] = pd.to_numeric(df['cnt'], errors='coerce') #cnt 열을 숫자형식으로 변환하고, 변환할 수 없는 값은 'NaN' 처리
# NaN 값을 원하는 방식으로 처리합니다. (예: 0으로 채우기)
df['cnt'] = df['cnt'].fillna(0).astype(int) #NaN 값을 0으로 채우고, 데이터 타입을 정수형으로 변환
#DataFrame 을 Parquet 파일로 저장
df.to_parquet(f'{SAVE_PATH}', partition_cols=['dt']) #'dt'열을 기준으로 데이터를 분할하여 저장
1. CSV 파일을 읽어 DataFrame 생성한다. 이때 잘못된 라인을 건너뛰고 열 이름과, 인코딩 형식을 지정해줬다.
2. cnt 열을 숫자형식으로 변환하고, 에러를 발생하는 데이터는 NaN 값으로 강제로 바꾼다.
3. cnt 열의 NaN 값을 0으로 채우고, 데이터 타입을 정수형으로 변환한다
4. DataFrame을 Parquet 파일로 저장한 후, dt열을 기준으로 데이터를 분할(파티셔닝)하여 저장한다.
4. make.done
task_done = BashOperator(
task_id="make.done",
bash_command="""
DONE_PATH={{var.value.DONE_PATH}}/parquet_done/{{ds_nodash}}
mkdir -p $DONE_PATH
touch $DONE_PATH/_DONE
figlet "make.done.end"
"""
)
이 태스크는 BashOperator를 사용하여 특정 경로에 _DONE 파일을 생성한다.
1, DONE_PATH 변수에 경로를 할당
2. DONE_PATH 변수에 저장된 디렉터리 생성
3. DONE_PATH 에 _DONE 파일 생성
trigger rule 디폴트 값이 all_success이기 때문에, 상위 task들이 제대로 실행되면 _DONE 파일을 남긴다.
5. end : DAG의 끝을 나타내는 task
task_end = gen_emp('end', 'all_done') #모든 이전 태스크가 완료되면 트리거
start에서는 디폴트 값이 "all_success" 이기 때문에 따로 값을 설정하지 않았다.
end에서는 모든 이전 태스크가 완료되었는지 확인하기 위해 trigger_rule = "all_done" 으로 설정했다.
전체코드
make_parquet DAG는 CSV 파일을 읽어 DataFrame을 생성하고, parquet 파일로 저장한다. 이 중 dt를 기준으로 파티셔닝하는 작업을 수행한다.
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.models import Variable
def gen_emp(id, rule="all_success"):
op = EmptyOperator(task_id=id, trigger_rule=rule)
return op
with DAG(
'make_parquet',
default_args={
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(seconds=3)
},
max_active_runs=1,
max_active_tasks=2,
description='history log 2 mysql db',
schedule="10 4 * * *",
start_date=datetime(2024, 7, 10),
catchup=True,
tags=['simple', 'bash', 'etl', 'shop', "db", "history", "parquet"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
task_check = BashOperator(
task_id="check.done",
bash_command="bash {{var.value.CHECK_SH}} {{var.value.DONE_PATH}}/import_done/{{ds_nodash}}/_DONE"
)
task_parquet = BashOperator(
task_id="to.parquet",
bash_command="""
echo "to.parquet"
READ_PATH=~/data/csv/{{ds_nodash}}/csv.csv
SAVE_PATH=~/data/parquet/
python ~/airflow/py/csv2parquet.py $READ_PATH $SAVE_PATH
"""
)
task_done = BashOperator(
task_id="make.done",
bash_command="""
DONE_PATH={{var.value.DONE_PATH}}/parquet_done/{{ds_nodash}}
mkdir -p $DONE_PATH
touch $DONE_PATH/_DONE
figlet "make.done.end"
"""
)
task_start = gen_emp('start')
task_end = gen_emp('end', 'all_done')
task_start >> task_check >> task_parquet >> task_done >> task_end
'Data Engineering > AIRFLOW' 카테고리의 다른 글
[실습] movdata : 영화데이터 수집 프로그램 (0) | 2024.08.20 |
---|---|
[AIRFLOW] movie pipeline (0) | 2024.08.07 |
[AIRFLOW] import_db 파이프라인 만들기 실습 (1) | 2024.07.26 |
[AIRFLOW] simple_bash 파이프라인 만들기 실습 (3) | 2024.07.24 |
[AIRFLOW] DAG 코드 기본 구조 (Operator, trigger rule) (0) | 2024.07.24 |