일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 도커
- k9s
- numpartitions
- mysql
- kafka
- dbt_project
- Python
- Java
- 파이썬
- ksql
- query history
- DBT
- proerty
- 크롤링
- KubernetesPodOperator
- 카프카
- spark
- polars
- 쿠버네티스
- airflow
- spring boot
- docker
- UI for kafka
- 윈도우
- 모바일
- Materializations
- bar cahrt
- 동적 차트
- CDC
- freshness
- Today
- Total
목록오픈소스/Spark (16)
데이터 엔지니어 이것저것

Spark 에서 Query를 날릴때 한번에 많은 양을 가져오면 OOM 이나 시간등을 체크해야하는데설정시, query에 where 절로 나눠서 호출을 한다 라는 내용만 있고 구체적으로 어떻게 동작하는지 확인하기 위함. 테스트를 위해 약 100GB의 데이터 셋을 준비 val dbProperties = new java.util.Properties() dbProperties.setProperty("user", "root") dbProperties.setProperty("password", "mysql") dbProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver") // Specify the JDBC driver dbProperties..

스파크를 테스트 하다보면 Spark UI로 어떻게 동작하는지 확인을 하는데그때마다 계속 Thread.sleep을 주는게 불편했다. 이를 다른 방법이 없나 보는데 히스토리 서버라는게 존재해서 관련 세팅 작업 진행 주의) 윈도우 환경이라 mac등과는 다른점 있음 val spark = SparkSession.builder() .appName("Spark Histroy Server") .config("spark.eventLog.enabled", "true") .master("local[*]") .getOrCreate() 첫번째로 Spark를 실행할때 config에 event enable을 추가하였다. 그리고 spark Dir에서 conf 작업도 필요하다 {Spark di..
spark를 통해 데이터를 구조화된 형식으로 처리할때 import spark.implicits._ 이 코드를 작성하는데 이게 무슨 기능을 하는지, 왜 쓰는지는 별 생각을 안해봤다. implicits는 컴파일러에게 암시적 변환을 사용하여, 특정한 데이터 형식을 다른 형식으로 변환하도록 지시하는 기능 주요 사례 Dataframe, Dataset의 API 편의 기능 스파크 SQL 함수 사용 타입 변환 및 암시적 캐스팅 해당 기능을 사용하면, 코드를 가결, 유연하게 작성하여 가동성 향상
Spark로 추후 db 연결등을 위해 테스트 진행 제일 처음 build.sbt 에 필요 라이브러리 추가 libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.0", "org.apache.spark" %% "spark-sql" % "3.0.0", "org.apache.spark" %% "spark-mllib" % "3.0.0", "org.apache.spark" %% "spark-streaming" % "3.0.0", "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.0", "mysql" % "mysql-connector-java" % "8.0.11", ) db 연결 코드 val dbUrl ..

Spark 에서 기본적으로 파일을 읽기 위한 옵션 및 방법들 가장 기본적인 방법 val df = spark.read .csv("data/netflix-data/movie_titles.csv") df.show(5) +---+----+--------------------+ |_c0| _c1| _c2| +---+----+--------------------+ | 1|2003| Dinosaur Planet| | 2|2004|Isle of Man TT 20...| | 3|1997| Character| | 4|1994|Paula Abdul's Get...| | 5|2004|The Rise and Fall...| +---+----+--------------------+ 헤더값 옵션 추가 ( 샘플 데이터에서는 헤더값이..

Flask에서 로그를 발생하여, 이를 Kafka 서버로 발송 (Producer) 스파크는 카프카에서 데이터를 가져가는 로직( Consumer) 카프카 서버 docker-compose version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:7.0.0 hostname: kafka container_name: kafka depends_o..

spark streaming을 이용하여 실시간 로그 파일 읽는 코드를 작성해보았다. import time import socket URL = 'localhost' PORT = 9999 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((URL, PORT)) sock.listen(1) conn, addr = sock.accept() with open('./project.log') as infile: for line in infile.readlines(): print('sending line: ', line) conn.send(line.encode()) time.sleep(0.2) conn.close() sock.close() 파이썬 소켓..
Spark Streaming은 분산처리를 위한 Apache Spark의 라이브러리 중 하나 이는 대규모 실시간 스트리밍 데이터를 처리하기 위한 높은 처리량과 낮은 지연 시간을 제공합니다. Spark Streaming은 데이터를 작은 배치 단위로 분할하고, 이를 스트림으로 처리하는데, 이 스트림은 다른 스트림이나 외부 데이터 소스와 결합될 수 있다. eg)Spark Streaming은 Kafka나 Flume 같은 메시지 큐, HDFS(Hadoop Distributed File System), Amazon S3 등 다양한 데이터 소스에서 스트림 데이터를 읽을 수 있다. Spark Streaming은 각 배치에 대해 Spark의 일관된 API를 사용하여 데이터를 처리하고, 결과를 저장하거나 외부 시스템으로 출..