Introducing the leading open-source Kafka Connector for Amazon S3

Introducing the leading open-source Kafka Connector for Amazon S3

Solid data integration is the backbone of an effective data platform, and in the Kafka ecosystem, managing S3 data flows is crucial.

Confronted with a range of diverse demands related to S3, data engineers need efficient solutions. This is where Lenses comes in. By forging partnerships with major enterprises, we've enhanced our S3 connector, establishing it as the preferred choice in the market.

The connector's inclusion in the Lenses 5.3 release has also strengthened Kafka topic backup and restore capabilities.

In the following sections, we'll unpack the expanded functionalities and the latest features of the connector, showcasing why it stands out in managing mission-critical data flows.

Integrating data with Apache Kafka & S3: The challenges

Integrating Kafka with S3 presents a unique set of challenges, largely due to S3's versatility and the wide array of use cases it accommodates. This flexibility often leaves data engineers in search of a comprehensive solution that can effectively bridge the gap between Kafka and S3, addressing several distinct hurdles:

  • Preserving context is essential when backing up Kafka topics.
  • Athena has a preference for Parquet storage, requiring specific partitioning.
  • When it comes to storing images in S3, maintaining a one-message-per-file approach is critical.

And these complexities are only part of the equation, representing just one side of the data sinking issues.

On the flip side, retrieving data from S3 to Kafka frequently involves navigating through complex formats, whether it's multiline XML or specialized industry text protocols, adding another layer of intricacy to the integration process.


A swiss army knife connector

Robust data management

When managing data, there's a pressing need for a connector that's both versatile and comprehensive, capable of catering to an array of requirements. This includes having a robust Envelope data structure that meticulously preserves elements like keys, values, headers, and metadata, ensuring no crucial information is lost in transactions.

Flexibility across file formats

It's also essential that the connector supports a variety of storage formats — from Parquet, AVRO, and JSON to plain text — and can source complex formats such as XML and specific industry protocols. This versatility extends to the ability to write raw events into separate files, which is vital for handling content such as images.

Ensuring exactly-once semantics

An effective connector should offer a flexible structure and tailored partitioning in S3, coupled with the capacity to control the timing and sizing of file flushes. It should dynamically detect new objects at the source and uphold data integrity through measures like avoiding data loss and ensuring exactly-once semantics.

Open-source, enterprise-ready

What's more, these high-caliber connectors are open-source and come with the added assurance of optional enterprise support. They have been rigorously stress-tested in mission-critical production environments in collaboration with our enterprise partners.

Enhancing disaster recovery with integrations & scalability

With the Lenses S3 connectors specifically, users gain the ability to backup and restore Kafka topics, archive topic histories for enhanced security and disaster recovery, and efficiently transport Kafka data to data lakes, servicing platforms from Athena to Databricks or Snowflake. These connectors also facilitate the powering of AI models and Live Long-term Memory (LLM), and they allow for the seamless integration of non-Kafka native apps by sourcing from S3. All these functions can be conducted at a large scale, with the ability to adjust throughput and latency in line with specific needs.

Finally, the connector employs the Kafka Connect Query Language (KCQL), offering SQL-like command for precise control. This comprehensive syntax underpins the connector's versatility, making it essential for sophisticated data management tasks.

INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[WITH_FLUSH_SIZE = flush_size]
[WITH_FLUSH_INTERVAL = flush_interval]
[WITH_FLUSH_COUNT = flush_count]
[PROPERTIES(
 'property.1' = x,
 'property.2' = x,
)]        

The main features of the connector are driven by this configuration property, and we will walk through examples in the rest of this article.

Message envelope

A Kafka message contains several parts: the key, value, headers, and metadata, which includes details such as topic, partition, offset, and timestamp.

To simplify backup and restore processes, the connector encases these messages in a "package" called an envelope. This method avoids the need for intricate Kafka Connect transformations.

The following is the organization of the envelope:

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 821122,
    "partition": 3,
    "timestamp": 1695645345,
    "topic": "source_topic"
  }
}
        

In this setup, every component of the Kafka message is preserved, helping with backup, restoration, and analytical query execution.

The Source connector uses this structure to reconstruct the original Kafka message, directing it to the designated topic.

INSERT INTO lensesioaws 
SELECT * FROM payments 
STOREAS AVRO 
PROPERTIES (
 ‘store.envelope’=true
);        

Storage formats

Amazon S3 can house a variety of content, and the connector accommodates this by supporting several primary formats, including:

  • AVRO
  • Parquet
  • JSON
  • CSV (headers included)
  • Text
  • BYTES

This range of formats is independent of the Kafka setup. The transition from Kafka to Connect is managed through the key.converter and value.converter properties of the connector.

Configuring the data storage format in S3 is straightforward, achieved by using the STOREAS keyword in the connector's settings. For instance, to store individual images, the format can be specified as bytes with a flush count of one.

connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM claim_photos STOREAS BYTES WITH_FLUSH_COUNT=1        

S3 object partitioning

Partitioning plays a key role in structuring data and enhancing query performance. In S3, partitioning is handled through the Object Key. The connector, by default, mirrors the structure of the Kafka topic to which it sends data. For example, a topic with three partitions would use the following setup:

connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM payments STOREAS AVRO PROPERTIES ( ‘store.envelope’=true)        

Would result in:

s3://lensesioaws/payments/0/000000001234.avro
s3://lensesioaws/payments/1/00000000555.avro
s3://lensesioaws/payments/2/0000000014566.avro        

The connector offers the option for customized partitioning, which brings several advantages:

  • It enhances the efficiency of future data queries by organizing data into specific partitions.
  • It simplifies data management by categorizing data over regular time periods, such as by year or month.
  • It segregates sensitive information into separate partitions, tightening access control.

To modify partitioning, the PARTITIONBY clause in the KCQL setup can be applied, using aspects of the Kafka message like the key, value, or headers.

For example, in a "sales" Kafka topic with messages related to transactions, KCQL can arrange data by criteria such as transaction year, type of product, and customer's geographical region.

INSERT INTO my-s3-bucket
SELECT *
FROM sales 
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO,        

The Kafka Connect S3 Sink Connector can generate customized object keys in your S3 bucket. These keys may include details such as customer ID, transaction year, product category, and customer region, leading to a more generalized partitioning approach. An example of what an object key might resemble is as follows:

s3://my-s3-bucket/2023/Electronics/EMEA/000000001.avro

        

To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:


INSERT INTO my-s3-bucket
SELECT *
FROM sales
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO
PROPERTIES('partition.include.keys' = true)

        

This will result in object keys like:

s3://my-s3-bucket/year=2023/product_category=Electronics/region=EMEA/000000001.avro        

Arranging data into time-specific segments using custom object keys offers significant advantages. For streamlined creation of these time-based intervals, the connector is compatible with a dedicated Kafka Connect Single Message Transformer (SMT) plugin. This plugin simplifies the process of custom object key naming based on time. The transformer plugin and its documentation are available for access. For instance, if there's a requirement for the object key to reflect the wallclock time (when the message was actually processed) and establish an hourly window derived from a field named timestamp, the connector can be configured as follows:

connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` WITH_FLUSH_SIZE=1000000 WITH_FLUSH_INTERVAL=30 WITH_FLUSH_COUNT=5000
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH        

In this setup:

  • The TimestampConverter SMT converts the timestamp field in the Kafka message's value to a string, following the provided format pattern (yyyy-MM-dd-HH). This manipulation formats the timestamp to denote an hourly segment.
  • The InsertWallclock SMT adds the existing wallclock time, adhering to the designated format (yyyy-MM-dd-HH).
  • The PARTITIONBY clause utilizes both the timestamp field and the wallclock header in shaping the object key, granting detailed command over how data is partitioned.

Sourcing multi-line events & XML

Data doesn't always follow Avro or Protobuf standards, making text-based data a common format. Here, regular expressions (regex) prove instrumental in sifting through and pulling out particular records from text data.

To understand the practical application of regex in text data, let's consider an example.

Suppose you're handling a log file from a web server filled with various entries like requests, errors, and more. Using regex, you can pinpoint and extract only those HTTP request entries that conform to a specific pattern, for example, successful GET requests for a certain endpoint.

This precise filtering helps you single out relevant records for further scrutiny or monitoring in Kafka, ignoring the extraneous details. Essentially, regex presents an efficient way to parse and cull relevant information from text sources such as log files, allowing a focus on specific, pertinent records and simplifying data processing tasks.

In the next section, a regex is applied that identifies each line as a record if it begins with a number exceeding zero. This tactic, used by an airline client of Lenses, zeroes in on specific flight data entries:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')        

Handling data spanning multiple lines

Sometimes, data records in a text file stretch over several lines, creating a distinct extraction challenge. The source connector counters this with a flexible multi-line mode.

To activate this mode, you need to adjust the read.text.mode to StartEndLine, then define the lines where the records start and end to accurately capture the necessary data. Here's an example to illustrate this.

Imagine working with a log file that features system status messages (SSM) stretching across multiple lines, with each entry starting with 'SSM' and ending with an empty line ('').

To tailor the configuration for this scenario and also trim any residual whitespace, you would apply the following settings:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')        

And finally, we introduce the Start-End Tag mode, a dynamic text-reading feature that selectively extracts text content sandwiched between specified start and end tags, inclusively. This mode is powerful when dealing with scenarios where a single line of text in an S3 source equates to multiple Kafka messages in the output. For example, given XML records nestled between '<SSM>' and '</SSM>' tags, with the objective of reading and transforming the records into Kafka messages. This can be achieved with effortless precision by configuring the connector as shown below:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')          

We hope you enjoy the connector!

Ready to dive deeper?


Bernice Etsebeth

Senior Talent Partner at Microsoft | Delivering Top Talent via Join Talent

1 年

要查看或添加评论,请登录

社区洞察

其他会员也浏览了