https://github.com/pladata-encore/DE32_101/issues/65
v0.2 : 영화목록API를 호출해서 연도별 영화목록 추출하기
이미 다운받은 연도의 영화목록은 SKIP 하도록 설정
import requests
import os
import json
import time
from tqdm import tqdm
API_KEY = os.getenv('MOVIE_API_KEY')
def save_json(data, file_path):
# 파일저장 경로 MKDIR
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def req(url):
r = requests.get(url).json()
return r
def save_movies(year, per_page=10, sleep_time=1, base_dir='data'):
file_path = f'{base_dir}/movies/year={year}/movieList.json'
url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieList.json?key={API_KEY}&openStartDt={year}&openEndDt={year}"
print(f"{year}년 영화정보를 불러옵니다.")
# 위 경로가 있으면 API 호출을 멈추고 프로그램 종료
if os.path.exists(file_path):
print(f"{year}년 데이터가 이미 존재합니다. 종료합니다.")
return True
# totCnt 가져와서 total_pages 계산
r = req(url_base + "&curPage=1")
tot_cnt = r['movieListResult']['totCnt']
#total_pages = (tot_cnt // per_page) + 1
total_pages = 10
# total_pages 만큼 Loop 돌면서 API 호출
all_data = []
for page in tqdm(range(1, total_pages + 1)):
time.sleep(sleep_time)
r = req(url_base + f"&curPage={page}")
d = r['movieListResult']['movieList']
all_data.extend(d)
save_json(all_data, file_path)
return True
+) 팀프로젝트 수정
년도별로 저장하지 않고 movieList.json 파일에 쌓이게 변경
movlist.py
import os
import json
import time
from tqdm import tqdm
import requests
API_KEY = os.getenv('MOVIE_API_KEY')
def save_json(data, file_path):
# 파일 저장 경로 생성 (이미 존재해도 무관)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# 기존 데이터가 있다면 불러오기
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
else:
existing_data = []
# 새로운 데이터를 기존 데이터에 추가
existing_data.extend(data)
# JSON 파일에 데이터 저장
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
def req(url):
r = requests.get(url).json()
return r
def save_movies(year, per_page=10, sleep_time=1):
# 하나의 movieList.json 파일에 모든 데이터를 저장
file_path = os.path.expanduser('~/data/movbotdata/movieList.json')
url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieList.json?key={API_KEY}&openStartDt={year}&openEndDt={year}"
print(f"{year}년 영화정보를 불러옵니다.")
r = req(url_base + "&curPage=1")
tot_cnt = r['movieListResult']['totCnt']
#total_pages = (tot_cnt // per_page) + 1
total_pages=10
all_data = []
for page in tqdm(range(1, total_pages + 1)):
time.sleep(sleep_time)
r = req(url_base + f"&curPage={page}")
d = r['movieListResult']['movieList']
all_data.extend(d)
# 기존 데이터와 합쳐서 저장
save_json(all_data, file_path)
return True
if __name__ == "__main__":
import sys
year = sys.argv[1] # 첫 번째 인수를 연도로 사용
save_movies(year) # save_movies 함수 호출
DAG
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
PythonOperator, PythonVirtualenvOperator, BranchPythonOperator
)
with DAG(
'mov_bot',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(seconds=3),
'max_active_tasks': 3,
'max_active_runs': 1,
},
description='pyspark',
#schedule=timedelta(days=1),
schedule="@yearly",
start_date=datetime(2015, 1, 1),
end_date=datetime(2021, 1, 1),
catchup=True,
tags=['pyspark', 'movie', 'json', 'dynamic'],
) as dag:
get_data = BashOperator(
task_id='get.data',
bash_command='python /home/oddsummer/airflow_pyspark/py/movlist.py {{ execution_date.year }}',
)
task_start = EmptyOperator(task_id='start')
task_end = EmptyOperator(task_id='end', trigger_rule="all_done")
task_start >> get_data >> task_end
v0.3 : 저장한 영화목록을 연도별로 읽어서 movieCd 를 추출. LOOP 돌면서 "영화 상세정보" API 를 조회하여 저장
import requests
import os
import json
import time
from tqdm import tqdm
API_KEY = os.getenv('MOVIE_API_KEY')
def save_json(data, file_path):
# 파일저장 경로
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def req(url):
r = requests.get(url).json()
return r
def read_movies(year):
home_path = os.path.expanduser("~")
file_path = f'{home_path}/data/movdata/year={year}/movieList.json'
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
def save_movieDetail(year=2015, sleep_time=1):
home_path = os.path.expanduser("~")
file_path = f'{home_path}/data/movdata/year={year}/movieInfo.json'
movies = read_movies(year)
movieCd = []
for mv in movies:
movieCd.append(mv['movieCd'])
# 위 경로가 있으면 API 호출을 멈추고 프로그램 종료
if os.path.exists(file_path):
print(f"데이터가 이미 존재합니다: {file_path}")
return True
url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieInfo.json?key={API_KEY}&movieCd="
all_data = []
for i in tqdm(movieCd):
time.sleep(sleep_time)
r = req(url_base + i)
d = r['movieInfoResult']['movieInfo']
all_data.append(d)
save_json(all_data, file_path)
return True
1. movieCd 추가과정 설명
movies = read_movies(year)
movieCd = []
for mv in movies:
movieCd.append(mv['movieCd'])
- movies 리스트: movies는 리스트. 이 리스트 안에는 영화 한 편의 정보를 담고 있는 여러 개의 딕셔너리가 존재
[{'movieCd': '20158561',
'movieNm': '기생수 파트2',
'movieNmEn': 'Parasyte: Part 2',
'prdtYear': '2015', 'openDt': '20150507',
'typeNm': '장편',
'prdtStatNm': '개봉',
'nationAlt': '일본',
'genreAlt': 'SF,스릴러',
'repNationNm': '일본',
'repGenreNm': 'SF',
'directors': [{'peopleNm': '야마자키 타카시'}],
'companys': []
},
{'movieCd': '2015856',
'movieNm': '기생수 파트',
...
},
더 많은 요소들
]
- for mv in movies: movies 리스트의 각 요소(영화 한 편의 정보를 담고 있는 딕셔너리)를 하나씩 mv 변수에 할당
첫번째 LOOP {'movieCd': '20158561', 'movieNm': '기생수 파트2'}
두번째 LOOP {'movieCd': '20158562', 'movieNm': '어벤져스: 에이지 오브 울트론'}
세번째 LOOP {'movieCd': '20158563', 'movieNm': '매드 맥스: 분노의 도로'}
- movieCd.append(mv['movieCd']): mv에서 'movieCd' 키를 찾아 movieCd 리스트에 추가
['20158561', '20158562', '20158563']
2. all_data 추가과정 설명
all_data = []
for i in tqdm(movieCd):
time.sleep(sleep_time)
r = req(url_base + i)
d = r['movieInfoResult']['movieInfo']
all_data.append(d)
save_json(all_data, file_path)
return True
- all_data 초기화
- for i in tqdm(movieCd): movieCd 리스트 순회
- 영화코드를 포함한 URL로 API 호출, 응답처리
url_base = f'https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieInfo.json?key={API_KEY}&movieCd='
i = '20158561'
$ r = req(url_base + i)
https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieInfo.json?key={API_KEY}&movieCd=20158561
req()는 영화코드를 포함한 URL로 API 요청을 보내고, JSON 형식으로 반환된 r에 저장한다.
- 필요한 영화정보가 포함된 딕셔너리 d 변수에 저장.
r['movieInfoResult']['movieInfo']를 통해, 필요한 영화 정보가 포함된 딕셔너리가 d 변수에 저장된다.
- 데이터를 all_data 리스트에 추가
- 데이터를 JSON 파일로 저장
'Data Engineering > AIRFLOW' 카테고리의 다른 글
movies-dynamic-json (0) | 2024.08.20 |
---|---|
[AIRFLOW] movie pipeline (0) | 2024.08.07 |
[AIRFLOW] make_parquet 파이프라인 만들기 실습 (0) | 2024.07.26 |
[AIRFLOW] import_db 파이프라인 만들기 실습 (1) | 2024.07.26 |
[AIRFLOW] simple_bash 파이프라인 만들기 실습 (3) | 2024.07.24 |