Data Engineering/AIRFLOW 9

[실습] movdata : 영화데이터 수집 프로그램

https://github.com/pladata-encore/DE32_101/issues/65 v0.2 : 영화목록API를 호출해서 연도별 영화목록 추출하기이미 다운받은 연도의 영화목록은 SKIP 하도록 설정import requestsimport osimport jsonimport timefrom tqdm import tqdmAPI_KEY = os.getenv('MOVIE_API_KEY')def save_json(data, file_path): # 파일저장 경로 MKDIR os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(..

[AIRFLOW] movie pipeline

파이프라인 흐름1. start 2. branch.optmp/test_parquet 하위 경로에 파일 확인- 존재하면 rm.dir task 실행- 존재하지 않으면 get.start와 echo.task 실행 3. get.start (trigger_rule = all_done) 4. get.data (multiMovieYn, repNationCd)4-1. multi.y : 독립영화4-2. multi.n : 상업영화4-3. nation.k : 한국영화4-4. nation.f : 외국영화 5. get.end 6. save.data7. end1. branch.op branch_op = BranchPythonOperator( task_id="branch.op", pytho..

[AIRFLOW] make_parquet 파이프라인 만들기 실습

make_parquet 파이프라인에서는 1~5 단계의 task가 존재한다.  1. start2. check.done3. to.parquet4. make.done5. end 1. start : DAG의 시작을 나타내는 taskEmptyOperator을 생성하는 get_emp 함수를 정의. id와 rule 두개의 매개변수를 받으며, rule의 기본값은 all_success 이다.def gen_emp(id, rule="all_success"): op = EmptyOperator(task_id=id, trigger_rule=rule) return optask_start = gen_emp('start') #태스크의 ID를 'start'로 설정 함수를 분리한 이유는, 여러 task를 생성해야 할 때 동일..

[AIRFLOW] import_db 파이프라인 만들기 실습

import_db 파이프라인에서는 1~8 단계의 task가 존재한다.  1. start2. check.done 에러가 발생 시, error report 단계를 거쳐 end 단계로 이동.3. to.csv4. create.table5. to.tmp6. to.base7. make.done8. end이번 파이프라인은 생성된 count.log를 csv로 변환하고 이를 DB로 옮기는 과정으로 프로세스가 좀 더 복잡하다. 따라서 variable를 선언해서 경로를 지정해 주거나, 코드를 새로운 파일로 분리해서 호출하는 등의 과정을 거친다. 선언한 variable과 공통적으로 쓰이는 코드들을 따로 정리해두려고 한다.  0. Preset1) VariablesCHECK_SH/home/oddsummer/airflow/dags..

[AIRFLOW] simple_bash 파이프라인 만들기 실습

DAG이 실제로 작업을 수행하기 위해서는 태스크 정의를 해야한다. 하나의 테스크들이 모여 하나의 DAG을 구성하고, DAG들이 모여 workflow가 이루어진다.  Airflow 태스크의 특징은 항상 멱등성을 가지고 있다는 점이다. Airflow에서는 사용자가 수행작업을 clear 하고 태스크를 다시 실행하는 경우가 많은데, 동일한 태스크를 여러번 실행해도 결과는 동일해야한다. 다시 돌아와서, simple_bash 파이프라인에서는 1~8 단계의 task가 존재한다.  1. start2. print_date3. copy.log  에러가 발생시, error report 단계를 거쳐 end 단계로 이동.4. cut.log5. sort.log6. count.log7. make.done8. end 1. start..

[AIRFLOW] DAG 코드 기본 구조 (Operator, trigger rule)

1. AIR FLOW DAG 코드 기본 구조(1) Import Statements : 필요한 모듈 importfrom airflow import DAG #Airflow에서 DAG(Directed Acyclic Graph)를 정의하기 위해 사용되는 클래스from airflow.operators.bash import BashOperator #Bash 명령을 실행하기 위한 Operatorfrom airflow.operators.empty import EmptyOperator #아무 작업도 수행하지 않는, 주로 흐름 제어에 사용되는 Operatorfrom datetime import datetime, timedelta #날짜와 시간을 다루기 위한 Python의 내장 모듈 (2) DAG 선언 및 Default A..

[AIRFLOW] jinja template

Jinja template문서(파일)에서 특정 양식으로 작성된 값을 런타임 시 실제 값으로 치환해주는 처리 엔진템플릿 엔진은 여러 솔루션이 존재하며 그 중 Jinja 템플릿은 파이썬 언어에서 사용하는 엔진오퍼레이터 파라미터 입력하는 곳에 중괄호 '{ }' 를 2개 이용하면 Airflow에서 기본적으로 제공하는 변수 (ex: 수행 날짜, DAG_ID) 들을 치환된 값으로 입력할 수 있다https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html Templates reference — Airflow Documentation airflow.apache.org 1) 날짜 형식에 대한 여러 유형의 축약 매개변수 task_date = Bas..

[AIRFLOW] Airflow 기본개념, 설치 및 실행

AIRFLOW?Apache Airflow는 데이터 파이프라인을 작성, 스케줄링 및 모니터링하기 위한 플랫폼이다. 복잡한 데이터 처리 작업을 워크플로우로 정의하고 관리할 수 있도록 도와준다. 주요 기능은 다음과 같다워크플로우 정의: 워크플로우를 DAG(Directed Acyclic Graph) 형식으로 정의. DAG는 작업(task)들의 모음이며, 각 작업의 의존 관계를 나타낸다.스케줄링: DAG 및 각 작업을 특정 시간 또는 주기로 실행하도록 스케줄링할 수 있다.모니터링 및 관리: 웹 인터페이스를 통해 실행 중인 워크플로우를 모니터링하고, 로그를 확인하며, 작업의 상태를 파악할 수 있다.확장성: 분산 실행을 지원하여 많은 양의 작업을 처리할 수 있다.유연성: Python 코드로 워크플로우를 정의하기 때문..