일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- kafka
- 동적 차트
- 파이썬
- 모바일
- proerty
- 카프카
- Python
- query history
- Materializations
- k9s
- polars
- spring boot
- 윈도우
- bar cahrt
- freshness
- numpartitions
- mysql
- docker
- UI for kafka
- 도커
- DBT
- CDC
- Java
- airflow
- 쿠버네티스
- 크롤링
- KubernetesPodOperator
- spark
- ksql
- dbt_project
- Today
- Total
목록오픈소스 (100)
데이터 엔지니어 이것저것

dbt snapshot이란 데이터를 분석하다보면 과거(이전)데이터를 봐야하는 경우가 많다 (히스토리테이블처럼) snapshot DIR에 해당 쿼리를 추가한다 {% snapshot user_snapshot %} {{ config( target_database='root', target_schema='source', unique_key='id', strategy='timestamp', updated_at='updated_at', ) }} SELECT * FROM {{ source('user_table', 'user_info')}} {% endsnapshot %} 이러한 정보를 가진 user_info 테이블로 테스틀 해보면 dbt snapshot 새로 테이블이 생성된것을 확인할수 있다 추가된 컬럼의 정보 : ..

source 를 사용하면 데이터의 이름을 지정하고 설명할 수 있다. SQL 에서 {{ soucre() }} 로 데이터 계보를 정의 소스 데이터에 대한 가정 테스트 소스 테이터가 최신의 데이터인지 확인 model DIR 인에 파일로 작성된다. (예시) version: 2 sources: - name: user_table database: root schema: source tables: - name: users freshness: warn_after: {count: 1, period: minute} error_after: count: 24 period: day loaded_at_field: created_at 기본적으로 schema 와 name은 동일하다. 다른경우에만 schema 를 추가하면 된다. fr..

dbt seed란? 데이터 웨어하우스에 로드할 수 있는 csv 파일 자주 변경되지 않은 정적 데이터 처리에 유용 왜 사용을 할까? 각 데이터간의 매핑을 위해 특정 데이터를 제외하기 위해서 자주 변경되지 않은 소규모 참조 데이터 세트를 DB에 저장하는것보다 시드로 로드하는게 효율적 데트스 데이터 업로드 유효성 검사 사용법 seeds 라는 DIR에 csv 파일을 생성한뒤 dbt seed 만약 특정 파일만 업로드 하고 싶다면 dbt seed --select product_codes 이후 DB를 확인해보면 seeds에 파일명으로 테이블이 생성이 되어있다 그 다음 models DIR에 SQL을 작성해준다 ref 함수를 이용하여 가져온다 SELECT * FROM {{ ref('country_codes') }} 실행..

dbt를 postgresql과 연결하여 시작을 해보려한다 pip install dbt-postgres 설치 이후 프로젝트 생성 dbt init users 이후 db 접근 설정들을 입력하라고 하는데 일단 설정을 모두 해본다 cd users dbt debug db 접근 정보들이 어디에 저장되어있는지 확인하던 도중 설정 정보값들이 HOME DIR에 저장되어 있어서 이것을 바꾸려 한다 dbt run --help 별도로 설정을 하지 않으면 HOME dir를 검색한다고 한다 .env Dir에 profiles.yml에 db 접근 정보를 입력하고 테스트 해보자 users: outputs: dev: dbname: root host: localhost pass: password port: 5432 schema: sourc..
dbt (Data Build Tool) ETL 과정에서 변환 과정을 도와주는 툴 분석 코드를 모듈화하고 중앙 집중화 쿼리의 버전을 지정하고 테스트 및 문서화 특징 모델 기반 접근 데이터 모델 기반으로 접근을 하여, 각 모델간 의존성을 관리 가능 테스트 및 문서화 모데을 테스트 하고 문서화 하는 기능을 제공 데이터 파이프라인 데이터 파이프라인을 자동으로 생성하고 실행 버전 관리 왜 써야하나 SQL쿼리를 사용하여 데이터 모델링을 수행 모델 기반 접근을 통해 모델간 의존성 및 재사용성 테스트 및 문서화 확장성

사용에 앞서 필요한 lib pip install ksql from ksql import KSQLAPI from pprint import pprint client = KSQLAPI('http://127.0.0.1:8088', timeout=None) result = client.ksql('show topics;') pprint(result) 결과물 data = client.query("""SELECT * FROM `ksqlstudy`;""", use_http2=True) for i in data: print(i)

KSQL에서 스키마 레지스트리를 연결하기 위해 옵션 추가 KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' 토픽 리스트 출력 스트림 만들기 명령어 CREATE OR REPLACE STREAM `ksqlstudy` WITH (KAFKA_TOPIC = 'kafka.topic.test', VALUE_FORMAT = 'AVRO'); 스트림 리스트 출력 스트림 출력 select * from `ksqlstudy`; 추가적으로 소문자의 경우 ``로 감싸줘야한다.

CDC를 하다보면 개인정보 또는 불필요한 컬럼을 이관하지 않아야 하는경우가 있다. 그럴경우 source에 추가를 하면 되는데 { "name":"inventory-connector", "config":{ "connector.class":"io.debezium.connector.mysql.MySqlConnector", "tasks.max":"1", "database.hostname":"mysql", "database.port":"3306", "database.user":"root", "database.password":"debezium", "database.server.name":"test", "database.include.list":"inventory", "database.history.kafka.boo..