Unlock the Need for Speed: Experience Blazing-Fast Queries with ClickHouse

Unlock the Need for Speed: Experience Blazing-Fast Queries with ClickHouse

What is different here in the blog?

This article is not about discussing theoretical pros & cons of clickhouse but the actual tried and tested implementation.

Here I have found a workaround to copy schema from postgress and converts it according to clickhouse schema standards. Converting Postgress data type to Clickhouse data type while migration is cumbersome. So, here I leveraged Postgress-Clickhouse integretion and tweaked the code to get the desired result.

The provided code below is a Python script that connects to a PostgreSQL database and a ClickHouse database, retrieves table names and column information from the PostgreSQL database, creates corresponding tables in the ClickHouse database, and inserts data from PostgreSQL into ClickHouse.


import psycop
import clickhouse_connect
import json

# Define database connection parameters
postgres_params = {
? ? "host": "psql_host",
? ? "port": "5432",
? ? "database": "psql_user",
? ? "user": "psql_user",
? ? "password": "psql_password"
}

clickhouse_params = {
? ? "host": "ch_host",
? ? "port": "ch_port",
? ? "database": "ch_database",
? ? "user": "ch_user",
? ? "password": "ch_password"
}

ch_client = clickhouse_connect.get_client(host=clickhouse_params['host'], username=clickhouse_params['user'], password=clickhouse_params['password'])

# Connect to databases
postgres_conn = psycopg2.connect(**postgres_params)

# create database temp if not exist?
# this is the temporary db created for making direct
# connection with postgres
temp_db = "temp"
query= f"CREATE DATABASE IF NOT EXISTS {temp_db}"
ch_client.command(query)

# create temporary database if not exist?
dev_db = "dev"
query= f"CREATE DATABASE IF NOT EXISTS {dev_db}"
ch_client.command(query)

# Define function for creating ClickHouse table
# Here we are directly conecting to postgres so that its schema is already created in CH
def create_clickhouse_temp_table(table_name, column_names, primary_key=''):
? ? create_query = f"CREATE TABLE if not exists {temp_db}.{table_name} AS postgresql('psql_host', '{temp_db}', '{table_name}', 'psql_user', 'psql_password')"
? ? print(ch_client.command(create_query))


# Define function for inserting data into ClickHouse
# first we take schema from temp db and recreate in main db
# After that we insert rows from postgres to main db?
def insert_into_clickhouse(table_name, column_names, primary_key=''):
? ? primary_key2 = '' # assuming 1st column as primary if primary key is null
? ? if not primary_key:
? ? ? ? primary_key = list(column_names.keys())[0]
? ? ? ? primary_key2 = list(column_names.keys())[1]

? ? # creating schema in main db
? ? show_create_query = f"show create {temp_db}.{table_name}"
? ? res = ch_client.query(show_create_query).result_rows
? ? res = str(res[0][0])
? ? s= res.split("AS")[0]
? ? s += f"Engine = ReplacingMergeTree order by ({primary_key}) SETTINGS index_granularity = 8192, allow_nullable_key = 1"
? ? s = s.replace(temp_db, dev_db)

? ? print(ch_client.command(s))

? ? # inserting all the rows from Postgres into CH main db
? ? ifnull_query = f"INSERT INTO {table_name} SELECT * FROM postgresql('psql_host', '{temp_db}', '{table_name}', 'psql_user', 'psql_password')"
? ? ch_client.command(ifnull_query)

# Fetch all table names
postgres_cursor = postgres_conn.cursor()
postgres_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public'")
table_names = [row[0] for row in postgres_cursor.fetchall()]
# Iterate over tables
for table_name in table_names:
? ? # Get column names and data types from PostgreSQL
? ? postgres_cursor.execute(f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name='{table_name}'")
? ? postgres_columns = postgres_cursor.fetchall()

? ? # finding primary key from columns and fetching all columns
? ? clickhouse_columns = {}
? ? primary_key = ''
? ? for column_name, data_type in postgres_columns:
? ? ? ? if column_name == 'id':
? ? ? ? ? ? primary_key = column_name
? ? ? ? clickhouse_columns[column_name] = ""
? ? # Create ClickHouse table
? ? create_clickhouse_temp_table(table_name, clickhouse_columns, primary_key)

? ? # Insert data into ClickHouse
? ? insert_into_clickhouse(table_name, clickhouse_columns, primary_key)
        
Disclaimer: This code is not perfect and might need some tweaking according to your data issues. But i have tried this code and worked in my case where i have migrated 1 TB of data while testing. It undoubtedly reduces my query response time from 1.35 minutes in indexed postgress query to 0.8 milliseconds in unindexed clickhouse query. Yes you read this right.

Code is self explanaotry but here is the Code Explanation for better understanding for non-coders.

The provided code is a Python script that demonstrates the process of migrating data from a PostgreSQL database to a ClickHouse database. Here's a breakdown of the code:

  1. The code imports the required libraries, including psycopg2 for PostgreSQL connectivity and clickhouse_connect for ClickHouse connectivity.
  2. Database connection parameters are defined for both PostgreSQL (postgres_params) and ClickHouse (clickhouse_params).
  3. The ClickHouse client is established using the get_client() function, passing the ClickHouse connection parameters.
  4. A connection is established to the PostgreSQL database using psycopg2.connect() and the PostgreSQL connection parameters.
  5. Two databases are created if they don't exist: temp and dev, using the ClickHouse client's command() method.
  6. The script defines two functions: create_clickhouse_temp_table() and insert_into_clickhouse(), for creating ClickHouse tables and inserting data respectively.
  7. In the main part of the code:

  • The script retrieves all table names from the PostgreSQL database using a SELECT query.
  • It iterates over each table and fetches the column names and data types from PostgreSQL using another SELECT query.
  • The script identifies the primary key column and creates the ClickHouse table using the create_clickhouse_temp_table() function.
  • The data is then inserted into the ClickHouse table using the insert_into_clickhouse() function.

Migrating data from PostgreSQL to ClickHouse can unlock improved performance, scalability, and cost-efficiency for organizations dealing with growing data volumes and complex analytical workloads. By following a systematic migration process, organizations can seamlessly transition their data while ensuring accuracy, data integrity, and minimal disruption to ongoing operations. Embrace the power of ClickHouse and unlock the full potential of your data analytics capabilities.

Hope you like this blog. Feel free to message me on linkedin in case you want to implement this or need any clarification. Thanks !!

Anurag Pandey

Co-founder @ Incerto | Building Custom Observability for Fintech

1 年

Insightful post. Migrating can be a pain, but I would do it for minutes to millisecond improvement.

Gaurav Shriwastava

Designing Seamless and User-Friendly Experiences ? Sr. UX Designer at ivoyant.com ? Enterprise UX and SaaS ? Youngest UX Panelist at CONFEDERATION OF GLOBAL INNOVATORS ? 9k+ Post Impressions on Linkedin

1 年

Great Work and again another insightful post. ????

Great work sir

Naman Gupta

Devop/SRE at Adidas

1 年

Great work Prabhat Kumar :)

要查看或添加评论,请登录

Prabhat Kumar的更多文章

社区洞察

其他会员也浏览了