Kafka Producer Consumer

In the earlier blog article we created a strimzi kafka cluster on kubernetes. We are going to write kafka producers and consumer on this part of the blog. All code used in this blog has been shared on github. Producers send messages to kafka topics and consumers consume messages from the kafka topic. For the sake of ease we have used python for writing kafka producer and consumers. kafka producer uses car_id as the partition key. Since the message belonging to the same key will always land on the same partition. We can process the data from particular partition based on the partiton key.

Creating a kafka producer

The complete code can be found on Github

from confluent_kafka import Producer
import random
import time
from uuid import uuid4
import json

locdata_car0 = """ {"lat":"60.30","lang":"60.30","speed":"30"}"""
locdata_car1 = """ {"lat":"80.40","lang":"60.50","speed":"40"}"""
locdata_car2 = """ {"lat":"80.30","lang":"60.20","speed":"80"}"""

locdata_car0_json = locdata_car0.encode()
locdata_car1_json = locdata_car1.encode()
locdata_car2_json = locdata_car2.encode()

def delivery_report(err,msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} partition [{}] offset {}'.format(msg.topic(), msg.partition(),msg.offset()))

conf = {
    'bootstrap.servers':'kafka:9094',
    'client.id':'python-producer-01'
}

producer = Producer(conf)
topic = 'my-topic'
#producer.poll(0)
car_ids = ['car-1','car-2','car-3','car-4','car-5','car-6','car-7']

while True:
    for car_id in car_ids:
        print(car_id)
        partition_key = hash(car_id)
        producer.produce(topic,key=car_id.encode('utf-8'),value=locdata_car0_json,callback=delivery_report)
        producer.flush()
    time.sleep(1)

Creating the kafka consumer

The complete code can be found on Github

from confluent_kafka import Consumer, KafkaError
from confluent_kafka import OFFSET_BEGINNING

def on_assign(consumer, partitions):
    """ Callback for when partition assignments change """
    for partition in partitions:
        partition.offset = OFFSET_BEGINNING

def on_message(message):
    """ Callback for when a message is received """
    if message.error() is not None:
        if message.error().code() == KafkaError._PARTITION_EOF:
            print("Reached end of partition event for {} [{}] at offset {}".format(
                message.topic(), message.partition(), message.offset()))
        else:
            print("Error in receiving message: {}".format(message.error()))
    else:
        print("Received message from {} [{}] at offset {}: key={}, value={}".format(
            message.topic(), message.partition(), message.offset(), message.key(), message.value()))

# Set up the Kafka consumer configuration
conf = {
    'bootstrap.servers': 'k8s-kafka-cluster-kafka-bootstrap.kafka:9092',  # Replace with the address of your Kafka broker(s)
    'group.id': 'python-consumer-group',
    'auto.offset.reset': 'earliest'
}

# Create the Kafka consumer instance
consumer = Consumer(conf)

# Subscribe to the desired topic
consumer.subscribe(['my-topic'], on_assign=on_assign)

# Continuously poll for new messages
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError.PARTITION_EOF:
            print("Reached end of partition event for {} [{}] at offset {}".format(
                msg.topic(), msg.partition(), msg.offset()))
        else:
            print("Error in receiving message: {}".format(msg.error()))
    else:
        on_message(msg)

Dockerizing the kafka producer and consumer

# Use an official Python runtime as the base image
FROM python:3.9.7-slim

# Set the working directory
WORKDIR /apps

# Copy the consumer code to the container
COPY consumer.py .

# Install the required packages
RUN pip install confluent-kafka

# Run the consumer code
CMD ["python", "consumer.py"]

Building the kafka producer and consumer

docker build -t hub.k8s.cluster:kafka-producer:0.0.1 .
docker build -t hub.k8s.cluster:kafka-consumer:0.0.1 .

Deploying the kafka producer as a deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-producer
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-producer
  template:
    metadata:
      labels:
        app: kafka-producer
    spec:
      containers:
      - name: kafka-producer
        image: hub.k8s.cluster/kafka-producer:0.0.1

Deploying kafka consumer as a deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-consumer
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-consumer
  template:
    metadata:
      labels:
        app: kafka-consumer
    spec:
      containers:
      - name: kafka-consumer
        image: hub.k8s.cluster/kafka-consumer:0.0.1

Scaling kafka producers

kubectl scale deployment kafka-producer --replicas 10

Scaling kafka consumers

kubectl scale deployment kafka-consumer --replicas 5

In the next blog we are going to use KEDA (Kubernetes Event Driven AutoScaler) to scale kafka consumers based on the messages on the kafka topic.