Data Engineering/AIRFLOW

[실습] movdata : 영화데이터 수집 프로그램

qqprty 2024. 8. 20. 12:13

https://github.com/pladata-encore/DE32_101/issues/65

 

v0.2 : 영화목록API를 호출해서 연도별 영화목록 추출하기

이미 다운받은 연도의 영화목록은 SKIP 하도록 설정

import requests
import os
import json
import time
from tqdm import tqdm

API_KEY = os.getenv('MOVIE_API_KEY')

def save_json(data, file_path):
    # 파일저장 경로 MKDIR
    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

def req(url):
    r = requests.get(url).json()
    return r

def save_movies(year, per_page=10, sleep_time=1, base_dir='data'):
    file_path = f'{base_dir}/movies/year={year}/movieList.json'

    url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieList.json?key={API_KEY}&openStartDt={year}&openEndDt={year}"

    print(f"{year}년 영화정보를 불러옵니다.")


    # 위 경로가 있으면 API 호출을 멈추고 프로그램 종료
    if os.path.exists(file_path):
        print(f"{year}년 데이터가 이미 존재합니다. 종료합니다.")
        return True

    # totCnt 가져와서 total_pages 계산
    r = req(url_base + "&curPage=1")
    tot_cnt = r['movieListResult']['totCnt']
    #total_pages = (tot_cnt // per_page) + 1
    total_pages = 10

    # total_pages 만큼 Loop 돌면서 API 호출
    all_data = []
    for page in tqdm(range(1, total_pages + 1)):
        time.sleep(sleep_time)
        r = req(url_base + f"&curPage={page}")
        d = r['movieListResult']['movieList']
        all_data.extend(d)

    save_json(all_data, file_path)
    return True

 

+) 팀프로젝트 수정

년도별로 저장하지 않고 movieList.json 파일에 쌓이게 변경

 

movlist.py

import os
import json
import time
from tqdm import tqdm
import requests

API_KEY = os.getenv('MOVIE_API_KEY')

def save_json(data, file_path):
    # 파일 저장 경로 생성 (이미 존재해도 무관)
    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    # 기존 데이터가 있다면 불러오기
    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            existing_data = json.load(f)
    else:
        existing_data = []

    # 새로운 데이터를 기존 데이터에 추가
    existing_data.extend(data)

    # JSON 파일에 데이터 저장
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(existing_data, f, indent=4, ensure_ascii=False)

def req(url):
    r = requests.get(url).json()
    return r

def save_movies(year, per_page=10, sleep_time=1):
    # 하나의 movieList.json 파일에 모든 데이터를 저장
    file_path = os.path.expanduser('~/data/movbotdata/movieList.json')

    url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieList.json?key={API_KEY}&openStartDt={year}&openEndDt={year}"

    print(f"{year}년 영화정보를 불러옵니다.")

    r = req(url_base + "&curPage=1")
    tot_cnt = r['movieListResult']['totCnt']
    #total_pages = (tot_cnt // per_page) + 1
    total_pages=10
    all_data = []
    for page in tqdm(range(1, total_pages + 1)):
        time.sleep(sleep_time)
        r = req(url_base + f"&curPage={page}")
        d = r['movieListResult']['movieList']
        all_data.extend(d)

    # 기존 데이터와 합쳐서 저장
    save_json(all_data, file_path)
    return True

if __name__ == "__main__":
    import sys
    year = sys.argv[1]  # 첫 번째 인수를 연도로 사용
    save_movies(year)  # save_movies 함수 호출

 

DAG

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator


from airflow.operators.python import (
        PythonOperator, PythonVirtualenvOperator, BranchPythonOperator
        )

with DAG(
        'mov_bot',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'retries': 2,
        'retry_delay': timedelta(seconds=3),
        'max_active_tasks': 3,
        'max_active_runs': 1,
        },

    description='pyspark',
    #schedule=timedelta(days=1),

    schedule="@yearly",
    start_date=datetime(2015, 1, 1),
    end_date=datetime(2021, 1, 1),

    catchup=True,
    tags=['pyspark', 'movie', 'json', 'dynamic'],
) as dag:


    get_data = BashOperator(
        task_id='get.data',
        bash_command='python /home/oddsummer/airflow_pyspark/py/movlist.py {{ execution_date.year }}',
    )

    task_start = EmptyOperator(task_id='start')
    task_end = EmptyOperator(task_id='end', trigger_rule="all_done")

    task_start >> get_data >> task_end

v0.3 : 저장한 영화목록을 연도별로 읽어서 movieCd 를 추출. LOOP 돌면서 "영화 상세정보" API 를 조회하여 저장

import requests
import os
import json
import time
from tqdm import tqdm

API_KEY = os.getenv('MOVIE_API_KEY')

def save_json(data, file_path):
    # 파일저장 경로
    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

def req(url):
    r = requests.get(url).json()
    return r

def read_movies(year):
    home_path = os.path.expanduser("~")
    file_path = f'{home_path}/data/movdata/year={year}/movieList.json'

    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    return data

def save_movieDetail(year=2015, sleep_time=1):
    home_path = os.path.expanduser("~")
    file_path = f'{home_path}/data/movdata/year={year}/movieInfo.json'

    movies = read_movies(year)
    movieCd = []

    for mv in movies:
        movieCd.append(mv['movieCd'])

    # 위 경로가 있으면 API 호출을 멈추고 프로그램 종료
    if os.path.exists(file_path):
        print(f"데이터가 이미 존재합니다: {file_path}")
        return True

    url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieInfo.json?key={API_KEY}&movieCd="

    all_data = []
    for i in tqdm(movieCd):
        time.sleep(sleep_time)
        r = req(url_base + i)
        d = r['movieInfoResult']['movieInfo']
        all_data.append(d)

    save_json(all_data, file_path)
    return True

 

1. movieCd 추가과정 설명

    movies = read_movies(year)
    movieCd = []

    for mv in movies:
        movieCd.append(mv['movieCd'])
  • movies 리스트: movies는 리스트. 이 리스트 안에는 영화 한 편의 정보를 담고 있는 여러 개의 딕셔너리가 존재
[{'movieCd': '20158561', 
'movieNm': '기생수 파트2', 
'movieNmEn': 'Parasyte: Part 2', 
'prdtYear': '2015', 'openDt': '20150507', 
'typeNm': '장편', 
'prdtStatNm': '개봉', 
'nationAlt': '일본', 
'genreAlt': 'SF,스릴러', 
'repNationNm': '일본', 
'repGenreNm': 'SF', 
'directors': [{'peopleNm': '야마자키 타카시'}], 
'companys': []
},

{'movieCd': '2015856', 
'movieNm': '기생수 파트', 
...
}, 

더 많은 요소들

]
  • for mv in movies: movies 리스트의 각 요소(영화 한 편의 정보를 담고 있는 딕셔너리)를 하나씩 mv 변수에 할당

첫번째 LOOP {'movieCd': '20158561', 'movieNm': '기생수 파트2'}

두번째 LOOP {'movieCd': '20158562', 'movieNm': '어벤져스: 에이지 오브 울트론'}

세번째 LOOP  {'movieCd': '20158563', 'movieNm': '매드 맥스: 분노의 도로'}

  • movieCd.append(mv['movieCd']):  mv에서 'movieCd' 키를 찾아 movieCd 리스트에 추가
['20158561', '20158562', '20158563']

 

2. all_data 추가과정 설명

    all_data = []
    for i in tqdm(movieCd):
        time.sleep(sleep_time)
        r = req(url_base + i)
        d = r['movieInfoResult']['movieInfo']
        all_data.append(d)

    save_json(all_data, file_path)
    return True
  • all_data 초기화
  • for i in tqdm(movieCd): movieCd 리스트 순회
  • 영화코드를 포함한 URL로 API 호출, 응답처리
url_base = f'https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieInfo.json?key={API_KEY}&movieCd='
i = '20158561'

$ r = req(url_base + i)
https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieInfo.json?key={API_KEY}&movieCd=20158561

 

req()는 영화코드를 포함한 URL로 API 요청을 보내고, JSON 형식으로 반환된 r에 저장한다.

  • 필요한 영화정보가 포함된 딕셔너리 d 변수에 저장. 

r['movieInfoResult']['movieInfo']를 통해, 필요한 영화 정보가 포함된 딕셔너리가 d 변수에 저장된다.

  • 데이터를 all_data 리스트에 추가 
  • 데이터를 JSON 파일로 저장