데이터 엔지니어 이것저것

카프카 프로듀서 본문

오픈소스/kafka

카프카 프로듀서

pastime 2021. 4. 27. 23:48
728x90

프로듀서란?
카프카 프로듀서 API와 그것으로 구성도니 애플리케이션을 말합니다.
프로듀서는 브로커에 특정 토픽(or 파티션)을 지정하여 메세지를 전달
프로듀서를 통해 전달되는 메세지의 구조


프로듀서 전달과정

  1. 직렬화 (Serializer)
  2. 파티셔닝 (Partitioner)
  3. 메시지 배치 (Record Accumulator)
  4. 압축 (Compression)
  5. 전달 (Sender)

 

 

프로듀서 구현

from kafka import KafkaProducer
from json import dumps
import time

str_topic_name = 'Topic1'


producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8'))

start = time.time()

for i in range(2):
    message = f'sample data {i}'
    data = {'str' : message}
    producer.send(str_topic_name, value=data)
    producer.flush()

print("end :", time.time() - start)

consumer 

 

from kafka import KafkaConsumer
from json import loads
from pprint import pprint
# 카프카 서버
bootstrap_servers = ["host.docker.internal:9092"]

# 카프카 토픽
str_topic_name = 'Topic1'

# 카프카 소비자 group1 생성
str_group_name = 'group1'
consumer = KafkaConsumer(str_topic_name, bootstrap_servers=bootstrap_servers,
                         auto_offset_reset='earliest', # 가장 처음부터 소비
                         enable_auto_commit=True,
                         group_id=str_group_name,
                         value_deserializer=lambda x: loads(x.decode('utf-8')),
                         consumer_timeout_ms=60000 # 타임아웃지정(단위:밀리초)
                        )

for message in consumer:
    pprint(message)

 

문서 :
kafka.apache.org/documentation/#gettingStarted

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

KafkaConsumer — kafka-python 2.0.2-dev documentation

  bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that wil

kafka-python.readthedocs.io

 

728x90

'오픈소스 > kafka' 카테고리의 다른 글

브로커와 클러스터  (0) 2022.01.03
ZooKeeper란?  (0) 2022.01.02
카프카 개발 배경  (0) 2021.12.26
python - kafka  (0) 2021.04.04
Apache - Kafka  (0) 2021.04.04