1. 프로젝트 주제
영화 박스오피스 데이터 수집/처리/보관 및 활용
2. 프로젝트 내용
- 영화 박스오피스 데이터 수집/처리/보관 및 활용에 대하여
- 각각 단계에 대하여 파이썬 프로그램을 package(PIP설치) 단위로 개발
- 개발 package 를 airflow 적용 및 운영
3. Repo 운영 및 Branch 전략
# 각 REPO 별 3개의 release 생성, 매일 퇴근전 release -> main 머지 후 릴리즈노트 작성 제출
$ git branch release/d1.0.0
$ git branch release/d2.0.0
$ git branch release/d3.0.0
# 개발 중 AIRFLOW 연결 branch 생성 및 PR(release<-dev) 생성( 후 고객과 협의
$ git branch dev/d1.0.0
# 각자 작업을 나누어 각각 아래와 같이 브랜치 생성
$ git branch d1.0.0/<기능이름>
# 기능 개발이 조금씩 완료 되면 commit + push 하고 dev/d1.0.0 으로 merge + push
$ git checkout dev/d1.0.0
$ git pull
$ git merge d1.0.0/<기능이름>
# TEST 후 PUSH
$ git push
4. 일차별 진행사항 및 결과물
1일차
1) 한 일
- git 연동
- airflow dag 생성
- ice-breaking 함수 개발 (디렉토리 생성 -> pdm init -> 모듈 개발 -> git push)
- 기능 분배
- 수집 + 처리 +보관 및 활용
- 수집 : JSON -> Parquet [지현]
- 처리 : Pandas [원준]
- 보관 및 활용 : Airflow [령래]
- 작업 플로우 정리
- dev branch에서 작업 후 release branch로 merge
- 모듈 작업 시 dev 브랜치에서 각자 디렉토리에서 작업 한 뒤에 각자 git에 push
- .py 생성 -> git checkout dev<버전> -> git pull -> git merge (dev <- simple) -> git push
[git 사용법] 작업을 완료하면 git pull -> git add . -> git commit -> 잘 작동되는지 확인 -> git push하기
2) 트러블 슈팅
- Git 연동 : Git Clone 시 HTTPS 대신 SSH URL 사용
- Git 사용
문제 1: Airflow_dag를 git pull 하는 과정에서 로컬에서도 똑같은 파일이 존재.
해결 => 브랜치 삭제 후 git reset --hard origin/dev/d1.0.0 명령 실행
문제 2: A가 로컬 컴퓨터에서 Airflow를 사용하고 싶은 상황, Github에 airflow의 환경설정을 저장하는 airflow.cfg 파일이 올라가 있는 상태. A가 Github(airflow_dags)를 pull하면 airflow.cfg 파일이 같이 딸려들어오는 상황
해결 => Github 상의 airflow.cfg를 삭제, gitignore에서 airflow.cfg를 추가한 뒤에 push
:1, $ s/<바꾸고 싶은 키워드>/<바꿀 키워드>/g
3) 결과물
main branch
- Airflow : https://github.com/7-TRG/Airflow_dags
- Extract : https://github.com/7-TRG/extract_trg
- Transform : https://github.com/7-TRG/transform_trg
release note
- Airflow : https://github.com/7-TRG/Airflow_dags/releases/tag/v3.0.0
- Extract : https://github.com/7-TRG/extract_trg/releases/tag/v2.0.0
- Transform : https://github.com/7-TRG/transform_trg/releases/tag/v3.0.0
2일차
1) 한 일
- extract 기능 개발
- transform 기능 개발
- airflow dag (movie1, 2, 3) pair programming
2) 결과물
Airflow
Extract
Transform
3일차
1) 한 일
- transform 기능개발
- load 기능개발 (국내 영화 일별 일일 매출액 기준 TOP 5)
- 흥행 정도와 상영기간과의 상관관계를 분석
- 발표
2) 결과물
릴리즈 브랜치
- https://github.com/7-TRG/Airflow_dags/tree/release/d1.0.0
- https://github.com/7-TRG/extract_trg.git
- https://github.com/7-TRG/transform_trg.git
- https://github.com/7-TRG/load_trg.git
릴리즈 노트
- https://github.com/7-TRG/Airflow_dags/releases/tag/v1.0.0
- https://github.com/7-TRG/extract_trg/releases/tag/v1.0.0
- https://github.com/7-TRG/transform_trg/releases/tag/v1.0.0
- https://github.com/7-TRG/load_trg/releases/tag/v1.0.0
5. 최종 요약
프로젝트
- 영화진흥위원회 API 활용, 데이터 ETL 파이프라인 구축 및 분석
- 분석 내용
1. 국내 영화 일별 일일 매출액 TOP 5
2. 흥행 정도와 상영기간과의 상관관계 분석
설정
1. 데이터
소스 데이터 : kobis open API
API access를 위해 키 발급 후 실행
export MOVIE_API_KEY="<키값>"
2. 환경변수 설정
export AIRFLOW_HOME=~/code/7_TRG/airflow_dags
export AIRFLOW__CORE__DAGS_FOLDER=~/code/7_TRG/airflow_dags/dags
export AIRFLOW__CORE__LOAD_EXAMPLES=False
``` </br></br>
# 실행
```bash
pyenv shell air
airflow standalone
3. 실행 환경
$ uname -a
Linux playdata 5.15.153.1-microsoft-standard-WSL2 #1 SMP Fri Mar 29 23:14:13 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/issue
Ubuntu 22.04.3 LTS \n \l
$ pyenv -v
pyenv 2.4.7
$ pyenv shell air
(air) $ python -V
Python 3.11.9
(air) $ airflow version
2.9.3
코드
1) Airflow
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
PythonOperator, PythonVirtualenvOperator, BranchPythonOperator
)
with DAG(
'movie2',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(seconds=3),
'max_active_tasks': 3,
'max_active_runs': 1,
},
description='Movie Data',
#schedule=timedelta(days=1),
schedule="0 5 * * *",
start_date=datetime(2017, 5, 1),
end_date=datetime(2017, 8, 31),
#end_date=datetime(2017, 2, 1),
catchup=True,
tags=['7_TRG','api', 'movie'],
) as dag:
#REQUIREMENTS = "git+https://github.com/7-TRG/extract_trg.git@main"
def branch_fun(ds_nodash):
import os
home_dir = os.path.expanduser("~")
# path = os.path.join(home_dir, f"tmp/test_parquet/load_dt={ds_nodash}")
path = os.path.join(home_dir, f"code/7_TRG/data_parquet/load_dt={ds_nodash}")
if os.path.exists(path):
return rm_dir.task_id
else:
return task_e.task_id
def extract_df(*args):
ds_nodash = args[0]
li = args[1:]
print(ds_nodash, li)
from extract_trg.extract_trg import dt2df
for dic in li:
df = dt2df(ds_nodash, dic)
print(df.head(10))
for k, v in dic.items():
df[k] = v
p_cols = ['load_dt'] + list(dic.keys())
df.to_parquet("~/code/7_TRG/data_parquet", partition_cols = p_cols)
def transform_df(ds_nodash):
from transform_trg.transform_trg import mer
df = mer(ds_nodash)
print(df.head())
return df
def load_trg(ds_nodash):
from load_trg.load_trg import load_trg
load_trg(ds_nodash)
task_e = PythonVirtualenvOperator(
task_id='extract',
python_callable=extract_df,
requirements=["git+https://github.com/7-TRG/extract_trg.git@d2.0.0"],
system_site_packages=False,
trigger_rule="all_done",
op_args = ['{{ ds_nodash }}',{'multiMovieYn' : 'Y'}, {'repNationCd' : 'K'}, {'multiMovieYn': 'N'},{ 'repNationCd' : 'F'}]
#op_kwargs = {'url_params' : {'multiMovieYn' : 'Y', 'repNationCd' : 'K'}, 'url_params2' : {'multiMovieYn': 'N', 'repNationCd' : 'F'}}
#venv_cache_path="/home/kim1/tmp2/airflow_venv/get_data"r
)
task_t = PythonVirtualenvOperator(
task_id='transform',
python_callable=transform_df,
requirements=["git+https://github.com/7-TRG/transform_trg.git@dev/d2.0.0"],
system_site_packages=False,
trigger_rule="all_done",
#venv_cache_path="/home/kim1/tmp2/airflow_venv/get_data"
)
task_l = PythonVirtualenvOperator(
task_id='load',
python_callable=load_trg,
requirements=['git+https://github.com/7-TRG/load_trg.git@dev/d3.0.0',"git+https://github.com/7-TRG/transform_trg.git@dev/d2.0.0"],
system_site_packages=False,
trigger_rule="all_done",
#venv_cache_path="/home/kim1/tmp2/airflow_venv/get_data"
)
branch_op = BranchPythonOperator(
task_id="branch.op",
python_callable=branch_fun
)
rm_dir = BashOperator(
task_id='rm.dir',
bash_command='rm -rf ~/code/7_TRG/data_parquet/load_dt={{ ds_nodash }}',
)
# task_err = BashOperator(
# bash_command="""
# DONE_PATH=~/data/done/{{ds_nodash}}
# mkdir -p ${DONE_PATH}
# touch ${DONE_PATH}/_DONE
# """,
# )
task_end = EmptyOperator(task_id='end', trigger_rule="all_done")
task_start = EmptyOperator(task_id='start')
task_start >> branch_op >> task_e >> task_t >> task_l >> task_end
branch_op >> rm_dir >> task_e
2) Extract
- 영화진흥위원회 openAPI 을 이용한 영화 박스오피스 데이터 ETL 과정 중 추출단계
- 특정 날짜의 한국 영화 박스오피스 데이터를 API를 통해 가져와서 데이터프레임으로 변환하고 저장하는 기능을 수행
실행
# 레퍼지토리 clone, 해당 디렉터리 이동
$ git clone git@github.com:7-TRG/extract_trg.git
$ cd ~/code/extract_trg
# 가상환경 활성화 및 필요 패키지 설치
$ pdm init
$ source .venv/bin/activate
$ pdm install
$ pdm add
extract_trg.py
import requests
import os
import json
import pandas as pd
def req(dt="20170101", url_param={}):
key = os.getenv("MOVIE_API_KEY")
base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
url = f"{base_url}?key={key}&targetDt={dt}"
for k, v in url_param.items():
url = url + f"&{k}={v}"
r = requests.get(url)
code = r.status_code
data = r.json()
return code, data
def req2list(dt="20170101", url_param={}):
_, data = req(dt, url_param)
l = data['boxOfficeResult']['dailyBoxOfficeList']
return l
def list2df(dt="20170101", url_param={}):
l = req2list(dt, url_param)
# list를 dataFrame으로
df = pd.DataFrame(l)
return df
# df 저장 + 파티셔닝 날짜넣기?
def dt2df(dt="20170101", url_param={}):
df = list2df(dt, url_param)
df['load_dt'] = dt
return df
3) Transform
- 영화진흥위원회 openAPI 을 이용한 영화 박스오피스 데이터 ETL 과정 중 변환단계
- 특정 날짜의 박스오피스 데이터를 Parquet 파일에서 읽어와서 필요한 열만 추출하고, 중복된 영화 데이터를 처리한 후, 최종적으로 결과를 반환하는 기능을 수행
실행
# 레퍼지토리 clone, 해당 디렉터리 이동
$ git clone git@github.com:7-TRG/transform_trg.git
$ cd ~/code/transform_trg
# 가상환경 활성화 및 필요 패키지 설치
$ pdm init
$ source .venv/bin/activate
$ pdm install
$ pdm add
transform_trg.py
import pandas as pd
def mer(load_dt='20170101'):
df = pd.read_parquet('~/code/7_TRG/data_parquet')
cols = ['movieCd', #영화의 대표코드를 출력합니다.
'movieNm', #영화명(국문)을 출력합니다.
'openDt', #영화의 개봉일을 출력합니다.
'salesAmt',
'audiCnt', #해당일의 관객수를 출력합니다.
'load_dt', # 입수일자
'multiMovieYn', #다양성영화 유무
'repNationCd', #한국외국영화 유무
]
# 해당 열만 추출
df2 = df[cols]
# airflow 날짜 받는곳
df_where = df2[df2['load_dt'] == int(load_dt)] # 날짜 받을때 정수형이여야 함
result = [] # 결과 들어갈 자리
for ind in set(df_where['movieNm']):
if len(df_where[df_where['movieNm'] == ind]) > 1:
df4 = df_where[df_where['movieNm'] == ind]
df4['repNationCd'] = df_where[df_where['movieNm'] == ind].iloc[1]['repNationCd']
result.append(list(df4.iloc[0]))
# 아닌 애들은 그냥 list에 append
else:
result.append(list(df_where[df_where['movieNm'] == ind].iloc[0]))
df_where = pd.DataFrame(result, columns = df2.columns)
# 타입 변경
df_where['load_dt'] = df_where['load_dt'].astype('object')
df_where['multiMovieYn'] = df_where['multiMovieYn'].astype('object')
df_where['repNationCd'] = df_where['repNationCd'].astype('object')
# 결측치 값 변환
df_where.fillna('unknown', inplace = True)
return df_where
4) Load, Analysis
1. 국내 영화 일별 일일 매출액 TOP 5
load_trg.py
from transform_trg.transform_trg import mer
import pandas as pd
def load_trg(ds_nodash):
df = mer(ds_nodash)
korea_movie = df[df['repNationCd']=='K']
korea_movie['salesAmt'] = pd.to_numeric(korea_movie['salesAmt'])
korea_movie.sort_values('salesAmt', ascending=False, inplace=True)
korea_movie.reset_index(inplace=True, drop=True)
print(korea_movie.head())
2. 흥행 정도와 상영기간과의 상관관계 분석
https://colab.research.google.com/drive/1C88AiH-5x-ARqw0St9BS3Pmj0Se52FTs?usp=sharing
6. 발표
발표자료
https://docs.google.com/presentation/d/118PW2OsLTLM5nlQ3yMSdMPzSfuQ--Xi89-QoUxMhhjM/edit?usp=sharing
발표 요약
각 분기별로 dag 파일을 설정하여 데이터 파이프라인을 구축.
rm.dir 테스크를 통해 데이터의 멱등성을 보장
흥행 정도가 크면 상영 기간이 길 것이라는 가설을 세워 데이터를 전처리하고 변형.
그 후 관객수를 기준으로 흥행정도를 상, 중, 하로 분류하고 각각의 평균 상영기간을 계산하여 시각화.
하지만 출력결과 흥행정도가 낮을수록 상영기간이 길다는 가설과 정반대의 결과가 출력됨.
이상한 점을 확인하기 위해 box plot으로 확인해본 결과, 이상치가 많이 존재하고, 흥행정도 하의 최대값이 너무 높은 것을 확인.
예측실패
1. 데이터 전처리 문제
- 널값, 공백과같은 개봉일 결측치
- 재개봉 개봉일이 1960년, 개봉예정이어서 개봉일이 2019년도 같은 이상치
2. 평일, 주말 관객 편차를 고려하지 않은 흥행도 기준
흥행정도를 관람객 수로만 설정하다보니, 같은 영화여도 평일에는 비교적 사람이 적어 흥행도 하, 주말은 비교적 많아 상에 속하여 같은 영화지만 분류들에 중복적으로 속해있는 문제.
개선하기 위하여 데이터 전처리에 충분히 시간을 가지고 신경쓰고, 흥행도를 정확하게 반영하는 기준을 설정해야한다는 결론
이런저런 설정들
1. git ignore
logs/
*.pid
pycache
airflow.db
*.swp
*.b
".gitignore"
airflow.cfg
standalone_admin_password.txt
*.cfg
*.pyc
webserver_config.py
2. Airflow dag 위치 이동
1) 환경 변수 넣고 적용
$ tail -n 4 ~/.zshrc
# AIRFLOW
export AIRFLOW_HOME=~/airflow_team
export AIRFLOW__CORE__DAGS_FOLDER=~/code/team1/movie_airflow/dags
export AIRFLOW__CORE__LOAD_EXAMPLES=False
2) airflow 완전 종료후 재구동
3) 새로운 admin 암호 확인
$ cat ~/airflow_team/standalone_admin_password.txt
4) pyenv shell air 가상환경에서 수행하기