Kafka Producer Consumer
In the earlier blog article we created a strimzi kafka cluster on kubernetes. We are going to write kafka producers and consumer on this part of the blog. All code used in this blog has been shared on github. Producers send messages to kafka topics and consumers consume messages from the kafka topic. For the sake of ease we have used python for writing kafka producer and consumers. kafka producer uses car_id as the partition key. Since the message belonging to the same key will always land on the same partition. We can process the data from particular partition based on the partiton key.
Creating a kafka producer
The complete code can be found on Github
from confluent_kafka import Producer
import random
import time
from uuid import uuid4
import json
locdata_car0 = """ {"lat":"60.30","lang":"60.30","speed":"30"}"""
locdata_car1 = """ {"lat":"80.40","lang":"60.50","speed":"40"}"""
locdata_car2 = """ {"lat":"80.30","lang":"60.20","speed":"80"}"""
locdata_car0_json = locdata_car0.encode()
locdata_car1_json = locdata_car1.encode()
locdata_car2_json = locdata_car2.encode()
def delivery_report(err,msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} partition [{}] offset {}'.format(msg.topic(), msg.partition(),msg.offset()))
conf = {
'bootstrap.servers':'kafka:9094',
'client.id':'python-producer-01'
}
producer = Producer(conf)
topic = 'my-topic'
#producer.poll(0)
car_ids = ['car-1','car-2','car-3','car-4','car-5','car-6','car-7']
while True:
for car_id in car_ids:
print(car_id)
partition_key = hash(car_id)
producer.produce(topic,key=car_id.encode('utf-8'),value=locdata_car0_json,callback=delivery_report)
producer.flush()
time.sleep(1)
Creating the kafka consumer
The complete code can be found on Github
from confluent_kafka import Consumer, KafkaError
from confluent_kafka import OFFSET_BEGINNING
def on_assign(consumer, partitions):
""" Callback for when partition assignments change """
for partition in partitions:
partition.offset = OFFSET_BEGINNING
def on_message(message):
""" Callback for when a message is received """
if message.error() is not None:
if message.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition event for {} [{}] at offset {}".format(
message.topic(), message.partition(), message.offset()))
else:
print("Error in receiving message: {}".format(message.error()))
else:
print("Received message from {} [{}] at offset {}: key={}, value={}".format(
message.topic(), message.partition(), message.offset(), message.key(), message.value()))
# Set up the Kafka consumer configuration
conf = {
'bootstrap.servers': 'k8s-kafka-cluster-kafka-bootstrap.kafka:9092', # Replace with the address of your Kafka broker(s)
'group.id': 'python-consumer-group',
'auto.offset.reset': 'earliest'
}
# Create the Kafka consumer instance
consumer = Consumer(conf)
# Subscribe to the desired topic
consumer.subscribe(['my-topic'], on_assign=on_assign)
# Continuously poll for new messages
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
print("Reached end of partition event for {} [{}] at offset {}".format(
msg.topic(), msg.partition(), msg.offset()))
else:
print("Error in receiving message: {}".format(msg.error()))
else:
on_message(msg)
Dockerizing the kafka producer and consumer
# Use an official Python runtime as the base image
FROM python:3.9.7-slim
# Set the working directory
WORKDIR /apps
# Copy the consumer code to the container
COPY consumer.py .
# Install the required packages
RUN pip install confluent-kafka
# Run the consumer code
CMD ["python", "consumer.py"]
Building the kafka producer and consumer
docker build -t hub.k8s.cluster:kafka-producer:0.0.1 .
docker build -t hub.k8s.cluster:kafka-consumer:0.0.1 .
Deploying the kafka producer as a deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: kafka-producer
template:
metadata:
labels:
app: kafka-producer
spec:
containers:
- name: kafka-producer
image: hub.k8s.cluster/kafka-producer:0.0.1
Deploying kafka consumer as a deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: kafka-consumer
template:
metadata:
labels:
app: kafka-consumer
spec:
containers:
- name: kafka-consumer
image: hub.k8s.cluster/kafka-consumer:0.0.1
Scaling kafka producers
kubectl scale deployment kafka-producer --replicas 10
Scaling kafka consumers
kubectl scale deployment kafka-consumer --replicas 5
In the next blog we are going to use KEDA (Kubernetes Event Driven AutoScaler) to scale kafka consumers based on the messages on the kafka topic.