Unlock the Need for Speed: Experience Blazing-Fast Queries with ClickHouse
What is different here in the blog?
This article is not about discussing theoretical pros & cons of clickhouse but the actual tried and tested implementation.
Here I have found a workaround to copy schema from postgress and converts it according to clickhouse schema standards. Converting Postgress data type to Clickhouse data type while migration is cumbersome. So, here I leveraged Postgress-Clickhouse integretion and tweaked the code to get the desired result.
The provided code below is a Python script that connects to a PostgreSQL database and a ClickHouse database, retrieves table names and column information from the PostgreSQL database, creates corresponding tables in the ClickHouse database, and inserts data from PostgreSQL into ClickHouse.
import psycop
import clickhouse_connect
import json
# Define database connection parameters
postgres_params = {
? ? "host": "psql_host",
? ? "port": "5432",
? ? "database": "psql_user",
? ? "user": "psql_user",
? ? "password": "psql_password"
}
clickhouse_params = {
? ? "host": "ch_host",
? ? "port": "ch_port",
? ? "database": "ch_database",
? ? "user": "ch_user",
? ? "password": "ch_password"
}
ch_client = clickhouse_connect.get_client(host=clickhouse_params['host'], username=clickhouse_params['user'], password=clickhouse_params['password'])
# Connect to databases
postgres_conn = psycopg2.connect(**postgres_params)
# create database temp if not exist?
# this is the temporary db created for making direct
# connection with postgres
temp_db = "temp"
query= f"CREATE DATABASE IF NOT EXISTS {temp_db}"
ch_client.command(query)
# create temporary database if not exist?
dev_db = "dev"
query= f"CREATE DATABASE IF NOT EXISTS {dev_db}"
ch_client.command(query)
# Define function for creating ClickHouse table
# Here we are directly conecting to postgres so that its schema is already created in CH
def create_clickhouse_temp_table(table_name, column_names, primary_key=''):
? ? create_query = f"CREATE TABLE if not exists {temp_db}.{table_name} AS postgresql('psql_host', '{temp_db}', '{table_name}', 'psql_user', 'psql_password')"
? ? print(ch_client.command(create_query))
# Define function for inserting data into ClickHouse
# first we take schema from temp db and recreate in main db
# After that we insert rows from postgres to main db?
def insert_into_clickhouse(table_name, column_names, primary_key=''):
? ? primary_key2 = '' # assuming 1st column as primary if primary key is null
? ? if not primary_key:
? ? ? ? primary_key = list(column_names.keys())[0]
? ? ? ? primary_key2 = list(column_names.keys())[1]
? ? # creating schema in main db
? ? show_create_query = f"show create {temp_db}.{table_name}"
? ? res = ch_client.query(show_create_query).result_rows
? ? res = str(res[0][0])
? ? s= res.split("AS")[0]
? ? s += f"Engine = ReplacingMergeTree order by ({primary_key}) SETTINGS index_granularity = 8192, allow_nullable_key = 1"
? ? s = s.replace(temp_db, dev_db)
? ? print(ch_client.command(s))
? ? # inserting all the rows from Postgres into CH main db
? ? ifnull_query = f"INSERT INTO {table_name} SELECT * FROM postgresql('psql_host', '{temp_db}', '{table_name}', 'psql_user', 'psql_password')"
? ? ch_client.command(ifnull_query)
# Fetch all table names
postgres_cursor = postgres_conn.cursor()
postgres_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public'")
table_names = [row[0] for row in postgres_cursor.fetchall()]
# Iterate over tables
for table_name in table_names:
? ? # Get column names and data types from PostgreSQL
? ? postgres_cursor.execute(f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name='{table_name}'")
? ? postgres_columns = postgres_cursor.fetchall()
? ? # finding primary key from columns and fetching all columns
? ? clickhouse_columns = {}
? ? primary_key = ''
? ? for column_name, data_type in postgres_columns:
? ? ? ? if column_name == 'id':
? ? ? ? ? ? primary_key = column_name
? ? ? ? clickhouse_columns[column_name] = ""
? ? # Create ClickHouse table
? ? create_clickhouse_temp_table(table_name, clickhouse_columns, primary_key)
? ? # Insert data into ClickHouse
? ? insert_into_clickhouse(table_name, clickhouse_columns, primary_key)
领英推荐
Disclaimer: This code is not perfect and might need some tweaking according to your data issues. But i have tried this code and worked in my case where i have migrated 1 TB of data while testing. It undoubtedly reduces my query response time from 1.35 minutes in indexed postgress query to 0.8 milliseconds in unindexed clickhouse query. Yes you read this right.
Code is self explanaotry but here is the Code Explanation for better understanding for non-coders.
The provided code is a Python script that demonstrates the process of migrating data from a PostgreSQL database to a ClickHouse database. Here's a breakdown of the code:
Migrating data from PostgreSQL to ClickHouse can unlock improved performance, scalability, and cost-efficiency for organizations dealing with growing data volumes and complex analytical workloads. By following a systematic migration process, organizations can seamlessly transition their data while ensuring accuracy, data integrity, and minimal disruption to ongoing operations. Embrace the power of ClickHouse and unlock the full potential of your data analytics capabilities.
Hope you like this blog. Feel free to message me on linkedin in case you want to implement this or need any clarification. Thanks !!
Co-founder @ Incerto | Building Custom Observability for Fintech
1 年Insightful post. Migrating can be a pain, but I would do it for minutes to millisecond improvement.
Designing Seamless and User-Friendly Experiences ? Sr. UX Designer at ivoyant.com ? Enterprise UX and SaaS ? Youngest UX Panelist at CONFEDERATION OF GLOBAL INNOVATORS ? 9k+ Post Impressions on Linkedin
1 年Great Work and again another insightful post. ????
SDE-3 (AI/ML)
1 年Great work sir
Devop/SRE at Adidas
1 年Great work Prabhat Kumar :)