Apache Kafka on Kubernetes with Strimzi – Part 2: Creating Producer and Consumer apps using Go and Scala and deploying on Kubernetes

In the previous post, we have learnt about Strimzi and deployed a Kafka Cluster on Minikube and also tested our cluster. Now, let’s build a Producer application with Go and a Consumer application with Scala language, deploy them on Kubernetes and see how it all works.

This is the Part 2 of the Apache Kafka on Kubernetes with Strimzi blog post. In case you haven’t read the previous post, I recommend you to check it out:

The Producer

For creating the producer, I would like to use the Go language. We’re going to add below dependencies to our module file:

github.com/segmentio/kafka-go // Kafka
go.uber.org/zap // Logging
google.golang.org/protobuf // Message Serialization
github.com/urfave/cli/v2 // CLI

With the help of urfave cli library, we can easily add some arguments (cli and environment variables) to our application. The variables we need are:

  • Kafka Bootstrap servers
  • Topic name
  • Buffer size (for batching messages)
  • Sleep time between producing messages

Check out the code on my Github to see how I leveraged this library to easily add and use these arguments.

Let’s create kafka.go file with two methods in it. One for creating a producer and one for creating a consumer (Although we are going to only use the producer, but I wanted to show you how to create a consumer too).

package kafka

import (
	"github.com/segmentio/kafka-go"
	log "strimzi-producer/internal/platform/logger"
)

func Consumer(servers []string, groupId string, topic string) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:  servers, //[]string{"localhost:9092"},
		GroupID:  groupId,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
		Logger:      kafka.LoggerFunc(log.Logger.Infof),
		ErrorLogger: kafka.LoggerFunc(log.Logger.Errorf),
	})
}

func Producer(servers []string, topic string) *kafka.Writer {
	return &kafka.Writer{
		Addr:         kafka.TCP(servers...),
		Topic:        topic,
		RequiredAcks: kafka.RequireOne,
		Logger:      kafka.LoggerFunc(log.Logger.Infof),
		ErrorLogger: kafka.LoggerFunc(log.Logger.Errorf),
	}
}

Now, let’s create a very simple message schema using Protobuf and generate Go output using protoc:

syntax = "proto3";

package producer;

option go_package = "producer";

message MyMessage {
  int32 id = 1;
  string message = 2;
}

And here’s the code that creates and produces messages to Kafka:

package producer

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"google.golang.org/protobuf/proto"
	"math/rand"
	"strconv"
	log "strimzi-producer/internal/platform/logger"
	"time"
)

type client struct {
	sleepTime time.Duration
	bufferSize uint
	writer    *kafka.Writer
}

func NewProducerClient(sleepTime time.Duration, bufferSize uint, writer *kafka.Writer) *client {
	return &client{
		sleepTime: sleepTime,
		bufferSize: bufferSize,
		writer:    writer,
	}
}

func (c *client) Produce() {
	log.Logger.Info("Starting the Producer...")
	for {
		buffer := make([]kafka.Message, c.bufferSize)
		var j uint
		for j = 0; j < c.bufferSize; j++ {
			id := strconv.Itoa(rand.Int())
			msg := MyMessage{
				Id:      id,
				Message: fmt.Sprintf("Just a message with a random number: %d", rand.Int()),
			}
			bytes, err := proto.Marshal(&msg)
			if err != nil {
				log.Logger.Errorf("Error in marshaling the message: %s", err.Error())
				return
			}
			buffer[j] = kafka.Message{
				Key:   []byte(id),
				Value: bytes,
			}
		}
		err := c.writer.WriteMessages(context.Background(), buffer...)
		log.Logger.Debugf("Sent %d messages to the topic", len(buffer))
		if err != nil {
			log.Logger.Errorf("Error in producing message to Kafka: %s", err.Error())
			return
		}
		time.Sleep(c.sleepTime)
	}
}

In the code above, we are putting serialized messages inside a buffer slice and produce them to Kafka as a batch.

There are some other (not complicated) parts in this project (health check api, graceful shutdown, …). Since they’re out of the scope of this article and also to keep this post shorter, I don’t explain them here. Please visit the project on my Github for full code and feel free to contact me or leave a comment if you have any question or suggestion on any part of the code.

Now, In order to deploy our Producer to the Kubernetes, we need to create a Dockerfile:

FROM golang:1.15-alpine
LABEL maintainer="Sina Nourian <sina.nourian@gmail.com>"
WORKDIR /app
COPY . .
RUN go mod download
RUN go build -o producer ./cmd/producer
CMD ["./producer"]

Let’s build our Dockerfile and publish it to our local repository:

$ docker build -t nrsina/strimzi-producer:v1 .

And also a Kubernetes Deployment file to deploy the Producer on Kubernetes:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-producer-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: strimzi-producer
  template:
    metadata:
      labels:
        app: strimzi-producer
    spec:
      containers:
      - name: strimzi-producer
        image: nrsina/strimzi-producer:v1
        imagePullPolicy: Never
        ports:
        - name: http
          containerPort: 8080
          protocol: TCP
        livenessProbe:
          httpGet:
            path: /health
            port: http
            scheme: HTTP
          initialDelaySeconds: 3
          timeoutSeconds: 1
          periodSeconds: 15
        readinessProbe:
          httpGet:
            path: /health
            port: http
            scheme: HTTP
          initialDelaySeconds: 3
          timeoutSeconds: 1
          periodSeconds: 15
        env:
        - name: SP_SERVE_ADDR
          value: ":8080"
        - name: SP_BOOTSTRAP_SERVERS
          value: "my-cluster-kafka-bootstrap.kafka:9092"
        - name: SP_KAFKA_TOPIC
          value: "my-topic"
        - name: SP_SLEEP_TIME_MS
          value: "2000ms"
        - name: SP_BUFFER_SIZE
          value: "5"

Let’s apply our deployment file, wait for the pod to start and get the logs from the pod:

Deploying Strimzi Producer app in Kubernetes

As you can see from the above screenshot, our producer is up and running. Awesome! Now, let’s get our hands dirty with the Consumer…

The Consumer

For our consumer, we’re going to build a consumer using the Scala language with Alpakka Kafka library which is an impressive Kafka library by Lightbend. By scaling our consumer, we can see how Consumer Group works. But before jumping into the code, I’d like to talk a little bit about Delivery Semantics (Delivery Guarantees) since it’s a very important issue to consider when using any stream processing framework.

Delivery Semantics

Kafka uses a dumb broker / smart consumer design. The broker only keeps the messages for a certain amount of time and consumers should keep track of their offset in partitions of the topic. So the developer should configure the consumer based on the business needs. We have three delivery semantics:

  • At most once. This is the most relaxed semantic. With this semantic, message might get processed one time or not at all (message can get lost and it cannot be redelivered). This semantic happens when the commitment of the offsets has nothing to to with the processing of the message.
    By setting the enable.auto.commit property of the consumer to true, the consumer offsets will be periodically committed in the background. So we have no control on the offsets and if the process of a message fails, the consumer can’t request the message from the topic again since the offset has been committed before and moved passed the failed message.
  • At least once. This semantic ensures that a message may never get lost but may get redelivered. So the message might get processed more than one time (but never lost). When this delivery semantic is needed, the consumer has to take control of the offset committing.
    The commitment should happen after the processing of the message is done. So if the process fails, no offset will be committed. And on the next pull from the topic, the failed message will get redelivered again.
    Bear in mind that with this approach, the processing of a message may be successful but the committing fails for any reason. So then again the message will be redelivered. This is because our processing (for example, persisting in a database) and committing the offsets in Kafka are not in a single transaction. (The enable.auto.commit property should be false since we need to take control of the offset committing).
  • Exactly once. This semantic guarantees that the processing of the message happens only once. This is the most strong semantic. When we want to have exactly once semantics, the process and committing of the message should happen in a single transaction.
    For example, if we are persisting the message in PostgreSQL, we should also persist the offsets in the same database so we can easily wrap both of them in a single transaction. If the persisting of the offset fails, the persisting of the message will also roll backs and vice versa.

You may ask why not always use exactly once semantics? Sounds like it’s the best approach! That may be true but you have to take the overhead it causes into the consideration. So use the semantic that best meets your business requirements.

Creating the Consumer

We are using Akka for this project. Akka is a toolkit consisting of many open-source libraries for designing concurrent, distributed and resilient applications by leveraging Actor Model. Akka uses Actor model which provides a high level of abstraction for building concurrent applications and alleviates the developer from dealing with low-level thread management and locking. Actors are single and independent units that encapsulate state and behavior and they communicate with other actors via message passing. A hierarchy of actors can be formed in Akka. Refer to Akka’s documentation for more information.

Our configuration resides in application.conf file [HOCON format]:

akka {
  loglevel = DEBUG
}

akka.management {
  http {
    hostname = "127.0.0.1"
    hostname = ${?AKKA_MANAGEMENT_HTTP_HOST}
    bind-hostname = "0.0.0.0"
    port = 8558
    port = ${?AKKA_MANAGEMENT_HTTP_PORT}
  }
}

strimzi-consumer {
  bootstrap-servers = "localhost:9092"
  bootstrap-servers = ${?SC_BOOTSTRAP_SERVER}
  group-id = "my-group"
  group-id = ${?SC_GROUP_ID}
  enable-auto-commit = "false" // false for at-least-once semantics
  enable-auto-commit = ${?SC_AUTO_COMMIT}
  auto-offset-reset = "latest" // other value: earliest
  auto-offset-reset = ${?SC_OFFSET_RESET}
  topic = "my-topic"
  topic = ${?SC_TOPIC_NAME}
  parallelism = 10 // batch processing of consumed messages
  parallelism = ${?SC_PARALLELISM_NUM}
  sleep-time-ms = 100 // simulates computation of a message
  sleep-time-ms = ${?SC_SLEEP_TIME}
}

Since we have added the Akka Management dependency, we need to configure it via akka.management key. Akka Management provides some useful HTTP APIs for easy management. We only need the health check APIs provided by the Akka Management because we’re going to deploy it on Kubernetes.

The important part of the configuration is the strimzi-consumer key. Here, we have added the configurations needed for a Kafka consumer. notice the ${?…} value for each key. What happens here is that if the environment variable with the specified name exists (for example, SC_BOOTSTRAP_SERVER), the value from environment variable will be overridden. the ? before variable name makes it optional.

Before creating our actors, an ActorSystem must be defined with a root actor (Guardian). For the sake of simplicity, we have directly used our Consumer Actor instead of a Guardian. So, in our Main class (Application.scala):

object Application extends App {
  ActorSystem[Nothing](ConsumerClient(), "StrimziConsumer")
}

We need to create a ConsumerSettings object to pass to the Consumer. we have created our ConsumerSettings object in ConsumerProperties.scala file:

def toConsumerSettings: ConsumerSettings[String, Array[Byte]] = {
  ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId(groupId)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit)
    .withStopTimeout(2.seconds)
}

As you have noticed, we have directly used a Byte Array as the value type of the Kafka message. It’s considered a best practice in Alpakka Kafka library:

The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a map operation in the Akka Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Akka Stream, it is easier to implement the desired error handling strategy as the examples below show.

Akka documentation

Now, let’s create our Consumer actor:

package com.snourian.strimzi.consumer

import akka.actor.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka.{CommitterSettings, Subscriptions}
import akka.management.scaladsl.AkkaManagement
import com.snourian.strimzi.producer.consumer.MyMessage

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.Try

object ConsumerClient {

  sealed trait Command

  private final case class KafkaConsumerStopped(reason: Try[Any]) extends Command

  def apply(): Behavior[Nothing] = {
    Behaviors.setup[Command] { ctx =>
      ctx.log.debug("Initializing ConsumerClient actor...")
      // Alpakka Kafka still uses the old ActorSystem (Classic System) which is untyped,
      // we need to convert the typed actor system to the classic untyped system.
      implicit val system: ActorSystem = ctx.system.toClassic
      implicit val ec: ExecutionContextExecutor = ctx.executionContext

      AkkaManagement(system).start()
      val consumerProps = ConsumerProperties(system)
      val subscription = Subscriptions.topics(consumerProps.topic)
      /*
      committableSource => offset can be committed after the computation -> at-least-once delivery semantic
      sourceWithOffsetContext => same as committableSource, but with offset position as flow context
      for at-most-once, use atMostOnceSource() and for exactly-once, use plainSource() for a fine-grained control over offset committing
      use Consumer.DrainingControl if you want to stop the stream after some amount of time
       */
      val consume = Consumer.sourceWithOffsetContext(consumerProps.toConsumerSettings, subscription)
        .mapAsync(consumerProps.parallelism) { record => // mapAsync -> process received messages in parallel (batch)
          Future {
            system.log.info(s"consumed record with Id ${record.key()} from partition ${record.partition()}")
            MyMessage.parseFrom(record.value()) // compile with SBT to create MyMessage class inside target folder
            Thread.sleep(consumerProps.sleepTime) // some heavy computation!
          }
        }.runWith(Committer.sinkWithOffsetContext(CommitterSettings(system)))
      // send a message to self if the consumer stops
      consume.onComplete { result =>
        ctx.self ! KafkaConsumerStopped(result)
      }
      Behaviors.receiveMessage[Command] {
        case KafkaConsumerStopped(reason) => //stop the Behavior when the consumer stops
          ctx.log.warn("Consumer stopped {}", reason)
          Behaviors.stopped
      }
    }.narrow //we don't care about the type, so narrow it down to Nothing (instead of Command)
  }
}

Alpakka Kafka provides various consumer implementations for different needs. I explained some of them in the comments above the Consumer code. here, we are using sourceWithOffsetContext that can be used to achieve at least once delivery semantics. also, with the help of the MapAsync, we are processing messages in batch and committing the offset after the processing of the message.

We’re almost done! Now we need to create a Docker image and publish it into our repository. Since we are using SBT build tool, we can use the SBT Native Packager plugin that can get this job done automatically. So no need to create a Dockerfile by hand.

For the SBT Native Packager to work, first we need to add the SBT plugin into the /project/plugins.sbt file

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.6")

And then we need to enable the plugin by adding enablePlugins(JavaAppPackaging) in the build.sbt file. we can also customize the process with the settings provided by the plugin. For example, the default base image that this plugin uses is large and we can easily provide our desired base image:

lazy val dockerSettings = Seq(
  dockerBaseImage := "openjdk:11-jre-slim",
  packageName in Docker := "strimzi-consumer",
  maintainer in Docker := "Sina Nourian <sina.nourian@gmail.com>",
  version in Docker := "v1",
  dockerUsername := Some("nrsina")
)

Now, by using the below command, this plugin creates a Docker image and publish it to our local repository. Also, the Dockerfile will be created at /target/docker folder. Check out the project on my Github for more information

$ sbt docker:publishLocal

If you only want the SBT plugin to create the Dockerfile and not to publish it, run the below command:

$ sbt docker:stage

Now it’s time to write some YAML files:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-consumer-deployment
  labels:
    app: strimzi-consumer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: strimzi-consumer
  template:
    metadata:
      labels:
        app: strimzi-consumer
    spec:
      containers:
      - name: strimzi-consumer
        image: nrsina/strimzi-consumer:v1
        imagePullPolicy: Never # IfNotPresent
        ports:
        - name: management
          containerPort: 8558
          protocol: TCP
        readinessProbe:
          httpGet:
            path: /ready
            port: management
          periodSeconds: 10
          failureThreshold: 3
          initialDelaySeconds: 10
        livenessProbe:
          httpGet:
            path: /alive
            port: management
          periodSeconds: 10
          failureThreshold: 5
          initialDelaySeconds: 20
        env:
        - name: SC_BOOTSTRAP_SERVER
          value: "my-cluster-kafka-bootstrap.kafka:9092"
        - name: SC_GROUP_ID
          value: "my-group"
        - name: SC_TOPIC_NAME
          value: "my-topic"
        - name: SC_PARALLELISM_NUM
          value: "5"
        - name: SC_SLEEP_TIME
          value: "200"

We are going to create 3 replicas for our consumer to see how the GroupId works.

Now, just like before, let’s apply our deployment file, wait for the pod to start and get logs from one of the pods:

Deploying Strimzi Consumer app in Kubernetes

Voilà! Three replicas of our consumer app is running and consuming messages from Kafka.

Conclusion

After deploying our Kafka Cluster on Kubernetes (Part 1), we have built a producer app in Go language and a consumer app in Scala language. After publishing our applications and watching the logs, it looks like everything is fine and running well. But is it? Are we sure that everything is ok? What is the rate of producing / consuming messages? Do we have a “Consumer Lag” here? By the look of the logs everything is fine and running well. But we are not really sure how our apps and Kafka are performing. We need some monitoring tools to monitor the Kafka’s performance. In the next part of the series, we are going to use Prometheus and Grafana to monitor our Cluster.

Feel free to ask any questions or share any comments in the comment section or contact me via email.

Leave a Reply

Your email address will not be published. Required fields are marked *