Data Engineering 30

[SPARK] json형식 pyspark 에서 파싱하기

1. JSON 파일 불러오기%spark.pysparkimport osinput_path = os.path.expanduser('~/data/movbotdata/movieList.json')jdf = spark.read.option("multiline", "true").json(input_path)#jdf = spark.read.option("multiline","true").json('/home/odsummer/data/movbotdata/movieList.json') json파일 안에 데이터 형식 단일이 아닌 여러줄로 구성되어 있는 경우, multiline 값을 true로 주는 옵션을 줘야한다.2. JSON 파일 스키마 확인%spark.pysparkjdf.printSchema()root |-- comp..

[실습] movdata : 영화데이터 수집 프로그램

https://github.com/pladata-encore/DE32_101/issues/65 v0.2 : 영화목록API를 호출해서 연도별 영화목록 추출하기이미 다운받은 연도의 영화목록은 SKIP 하도록 설정import requestsimport osimport jsonimport timefrom tqdm import tqdmAPI_KEY = os.getenv('MOVIE_API_KEY')def save_json(data, file_path): # 파일저장 경로 MKDIR os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(..

TRG 팀 프로젝트

1. 프로젝트 주제영화 박스오피스 데이터 수집/처리/보관 및 활용 2. 프로젝트 내용영화 박스오피스 데이터 수집/처리/보관 및 활용에 대하여각각 단계에 대하여 파이썬 프로그램을 package(PIP설치) 단위로 개발개발 package 를 airflow 적용 및 운영3. Repo 운영 및 Branch 전략# 각 REPO 별 3개의 release 생성, 매일 퇴근전 release -> main 머지 후 릴리즈노트 작성 제출$ git branch release/d1.0.0$ git branch release/d2.0.0$ git branch release/d3.0.0# 개발 중 AIRFLOW 연결 branch 생성 및 PR(release# 기능 개발이 조금씩 완료 되면 commit + push 하고 dev/d..

[SPARK] 분산 처리 시스템, 클러스터

1. 스파크 어플리케이스파크는 빅데이터 처리를 위한 오픈소스 분산 처리 시스템이다.스파크는 크게보면 스파크 어플리케이션과 클러스터 매니저로 구성된다. 스파크 어플리케이션은 Spark Driver (Driver Process)와 Executor로 구성된다.1개의 스파크 어플리케이션에는 1개의 Spark Driver와 N개의 Executor로 구성된다. - Spark Driver (Master Node)한개의 노드에서 실행되며 main함수 실행.사용자 프로그램을 task 단위로 변환하여 Executor로 전달. - Executor (Worker Node)다수의 worker 노드에서 실행되는 프로세스.Spark Driver가 할당한 task를 수행하여 결과를 반환. 스파크 어플리케이션 실행과정 1. 사용자가 ..

[SPARK] spark-submit option (total-executor-cores , num-executors)

$ $SPARK_HOME\spark-submit--master spark://spark-master:7077 \--total-executor-cores --num-executors \--executor-cores \--executor-memory \app.py 1) total-executor-cores  Spark 클러스터 전체에서 사용할 수 있는 총 코어 수를 지정하는 옵션.전체 애플리케이션에 할당된 코어의 수를 제한하여, 클러스터 내의 리소스 사용을 조절한다. 코어 수의 제한으로 인해 생성되는 실행기의 수는 자동으로 결정된다. $SPARK_HOME/bin/spark-submit --master spark://172.31.45.31:7077 \ --total-executor-cores 8 --e..

Spark 프로그램 Airflow에서 돌리기

요구사항 repartitioncode 를 참조하여 날짜 별로 진행(virtualenv operator + PDM 으로 만든 패키지)/home//data/movie/repartition/$ git clone https://github.com/samdulshopmovie/movie_data.git$ cd movie_data/data/extract$ cp -rf load_dt=20150101 ~/tmp/sparkdatadf = pd.read_parquet('~/tmp/sparkdata')df.to_parquet('~/tmp/sparkpartition', partition_cols=['load_dt','multiMovieYn', 'repNationCd'])join_dfspark sql 을 활용(어제 작성한 제플..

spark, 분산처리, 배치처리 개념 / spark, zeppline, java 설치

1. spark / zeppelinApache Spark는 대규모 데이터 처리 및 분석을 위해 설계된 오픈소스 분산 처리 시스템. Spark는 고속의 대규모 데이터 처리를 지원하며, 메모리 내 연산을 통해 하둡(Hadoop)보다 훨씬 빠른 속도로 작업을 수행할 수 있다.  hadoop vs spark1. 데이터 처리 방식Hadoop은 주로 배치 처리에 사용되며, 데이터 처리에 MapReduce 프레임워크를 사용한다. 이 과정에서 데이터는 디스크에 저장되었다가 다시 읽히고, 이를 반복하며 처리된다. 반면, Spark는 데이터 처리를 주로 메모리 내에서 수행하여 디스크 I/O를 최소화하기 때문에 Spark가 Hadoop보다 훨씬 빠른 처리 속도 보여준다. 2. 유연성Hadoop은 주로 배치 처리에 최적화되어..

ngrinder, nginix

1. ngrinder & nginixngrinder : 부하 테스트와 성능 테스트를 위한 툴.  nGrinder는 주로 시스템이 일정 수준의 트래픽을 견딜 수 있는지, 병목 현상이 발생하지 않는지를 확인하기 위해 사용된다. 이를 통해 성능 최적화가 필요한 부분을 미리 파악하고, 실제 운영 환경에서 발생할 수 있는 문제를 사전에 방지가능하다. 특히, Nginx와 같은 로드 밸런서와 함께 사용하면, 다양한 부하 조건에서의 성능을 테스트하여 시스템의 안정성을 확인할 수 있다. 웹서버 nginx : LB 역할하는 proxy 서버  + 프록시: 클라이언트의 요청을 받아 실제 서버로 전달하는 역할. nginx는 리버스 프록시로 사용되는데, 이는 클라이언트와 여러 백엔드 서버 사이에 위치하여 클라이언트의 요청을 적절..