데이터 엔지니어 이것저것

dbt sources 본문

오픈소스/dbt

dbt sources

pastime 2023. 11. 12. 20:55
728x90
source 를 사용하면 데이터의 이름을 지정하고 설명할 수 있다.

 

  • SQL 에서 {{ soucre() }} 로 데이터 계보를 정의
  • 소스 데이터에 대한 가정 테스트
  • 소스 테이터가 최신의 데이터인지 확인

model DIR 인에 파일로 작성된다. (예시)

version: 2

sources:
  - name: user_table
    database: root
    schema: source
    tables:
      - name: users
        freshness:
          warn_after: {count: 1, period: minute}
          error_after:
            count: 24
            period: day
        loaded_at_field: created_at

 

기본적으로 schema 와 name은 동일하다. 다른경우에만 schema 를 추가하면 된다.

 

freshness 에 위의 예시처럼 {} 안에 입력하거나 들여쓰기로 입력 가능

 

 

source를 사용한 SQL 작성

 

SELECT id,
       user_name

FROM {{ source('user_table', 'users')}}

 

dbt source freshness

 

 

위의 설정한 값을 토대로 created_at 컬럼을 통해 최근 1분 이내에 생성된 데이터가 없다면 warn

 

관련 결과물은 target/source.json으로 남는다

 

{
  "metadata": {
    "dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",
    "dbt_version": "1.7.1",
    "generated_at": "2023-11-12T11:42:16.088994Z",
    "invocation_id": "863b36f4-ece6-4572-b8c5-68d69254da01",
    "env": {}
  },
  "results": [
    {
      "unique_id": "source.users.user_table.users",
      "max_loaded_at": "2023-11-12T11:32:28.997227+00:00",
      "snapshotted_at": "2023-11-12T11:42:16.082537+00:00",
      "max_loaded_at_time_ago_in_s": 587.08531,
      "status": "warn",
      "criteria": {
        "warn_after": {
          "count": 1,
          "period": "minute"
        },
        "error_after": {
          "count": 24,
          "period": "day"
        },
        "filter": null
      },
      "adapter_response": {
        "_message": "SELECT 1",
        "code": "SELECT",
        "rows_affected": 1
      },
      "timing": [
        {
          "name": "compile",
          "started_at": "2023-11-12T11:42:16.040782Z",
          "completed_at": "2023-11-12T11:42:16.040782Z"
        },
        {
          "name": "execute",
          "started_at": "2023-11-12T11:42:16.041781Z",
          "completed_at": "2023-11-12T11:42:16.085993Z"
        }
      ],
      "thread_id": "Thread-1",
      "execution_time": 0.04821372032165527
    }
  ],
  "elapsed_time": 0.2371530532836914
}

 

관련해서 쿼리는 어떻게 날리는지 확인하기 위해 log 파일을 확인해보니

 

select
  max(created_at) as max_loaded_at,
  now() as snapshotted_at
from "root"."source"."users"

이렇게 쿼리를 날린다.

 

 

즉, 확인을 위한 created_at의 시간과 현재시간을 비교해서 최신 정보 인지 확인한다.

 

 

TODO

 - freshness 확인하기 위해 filter 기능

 

 

 

 

 

 

 

 

관련 문서

https://docs.getdbt.com/reference/resource-properties/freshness

 

freshness | dbt Developer Hub

.yml'>

docs.getdbt.com

 

 

728x90

'오픈소스 > dbt' 카테고리의 다른 글

dbt test  (0) 2023.11.14
dbt snapshot  (0) 2023.11.13
dbt seeds  (0) 2023.11.12
dbt 시작하기  (0) 2023.11.11
dbt란?  (0) 2023.11.10