Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- dbt_project
- proerty
- spring boot
- k9s
- mysql
- 크롤링
- 파이썬
- CDC
- bar cahrt
- Python
- docker
- Materializations
- DBT
- ksql
- kafka
- UI for kafka
- polars
- 도커
- 쿠버네티스
- KubernetesPodOperator
- airflow
- freshness
- 윈도우
- spark
- Java
- 동적 차트
- 모바일
- query history
- 카프카
- numpartitions
Archives
- Today
- Total
데이터 엔지니어 이것저것
dbt sources 본문
728x90
source 를 사용하면 데이터의 이름을 지정하고 설명할 수 있다.
- SQL 에서 {{ soucre() }} 로 데이터 계보를 정의
- 소스 데이터에 대한 가정 테스트
- 소스 테이터가 최신의 데이터인지 확인
model DIR 인에 파일로 작성된다. (예시)
version: 2
sources:
- name: user_table
database: root
schema: source
tables:
- name: users
freshness:
warn_after: {count: 1, period: minute}
error_after:
count: 24
period: day
loaded_at_field: created_at
기본적으로 schema 와 name은 동일하다. 다른경우에만 schema 를 추가하면 된다.
freshness 에 위의 예시처럼 {} 안에 입력하거나 들여쓰기로 입력 가능
source를 사용한 SQL 작성
SELECT id,
user_name
FROM {{ source('user_table', 'users')}}
dbt source freshness
위의 설정한 값을 토대로 created_at 컬럼을 통해 최근 1분 이내에 생성된 데이터가 없다면 warn
관련 결과물은 target/source.json으로 남는다
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",
"dbt_version": "1.7.1",
"generated_at": "2023-11-12T11:42:16.088994Z",
"invocation_id": "863b36f4-ece6-4572-b8c5-68d69254da01",
"env": {}
},
"results": [
{
"unique_id": "source.users.user_table.users",
"max_loaded_at": "2023-11-12T11:32:28.997227+00:00",
"snapshotted_at": "2023-11-12T11:42:16.082537+00:00",
"max_loaded_at_time_ago_in_s": 587.08531,
"status": "warn",
"criteria": {
"warn_after": {
"count": 1,
"period": "minute"
},
"error_after": {
"count": 24,
"period": "day"
},
"filter": null
},
"adapter_response": {
"_message": "SELECT 1",
"code": "SELECT",
"rows_affected": 1
},
"timing": [
{
"name": "compile",
"started_at": "2023-11-12T11:42:16.040782Z",
"completed_at": "2023-11-12T11:42:16.040782Z"
},
{
"name": "execute",
"started_at": "2023-11-12T11:42:16.041781Z",
"completed_at": "2023-11-12T11:42:16.085993Z"
}
],
"thread_id": "Thread-1",
"execution_time": 0.04821372032165527
}
],
"elapsed_time": 0.2371530532836914
}
관련해서 쿼리는 어떻게 날리는지 확인하기 위해 log 파일을 확인해보니
select
max(created_at) as max_loaded_at,
now() as snapshotted_at
from "root"."source"."users"
이렇게 쿼리를 날린다.
즉, 확인을 위한 created_at의 시간과 현재시간을 비교해서 최신 정보 인지 확인한다.
TODO
- freshness 확인하기 위해 filter 기능
관련 문서
https://docs.getdbt.com/reference/resource-properties/freshness
freshness | dbt Developer Hub
.yml'>
docs.getdbt.com
728x90