Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Java
- bar cahrt
- dbt_project
- KubernetesPodOperator
- Materializations
- 동적 차트
- CDC
- spring boot
- 크롤링
- numpartitions
- 파이썬
- Python
- freshness
- ksql
- airflow
- 카프카
- 모바일
- polars
- mysql
- spark
- 쿠버네티스
- docker
- UI for kafka
- 도커
- kafka
- query history
- 윈도우
- proerty
- DBT
- k9s
Archives
- Today
- Total
데이터 엔지니어 이것저것
spark db 연결 본문
728x90
Spark로 추후 db 연결등을 위해 테스트 진행
제일 처음 build.sbt 에 필요 라이브러리 추가
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.0.0",
"org.apache.spark" %% "spark-sql" % "3.0.0",
"org.apache.spark" %% "spark-mllib" % "3.0.0",
"org.apache.spark" %% "spark-streaming" % "3.0.0",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.0",
"mysql" % "mysql-connector-java" % "8.0.11",
)
db 연결 코드
val dbUrl = "jdbc:mysql://localhost:3306/spark?useSSL=false&verifyServerCertificate=false"
val dbProperties = new java.util.Properties()
dbProperties.setProperty("user", "user")
dbProperties.setProperty("password", "password")
dbProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
val connection = java.sql.DriverManager.getConnection(dbUrl, dbProperties)
val tableName = "tableName"
val df2 = spark.read.jdbc(dbUrl, tableName, dbProperties)
df2.show()
connection.close()
useSSL 과 verifyServerCertificate 부분은 연결 오류가 발생하여 false 진행
+-------+----+--------------------+
|movieID|year| title|
+-------+----+--------------------+
| 1|2003| Dinosaur Planet|
| 2|2004|Isle of Man TT 20...|
| 3|1997| Character|
| 4|1994|Paula Abdul's Get...|
| 5|2004|The Rise and Fall...|
| 6|1997| Sick|
| 7|1992| 8 Man|
| 8|2004|What the #$*! Do ...|
| 9|1991|Class of Nuke 'Em...|
| 10|2001| Fighter|
| 11|1999|Full Frame: Docum...|
| 12|1947|My Favorite Brunette|
| 13|2003|Lord of the Rings...|
| 14|1982| Nature: Antarctica|
| 15|1988|Neil Diamond: Gre...|
| 16|1996| Screamers|
| 17|2005| 7 Seconds|
| 18|1994| Immortal Beloved|
| 19|2000|By Dawn's Early L...|
| 20|1972| Seeta Aur Geeta|
+-------+----+--------------------+
조회가 잘되는 모습확인
위의 DataSet을 table에 insert 하는 코드
import org.apache.spark.sql.SaveMode
val saveMode = SaveMode.Append
df.write.mode(saveMode).jdbc(url=dbUrl, table="netflix_movie_titles", dbProperties)
728x90
'오픈소스 > Spark' 카테고리의 다른 글
Spark History Server (1) | 2024.05.01 |
---|---|
spark implicits (0) | 2024.04.16 |
Spark 파일 읽기 (0) | 2024.04.10 |
Spark readStream with kafka, flask (0) | 2023.03.15 |
spark streaming socketTextStream (0) | 2023.03.04 |