일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- kafka
- CDC
- proerty
- 모바일
- 카프카
- 동적 차트
- Java
- query history
- spark
- k9s
- 도커
- 크롤링
- numpartitions
- 쿠버네티스
- docker
- airflow
- Materializations
- DBT
- polars
- mysql
- freshness
- dbt_project
- ksql
- KubernetesPodOperator
- 윈도우
- UI for kafka
- Python
- bar cahrt
- spring boot
- 파이썬
- Today
- Total
목록오픈소스 (100)
데이터 엔지니어 이것저것

목표 : 로컬환경에서 airflow 와 spark를 연동하여 사용 로컬에서 airflow와 spark를 연동해서 사용하기 위해서는 java가 필요하기 떄문에 base 이미지 부터 수정이 필요. DockerfileFROM apache/airflow:2.10.4-python3.12USER rootRUN apt-get update && \ apt-get install -y openjdk-17-jdk && \ apt-get install -y ant && \ apt-get clean;# Set JAVA_HOME environment variableENV JAVA_HOME /usr/lib/jvm/java-17-openjdk-amd64ENV PATH $JAVA_HOME/bin:$PATHUSER ai..

Spark 에서 Query를 날릴때 한번에 많은 양을 가져오면 OOM 이나 시간등을 체크해야하는데설정시, query에 where 절로 나눠서 호출을 한다 라는 내용만 있고 구체적으로 어떻게 동작하는지 확인하기 위함. 테스트를 위해 약 100GB의 데이터 셋을 준비 val dbProperties = new java.util.Properties() dbProperties.setProperty("user", "root") dbProperties.setProperty("password", "mysql") dbProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver") // Specify the JDBC driver dbProperties..

스파크를 테스트 하다보면 Spark UI로 어떻게 동작하는지 확인을 하는데그때마다 계속 Thread.sleep을 주는게 불편했다. 이를 다른 방법이 없나 보는데 히스토리 서버라는게 존재해서 관련 세팅 작업 진행 주의) 윈도우 환경이라 mac등과는 다른점 있음 val spark = SparkSession.builder() .appName("Spark Histroy Server") .config("spark.eventLog.enabled", "true") .master("local[*]") .getOrCreate() 첫번째로 Spark를 실행할때 config에 event enable을 추가하였다. 그리고 spark Dir에서 conf 작업도 필요하다 {Spark di..
spark를 통해 데이터를 구조화된 형식으로 처리할때 import spark.implicits._ 이 코드를 작성하는데 이게 무슨 기능을 하는지, 왜 쓰는지는 별 생각을 안해봤다. implicits는 컴파일러에게 암시적 변환을 사용하여, 특정한 데이터 형식을 다른 형식으로 변환하도록 지시하는 기능 주요 사례 Dataframe, Dataset의 API 편의 기능 스파크 SQL 함수 사용 타입 변환 및 암시적 캐스팅 해당 기능을 사용하면, 코드를 가결, 유연하게 작성하여 가동성 향상
Spark로 추후 db 연결등을 위해 테스트 진행 제일 처음 build.sbt 에 필요 라이브러리 추가 libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.0", "org.apache.spark" %% "spark-sql" % "3.0.0", "org.apache.spark" %% "spark-mllib" % "3.0.0", "org.apache.spark" %% "spark-streaming" % "3.0.0", "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.0", "mysql" % "mysql-connector-java" % "8.0.11", ) db 연결 코드 val dbUrl ..

Spark 에서 기본적으로 파일을 읽기 위한 옵션 및 방법들 가장 기본적인 방법 val df = spark.read .csv("data/netflix-data/movie_titles.csv") df.show(5) +---+----+--------------------+ |_c0| _c1| _c2| +---+----+--------------------+ | 1|2003| Dinosaur Planet| | 2|2004|Isle of Man TT 20...| | 3|1997| Character| | 4|1994|Paula Abdul's Get...| | 5|2004|The Rise and Fall...| +---+----+--------------------+ 헤더값 옵션 추가 ( 샘플 데이터에서는 헤더값이..
CDC 의 설정 방법은 여러가지가 있지만 그중 insert.mode 세팅 관련 insert.mode 에는 2가지 설정값이 있다 1. (default) insert 2. upsert 관련 문서 : https://debezium.io/documentation/reference/stable/connectors/jdbc.html#jdbc-idempotent-writes CDC를 할떄 보통 이기종간의 데이터를 한곳(DW)에서 보기위해 작업을 진행하는데 목적에 맞게 insert.mode를 설정하면 될것같다. 해당 테이블의 히스토리를 보고싶다 : insert 최신 데이터를 보고싶다 : upsert 두 설정값의 차이는 ㅁ upsert "pk.mode": "record_key", "delete.enabled": "tru..