Incremental Data Streaming from an Oracle Database to Apache Kafka using Python

Incremental Data Streaming from an Oracle Database to Apache Kafka using Python

This Python code implements a solution for integrating an Oracle database with Apache Kafka. It retrieves incremental data from the Oracle database and sends it indefinitely to a specified Apache Kafka topic. The script connects to the Oracle database and executes SQL statements using the cx Oracle library, interacts with Apache Kafka using the confluent Kafka library, and handles time-related operations and file management using the time and os libraries, respectively. The script saves the most recently updated identifier in a file to keep track of the most recent data sent to Apache Kafka, allowing subsequent runs to only fetch new or updated data from the database. The script runs indefinitely, waiting for a predetermined interval before polling the database again.

The code makes use of the following libraries:

  1. cx_Oracle: This library allows us to connect to an Oracle database and execute SQL statements. It provides a convenient interface for working with Oracle databases.
  2. confluent_kafka: This library provides a high-level API for working with Apache Kafka. It supports consuming and producing messages, making it an ideal choice for this integration project.
  3. time: The time library provides various time-related functions in Python, such as the sleep() function used in this script to wait for a specified interval before polling the database again.
  4. os: The os library provides a variety of methods for interacting with the operating system, such as the path.exists() function used in this script to check if a file exists.

import cx_Oracl
import confluent_kafka
import time
import os


last_updated_id_file = "last_updated_id.txt"


def get_last_updated_id():
? ? if os.path.exists(last_updated_id_file):
? ? ? ? with open(last_updated_id_file, "r") as f:
? ? ? ? ? ? return int(f.read().strip())
? ? return None


def set_last_updated_id(last_updated_id):
? ? with open(last_updated_id_file, "w") as f:
? ? ? ? f.write(str(last_updated_id))

?
 ? # Connect to the Oracle database
? ? connection = cx_Oracle.connect("user", "password", "host:port/service_name")
? ? cursor = connection.cursor()


? ? # Fetch the last updated unique identifier from the Apache Kafka topic
? ? consumer = confluent_kafka.Consumer({
? ? ? ? "bootstrap.servers": "localhost:9092",
? ? ? ? "group.id": "group_id",
? ? ? ? "auto.offset.reset": "latest"
? ? })


? ? consumer.subscribe(["table_name"])
? ? last_updated_id = get_last_updated_id()


? ? msg = consumer.poll(1.0)
? ? if msg is not None:
? ? ? ? if msg.error() is None:
? ? ? ? ? ? last_updated_id = msg.value()
? ? ? ? else:
? ? ? ? ? ? raise Exception(f"Error fetching last updated identifier: {msg.error()}")
? ??
? ? # Connect to Apache Kafka
? ? producer = confluent_kafka.Producer({
? ? ? ? "bootstrap.servers": "localhost:9092",
? ? ? ? "acks": "all",
? ? ? ? "compression.type": "snappy"
? ? })


? ? iteration_count = 0
? ? max_iterations = 100


? ? while iteration_count < max_iterations:
? ? ? ? iteration_count += 1
? ? ? ??
? ? ? ? # Fetch the incremental data from the Oracle database
? ? ? ? if last_updated_id is not None:
? ? ? ? ? ? cursor.execute(f"SELECT * FROM table_name WHERE unique_id > {last_updated_id}")
? ? ? ? ? ? rows = cursor.fetchall()
? ? ? ? else:
? ? ? ? ? ? cursor.execute("SELECT * FROM table_name")
? ? ? ? ? ? rows = cursor.fetchall()


? ? ? ? # Prepare the data for sending to Apache Kafka
? ? ? ? data = []
? ? ? ? for row in rows:
? ? ? ? ? ? data.append(row)


? ? ? ? # Send the data to Apache Kafka
? ? ? ? for row in data:
? ? ? ? ? ?  ? ? producer.produce("table_name", key="table_name".encode(), value=row)
? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? print(f"Error producing record to Apache Kafka: {e}")


? ? ? ? # Wait for the data to be sent to Apache Kafka
? ? ? ? producer.flush()


? ? ? ? # Update the last updated identifier
? ? ? ? if data:
? ? ? ? ? ? last_updated_id = max([row[0] for row in data])
? ? ? ? ? ? set_last_updated_id(last_updated_id)

? ? ? ? # Wait for a specified interval before polling the database again
? ? ? ? time.sleep(5)


? ? # Close the connection to Apache Kafka
? ? producer.close()

    # Close the connection to the Oracle databas
    cursor.close()
    connection.close()        

Here are the key features of the code:

  • Statefulness: The code maintains the state of the last updated identifier in a text file and uses it to fetch only incremental data from the Oracle database.
  • Incremental Data Transfer: The code only fetches new data from the Oracle database that has been updated since the last run. This helps to minimize the amount of data transferred, which improves performance.
  • Performance Optimization: The code sends the data to Apache Kafka in small batches of 100, which can improve performance compared to sending all the data in one large batch.
  • Data Consistency: The code sends the data to Apache Kafka with the key "table_name" encoded, which ensures that the data is consistently written to the correct topic. Additionally, the code uses the "acks" configuration option set to "all" to ensure that the data is written to Apache Kafka with full data consistency guarantees.
  • Error Handling: The code includes error handling to raise an exception if there is an error fetching the last updated identifier from Apache Kafka. It also includes error handling for exceptions raised when connecting to the Oracle database and Apache Kafka to ensure that the connections are closed in case of exceptions.

Galvis Groeneweg

Private Equity Associate - Pre-IPO Opportunities

2 年

Thanks for sharing

要查看或添加评论,请登录

Shanoj Kumar V的更多文章

社区洞察

其他会员也浏览了