[Kafka] 2. kafka-python 으로 kafka producer, consumer 만들기

2023. 1. 12. 12:01

 

 

2023.01.03 - [Develop/Kafka] - [Kafka] 1. kafka란 무엇인가?

 

[Kafka] 1. kafka란 무엇인가?

https://kafka.apache.org/ Apache Kafka Apache Kafka: A Distributed Streaming Platform. kafka.apache.org 이번에 학부연구생을 시작하면서 Kafka를 사용하게 되었다. 정말 처음 들어보는 것이라서 약간 걱정은 되지만, 포기

mobuk.tistory.com

지난시간에 Kafka에 대한 기본적인 개념을 공부했다. 이번에는 Kafka의 동작을 살펴보기 위해 간단하게 producer와 consumer를 만들어 보려고 한다. 

이번에는 아래의 영상을 참고하여 코드를 만들었다. 

https://youtu.be/LHNtL4zDBuk?t=330 

 

1. 환경 세팅하기

일단, docker로 kafka 환경을 구축하였다. 

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME : localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

 

 

그리고 python을 이용해 만들 예정이기 때문에 kafka를 설치해준다.

pip install kafka-python

 

2. kafka producer 만들기

일단 간단하게 topic을 지정해서 kafka에 data를 전송하는 코드를 만들어 보겠다.

producer.py

import json
from kafka import KafkaProducer

KAFKA_TOPIC = "data1"
producer = KafkaProducer(bootstrap_servers='localhost:9092')

i = 0

while(True):
  text = input("text를 입력해주세요 >> ")
  data = {
    "id": i,
    "data": text,
  }
  i = i + 1
  producer.send(KAFKA_TOPIC, 
    json.dumps(data).encode("utf-8"))
    
  print(f"Done Sending..{i}")

전송은 json 형식으로 했다.  그닥 어려운 형식의 코드가 아니기에 자세한 설명은 생략하고 간략하게만 말하겠다. json 형식으로 data를 만든 뒤, producer.send 로 kafka 에 보냈다. 

 

3. kafka consumer 만들기 

consumer.py

import json
from kafka import KafkaProducer

consumer = KafkaConsumer(
  KAFKA_TOPIC,
  bootstrap_servers="127.0.0.1:9092"
)

print("start listening")

for message in consumer:
    print("get data")
    consumed_message = json.loads(message.value.decode())

    ex_id = consumed_message["id"]
    ex_num = consumed_message["num"]

    print(f"{ex_id} : {ex_num}")

consumer.py을 실행시킨 뒤 producer.py를 실행시켜서 kafka로 데이터를 보내면 consumer.py  에 있는 kafka consumer가 데이터를 받아서 출력하는 모습을 보여준다. json 형식으로 데이터가 전송되었기 때문에 간단하게 key 값을 이용해 value를 찾을 수 있다. 

 

'Develop > Kafka' 카테고리의 다른 글

[Kafka] 1. kafka란 무엇인가?  (0) 2023.01.03

BELATED ARTICLES

more