GITHUB
https://github.com/oddsummer56/movdata/tree/v0.2/movieList
https://github.com/oddsummer56/spark_flow/tree/0.3.0/movies-dynamic-json
DAG movies_dynamic_json
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
PythonOperator, PythonVirtualenvOperator, BranchPythonOperator
)
with DAG(
'movies-dynamic-json',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(seconds=3),
'max_active_tasks': 3,
'max_active_runs': 1,
},
description='pyspark',
#schedule=timedelta(days=1),
schedule="@yearly",
start_date=datetime(2015, 1, 1),
end_date=datetime(2021, 1, 1),
catchup=True,
tags=['pyspark', 'movie', 'json', 'dynamic'],
) as dag:
def get_data(ds_nodash):
year=ds_nodash[:4]
from movdata.ml import save_movies
save_movies(year=year, base_dir='/home/oddsummer/data')
get_data = PythonVirtualenvOperator(
task_id='get.data',
python_callable=get_data,
requirements=["git+https://github.com/oddsummer56/movdata.git@v0.2/movieList"],
system_site_packages=False,
)
pars_parq = BashOperator(
task_id='parsing.parquet',
bash_command="""
YEAR={{ds_nodash[:4]}}
$SPARK_HOME/bin/spark-submit /home/oddsummer/airflow_pyspark/py/parsing_parquet.py $YEAR
"""
)
select_parq = BashOperator(
task_id='select.parquet',
bash_command="""
$SPARK_HOME/bin/spark-submit /home/oddsummer/airflow_pyspark/py/select_df.py
"""
)
task_start = EmptyOperator(task_id='start')
task_end = EmptyOperator(task_id='end', trigger_rule="all_done")
task_start >> get_data >> pars_parq >> select_parq >> task_end
1. get_data
def get_data(ds_nodash):
year=ds_nodash[:4]
from movdata.ml import save_movies
save_movies(year=year, base_dir='/home/oddsummer/data')
get_data = PythonVirtualenvOperator(
task_id='get.data',
python_callable=get_data,
requirements=["git+https://github.com/oddsummer56/movdata.git@v0.2/movieList"],
system_site_packages=False,
)
ds_nodash로 받아온 YYYYMMDD 형식에서 슬라이싱. year을 YYYY형식으로 받아와서 save_movies 호출.
import해온 save_movies 코드는 해당 게시물 참고
https://oddsummer.tistory.com/48
movdata : 영화데이터 수집 프로그램
https://github.com/pladata-encore/DE32_101/issues/65 v0.2 : 영화목록API를 호출해서 연도별 영화목록 추출하기이미 다운받은 연도의 영화목록은 SKIP 하도록 설정import requestsimport osimport jsonimport timefrom tqdm impor
oddsummer.tistory.com
import requests
import os
import json
import time
from tqdm import tqdm
API_KEY = os.getenv('MOVIE_API_KEY')
def save_json(data, file_path):
# 파일저장 경로 MKDIR
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def req(url):
r = requests.get(url).json()
return r
def save_movies(year, per_page=10, sleep_time=1, base_dir='data'):
file_path = f'{base_dir}/movies/year={year}/movieList.json'
url_base = f"https://kobis.or.kr/kobisopenapi/webservice/rest/movie/searchMovieList.json?key={API_KEY}&openStartDt={year}&openEndDt={year}"
print(f"{year}년 영화정보를 불러옵니다.")
# 위 경로가 있으면 API 호출을 멈추고 프로그램 종료
if os.path.exists(file_path):
print(f"{year}년 데이터가 이미 존재합니다. 종료합니다.")
return True
# totCnt 가져와서 total_pages 계산
r = req(url_base + "&curPage=1")
tot_cnt = r['movieListResult']['totCnt']
#total_pages = (tot_cnt // per_page) + 1
total_pages = 10
# total_pages 만큼 Loop 돌면서 API 호출
all_data = []
for page in tqdm(range(1, total_pages + 1)):
time.sleep(sleep_time)
r = req(url_base + f"&curPage={page}")
d = r['movieListResult']['movieList']
all_data.extend(d)
save_json(all_data, file_path)
return True
2. parsing_parquet
pars_parq = BashOperator(
task_id='parsing.parquet',
bash_command="""
YEAR={{ds_nodash[:4]}}
$SPARK_HOME/bin/spark-submit /home/oddsummer/airflow_pyspark/py/parsing_parquet.py $YEAR
"""
)
해당연도에 따라 파싱하고 parquet 형식으로 저장하기 위해서 get data 때와 같이 ds_nodash를 슬라이싱한다.
spark-submit을 이용하여 어플리케이션을 실행시킨다.
parsing_parquet.py에서 explode_outer 명령을 통해서 데이터 프레임의 열에 포함되어있는 리스트나 배열의 각 요소를 개별 행으로 분리하여 데이터를 펼친다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode_outer
import sys
year=sys.argv[1]
spark = SparkSession.builder.appName("DynamicJson").getOrCreate()
jdf = spark.read.option("multiline","true").json(f'/home/oddsummer/data/movdata/year={year}/movieList.json')
#리스트나 배열로 되어 있는 companys / directors 열을 개별 행으로 펼쳐주기
edf = jdf.withColumn("company", explode_outer("companys"))
eedf = edf.withColumn("director", explode_outer("directors"))
#특정 열만 선택하여 데이터프레임 생성
sdf = eedf.select("movieCd", "movieNm", "genreAlt", "typeNm", "director", "company")
#특정 열의 하위 속성에서 값을 가져와서 저장
sdf = sdf.withColumn("directorNm", col("director.peopleNm"))
sdf = sdf.withColumn("companyNm", col("company.companyNm"))
#데이터프레임 parquet 형식으로 저장
sdf.write.mode('append').parquet(f"/home/oddsummer/data/movdata/parsing/year={year}")
spark.stop()
explode vs explode_outer
- 중복 데이터: explode와 explode_outer는 모두 중복된 값을 그대로 확장하여 개별 행으로 나눈다.
- 중첩된 리스트: 두 함수 모두 중첩된 리스트를 평탄화하지 않고, 리스트 형태로 남긴다. 중첩된 리스트를 처리하려면 추가적인 평탄화 작업이 필요하다.
- 빈 배열: explode는 빈 배열을 가진 행을 제거한다. explode_outer는 빈 배열을 null로 변환하고, 해당 행을 유지한다.
- null 값: explode는 null 값을 가진 행을 제거하지만, explode_outer는 null 값을 유지하면서 해당 행을 출력한다.
초기 edf 데이터프레임
explode 적용 후 데이터프레임
edf_explode = edf.withColumn("director", explode("directors"))
explode_outer 적용 후 데이터프레임
edf_explode_outer = edf.withColumn("director", explode_outer("directors"))
+) 중첩된 구조들을 반복해서 펼치는 코드..
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import StructType, ArrayType
def flatten_schema(df: DataFrame) -> DataFrame:
"""
주어진 DataFrame에서 중첩된 구조체(StructType) 필드를 모두 최상위 필드로 평탄화.
"""
while True:
has_struct = False # StructType이 존재하는지 여부를 확인하는 플래그
for field in df.schema.fields:
# StructType 필드인지 확인
if isinstance(field.dataType, StructType):
has_struct = True # StructType 필드가 있음을 표시
# 모든 컬럼을 선택하고, 중첩된 필드를 최상위 레벨 컬럼으로 추가
expanded_cols = [
col(f"{field.name}.{nested_field.name}").alias(f"{field.name}_{nested_field.name}")
for nested_field in field.dataType.fields
]
# 기존 컬럼에 새로 평탄화된 컬럼들을 추가하고 원래의 StructType 컬럼은 삭제
df = df.select("*", *expanded_cols).drop(field.name)
break # 스키마가 변경되었으므로 다시 처음부터 확인하기 위해 루프를 종료
# ArrayType이면서, 그 내부가 StructType인 경우 먼저 배열을 풀어낸 후 처리
elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
has_struct = True # 처리할 StructType 필드가 있음을 표시
# 배열을 풀어내기 위해 explode_outer 적용
df = df.withColumn(field.name, explode_outer(field.name))
break # 스키마가 변경되었으므로 다시 처음부터 확인하기 위해 루프를 종료
# 더 이상 StructType 필드가 없으면 반복을 종료
if not has_struct:
break
return df
# jdf가 입력 DataFrame이라고 가정
flattened_df = flatten_schema(jdf)
flattened_df.show()
- 반복 루프 설정 (while True):
- DataFrame에 더 이상 StructType이나 ArrayType이 남아있지 않을 때까지 반복.
- StructType 필드 탐색:
- df.schema.fields를 통해 DataFrame의 모든 필드를 순회.
- 만약 StructType 필드가 발견되면, 그 안의 각 하위 필드를 최상위 레벨로 가져오기 위해 expanded_cols 리스트를 만든다. 예를 들어, companys라는 StructType 필드가 있으면, 그 내부의 companyCd, companyNm 같은 필드를 최상위 레벨로 옮긴다.
- df.select("*", *expanded_cols).drop(field.name)를 통해 기존의 모든 컬럼에 더해 새로 평탄화된 컬럼들을 추가하고, 원래의 StructType 필드는 삭제.
- 스키마가 변경되었으므로 루프를 종료하고 다시 처음부터 확인.
- ArrayType 필드 탐색 및 처리:
- 만약 ArrayType 필드가 발견되었고, 그 내부가 StructType으로 구성되어 있다면, 먼저 이 배열을 explode_outer를 사용해 풀어낸다.
- explode_outer는 배열 내의 각 요소를 개별 행으로 확장하여 DataFrame의 구조를 평탄화.
- 이 경우에도 스키마가 변경되므로 루프를 종료하고 다시 처음부터 확인.
- 루프 종료 조건:
- 더 이상 StructType 필드가 남아 있지 않으면, has_struct 플래그가 False로 설정되어 루프를 종료.
- 최종 결과 반환:
- 모든 중첩된 구조체 필드가 최상위로 평탄화된 DataFrame이 반환.
+) 이 코드는 좀 더 수정이 필요함
def get_json_keys(schema, prefix):
keys=[]
for field in schema.fields:
if isinstance(field.dataType, StructType):
if prefix:
new_prefix=f"{prefix}.{field.name}"
else:
new_prefix=field.name
keys+=get_json_keys(field.dataType, new_prefix)
elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, ArrayType):
if prefix:
new_prefix=f"{prefix}.{field.name}"
else:
new_prefix=field.name
keys+=get_json_keys(field.dataType.elementType, new_prefix)
else:
if prefix:
keys.append(f"{prefix}.{field.name}")
else:
keys.append(field.name)
return keys
3. select_df.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SelectData").getOrCreate()
read_df = spark.read.parquet('/home/oddsummer/data/movdata/parsing')
read_df.createOrReplaceTempView("parsing")
director = spark.sql(f"""
SELECT directorNm, count('moviecCd') as cntmovie
FROM parsing
GROUP BY directorNm
""")
#z.show(director)
director.show()
company = spark.sql(f"""
SELECT companyNm, count('moviecCd') as cntmovie
FROM parsing
GROUP BY companyNm
""")
#z.show(company)
company.show()
spark.stop()
directorNm과 companyNm으로 groupby 하여 회사별, 디렉터별 영화수를 집계하여 분석해보았다.
'Data Engineering > AIRFLOW' 카테고리의 다른 글
[실습] movdata : 영화데이터 수집 프로그램 (0) | 2024.08.20 |
---|---|
[AIRFLOW] movie pipeline (0) | 2024.08.07 |
[AIRFLOW] make_parquet 파이프라인 만들기 실습 (0) | 2024.07.26 |
[AIRFLOW] import_db 파이프라인 만들기 실습 (1) | 2024.07.26 |
[AIRFLOW] simple_bash 파이프라인 만들기 실습 (3) | 2024.07.24 |