오픈소스/Spark

Spark readStream with kafka, flask

pastime 2023. 3. 15. 00:17
728x90

Flask에서 로그를 발생하여, 이를 Kafka 서버로 발송 (Producer)

스파크는 카프카에서 데이터를 가져가는 로직( Consumer)

 

카프카 서버 docker-compose

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka-data:/var/lib/kafka/data

 

 

flask 서버 코드

from flask import Flask
import random
import time
import logging
from confluent_kafka import Producer

# log_levels = ['INFO', 'WARNING', 'ERROR', 'DEBUG']
log_levels = [20, 30, 40, 10]
log_messages = ['Application started', 'Processing data', 'Invalid input', 'An error occurred']

conf = {
    'bootstrap.servers': '127.0.0.1:9092',
    'client.id': 'flask-producer'
}

logging.basicConfig(level=logging.DEBUG)
producer = Producer(conf)

app = Flask(__name__)

@app.route('/')
def generate_log():
    while True:
        log_level = random.choice(log_levels)
        log_message = random.choice(log_messages)
        current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        msg = f"{current_time} [{log_level}] {log_message}"
        print(msg)
        producer.produce('logs', msg.encode('utf-8'))
        producer.flush()
        time.sleep(0.1)

        # return f'{current_time} [{log_level}] {log_message}'

if __name__ == '__main__':
    app.run(port=5000, debug=True)

 

 

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.sql.functions._


object spark_streaming_test {

  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create SparkSession
    val spark = SparkSession.builder()
      .appName("StreamReaderExample")
      .master("local[*]")
      .getOrCreate()

    val bootstrapServers = "127.0.0.1:9092"
    val topic = "logs"

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "latest")
      .load()

    val query = df
      .withWatermark("timestamp", "1 seconds")  // watermark 적용
      .select(
        split(col("value"), " ").getItem(0).as("id"),
        split(col("value"), " ").getItem(1).as("msg")
        , col("timestamp")
      )
      .groupBy(window(col("timestamp"), "1 second"), col("id"), col("msg"))
      .count()
      .writeStream
      .outputMode("append")
      .format("console")
      .start()


    query.awaitTermination()
  }
}

 

좌측은 log / 우측은 이를 spark
group by

 

728x90