Apache Kafka - Summary
Based on the courses of Stephen Maarek

Apache Kafka - Summary


In the realm of cutting-edge data streaming solutions, Apache Kafka stands tall as a beacon of reliability and efficiency. Welcome to this article, your definitive guide to unraveling the core concepts, mastering essential configurations, and navigating the pivotal commands within this open-source distributed event streaming platform.

As we journey through the intricacies of Kafka's publish-subscribe model, we'll uncover the foundational principles that make it the heartbeat of data in motion. Whether you're configuring key components for seamless performance or delving into the command-line interface to command Kafka's capabilities, this summary ensures you grasp the essentials.


Topics, Partitions, Offset

Topic: A particular stream of data

  • Similar to a table in a database (Without all constraints)
  • You can have as many topics as you want
  • A topic is identified by its name

Topics are split into partitions

  • Each partition is ordered
  • Each message with a partition gets an incremental ID, called an offset
  • Offset only has a meaning for a specific partition.E.g. Offset 3 in a partition 0 doesn’t represent the same data as offset 3 in a partition 1
  • Ordering is only guaranteed within a partition (not across partitions)
  • Data is kept only for a limited time (default is one week)
  • Once the data is written to a partition, it can be changed (immutability)
  • Data is assigned randomly to a partition unless a key is provided.

Brokers

  • Kafka is composed of multiple brokers (servers)
  • Each broker is identified with its ID (Integer)
  • Each broker contains certain topics partitions
  • After connecting to any broker (bootstrap broker), you will be connected to the entire cluster
  • A good number to get started is 3 brokers, but some big cluster has over 100 brokers
  • In these examples, we choose to number brokers starting at 100 (arbitrary)

Topic Replication Factor

  • Topics should have replication factor > 1 (usually between 2 and 3)
  • This way if a broker is down, another broker can serve the data

Concept of leader for a partition

  • At any time only ONE broker can be a leader for a given partition
  • Only that leader can receive and serve data for a partition
  • The other broker will synchronize the data
  • Therefore each partition has one leader and multiple ISR (in-sync replica)

Producers

  • Producers write data to topics (which are made of partitions)
  • Producers automatically know to which broker and partition to write to
  • In case of broker failures, producers will automatically recover
  • Producers can choose to receive acknowledgment of data writes:acks=0 Produce won’t way for an acknowledgment (possible data loss)acks=1 Producer will wait for leader acknowledgment (limited data loss)acks=2 Leader + replica acknowledgment (no data loss)

Producers: Message Key

  • Producers can choose to send a key with the message (string, numbers, etc..)
  • If key=null, data is sent round-robin, (broker 101, then 102, then 103)
  • If a key is sent, then all messages for that will always go to the same partition.
  • A key is basically sent if you need message ordering for a specific field.

Consumer

  • Consumers read data from a topic (Identified by name)
  • Consumers know which broker to read from
  • In case of broker failures, consumers know how to recover
  • Data is read in order within each partition

Consumer Groups

  • Consumers read data from consumer group
  • Each consumer within a group read from exclusive partitions
  • If you have more consumers than partitions, some consumers will be inactive

Consumer Offsets

  • Kafka stores the offsets at which a consumer group has been read
  • The offsets commit live in a Kafka topic named _consumer_offsets
  • When a consumer in a group has processed data received from Kafka, it should be committing? the offsets
  • If a consumer dies, it will be able to read back from where it left off thanks to the committed consumer offsets!

Delivery Semantics

  • Consumers choose when to commit offsets
  • There are 3 delivery semantics

  1. At most once:- Offsets are committed as soon as a message is received- If the processing goes wrong, the message will be lost (it won’t be again)
  2. At least once (usually preferred):- Offsets are committed after the message is processed- If the processing goes wrong, the message will read again- This can result in duplicate processing of messages. Make sure your processing is Idempotent.
  3. Exactly once- This can be achieved for Kafka => Kafka workflows using Kafka Stream API- For Kafka => External System Workflows, use an idempotent consumer.

Kafka Broker Discovery

  • Every Kafka broker is also called a “bootstrap server”
  • That means that you only need to connect to one broker, and you will be connected to the entire cluster.
  • Each broker knows about all brokers, topics, and partitions (metadata)

Zookeeper

  • Zookeeper manages brokers (keeps a list of them)
  • Zookeeper helps in performing leader election for partitions
  • Zookeeper sends notifications to Kafka in case of changes (e.g. new topic, broker dies, broker comes up, delete topics, etc..)
  • Kafka can’t work without Zookeeper
  • Zookeeper by design operates with an odd number of servers (3, 5, 7)
  • Zookeeper has a leader (handle writes) the rest of the server are followers (handle reads)
  • Zookeeper does NOT store consumer offsets with Kafka v0.1.0


Commands

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties        
kafka-server-start /usr/local/etc/kafka/server.properties        

Topics

kafka-topics --bootstrap-server 127.0.0.1:9092 --list        
kafka-topics --bootstrap-server 127.0.0.1:9092 --topic first_topic --create --partitions 6 --replication-factor 1        
kafka-topics --bootstrap-server 127.0.0.1:9092 --topic first_topic --delete        
kafka-topics --bootstrap-server 127.0.0.1:9092 --topic first_topic --describe        

Producers

kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic first_topic        
kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic first_topic --producer-property acks=all        
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,        

Consumer

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic        
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning        
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application        
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,        

Groups

kafka-consumer-groups? --bootstrap-server localhost:9092 --list        
kafka-consumer-groups? --bootstrap-server localhost:9092 --describe --group my-first-application        
kafka-consumer-groups? --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic        

Producers Acks Deep dive

acks=0 (No acks)

  • No response is requested
  • If the broker goes offline or an exception happens, we won’t know and will lose data.
  • Use for data where it’s okay to potentially lose messages:- Metrics collection- Log connection.

acks=1 (leaders acks)

  • Leader response is requested, but replication is not guaranteed (happens in the background)
  • If an ACK is not received, the producer may retry.
  • If the leader broker goes down, but replicas haven’t replicated the data yet, we have data loss.

acks = all (replicas acks)

  • Leader + Replicas are requested
  • Added latency and safety
  • No data loss if enough replicas
  • Necessary setting, if you don't want to lose data
  • acks=all, must be used in conjunction with min.insync.replicas
  • min.insync.replicas can be set at the broker or topic level (override)
  • min.insync.replicas=2 implies that at least 2 brokers that are ISR (including leader) must respond that they have the data
  • That means if you use replication.factor=3, min.insync.replicas=2, acks=all, you can only tolerate a broker going down, otherwise, the producer will receive an exception on send.

Producer Retries

  • In case of transient failures, developers are expected to handle exceptions, otherwise, the data will be lost:
  • Example of transient failure:- NotEnoguhReplicasException
  • There is a retries setting- Defaults to 0 for kafka <= 2.0- Defaults to 2147483647 for kafka >= 2.1
  • The retry.backoff.ms setting is by default 100 ms

Producer Timeouts

  • If retries > 0, for example, retries 2147483647
  • The producer won’t try the request forever, it’s bound by a timeout
  • For this, you can set an intuitive Producer Timeout (KIP-91 Kafka 2.1)
  • delivery.timeout.ms =12000 ms equals 2 minutes
  • Records will be failed if they can acknowledged in delivery.timeout.ms

Warning

  • In the case of retries, there is a chance that the message will be sent out of order (if the batch has failed to be sent)
  • If you rely on key-based ordering, that can be an issue.

  • For this, you can set the setting while controlling how many producers' requests can be made in parallel: max.in .flight.requests.per.connection?- Default 5- Set it to if you need to ensure ordering (may impact throughput)

  • In Kafka >= 1.0.0 there is a better solution with idempotent producers

Idempotent Producers

  • Idempotent producers are great for guaranteeing a stable and safe pipeline
  • They come with:- retries= Integer.MAX_VALUE (2^31 2147483647)- max.in .flight.requests=1 (Kafka == 0.11) or- max.in .flight.requests=5 (Kafka >= 1.0.0 - high performance and keep ordering)- acks=all
  • These settings are applied automatically after your producer has started if you don’t set them manually
  • Just set:- producerProps.put(“enable.idempotence”, true)


Message Compression

  • Producers usually send data that is text-based, for example with JSON data.
  • In this case, it is important to apply compression to the producer.
  • Compression is enabled at the producer level and doesn't require any configuration change in the Broker or in the Consumers.
  • compression.type can be none (default), gzip, lz4, snappy
  • Compression is more effective the bigger the batch being sent Kafka

https://blog.cloudflare.com/squeezing-the-firehose/

  • The compressed batch has the following advantages:- Much smaller producer request size (compression ratio up to 4x!)- Faster transfer of data over the network => less latency- Better throughput- Better disk utilization in Kafka (store messages on disk are smaller)
  • Disadvantages (very minor)- Producer must commit some CPU cycles to compression- Consumer must commit some CPU cycles to decompression
  • Overall- Consider testing snappy and lz4 for optimal speed/compression ratio
  • Recommendations- Find a compression algorithm that gives you the best performance for your specific data. Test of all them.- Always use compression in production especially if you have a high throughput- Consider tweaking linger.ms and batch.size to have bigger batches, and therefore more compression and higher throughput.

Producer Batching

  • By default, Kafka tries to send records as soon as possible- It will have up to 5 requests in flight, meaning up to 5 messages individually sent at the same time.- After this, if more messages have to be sent while others are in flight, Kafka is smart and will start batching them while they wait to send them all at once.
  • This smart batching allows Kafka to increase throughput while maintaining very low latency
  • Batches have higher compression so better efficiency

linger.ms & batch.size

linger.ms

  • Number of milliseconds a producer is willing to wait before sending a batch out. (Default 0)
  • By introducing some lag (for example linger.ms =5), we increase the chances of messages being sent together in a batch.
  • So at the expense of introducing a small delay, we can increase the throughput, compression, and efficiency of our producer.
  • If a batch is full (see batch.size) before the end of the lingers.ms period, it will be sent to Kafka right away!?

batch.size

  • Maximum number of bytes that will be included in a batch. The default is 16KB.
  • Increasing the batch size to something like 32KB or 64KB can help increase the compression, throughput, and efficiency of requests.
  • Any message that is bigger than the batch will not be bached
  • A batch is allocated per partition, so make sure that you don’t see it as a number that’s too high, otherwise, you will run waste memory.

max.block.ms & buffer.memory

  • If that buffer is full (all 32 MB), then the .send() method will start to block (won’t return right away)
  • The time that .send() will block until throwing an exception. Exceptions are basically thrown when:The producer has filled up its bufferThe broker is not accepting new data60 seconds has elapsed
  • If you hit an exception, that usually means your brokers are down or overloaded as they can’t respond to requests.

Delivery Semantics for Consumer Summary

At most once

Offsets are committed as soon as the messages are received. If the processing goes wrong, the message will be lost (it won’t be read again).

At least once

Offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in a duplicated processing of messages. Make sure your processing is Idempotent

Exactly once

Can be achieved for Kafka => Kafka workflows using Kafka Streams API. For Kafka => Sink workflows, use idempotent consumers.

Consumers Poll Behavior

  • Kafka consumers have a “poll“ model, while many other messaging buses in the enterprise have a “push” model.
  • This allows consumers to control where in the log they want to consume, how fast, and gives them the ability to replay events.

fetch.min.bytes (default 1)

  • Controls how much data you want to pull at least on each request
  • Helps improve throughput and decrease request number
  • At the cost of latency

max.poll.records (default 500)

  • Controls how many records to received per poll request
  • Increase if your messages are very small and have a lot of available RAM
  • Good to monitor how many records are polled per request

max.partitions.fetch.bytes (default 1 MB)

  • Maximum data returned by the broker per partition
  • If you read from 100 partitions, you’ll need a lot of memory

fetch.max.bytes (default 50 MB)

  • Maximum data returned for each fetched request (cover multiple partitions)
  • The consumer performs multiple fetches in parallel

Change these settings only if your consumer maxes out on throughput already

Consumers Offset Commits Strategies

  • (easy) enable.auto .commit=true & synchronous processing of batches- With auto-commit, offsets will be committed automatically for you at regular intervals (auto.commit.interval.ms =5000 by default)- If you don’t use synchronous processing, you will be in “at-most-once” behavior because offsets will be committed before your data is processed
  • (medium) enable.auto .commit=false & manual commit of offsets- You control when your commit offsets and what is the conditions for committing them

Consumer Offset Reset Behavior

  • A consumer is expected to read from a log continuously.


  • But if your application has a bug, your consumer can be down.
  • If Kafka has a retention of seven days, and your consumer is down for more than seven days, the offsets are “invalid”
  • The behavior for the consumer is to then use:- auto.offset.reset=latest will read from the end of the log- auto.offset.reset=earliest will read from the start of the log- auto.offset.reset=none will throw an exception if no offsets are found
  • Additionally, consumer offsets can be lost:- If a consumer hasn’t read new data in 1 day (kafka < 2.0)- If a consumer hasn’t read new data in 7 day (kafka > 2.0)
  • This can be controlled by the broker setting offset.retention.minutes

Replaying data for consumer

  • To replay data for a consumer group

  • Take all the consumers down from a specific group down
  • Use kafka-consumer-groups command to set offset to what you want
  • Restart consumers

Notes

  • Set proper data retention period & offset retention period
  • Ensure the auto-offset reset behavior is the one you expected/want
  • Use replay capability in case of unexpected behavior

Controlling Consumer Liveliness

  • Consumers in? a group talk with a consumer coordinator
  • To detect consumers that are “down” there is a “heartbeat” mechanism and a “poll” mechanism
  • To avoid issues, consumers are encouraged to process data fast and poll often.

Consumers Heartbeat Thread

  • session.timeout.ms (default 10 seconds)- Heartbeats are sent periodically to the broker- If no heartbeats are sent during that period, the consumer is considered dead- Set event lower to faster consumer rebalances
  • heartbeat.interval.ms (default 3 seconds)- How often to send heartbeats- Usually set to 1/3rd of session.timeout.ms

Takeaway: This mechanism is used to detect a consumer application being down.

Consumer Poll Thread

  • max.poll.interval.ms (default 5 minutes)- Maximum amount of time between 2 .poll() calls before declaring the consumer dead- This is particularly relevant for Big Data frameworks like Spark in case the processing takes time.

Takeaway: This mechanism is used to detect a data processing issue with the consumer.

Kafka Connect

  • Source connectors to get data from common data sources
  • Sink connectors to publish that data in Common Data Stores
  • Make it easy for non-experienced dev to quickly get their data reliably into Kafka
  • Parto of your ETL pipeline
  • Scaling made easy from small pipelines to company-wide pipeline

Kafka Streams

  • Easy data processing and transformation library within Kafka
  • No need to create a separate cluster
  • High scalable, elastic, and fault-tolerant
  • Exactly Once Capability
  • One record at a time processing (No batching)
  • Works for any application size

Kafka Streams

Architecture

Kafka Streams Architecture

Kafka Schema Registry

The need for a schema registry

  • Kafka takes bytes as input and publishes them
  • No data verification

So,

  • We need data to be self describable
  • We need to be able to evolve data without breaking downstream consumers
  • We need schemas

What if the Kafka Brokers were verifying the message that they received?

  • It would break what makes Kafka so good- Kafka doesn’t parse or even read your data (no CPU usage)- Kafka takes bytes as input without even loading them into memory (that call zero’s copy)- Kafka distributes bytes- As far as Kafka is concerned, it doesn’t even know if your data is an integer, string, etc.

Then,

  • The Schema Registry has to be a separate components
  • Producers and Consumers need to be able to talk to it
  • The schema registry must be able to reject bad data.
  • A common data format must be agreed uponIt needs to support schemasIt needs to support evolutionIt needs to be lightweight

Confluent Schema Registry

  • Store and retrieve schemas for Producers / Consumers
  • Enforce Backward / Forward / Full compatibility on topics
  • Decrease the size of the payload of data sent to Kafka.

Schema Registry

  • Utilizes schema registry has a lot of benefits
  • But it implies you need to- Set it up well- Make sure it's highly available?- Partially change the producer and consumer code
  • Apache Avro as a format is awesome but has a learning curve
  • The Schema Registry is free and open-sourced, created by Confluent (Creators of Kafka)

https://medium.com/@stephane.maarek/the-kafka-api-battle-producer-vs-consumer-vs-kafka-connect-vs-kafka-streams-vs-ksql-ef584274c1e

Partition Count and Replication Factor

  • The 2 most important parameters when creating a topic
  • They impact performance and durability of the system overall
  • It is best to get the parameters right the first timeIf the Partitions Count increases during a topic lifecycle, you will break your key ordering guarantees.If the Replication Factor increases during a topic lifecycle, you put more pressure on your cluster, which can lead to an unexpected performance decrease.

Partitions Count

  • Each partition can handle a throughput of a few Mb/s
  • More partitions imply:- Better parallelism and better throughput- Ability to run more consumers in a group to scale- Ability to leverage more brokers if you have a large cluster- BUT more elections to perform for Zookeeper- BUT more files opened for Kafka
  • Partitions per topic = MILLION DOLLAR QUESTION- (Intuition) small cluster (<6 brokers) 2x # Brokers- (Intuition) Big cluster (>12 brokers) 1x # Brokers- Adjust the number of consumers you need to run in parallel at peak throughput

Replication Factor

  • Should be at least 2, usually 3, and a maximum of 4
  • The higher the replication factor (N):- Better resilience of your system (N-1 brokers can fail)- BUT more replication (higher latency if ack=all)- BUT more disk space on your system (50% more if RF is 3 instead of 2)

Guidelines

  • Set it to 3 to get started (you will have at least 3 brokers for that)
  • If replication performance is an issue, get a better broker instead of less RF
  • Never set it to 1 in production

Cluster Guidelines

  • It is pretty much accepted that a broker should not hold more than 2000 and 4000 partitions (across all topics of that broker).
  • Additionally, a Kafka cluster should have a maximum of 20000 partitions across all brokers- That reason is that in case of brokers going down, Zookeeper needs to perform a lot of leader elections.
  • If you need more partitions in your cluster, add brokers instead
  • If you need more than 20000 partitions in your cluster (It will take time to get there!), follow the Netflix model and create more Kafka Brokers

Advanced Kafka Topic Configurations

Change a topic configuration

Adding

kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name test.config --add-config min.insync.replicas=2 --alter?        

Deleting

kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name test.config --delete-config min.insync.replicas --alter        

Partitions and Segments

  • Partitions are made of segments (files)!

Partitions and Segments

  • Only one segment is ACTIVE
  • Two segment settings:- log.segment.byte the max size of a single segment in bytes- log.segment.ms the time Kafka will wait before committing the segment if not null

Segments And Indexes

  • Segments come with 2 index files (files):- And offset to position index: allows Kafka where to read to find a message- A timestamp to offset index: allows Kafka to find messages with a timestamp
  • A smaller log.segment.byte (size, default 1GB) means:- More segments per partitions- Log Compaction happens more often- BUT Kafka has to keep more files opened (Too many opened files)
  • A smaller log.segment.ms (time, default a week) means:- You set a max frequency for log compaction (more frequent triggers)Maybe you want daily compaction instead of weekly
  • Ask yourself How often do I need log compaction to happen?

Log Cleanup Policies

  • Many Kafka clusters make data expire, according to a policy
  • That concept is called log clean-up

Policy 1

log.cleanup.policy=delete (Kafka default for all user topics)

  • Delete based on the age of data (default is a week)
  • Delete based on max size of log (default -1 == infinite)

Policy 2

log.cleanup.policy=compact (Kafka default for topic __consumer_offsets)

  • Delete based on the keys of your messages
  • Will delete all duplicate keys after the active segment is committed
  • Infinite time and space retention

Log Cleanup Policy: Delete

  • log.retention.hours- Number of hours to keep data for (default is 168 - one week)- A higher number means more disk space- A lower number means that less data is retained (if your consumers are down for too long, they can miss data)
  • log.retention.bytes- Max size in bytes for each partition, (default is -1 - infinite)- Useful to keep the size of a log under a threshold


I hope you find it useful, happy coding!        

要查看或添加评论,请登录

Harold C.的更多文章

  • Event-Loop Implementation In Java

    Event-Loop Implementation In Java

    An Event Loop, often referred to as a message dispatcher, is a fundamental design pattern that serves as a central…

    5 条评论
  • What kind of exception must define in Java?

    What kind of exception must define in Java?

    It is important to know when to define a checked or unchecked in Java and also when to apply in place and with the…

    1 条评论

社区洞察

其他会员也浏览了