[Kafka] 2. kafka-python 으로 kafka producer, consumer 만들기
2023. 1. 12. 12:01
2023.01.03 - [Develop/Kafka] - [Kafka] 1. kafka란 무엇인가?
지난시간에 Kafka에 대한 기본적인 개념을 공부했다. 이번에는 Kafka의 동작을 살펴보기 위해 간단하게 producer와 consumer를 만들어 보려고 한다.
이번에는 아래의 영상을 참고하여 코드를 만들었다.
https://youtu.be/LHNtL4zDBuk?t=330
1. 환경 세팅하기
일단, docker로 kafka 환경을 구축하였다.
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME : localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
그리고 python을 이용해 만들 예정이기 때문에 kafka를 설치해준다.
pip install kafka-python
2. kafka producer 만들기
일단 간단하게 topic을 지정해서 kafka에 data를 전송하는 코드를 만들어 보겠다.
producer.py
import json
from kafka import KafkaProducer
KAFKA_TOPIC = "data1"
producer = KafkaProducer(bootstrap_servers='localhost:9092')
i = 0
while(True):
text = input("text를 입력해주세요 >> ")
data = {
"id": i,
"data": text,
}
i = i + 1
producer.send(KAFKA_TOPIC,
json.dumps(data).encode("utf-8"))
print(f"Done Sending..{i}")
전송은 json 형식으로 했다. 그닥 어려운 형식의 코드가 아니기에 자세한 설명은 생략하고 간략하게만 말하겠다. json 형식으로 data를 만든 뒤, producer.send 로 kafka 에 보냈다.
3. kafka consumer 만들기
consumer.py
import json
from kafka import KafkaProducer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers="127.0.0.1:9092"
)
print("start listening")
for message in consumer:
print("get data")
consumed_message = json.loads(message.value.decode())
ex_id = consumed_message["id"]
ex_num = consumed_message["num"]
print(f"{ex_id} : {ex_num}")
consumer.py을 실행시킨 뒤 producer.py를 실행시켜서 kafka로 데이터를 보내면 consumer.py 에 있는 kafka consumer가 데이터를 받아서 출력하는 모습을 보여준다. json 형식으로 데이터가 전송되었기 때문에 간단하게 key 값을 이용해 value를 찾을 수 있다.
'Develop > Kafka' 카테고리의 다른 글
[Kafka] 1. kafka란 무엇인가? (0) | 2023.01.03 |
---|