12. Plain Kafka Consumer in Play Framework

2021-12-26 kafka play scala

Introduction

I will describe how to use plain Kafka Consumer in Play Framework (Kafka Producer is out of the scope of this post). Please, be aware you can easily find libraries offering API which can better suit to your needs. For example Alpakka provides reactive and stream-aware API. But before you start with more advanced solutions it’s always worth to get to know how plain component works.

Start by adding apache kafka dependency to your build.sbt file:

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "3.0.0"

Eager Singleton

Firstly, I will create SampleKafkaCustomer class according to kafka documentation and I will annotate it with javax.inject.Singletion annotation. SampleKafkaCustomer is as simple as possible: it polls messages every 3 seconds from kafka server exposed at localhost:6003 (if you use default configuration it probably is localhost:9092). Kafka Message consists of two parts: key and value. In our example I assume both are simple String (but they can have more complex structure, for example described in json format). SampleKafkaConsumer subscribes sample-topic topic and has assigned sample-group-id consumer group id.

@Singleton
class SampleKafkaConsumer extends Logging {

  logger.info("Starting SampleKafkaConsumer")

  private  val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:6003")
  properties.put("group.id", s"sample-group-id")
  properties.put("key.deserializer", classOf[StringDeserializer])
  properties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](properties)
  kafkaConsumer.subscribe(Set("sample-topic").asJava)

  Try {
    while (true) {
      kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
        .foreach(r => {
          logger.info(s"SampleKafkaConsumer receive record $r")
        })
    }
  } match {
    case Success(_) => logger.info(s"SampleKafkaConsumer succeed.")
    case Failure(e) => logger.error(s"SampleKafkaConsumer fail.", e)
  }
}

I want to start up Kafka Consumer when application starts. This type of requirements can be easily satisfied using eager singleton. To do this I will define play module which will mark SampleKafkaConsumer singleton as eager:

class KafkaModule extends AbstractModule with Logging {

  override def configure(): Unit = {
    logger.info("Starting KafkaModule")
    bind(classOf[SampleKafkaConsumer]).asEagerSingleton()
  }
}

To start KafkaModule it must be enabled in play configuration (by default it is application.conf file):

play.modules.enabled += "kafka.KafkaModule

If you now start application you will notice that it is not starting :) In logs everything is looking ok:

--- (Running the application, auto-reloading is enabled) ---

[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

Server started, use Alt+D to stop

But after opening desired localization in browser (in my case http://localhost:9000) , the page is still loading all the time. It is because SampleConsumerKafka contains while(true) loop in which consumer polls Kafka messages every 3 seconds. As a result SampleConsumerKafka constructor never finish and application can’t start. To resolve this problem I will run while(true) loop in asynchronous manner.

Polling in dedicated thread-pool

To allow application start I will add dedicated thread for KafkaConsumer logic (while(true) loop). KafkaConsumer loop uses single thread anyway, so it is a good idea to create dedicated ExecutionContext for this purpose. In the following listing I don’t mark ExecutionContext implicit, that you can follow how it is used. I also replaced Try code block with Future code block.

@Singleton
class SampleKafkaConsumer extends Logging {

  logger.info("Starting SampleKafkaConsumer")

  val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

  private val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:6003")
  properties.put("group.id", s"sample-group-id")
  properties.put("key.deserializer", classOf[StringDeserializer])
  properties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](properties)
  kafkaConsumer.subscribe(Set("sample-topic").asJava)

  Future {
    while (true) {
      kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
        .foreach(r => {
          logger.info(s"SampleKafkaConsumer receive record $r")
        })
    }
  }(executionContext).andThen {
    case Success(_) => logger.info(s"SampleKafkaConsumer succeed.")
    case Failure(e) => logger.error(s"SampleKafkaConsumer fail.", e)
  }(executionContext)
}

Now it works great! …almost. Application starts, but I need to do one thing more. KafkaConsumer is never closed, even if application stops!

Stopping KafkaConsumer

KafkaConsumer is not thread-safe, so I need to stop it in the same thread as while(true) loop runs. To do it I must define loop quit condition. I will follow kafka documentation and I will add AtomicBoolean field to keep information if consumer should be stopped (default field value is set to false). If while(!stopConsumer.get()) loop will quit, KafkaConsumer will stop.

@Singleton
class SampleKafkaConsumer extends Logging {

  logger.info("SampleKafkaConsumer starts")

  private val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
  private val stopConsumer: AtomicBoolean = new AtomicBoolean(false)

  private val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:6003")
  properties.put("group.id", s"sample-group-id")
  properties.put("key.deserializer", classOf[StringDeserializer])
  properties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](properties)
  kafkaConsumer.subscribe(Set("sample-topic").asJava)

  Future {
    while (!stopConsumer.get()) {
      kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
        .foreach(r => {
          logger.info(s"SampleKafkaConsumer receives record: $r")
        })
    }
    logger.info(s"SampleKafkaConsumer quits 'while(true)' loop.")
  }(executionContext)
  .andThen(_ => kafkaConsumer.close())(executionContext)
  .andThen {
    case Success(_) =>
      kafkaConsumer.close()
    case Failure(e) =>
      kafkaConsumer.close()
  }(executionContext)
}

All I need to do now is change stopConsumer flag value from false to true during application stop. Play Framework provides CoordinatedShutdown component to such requirements, which originally is Akka solution (Play Framework is build on Akka). CoordinatedShutdown allows you to define tasks to be executed during the shutdown process. It is more flexible component than earlier used ApplicationLifecycle component. ApplicationLifecycle is not deprecated (you can use it instead), it just offers you possibility to add task in only one phase of shutdown process - phase named service-stop. I need to add stop task to exactly this phase, but I chose to use CoordinatedShutdown. The task will change stopConsumer flag value to true.

coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "SampleKafkaConsumer-stop"){() =>
    logger.info("Shutdown-task[SampleKafkaConsumer-stop] starts.")
    stopConsumer.set(true)
    Future{ Done }(executionContext).andThen{
      case Success(_) => logger.info("Shutdown-task[SampleKafkaConsumer-stop] succeed.")
      case Failure(e) => logger.error("Shutdown-task[SampleKafkaConsumer-stop] fails.", e)
    }(executionContext)
  }

Pay attention added task returns Future{ Done } using defined earlier executionContext. It gives confidence, that Future will complete after while loop quits and KafkaConsumer stops (executionContext contains single thread, so it must quit loop first to do something else). Please, be also aware that every phase of shutdown process has timeout after which next phase begin, even if current phase didn’t end. Default service-stop phase timeout is 5 seconds. Final version of SampleKafkaConsumer:

@Singleton
class SampleKafkaConsumer @Inject()(coordinatedShutdown: CoordinatedShutdown) extends Logging {

  logger.info("SampleKafkaConsumer starts")

  private val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
  private val stopConsumer: AtomicBoolean = new AtomicBoolean(false)

  private val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:6003")
  properties.put("group.id", s"sample-group-id")
  properties.put("key.deserializer", classOf[StringDeserializer])
  properties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](properties)
  kafkaConsumer.subscribe(Set("sample-topic").asJava)

  Future {
    while (!stopConsumer.get()) {
      kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
        .foreach(r => {
          logger.info(s"SampleKafkaConsumer receives record: $r")
        })
    }
    logger.info(s"SampleKafkaConsumer quits 'while(true)' loop.")
  }(executionContext)
  .andThen(_ => kafkaConsumer.close())(executionContext)
  .andThen {
    case Success(_) =>
      kafkaConsumer.close()
    case Failure(e) =>
      kafkaConsumer.close()
  }(executionContext)

  coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "SampleKafkaConsumer-stop"){() =>
    logger.info("Shutdown-task[SampleKafkaConsumer-stop] starts.")
    stopConsumer.set(true)
    Future{ Done }(executionContext).andThen{
      case Success(_) => logger.info("Shutdown-task[SampleKafkaConsumer-stop] succeed.")
      case Failure(e) => logger.error("Shutdown-task[SampleKafkaConsumer-stop] fails.", e)
    }(executionContext)
  }
}

Some logs

When application starts you will find log:

[info] k.SampleKafkaConsumer play-dev-mode-akka.actor.default-dispatcher-7 - SampleKafkaConsumer starts

Now, I will start command line KafkaProducer and I will send some messages to sample-topic topic:

$ bin/kafka-console-producer.sh --topic sample-topic --bootstrap-server localhost:6003
>Hello Readers!
>Thank you!

SampleKafkaConumers logs all messages:

[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer receives record: ConsumerRecord(topic = sample-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1640519882833, serialized key size = -1, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Hello Readers!)
[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer receives record: ConsumerRecord(topic = sample-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1640519898470, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Thank you!)

Now I will stop application (PID=26054) with SIGTERM:

kill 26054

SampleKafkaConsumer stops gracefully:

[info] p.c.s.AkkaHttpServer play-dev-mode-shutdown-hook-1 - Stopping Akka HTTP server...
[info] p.c.s.AkkaHttpServer play-dev-mode-akka.actor.internal-dispatcher-2 - Terminating server binding for /0:0:0:0:0:0:0:0:9000
[info] s.ApplicationTimer application-akka.actor.internal-dispatcher-5 - ApplicationTimer demo: Stopping application at 2021-12-26T11:58:35.988474Z after 129s.
[info] k.SampleKafkaConsumer application-akka.actor.internal-dispatcher-5 - Shutdown-task[SampleKafkaConsumer-stop] starts.
[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer quits 'while(true)' loop.
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Revoke previously assigned partitions sample-topic-0
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Member consumer-sample-group-id-1-52faa9ac-a6c3-42e5-99aa-33c992070bea sending LeaveGroup request to coordinator localhost:6003 (id: 2147483646 rack: null) due to the consumer is being closed
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Resetting generation due to: consumer pro-actively leaving the group
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Request joining group due to: consumer pro-actively leaving the group
[info] o.a.k.c.m.Metrics pool-10-thread-1 - Metrics scheduler closed
[info] o.a.k.c.m.Metrics pool-10-thread-1 - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[info] o.a.k.c.m.Metrics pool-10-thread-1 - Metrics reporters closed
[info] o.a.k.c.u.AppInfoParser pool-10-thread-1 - App info kafka.consumer for consumer-sample-group-id-1 unregistered
[info] k.SampleKafkaConsumer pool-10-thread-1 - Shutdown-task[SampleKafkaConsumer-stop] succeed.
[info] p.c.s.AkkaHttpServer play-dev-mode-akka.actor.internal-dispatcher-15 - Running provided shutdown stop hooks

Summary

In this post I described how to use plain KafkaConsumer in Play Framework. It is always worth to know how lower-level components works, but after all I strongly encourage you to consider use libraries like Alpakka. For example in my last project I decided to use akka-projection which is build on Alpakka. It composes with Akka nicely and give you some extra value like easy configuration of restart strategy in case of failure.

All code examples from this post you will find at my github. If you want to follow code changes section after section, check git log. Every commit corresponds to single post section.