Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- proerty
- CDC
- mysql
- KubernetesPodOperator
- 카프카
- polars
- Materializations
- Python
- bar cahrt
- dbt_project
- 파이썬
- query history
- spring boot
- k9s
- 모바일
- freshness
- 도커
- kafka
- ksql
- DBT
- docker
- Java
- 동적 차트
- numpartitions
- airflow
- UI for kafka
- 크롤링
- 쿠버네티스
- spark
- 윈도우
Archives
- Today
- Total
데이터 엔지니어 이것저것
카프카 프로듀서 본문
728x90
프로듀서란?
카프카 프로듀서 API와 그것으로 구성도니 애플리케이션을 말합니다.
프로듀서는 브로커에 특정 토픽(or 파티션)을 지정하여 메세지를 전달
프로듀서를 통해 전달되는 메세지의 구조
프로듀서 전달과정
- 직렬화 (Serializer)
- 파티셔닝 (Partitioner)
- 메시지 배치 (Record Accumulator)
- 압축 (Compression)
- 전달 (Sender)
프로듀서 구현
from kafka import KafkaProducer
from json import dumps
import time
str_topic_name = 'Topic1'
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
start = time.time()
for i in range(2):
message = f'sample data {i}'
data = {'str' : message}
producer.send(str_topic_name, value=data)
producer.flush()
print("end :", time.time() - start)
consumer
from kafka import KafkaConsumer
from json import loads
from pprint import pprint
# 카프카 서버
bootstrap_servers = ["host.docker.internal:9092"]
# 카프카 토픽
str_topic_name = 'Topic1'
# 카프카 소비자 group1 생성
str_group_name = 'group1'
consumer = KafkaConsumer(str_topic_name, bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', # 가장 처음부터 소비
enable_auto_commit=True,
group_id=str_group_name,
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=60000 # 타임아웃지정(단위:밀리초)
)
for message in consumer:
pprint(message)
문서 :
kafka.apache.org/documentation/#gettingStarted
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
KafkaConsumer — kafka-python 2.0.2-dev documentation
bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that wil
kafka-python.readthedocs.io
728x90
'오픈소스 > kafka' 카테고리의 다른 글
브로커와 클러스터 (0) | 2022.01.03 |
---|---|
ZooKeeper란? (0) | 2022.01.02 |
카프카 개발 배경 (0) | 2021.12.26 |
python - kafka (0) | 2021.04.04 |
Apache - Kafka (0) | 2021.04.04 |