Lagging Kafka Consumer in Spring Boot Application

Lagging Kafka Consumer in Spring Boot Application

I was recently working with a Spring Boot Kafka application. One day suddenly the application stopped processing messages from a Kafka topic that it was listening to in production. Initially we thought that it could have something to do with the recent code changes and we decided to roll back to an older version of the code which seemed to resolve the issue. However, I wasn't convinced?and didn’t want to spend time brining newer code one pull request at a time on top of rolled back version. I decided to dig deeper and anyway the issue reappeared the very next day when a large number of requests were sent to the application.

1.1 Issue

We were using @KafkaListener annotated method to consume messages from a Kafka topic. This had been working fine for a long time, but one day it became too slow and new messages kept piling up in the Kafka topic without getting processed.

1.2 Temporary Fixes

This issue had occurred in production, so we had to come up with temporary fixes before we performed a root cause analysis and came up with a permanent solution.

1.2.1 Multiple instances of Kafka Consumer

Initial investigation showed that the @KafkaListener consumer was processing each message in a single thread and when a large number of messages were piled up in the Kafka topic, single thread was not fast enough to process them all. We decided to deploy multiple instances of the consumer application to make this process faster. Kafka consumers don't poll the Kafka topic itself, but the partitions?in the topic and if we deploy multiple consumers for a Kafka topic?each consumer needs its own set of partitions to poll. That meant we had to increase the number of partitions in the Kafka topic?depending on the number of consumer instances we wanted to deploy, otherwise if we have more consumers than the topic partitions, consumers that are not assigned any partitions would remain idle.

However, increasing number of partitions and the number of instances of the consumer application improved the message processing rate only marginally.

1.2.2 Rollback to Older version

At this point we decided to deploy an older version of the application which we knew had worked earlier. Rolled back version also seemed to have trouble during the first thirty minutes after the deployment, but suddenly started working normally after thirty minutes. It was decided that we keep this version running till we can figure out what recent code change had caused this issue.

Even the rolled back version started showing same symptoms the very next day. Now we knew that any recent code change couldn't have caused this issue and we had to look else where.

1.3 General Definitions

  • Kafka: A messaging system like RabbitMQ and ActiveMQ.
  • Broker: A server that participates in a Kafka cluster. There can be multiple brokers.
  • Topic: Logical storage of ordered messages. Each topic has a unique name that consumers and producers use. A SQL equivalent would be a Table.
  • Partition: Each topic can have multiple partitions. All consumers of a topic don’t read from all partitions, instead they’re dynamically assigned topics. Partition is uniquely assigned to consumers within a consumer group. Partitions are the units that are replicated in the cluster to protect against data loss.
  • Message: A record in a partition. It’s the data that a producer publishes and a consumer consumes. SQL equivalent would be a row in a table.
  • Message Offset: Message index in a partition. It starts from zero and keeps on increasing without reuse.
  • Producer: Application that publishes message to a topic. Producer assigns a key to the message (generally a UUID) that is used by Kafka to distribute the message among the partitions of topic. Key should be selected to ensure even distribution of messages among partitions.
  • Consumer: Consumers (Kafka client) poll for messages from a Kafka topic. They’re assigned set of partitions dynamically or they can also ask to poll specific partitions. Consumers are part of a consumer group.
  • Consumer Group: A named set of consumers. Consumers within a group divide partitions from the topic they listen to among themselves. Partitions are divided again if a consumer joins or leaves the group.
  • Consumer Group Offset: Partitions store offset of last message read by each group. This is used by Kafka to keep track of read and unread messages by a group. Only unread messages are returned by Kafka in order when polled by a consumer in a group. It can be reset manually by running Kafka commands.

Here are a few commands that I use frequently

1.4 Root Cause Analysis

1.4.1 Kafka Lagging Messages

Initially I started looking into Kafka itself to see what was happening. I logged into Kafka container in Kubernetes and ran command (kcg --describe --group <group-name>, kcg is an alias, see the last image under General-Definitions) to print status of consumer group.

I didn’t know what these columns were so I copied them to copilot and found that LAG column shows the number of unconsumed messages. Following screenshot is from a sample application (See project link at the end) that I created for this article. In production this number was more than one thousand per partition and kept on increasing. This was not good. Ever increasing LAG value meant that application had ground to a halt.

1.4.2 Log Analysis

To perform a root cause analysis, I collected logs from all instances of the consumer application?and combined them into one giant log file. Then I wrote Java code?to extract?summary of the log. Summary included details like how long did it take messages to be processed on average and if any exceptions or warnings were logged again and again.

This exercise gave me following insights

  1. A poll-timeout exception was being logged frequently.
  2. Some messages were being reprocessed again and again repeatedly.
  3. Reprocessing was happening just after a pole time out.
  4. Reprocessing was happening in different instance of the consumer application than the one logged poll timeout.
  5. Consumer application was being reassigned different sets of partitions after a poll-timeout.

That’s when I started looking into the Kafka consumer’s polling configuration. Here’s what I found out

  1. Consumers pole the topic?periodically to retrieve messages.
  2. Consumers use max.poll.records (500 by default) configuration property to limit the maximum number of messages that are fetched in one poll.
  3. Consumers use max.poll.interval.ms (300 seconds or 5 minutes by default) configuration property to limit the time taken in processing messages in a single poll.
  4. On a poll timeout, it’s determined that consumer is unable to process assigned partitions and those partitions are assigned to a different consumer in the consumer group.

Consumer group’s offset of last read message of the partitions assigned to it are committed at the end of the poll. After offsets are committed, subsequent poll receives new messages after the last committed offset. However, in case of poll-timeout, offset doesn’t get committed. And the next consumer that gets the same partitions starts from the same message offsets, leading to reprocessing. If the new consumer also runs into a poll timeout, process repeats itself, leading to an infinite-loop.

In my case default value for both properties max.poll.records and max.poll.interval.ms were being used. Which was 500 max records and 300 seconds (5 minutes) poll timeout. My log analysis had shown that each message was being processed in about 1.5 seconds. Since all the partitions already had more than 500 lagging messages, each poll from each consumer was returning 500, maximum possible number of messages. Which meant that average poll processing time would have been 500 * 1.5 = 750 seconds, which was double the default poll-timeout duration of 300 seconds.

1.4.3 Slow Message Processing

The reason message processing was taking so long per message was because it was running a few SQL queries without indexes. It had not been a problem in past because we didn’t have much data in the tables. However, recently we had crossed a threshold that made message processing just slow enough to cause poll timeouts. A few weeks after this incident I was looking into the Postgres metrics in Azure and saw that CPU usage had hit 90% during the time of this production incident, further confirming the root cause analysis. Anyway, there’s no excuse to not create indexes.

Following log shows consumer configuration properties. We can see the default value of max.poll.records=500 and max.poll.interval.ms=300000

Following screenshot shows the consumer logging poll timeout exception. It even tells us about the configuration properties max.poll.records and max.poll.interval.ms.

Log below shows partitions being reassigned. It also shows the consumer group offset of each fetched partition. Which remains unchanged in case of repeated poll-timeouts.

1.5 Solution

1.5.1 Reduce max.poll.records and increase max.poll.interval.ms

This was the easiest solution that could fix our problem. For example, if we reduce the max poll records from 500 to 50 that would mean we process 10 times less messages within same five-minute poll timeout duration. This would only make sure that our application kept on processing new messages without going into an infinite loop of processing same messages again and again. However, we also have to meet the through put requirement and messages have to be processed within reasonable duration.

1.5.2 Creating Indexes

This was another easy solution that could fix our problem in short term. Queries that were taking 1.5 seconds to run started taking a hundred milli-seconds only. However, this is not a permanent solution. As long as we are running blocking SQL queries during message processing, there would come a time when it would become too slow as the application grows.

1.5.3 Parallel Processing

Next solution that we actually tried as a temporary fix was to process messages in concurrent threads created with ExecutorService. That meant we could process 100 messages within the time it was taking to process 1 message previously, increasing through put a hundred-fold. However, this had a few downsides

  1. Putting message received in @KafkaListener annotated method on a separate thread, acknowledges the message immediately. So, if something goes wrong and we are not able to process the message in the separate thread, we have to implement our own retry logic and something like a dead letter queue.
  2. Number of message processor threads in our case was limited by the number of database connections. By-default Spring Boot with default relational database connection pool Hikari creates connection pool of size 10. That means we could not run more than 10 threads per application, if we did rest of the threads would be waiting to acquire database connections without doing actual work. And even if we create 10 threads, other parts of the application would also be using connections themselves. That would mean we would be running less than 10 message processor threads. We can increase connection pool size, but database itself has limited number of clients that it can connect to. We can’t increase the pool size to a 1000 if database supports 100 connections only and those 100 connections are to be shared among all instances of the application, DB clients like PG Admin running on developer machine or other production monitoring systems.

Apart from above, I faced challenges with Hikari connection pool after implementing this solution. I noticed that connection pool was not renewing connections after 30 minutes application had started, leading to closed connection exceptions. That’s something I will debug in future, meanwhile I had a better idea to solve our problem, which was batch processing. I have implemented it before and I should have thought of it first.

1.5.4 Batch Processing

Core of this idea is to reduce the number of queries that application is running by batching them. I implemented a message processor thread that collected messages for 10 seconds before processing all of them in one batch. I also made sure to wake up the thread immediately if a fixed number of messages (say 100) were already collected.

1.5.4.1 Batch Selects

Instead of running 100 queries like select from my_table where id = ? it’s far more efficient to run one?select from my_table where id in (?, ?...).

1.5.4.2 Batch Inserts/Updates

Instead of using JpaRepository.save() I started using JpaRepository.saveAll() once for all collected messages. I also enabled following JPA properties so that this operation was done in a JDBC batch.

1.6 Project

Here’s the project that I created to demonstrate poll-timeout.

https://github.com/ConsciousObserver/KafkaPollTimeoutTest


要查看或添加评论,请登录

Digvijay Singh的更多文章

  • How to Correctly Log Exceptions and Stack Traces in Java

    How to Correctly Log Exceptions and Stack Traces in Java

    This is going to be an opinionated piece on logging sanity based on my experience. Take it with a grain of salt and use…

  • ClassCastException with Selenium in Spring Boot JAR

    ClassCastException with Selenium in Spring Boot JAR

    Recently I was working with a Spring Boot application using Selenium WebDriver API. Everything worked fine on developer…

  • Tracking Down Source of HTTP Cookies

    Tracking Down Source of HTTP Cookies

    Many a times I find myself tracking down source of HTTP cookies in web applications to debug certain issues. Generally,…

  • Working Faster with Custom Commands on Windows

    Working Faster with Custom Commands on Windows

    As a software developer I use a few commands to cut down repetitive clicks, keystrokes while working on Windows. I have…

  • How to Properly Sort Names in Java

    How to Properly Sort Names in Java

    This is going to be short note on sorting names in Java. I had to deal with this about 2 years ago and I’m documenting…

  • Conditional Logback Configuration with Spring Boot

    Conditional Logback Configuration with Spring Boot

    After working on a microservices project for a few months in 2020, I had to integrate it with ELK (Elasticsearch…

  • Resolving Classpath Conflicts with Custom Class Loader

    Resolving Classpath Conflicts with Custom Class Loader

    At the start of 2019 I was working on synchronizing data between few DB2 tables and Cassandra Keyspaces. Kafka Connect…

  • MongoDB Aggregation with Spring Data

    MongoDB Aggregation with Spring Data

    In 2021 I was working on a Spring Batch project where I needed to run a group by MongoDB query. However, I wasn’t able…

  • Remote Debugging Java Apps in Kubernetes

    Remote Debugging Java Apps in Kubernetes

    There are times when a bug can only be reproduced in the server environment and not on developer machine. In these…

  • SSH Tunneling and Proxy

    SSH Tunneling and Proxy

    SSH Tunneling and Proxy This is going to be a brief introduction to SSH tunneling and proxy features and how they can…

社区洞察

其他会员也浏览了