Data Engineering/AIRFLOW

[AIRFLOW] movie pipeline

qqprty 2024. 8. 7. 10:46

파이프라인 흐름

1. start

 

2. branch.op

tmp/test_parquet 하위 경로에 파일 확인

- 존재하면 rm.dir task 실행

- 존재하지 않으면 get.start와 echo.task 실행

 

3. get.start (trigger_rule = all_done)

 

4. get.data (multiMovieYn, repNationCd)

4-1. multi.y : 독립영화

4-2. multi.n : 상업영화

4-3. nation.k : 한국영화

4-4. nation.f : 외국영화

 

5. get.end 

6. save.data

7. end


1. branch.op

    branch_op = BranchPythonOperator(
            task_id="branch.op",
            python_callable=branch_fun
            )


BranchPythonOperator
는 Airflow에서 작업 흐름의 경로를 분기하는 데 사용됨.

python_callable은 BranchPythonOperator가 호출할 Python 함수를 지정함.

    def branch_fun(ds_nodash):
        import os
        home_dir = os.path.expanduser("~") #홈 디렉토리 경로를 반환
        path = os.path.join(home_dir, f"tmp/test_parquet/load_dt={ds_nodash}") #여러 경로 요소를 결합하여 하나의 경로를 생성
        if os.path.exists(path):
            return rm_dir.task_id
        else:
            return "get.start", "echo.task"
  • 현재 사용자의 홈 디렉토리에 tmp/test_parquet/load_dt={ds_nodash} 경로가 있는지 확인.
  • 경로가 존재하면 rm_dir 작업의 task_id를 반환하고, 존재하지 않으면 "get.start"와 "echo.task"를 반환

2. Get data

common_get_data

    def common_get_data(ds_nodash, url_param):
        from mov.api.call import save2df
        df = save2df(load_dt=ds_nodash, url_param=url_param)

        print(df[['movieCd', 'movieNm']].head(5))
		
        # url_param 딕셔너리의 키와 값을 데이터프레임에 추가
        for k, v in url_param.items():
            df[k] = v

        # partition_cols에 'load_dt'와 url_param의 키를 포함
        p_cols = ['load_dt'] + list(url_param.keys())
        df.to_parquet('~/tmp/test_parquet',
                partition_cols=p_cols
                # partition_cols=['load_dt', 'movieKey']
        )

save2df

def save2df(load_dt='20120101', url_param={}):
    df=list2df(load_dt, url_param)
    # df에 load_dt 칼럼 추가 (조회 일자 YYYYMMDD 형식)
    # 아래 파일 저장시 load_dt 기준으로 파티셔닝
    df['load_dt'] = load_dt
    print(df.head(5))
    df.to_parquet('~/tmp/test_parquet', partition_cols=['load_dt'])
    return df

1) get.data - multiMovieYn

    multi_y = PythonVirtualenvOperator(
        task_id='multi.y',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        op_kwargs={
            "url_param": {"multiMovieYn": "Y"},
        },
    )

    multi_n = PythonVirtualenvOperator(
        task_id='multi.n',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        op_kwargs={
            "url_param": {"multiMovieYn": "N"}
        }
    )

 

requirements 매개변수는 가상 환경에서 필요한 패키지를 지정한다.

common_get_data 함수가 https://github.com/oddsummer56/movie.git@0.3/api에 있는 패키지를 필요로 하므로, 이 패키지를 가상 환경에 설치하는 과정을 가진다.

 

op_kwargs 

 

  • Python 함수(python_callable)에 전달할 추가 매개변수를 딕셔너리 형태로 지정
  • common_get_data 함수에 전달될 매개변수로, url_param 딕셔너리를 설정
  • multi_y 작업에서는 url_param에 {"multiMovieYn": "Y"} 값을 전달 (독립영화)
  • multi_n 작업에서는 url_param에 {"multiMovieYn": "N"} 값을 전달 (상업영화)

 

 

2) get.data - repNationCd

    nation_k = PythonVirtualenvOperator(
        task_id='nation.k',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        #op_args=["{{ds_nodash}}", "{{ds}}"],
        op_kwargs={
            "url_param": {"repNationCd": "K"}
        }
    )

    nation_f = PythonVirtualenvOperator(
        task_id='nation.f',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        #op_args=["{{ds_nodash}}", "{{ds}}"],
        op_kwargs={
            "url_param": {"repNationCd": "F"}
        }
    )
  • nation_k 작업에서는 url_param에 {"repNationCd": "K"} 값을 전달 (한국영화)
  • nation_f 작업에서는 url_param에 {"repNationCd": "F"} 값을 전달 (외국영화)

3. Save data

    task_savedata = PythonVirtualenvOperator(
            task_id="save.data",
            python_callable=save_data,
            requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
            system_site_packages=False,
            trigger_rule = "one_success",
    )
    def save_data(ds_nodash):
        from mov.api.call import apply_type2df

        df = apply_type2df(load_dt=ds_nodash)
        print(df.head(10))
        print("*"*33)
        print(df.dtypes)

        # 개봉일 기준 그룹핑 누적 관개수 합
        print("개봉일 기준 그룹핑 누적 관객수 합")
        g = df.groupby('openDt')  # 'openDt' 컬럼을 기준으로 그룹핑
        sum_df = g.agg({'audiCnt': 'sum'}).reset_index()  # 그룹별 'audiCnt' 컬럼의 합계를 계산하고, 인덱스를 초기화
        print(sum_df)

 

+) 가상환경 브랜치 복사해오기

pip install git+<git url>@<branch>

mov_agg (transform)

강사님 코드

 

특정 날짜(load_dt)의 데이터를 읽고, 다양한 조건을 만족하는 데이터를 필터링하고, 데이터 타입을 변환 및 병합하는 과정을 수행함

import pandas as pd

def merge(load_dt="20240724"):
    read_df = pd.read_parquet('~/tmp/test_parquet')
    cols = ['movieCd', #영화의 대표코드를 출력합니다.
       'movieNm', #영화명(국문)을 출력합니다.
       #'openDt', #영화의 개봉일을 출력합니다.
       #'audiCnt', #해당일의 관객수를 출력합니다.
       'load_dt', # 입수일자
       'multiMovieYn', #다양성영화 유무
       'repNationCd', #한국외국영화 유무
       ]
    df = read_df[cols]
    dw = df[(df['movieCd'] == '20235974') & (df['load_dt'] == int(load_dt))].copy() #날짜조건 load_dt 인자 받기 print(dw) 
    print(dw.dtypes)

    # 카테고리 타입 -> Object
    dw['load_dt'] = dw['load_dt'].astype('object')
    dw['multiMovieYn'] = dw['multiMovieYn'].astype('object')
    dw['repNationCd'] = dw['repNationCd'].astype('object')
    
    # NaN 값 unknown 으로 변경
    dw['multiMovieYn'] = dw['multiMovieYn'].fillna('unknown')
    dw['repNationCd'] = dw['repNationCd'].fillna('unknown')
    print(dw.dtypes)
    print(dw)
  
    # 머지 
    u_mul = dw[dw['multiMovieYn'] == 'unknown']
    u_nat = dw[dw['repNationCd'] == 'unknown']
    m_df = pd.merge(u_mul, u_nat, on='movieCd', suffixes=('_m', '_n')) # 필터링된 두 데이터프레임을 'movieCd'를 기준으로 병합
    
    print("머지 DF")
    print(m_df.head(4))
    return m_df
    

merge()

전체코드

DAG 코드

from datetime import datetime, timedelta
from textwrap import dedent
from pprint import pprint

from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.models import Variable
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator,
    PythonVirtualenvOperator,
    is_venv_installed,
)

def gen_emp(id, rule="all_success"):
    op = EmptyOperator(task_id=id, trigger_rule=rule)
    return op

with DAG(
        'movie',
    default_args={
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(seconds=3)
    },
    max_active_runs=1,
    max_active_tasks=2,
    description='movie',
    schedule="10 4 * * *",
    start_date=datetime(2024, 7, 10),
    catchup=True,
    tags=['movie'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators


    def common_get_data(ds_nodash, url_param):
        from mov.api.call import save2df
        df = save2df(load_dt=ds_nodash, url_param=url_param)

        print(df[['movieCd', 'movieNm']].head(5))

        for k, v in url_param.items():
            df[k] = v

        #p_cols = list(url_param.keys()).insert(0, 'load_dt')
        p_cols = ['load_dt'] + list(url_param.keys())
        df.to_parquet('~/tmp/test_parquet',
                partition_cols=p_cols
                # partition_cols=['load_dt', 'movieKey']
        )

    def save_data(ds_nodash):
        from mov.api.call import apply_type2df

        df = apply_type2df(load_dt=ds_nodash)
        print(df.head(10))
        print("*"*33)
        print(df.dtypes)

        # 개봉일 기준 그룹핑 누적 관개수 합
        print("개봉일 기준 그룹핑 누적 관객수 합")
        g = df.groupby('openDt')
        sum_df = g.agg({'audiCnt': 'sum'}).reset_index()
        print(sum_df)

    def branch_fun(ds_nodash):
        import os
        home_dir = os.path.expanduser("~")
        path = os.path.join(home_dir, f"tmp/test_parquet/load_dt={ds_nodash}")
        if os.path.exists(path):
            return rm_dir.task_id
        else:
            return "get.start", "echo.task"

    branch_op = BranchPythonOperator(
            task_id="branch.op",
            python_callable=branch_fun
            )


    task_savedata = PythonVirtualenvOperator(
            task_id="save.data",
            python_callable=save_data,
            requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
            system_site_packages=False,
            trigger_rule = "one_success",
    )

    # 다양성 영화 유무
    multi_y = PythonVirtualenvOperator(
        task_id='multi.y',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        #op_args=[1,2,3,4],
        op_kwargs={
            "url_param": {"multiMovieYn": "Y"},
        },
    )

    multi_n = PythonVirtualenvOperator(
        task_id='multi.n',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        #op_args=["{{ds_nodash}}", "{{ds}}"],
        op_kwargs={
            "url_param": {"multiMovieYn": "N"}
            #"ds": "2024-11-11",
            #"ds_nodash", "2024111"
            #.
            #.
            #.
        }
    )

    nation_k = PythonVirtualenvOperator(
        task_id='nation.k',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        #op_args=["{{ds_nodash}}", "{{ds}}"],
        op_kwargs={
            "url_param": {"repNationCd": "K"}
        }
    )

    nation_f = PythonVirtualenvOperator(
        task_id='nation.f',
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/oddsummer56/movie.git@0.3/api"],
        #op_args=["{{ds_nodash}}", "{{ds}}"],
        op_kwargs={
            "url_param": {"repNationCd": "F"}
        }
    )


    rm_dir = BashOperator(
            task_id="rm.dir",
            bash_command='rm -rf ~/tmp/test_parquet/load_dt={{ds_nodash}}',
    )

    echo_task = BashOperator(
            task_id='echo.task',
            bash_command="echo 'task'"
    )

    task_start = gen_emp('start')
    task_end = gen_emp('end','all_done')

    throw_err = BashOperator(
            task_id='throw.err',
            bash_command="exit 1",
            trigger_rule="all_done"
    )

    get_start = gen_emp('get.start', 'all_done')
    get_end = gen_emp('get.end')


    task_start >> branch_op
    task_start >> throw_err >> task_savedata

    branch_op >> rm_dir >> get_start
    branch_op >> echo_task
    branch_op >> get_start

    get_start >> [multi_y, multi_n, nation_k, nation_f] >> get_end

    get_end >> task_savedata >> task_end

 

call.py

import requests
import os
import pandas as pd

def echo(yaho):
    return yaho

def apply_type2df(load_dt='20120101', path="~/tmp/test_parquet"):
    df=pd.read_parquet(f'{path}/load_dt={load_dt}')

    num_cols = ['rnum', 'rank', 'rankInten', 'salesAmt', 'audiCnt', 'audiAcc', 'scrnCnt', 'showCnt', 'salesShare', 'salesInten', 'salesChange', 'audiInten', 'audiChange']

    df[num_cols] = df[num_cols].apply(pd.to_numeric)
    return df


def save2df(load_dt='20120101', url_param={}):
    df=list2df(load_dt, url_param)
    # df에 load_dt 칼럼 추가 (조회 일자 YYYYMMDD 형식)
    # 아래 파일 저장시 load_dt 기준으로 파티셔닝
    df['load_dt'] = load_dt
    print(df.head(5))
    df.to_parquet('~/tmp/test_parquet', partition_cols=['load_dt'])
    return df

def list2df(load_dt='20120101', url_param={}):
    l = req2list(load_dt, url_param)
    df = pd.DataFrame(l)
    return df

def req2list(load_dt='20120101', url_param={}) -> list:
    _, data = req(load_dt, url_param)
    l = data['boxOfficeResult']['dailyBoxOfficeList']
    return l

def get_key():
    key = os.getenv('MOVIE_API_KEY')
    return key

def req(load_dt="20120101", url_param={}):
    #url = gen_url('20240720')
    url = gen_url(load_dt, url_param)
    r = requests.get(url)
    code = r.status_code
    data = r.json()
    return code, data


def gen_url(dt="20120101", url_param = {}):
    base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
    key = get_key()
    url = f"{base_url}?key={key}&targetDt={dt}"

    for k, v in url_param.items():
        url = url + f"&{k}={v}"

    print(url)
    return url