Running the Apache Camel HTTP Kafka Source Connector in Instaclustr's Managed Kafka
Camels drinking from a leaking pipe (https://commons.wikimedia.org/wiki/File:Leaky_Pipe_Oasis_%283877530270%29.jpg)

Running the Apache Camel HTTP Kafka Source Connector in Instaclustr's Managed Kafka

I few years ago I built a "zero-code" data processing pipeline with Apache Kafka and related open-source technologies. Here's the first blog in that series (there were 10 in all, with links at the end). It was designed to use an open-source Kafka REST source connector to ingest tidal data from the NOAA REST API into Kafka topics for downstream parts of the pipeline to process. I used this REST connector at the time, however, when I downloaded it again it failed to build. After a bit of hunting, I tracked down another potential candidate connector, the Apache Camel HTTP Source Kafka Connector. Previously I had used some of the Apache Camel Kafka connectors, as they have the biggest pool of open-source Kafka connectors anywhere (here's the list), which they achieve by automatically generating them from the main Camel components. Given that camels are good at drinking lots of water very quickly (200L in 3 minutes), this connector was obviously worth trying.

But first things first, here are the basic steps required to set up the system.

First, you need to create a Kafka cluster in the Instaclustr console.


You would need a lot of water bottles to satisfy a camel, but we are only using S3 "buckets" to make the connectors available to the Kafka connect cluster (Public Domain)

Because we are going to use an open-source Kafka connector, the next step is to follow our instructions to create an AWS S3 bucket (there are other approaches if you are not using AWS) to act as the intermediate storage for the connector(s) - you upload the connector(s) to S3, and then our Kafka connect cluster can retrieve and deploy them automatically.

The instructions provide a CloudFormation template. You will need to run this in an AWS account that has permission to run CloudFormation, create an S3 bucket, and create and use an IAM User, AccessKey and Policy. You also need to change the S3 bucket name to something unique, otherwise the creation will fail.

Once the CloudFormation completes, look at the Outputs and record the AccessKey, S3BucketName, and SecretKey (you can come back later if you prefer).

The next step is to create an Instaclustr Kafka Connect Cluster that is connected to the Kafka cluster already created above (Kafka Target Cluster). The instructions are here. Make sure you tick the "Use Custom Connectors" option. This is the only opportunity you will have to configure the AWS S3 bucket, and you need to fill in the Custom Connector Configuration details (AccessKey, S3BucketName and SecretKey from the CloudFormation template Output).

Once the Kafka Connect cluster is running, click on the "Connectors" tab where you will see a list of built-in open-source connectors. However, there's no REST/HTTP source connector. This is where we need to some behind-the-scenes work.

Next, download the Camel HTTP Source Kafka Connector code from here. Unzip the file into a handy directory and you will see lots of jar files (96 of them). Which one is the correct file? Well, basically you need ALL of them. Find the AWS S3 bucket you created above, and upload the directory containing all the Camel jar files.

Next, go back to the Instaclustr Kafka Connect cluster Connector tabs, and click on the "Sync" button under "Managing Custom Connectors". After a short wait, the list of Available Connectors will be updated and include the Camel connectors. But what out! There are at least 3 new connectors. I accidentally used the first on the list, the CamelSinkConnector by mistake - this will not work as a Source connector (but didn't give any errors - why? I guess because it's trying to read records from the Kafka topic and write to the HTTP URL, but as there were no messages in the new topic it just sat and did nothing).

So obviously, given that we want the HTTP Source connector, the correct class to use is:

"org.apache.camel.kafkaconnector.httpsource.CamelHttpsourceSourceConnector"        


Running Camels (Public Domain)

The next step is configuring and running the connector - good luck - this is often tricky, with little debugging help available.

The documentation only mentions 3 configuration options, including period, contentType and url.

Ah, so what URL are we planning on calling? It's actually the OpenWeatherMap API we need to use. You need to sign up and get a key which allows a limited number of free calls per day. For example, this call:

https://api.openweathermap.org/data/2.5/weather?q=London,uk&APPID=yourkeyhere        

Returned this JSON data:

{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":803,"main":"Clouds","description":"broken clouds","icon":"04n"}],"base":"stations","main":{"temp":271.83,"feels_like":267.95,"temp_min":270.01,"temp_max":273.81,"pressure":995,"humidity":83},"visibility":10000,"wind":{"speed":3.13,"deg":49,"gust":6.26},"clouds":{"all":69},"dt":1705531760,"sys":{"type":2,"id":2075535,"country":"GB","sunrise":1705478291,"sunset":1705508528},"timezone":0,"id":2643743,"name":"London","cod":200}        

Wow! That's hot even for a desert with Camels! Ah, the temperature is in Kelvin (so actually cold, -1.32C)

(Public Domain)

With a bit of trial and error, I finally got the connector configured and running with a CURL command from a command line as follows:

curl https://IP:8083/connectors -X POST -H 'Content-Type: application/json' -d '{ "name": "camel-source-HTTP", "config": { "connector.class": "org.apache.camel.kafkaconnector.httpsource.CamelHttpsourceSourceConnector", "topics": "test2", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.kamelet.http-source.contentType": "application/json", "camel.kamelet.http-source.url":"https://api.openweathermap.org/data/2.5/weather?q=London,uk&APPID=yourkeyhere", "tasks.max": "1" } }' -k -u ic_kc_user:yourconnectclusterkeyhere        

The Kafka Connect cluster-specific connection information (with examples) is found under the Kafka Connect cluster "Connection Info" tab. It's tricky to get the syntax right and provide all the required configuration options, but once it works you see the connector appear in the "Active Connectors" tab (where you can restart or delete it). Given that it's a Kafka source connector it's mandatory to specify a topic (or more, with a comma-separated list) using "topics". But it was also tricky to find the correct value.converter. String and JSON options produced garbage record values, but finally ByteArrayConverter worked fine. Since the last time I used Apache Camel Kafka connectors there's something new, Kamelets! Some of the configurations require "camel.kamelet." prefixes. More about Kamelets is here. This source connector doesn't appear to produce Kafka record keys, so if you need them (e.g. for ordering or partition handling) you may need another solution.


So, to sum up the steps required to get a Camel HTTP Kafka source connector running:


  1. Create a Kafka cluster.
  2. Create an AWS S3 bucket.
  3. Create a Kafka Connect cluster, using (1) as the target Kafka cluster, and details from (2) as the Custom connector configuration.
  4. Download the Camel HTTP Kafka connector code, and upload to the S3 bucket from (2).
  5. "Sync" the Kafka connect cluster to get the new custom connectors.
  6. Work out the connector configuration and cluster connection details and run a CURL command to configure/run the connector.
  7. Check that the connector is active.
  8. Check that the data is arriving in the Kafka topic correctly.

Some other things to consider include (1) Scalability - this example shows only 1 task running, if you need more you can restart the connector with more tasks (2) Is there any chance of exceptions occurring, and how are they handled? (3) For each weather location we will need a different connector configuration and instance running. What if we need 100 locations? 1000s? And what if we need to dynamically request data from different locations, potentially for only a few hours at a time, and then delete the corresponding connectors? There is potential to use a workflow engine for this (E.g. Cadence or AirFlow etc).












Liam Anderson

Unlock the full potential of Open Source data solutions - no need for a team, tools, or talent. We have it all!

1 年

Beyond the great content, you always have really interesting analogies and pics Paul Brebner. Makes the read a lot of fun ????

回复

要查看或添加评论,请登录

Paul Brebner的更多文章

社区洞察

其他会员也浏览了