Kafka Producers: Writing Messages, Partitioning, and Optimizing for Performance
suraj kumar
Engineering Manager | Full Stack | Distributing data connectivity using eSIM | One sim for life time | Partner with us
When you’re working with Kafka, whether it’s for message queues, as a data storage platform, or for stream processing, there’s a high chance you’ll end up writing data (as a producer), reading data (as a consumer), or doing both.
Imagine you’re building a system for credit card transactions. The moment a payment is made, the store sends the transaction details to Kafka. Another system picks that up, processes it through a rules engine to approve or deny the transaction, and the response goes back to Kafka. From there, it gets sent back to the store, while another service stores the transaction for analysis later.
In this article, we’ll dive into Kafka producers—how they work, how to set one up, and how to handle common issues. By the end, you’ll be ready to start producing messages like a pro.
What’s a Kafka Producer? Of Course, the Basics First Okay???
Producers in Kafka are responsible for writing data to a Kafka topic. You might need them to track user activity, store logs, collect metrics, or facilitate system-to-system communication.
But before we get to the how, let’s talk about the why. As I always encourage.
Depending on your use case, the way you configure a Kafka producer will vary. Do you need to make sure no messages are lost? Are you okay with duplicates? What’s more important—low latency or high throughput? A payment processing system, for example, can't afford to lose messages, while a system tracking website clicks might allow for some loss without much consequence.
Once you have a clear understanding of your needs, you’ll configure Kafka accordingly.
For example, in the credit card transaction system mentioned earlier, losing or duplicating messages must be avoided, and while low latency is preferred, it can go up to 500 ms. High throughput is essential, processing up to a million messages per second. In contrast, for tracking website clicks, some message loss or duplication is acceptable. Higher latencies are also tolerable, as long as they don't affect user experience, and throughput will depend on site traffic.
To send a message in Kafka, the first thing you need to do is create a ProducerRecord. This record needs to include at least two things: the topic (where the message is going) and the value (the actual data). You can also add extras like a key, partition, timestamp, or headers if needed, but those are optional.
Once you send the ProducerRecord, Kafka steps in and serializes both the key and value into byte arrays so they can travel over the network. If you haven’t told Kafka which partition to use, it’ll figure that out for you, usually by hashing the key to decide which partition to send the message to.
After the partition is determined, your message is added to a batch (Kafka likes to send messages in groups for better performance). A separate thread then takes care of sending this batch to the correct Kafka brokers. When the broker receives the message, it either gives you a thumbs up ?? by returning a RecordMetadata (with details like the topic, partition, and offset), or it throws an error if something went wrong.
Handling Errors
If something does go wrong—like a connection issue—Kafka doesn’t give up immediately. The producer will try a few more times to send the message before throwing in the towel and returning an error.
When a Kafka producer encounters an error while writing messages, it may attempt to retry sending the message several times before ultimately failing and returning an error.
Setting Up a Kafka Producer
To start sending messages to Kafka, you first need to create a producer object by setting up a few important properties. There are three key things you’ll always need:
Let’s look at a simple example of creating a Kafka producer with just the basics:
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
This initializes the producer with basic settings, connecting to two brokers and using the built-in string serializers for both keys and values.
Here’s a breakdown of how to set up and use a Kafka producer:
Here’s the basic setup:
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Sending Messages: Fire-and-Forget, Sync, or Async?
When it comes to sending messages in Kafka, you’ve got three main options, each with its own perks and trade-offs:
Sending a Message to Kafka
The simplest way to send a message? Create a ProducerRecord, give it a topic and value, and let the send() method do its thing. Here’s what that looks like:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
In this example, we’re sending a message to the "CustomerCountry" topic, where the key is "Precision Products" and the value is "France". Kafka takes that message, puts it into a buffer, and a background thread sends it off to the broker.
However, things don’t always go smoothly. Errors like SerializationException, BufferExhaustedException, or TimeoutException can pop up before Kafka even gets your message. In those cases, catching and handling exceptions is the way to go.
Sending a Message Synchronously
Now, let’s say you’re feeling cautious and want to be absolutely sure your message was delivered. You can send messages synchronously by calling the get() method on the Future object that send() returns. It’s like calling Kafka and waiting for them to say, “Yep, got it!”
Here’s an example:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
Are you thinking about the downside? Synchronous sends can slow things down, especially if Kafka is busy. You’ll be left waiting, which isn’t ideal if you’re shooting for high performance. While Kafka can retry some errors (like connection issues), others—like "message size too large"—won’t be retried.
Sending a Message Asynchronously
If waiting for Kafka’s response sounds like too much of a bottleneck, asynchronous sending is your best friend. You send the message and pass a callback function that Kafka will trigger once it’s done. This way, you don’t have to sit around waiting for a response.
Here’s how you’d do it:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace(); // Handle the error
} else {
System.out.println("Sent record to topic " + metadata.topic() +
" partition " + metadata.partition() + " at offset " + metadata.offset());
}
});
In this example, if the message is successfully sent, the callback prints out the topic, partition, and offset where it landed. If there’s an error, the exception gets handled. This method lets you keep sending messages while handling errors in the background. It's efficient and strikes a good balance between speed and reliability—perfect for high-performance applications.
Sending a Message with a Callback
If you need more control over how Kafka handles your messages, you can use the Callback interface. This gives you even more customization over what happens after Kafka processes your message.
Here’s an example of implementing the Callback interface:
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace(); // Handle the exception
} else {
System.out.println("Message sent to topic: " + recordMetadata.topic() +
" partition: " + recordMetadata.partition() +
" at offset: " + recordMetadata.offset());
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "US");
producer.send(record, new DemoProducerCallback());
This callback method is triggered once Kafka processes the message. If everything goes well, you’ll get the topic, partition, and offset details. If there’s an error, it will pass the exception, allowing you to handle it as needed.
Keep in mind that callbacks are executed in the producer’s main thread, and it’s best to avoid any blocking operations here to keep things moving smoothly.
With these three approaches—fire-and-forget, sync, and async—you’ve got flexibility in how you send data to Kafka, depending on your application’s needs. Whether you prioritize speed, reliability, or a balance of both, Kafka’s producer API has you covered.
Configuring Kafka Producers
Kafka producers come with a lot of configurable settings, but don’t worry—many of the defaults are pretty solid. That said, tweaking a few key parameters can seriously impact how your producer performs, how much memory it uses, and how reliable your messages are. Let’s break down some of the most important settings that you’ll want to get familiar with.
client.id
First up, we’ve got the client.id. This is basically a name tag for your Kafka producer that gets used for logging and metrics. It’s not mandatory, but giving your producer a meaningful name (like "order-service-producer") can make troubleshooting a lot easier. If something goes wrong, you’ll be able to quickly figure out which service is causing the issue based on the client.id in the logs.
acks
The acks setting is where things get interesting. This little parameter controls how safe your messages are, determining how many Kafka brokers (or replicas) need to confirm receiving the message before it’s considered “successfully” written. Here’s how it works:
Choosing the Right acks Setting
So, which one should you pick? Well, it depends on your priorities:
The choice really comes down to what matters most for your application—speed, reliability, or a balance of both. For example, a real-time stock trading app might opt for acks=all to avoid any data loss, while a logging system might be fine with acks=0 since losing a few log entries isn't a big deal.
In short, acks is all about balancing throughput, latency, and data durability. Pick the one that best fits your use case, and you’re good to go!
Partitioning in Kafka
In Kafka, partitions are your secret weapon for handling large amounts of data efficiently. They allow you to split up messages across multiple partitions, which enables parallel processing—essentially, you can spread the load across different workers. Now, when you send a message to Kafka using a ProducerRecord, the key plays a big role in deciding which partition that message will go to. Let’s break down how this works.
Key-Based Partitioning
When you provide a key in your ProducerRecord, Kafka doesn’t just throw your message into any random partition. It uses a hashing algorithm to figure out which partition should hold that message. The key is like a guide—every message with the same key always ends up in the same partition.
For example:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
Here, "Laboratory Equipment" is the key, and Kafka will hash it to decide which partition gets the message. This ensures that all messages with that same key—"Laboratory Equipment"—always go to the same partition. It’s handy when you want related data to stick together.
No Key? No Problem! (Random Partitioning hai na ??)
What if you don’t provide a key? No worries! Kafka will still make sure your message ends up somewhere, but it’ll be a bit more random about it. By default, Kafka uses a round-robin approach to distribute messages across partitions.
For instance:
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "USA");
Here, there’s no key, so Kafka will send your messages to different partitions in a round-robin fashion. However, starting with Kafka 2.4, there’s a new trick up its sleeve—the sticky partitioner. This nifty little feature ensures that Kafka will stick to one partition for a batch of messages before moving on to the next one. The result? Fewer partition switches and improved performance.
Custom Partitioner: When You Need More Control
For most cases, Kafka’s default partitioning works just fine, but there are times when you might want more control over which partition gets what data. Enter the custom partitioner. This allows you to define your own rules for how messages are distributed across partitions.
For example, let’s say you have a big client called "Banana" who sends tons of data. You don’t want "Banana’s" messages cluttering up the same partition as everyone else’s, so you create a custom partitioner to give "Banana" their own partition.
领英推荐
What Happens When You Change the Number of Partitions?
Here’s an important thing to keep in mind: Kafka guarantees that messages with the same key will always go to the same partition—as long as the number of partitions doesn’t change. But if you add more partitions later, things can get tricky.
Here’s what happens:
Partitioning Strategies
Kafka offers a few strategies for how to assign messages to partitions, depending on your needs:
Creating a Custom Partitioner in Kafka
Let’s say you want to build a custom partitioner to handle that heavy "Banana" client. The default hash partitioning might place Banana’s records in the same partition as other customers, creating an uneven load. Instead, we can write a custom partitioner that gives Banana their own partition.
Here’s an example of how you’d do that:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class BananaPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// Optional configuration setup
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null || !(key instanceof String)) {
throw new InvalidRecordException("All messages should have customer name as key");
}
// Assign "Banana" to the last partition
if ("Banana".equals(key)) {
return numPartitions - 1;
}
// Hash other keys and map them to partitions
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
@Override
public void close() {
// Cleanup if necessary
}
}
Key Points About the Custom Partitioner
With this setup, you ensure that your heavyweight customer gets their own partition, avoiding the risk of overloading any single partition with too much data.
In a nutshell, partitioning in Kafka is a powerful tool that allows you to distribute your messages across multiple partitions for parallel processing. Whether you’re using Kafka’s default partitioner, a custom one, or experimenting with the sticky partitioner, partitioning is essential to optimizing performance and maintaining order in your data.
Using Headers in Kafka Records
Ever wanted to sneak some extra information into your Kafka messages without stuffing it into the key or value? That’s where headers come in! Kafka allows you to attach headers—think of them as little pieces of metadata that ride along with your message. These are perfect for adding extra context without messing with the main message payload.
Here are a couple of ways headers can make your life easier:
Kafka headers are just key-value pairs: the key is a string, and the value can be pretty much any serialized object.
Here’s a quick example of how to add headers to a ProducerRecord:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
// Adding headers to the record
record.headers().add("privacy-level", "YOLO".getBytes(StandardCharsets.UTF_8));
In this example, we’ve added a header to our record, giving it a "privacy-level" of "YOLO". The header is sent along with the message but doesn’t affect the actual key or value. Super handy!
Interceptors in Kafka Producers
If you need to modify or log something about your messages before they’re sent to Kafka, interceptors are your go-to tool. Interceptors allow you to tweak records or capture info before they hit Kafka, which is super useful if you’re managing multiple producers and want some consistent behavior across the board.
The ProducerInterceptor interface provides two key methods you’ll want to know about:
Here’s how you might use an interceptor to log some information before and after sending:
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CustomInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// Log some info before the message is sent
System.out.println("Intercepting record: " + record.value());
// Return the record (can modify it here if needed)
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// Log when Kafka acknowledges the message
if (exception == null) {
System.out.println("Message acknowledged by Kafka: " + metadata.offset());
} else {
exception.printStackTrace();
}
}
@Override
public void close() {
// Cleanup resources if needed
}
@Override
public void configure(Map<String, ?> configs) {
// Any setup you need to do
}
}
With this interceptor, you’re logging the message value before it’s sent and then printing the message offset once Kafka confirms it’s received. If something goes wrong, the exception gets printed.
Key Takeaways
Example: Counting Messages and Acknowledgments with an Interceptor
Here’s a fun use case for an interceptor: counting messages. Let’s say you want to track how many messages are sent and how many acknowledgments are received over specific time windows. You can do that with a ProducerInterceptor.
Here’s how you could set that up:
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class CountingProducerInterceptor implements ProducerInterceptor<String, String> {
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private static final AtomicLong numSent = new AtomicLong(0);
private static final AtomicLong numAcked = new AtomicLong(0);
@Override
public void configure(Map<String, ?> configs) {
Long windowSize = Long.valueOf((String) configs.get("counting.interceptor.window.size.ms"));
// Schedule a task to print counts at regular intervals
executorService.scheduleAtFixedRate(CountingProducerInterceptor::run, windowSize, windowSize, TimeUnit.MILLISECONDS);
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// Count the message before it’s sent
numSent.incrementAndGet();
return record; // Return the unmodified record
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// Count the acknowledgment when a message is acknowledged
numAcked.incrementAndGet();
}
@Override
public void close() {
// Shut down the executor when the producer is closed
executorService.shutdownNow();
}
// Print the counts at regular intervals
public static void run() {
System.out.println("Messages sent: " + numSent.getAndSet(0));
System.out.println("Messages acknowledged: " + numAcked.getAndSet(0));
}
}
Key Points About This Interceptor:
By using this type of interceptor, you can track Kafka message flow in real-time, making it easier to monitor performance or catch issues.
With these advanced Kafka features—headers, interceptors, and custom partitioners—you can really fine-tune how your messages are handled and monitored, making your Kafka setup even more powerful and efficient.
Quotas and Throttling in Kafka
In a busy Kafka ecosystem, you don’t want one overzealous client hogging all the resources, right? That’s where quotas come in. Kafka gives you the ability to control how fast clients can send or receive messages, which is crucial to keeping things fair and preventing any single client from overwhelming the broker. There are three types of quotas you can set:
Configuring Quotas in Kafka
Kafka lets you apply quotas to different clients in several ways:
Here’s a quick example: Let’s say you want to set a default producer quota of 10 MB/s for all clients. You’d run this command:
kafka-configs.sh --zookeeper localhost:2181 --entity-type clients --entity-name default --alter --add-config 'producer_byte_rate=10485760'
By using quotas, you can make sure no single client floods the broker, ensuring smooth and fair resource distribution.
Configuring and Managing Quotas in Kafka
You can manage Kafka quotas in two main ways: statically (via broker configuration files) or dynamically (on-the-fly with commands). Let’s break it down:
Static Quota Configuration (via Broker Config)
If you want to set a default producer quota of 2 MB/s, you’d add this line to the broker config file:
quota.producer.default=2M
If you need more control, like setting different limits for different clients, you can add overrides:
quota.producer.override="clientA:4M, clientB:10M"
This lets clientA produce data at 4 MB/s, while clientB can crank out data at 10 MB/s. The downside to static quotas? You’ll need to restart the broker for changes to take effect.
Dynamic Quota Configuration (No Restart Needed)
Thankfully, Kafka also supports dynamic quotas, meaning you can change them on the fly without restarting anything. Here are some examples using the kafka-configs.sh tool:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--add-config 'producer_byte_rate=1024' --entity-type clients --entity-name clientA
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--add-config 'consumer_byte_rate=2048' --entity-type users
In these examples, you’re limiting clientA to 1024 bytes per second and user1 to 2048 bytes per second for consuming data.
Quota Throttling: What Happens When You Exceed Your Quota?
So what happens if a client gets a little greedy and starts sending more data than its quota allows? Kafka doesn’t immediately reject the messages. Instead, it starts throttling—delaying responses to slow down the client.
If a client is continuously exceeding its quota, Kafka might temporarily mute the connection to prevent it from hammering the system with too many requests. You can track throttling behavior with these handy metrics:
These metrics give you visibility into how much time Kafka spends keeping clients in check, helping you adjust quotas as needed to keep things running smoothly.
Handling Quota Violations in the Producer
For asynchronous producers, things can get messy if you exceed the broker’s capacity. When the producer tries to send messages faster than the broker can handle (either because of a quota or general load), the producer’s internal buffer fills up. If that happens, calls to send() may block, and you could even run into a TimeoutException if the broker can’t process the messages in time.
To avoid getting stuck in this situation:
With Kafka’s dynamic and static quota configurations, you’re in control of how resources are used. Set your limits wisely, and you’ll keep the system fair, prevent overloads, and ensure smooth sailing for all your clients!
Quick Recap of Key Kafka Producer Features
[email protected] 9146199997
2 个月Java Full Stack Developer Location: Chennai (Perungudi) / Bangalore Experience: 7 – 10 years Notice Period: Immediate Joiners (within 10 Days) Interview Mode: L1 – Zoom, L2 – F2F (Mandatory) Walk-In Date: Friday Akshar-9146199997
Open to work,Content creator,job updates Helping brand to grow
2 个月Insightful!
Rajiv Gandhi Proudyogiki Vishwavidyalay
2 个月Very helpful!
Marketing Specialist ||Affiliate Marketers || Business Branding Expert I love to sell your Product, Brand or Service to my fast growing Global Network || I'd love to connect with people and make the world a little nicer
2 个月Very informative
Social Media Manager I Influencer I Free Lancer I Personal Branding I Brand Management I LinkedIn Growth #contentcreator #Affiliatemarketing #Facebookbusinessmanager #Digitalmarketingspecialist
2 个月Thanks for sharing