Incremental Data Streaming from an Oracle Database to Apache Kafka using Python
Shanoj Kumar V
VP - Senior Technology Architecture Manager @ Citi | LLMs, AI Agents & RAG | Cloud & Big Data | Author
This Python code implements a solution for integrating an Oracle database with Apache Kafka. It retrieves incremental data from the Oracle database and sends it indefinitely to a specified Apache Kafka topic. The script connects to the Oracle database and executes SQL statements using the cx Oracle library, interacts with Apache Kafka using the confluent Kafka library, and handles time-related operations and file management using the time and os libraries, respectively. The script saves the most recently updated identifier in a file to keep track of the most recent data sent to Apache Kafka, allowing subsequent runs to only fetch new or updated data from the database. The script runs indefinitely, waiting for a predetermined interval before polling the database again.
The code makes use of the following libraries:
领英推荐
import cx_Oracl
import confluent_kafka
import time
import os
last_updated_id_file = "last_updated_id.txt"
def get_last_updated_id():
? ? if os.path.exists(last_updated_id_file):
? ? ? ? with open(last_updated_id_file, "r") as f:
? ? ? ? ? ? return int(f.read().strip())
? ? return None
def set_last_updated_id(last_updated_id):
? ? with open(last_updated_id_file, "w") as f:
? ? ? ? f.write(str(last_updated_id))
?
? # Connect to the Oracle database
? ? connection = cx_Oracle.connect("user", "password", "host:port/service_name")
? ? cursor = connection.cursor()
? ? # Fetch the last updated unique identifier from the Apache Kafka topic
? ? consumer = confluent_kafka.Consumer({
? ? ? ? "bootstrap.servers": "localhost:9092",
? ? ? ? "group.id": "group_id",
? ? ? ? "auto.offset.reset": "latest"
? ? })
? ? consumer.subscribe(["table_name"])
? ? last_updated_id = get_last_updated_id()
? ? msg = consumer.poll(1.0)
? ? if msg is not None:
? ? ? ? if msg.error() is None:
? ? ? ? ? ? last_updated_id = msg.value()
? ? ? ? else:
? ? ? ? ? ? raise Exception(f"Error fetching last updated identifier: {msg.error()}")
? ??
? ? # Connect to Apache Kafka
? ? producer = confluent_kafka.Producer({
? ? ? ? "bootstrap.servers": "localhost:9092",
? ? ? ? "acks": "all",
? ? ? ? "compression.type": "snappy"
? ? })
? ? iteration_count = 0
? ? max_iterations = 100
? ? while iteration_count < max_iterations:
? ? ? ? iteration_count += 1
? ? ? ??
? ? ? ? # Fetch the incremental data from the Oracle database
? ? ? ? if last_updated_id is not None:
? ? ? ? ? ? cursor.execute(f"SELECT * FROM table_name WHERE unique_id > {last_updated_id}")
? ? ? ? ? ? rows = cursor.fetchall()
? ? ? ? else:
? ? ? ? ? ? cursor.execute("SELECT * FROM table_name")
? ? ? ? ? ? rows = cursor.fetchall()
? ? ? ? # Prepare the data for sending to Apache Kafka
? ? ? ? data = []
? ? ? ? for row in rows:
? ? ? ? ? ? data.append(row)
? ? ? ? # Send the data to Apache Kafka
? ? ? ? for row in data:
? ? ? ? ? ? ? ? producer.produce("table_name", key="table_name".encode(), value=row)
? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? print(f"Error producing record to Apache Kafka: {e}")
? ? ? ? # Wait for the data to be sent to Apache Kafka
? ? ? ? producer.flush()
? ? ? ? # Update the last updated identifier
? ? ? ? if data:
? ? ? ? ? ? last_updated_id = max([row[0] for row in data])
? ? ? ? ? ? set_last_updated_id(last_updated_id)
? ? ? ? # Wait for a specified interval before polling the database again
? ? ? ? time.sleep(5)
? ? # Close the connection to Apache Kafka
? ? producer.close()
# Close the connection to the Oracle databas
cursor.close()
connection.close()
Here are the key features of the code:
Private Equity Associate - Pre-IPO Opportunities
2 年Thanks for sharing