Historical Data Tracking in PostgreSQL - Part 3: ETL Process Adjustment

Historical Data Tracking in PostgreSQL - Part 3: ETL Process Adjustment

Introduction

In the first two parts of this series, we explored how to set up historical data tracking in #PostgreSQL using triggers and discussed the essential steps involved. In this final topic, we will delve into adapting your #ETL (Extract, Transform, Load) process to accommodate tables subjected to historical data tracking.

Involved Table and Materialized View

The table 'stg.products' is used for subsequent operations and business calculations.

CREATE TABLE stg.products (
	id int4 NOT NULL,
	name varchar(250) NOT NULL,
	description text NOT NULL,
	category varchar(150) NULL,
...
	price float8 NULL,
	CONSTRAINT products_pkey PRIMARY KEY (id)
...
);        

The materialized view, 'stg.mv_products' serves as a snapshot of the source data from an #foreignserver .

CREATE MATERIALIZED VIEW stg.mv_products
TABLESPACE pg_default
AS SELECT
      products.id
    , products.name
    , products.description
    , products.category
    , ...
    , products.price
   FROM dblink('an_external_server'::text, 'SELECT id, name, description, category, ..., price FROM public.products'::text)
        products(id integer, name character varying(250), description text, category character varying(150), ..., price double precision)
WITH DATA;        

Adequation of the Procedure 'stg.bulkdata()'

To integrate historical data tracking into your ETL process, you'll first need to adapt the existing 'stg.bulkdata()' procedure. This procedure is responsible for handling the bulk data operations.

CREATE OR REPLACE PROCEDURE stg.bulkdata()
 LANGUAGE plpgsql
AS $procedure$
BEGIN
	REFRESH MATERIALIZED VIEW stg.mv_products;
	... -- Other bulk operations
        ...
	-- TRUNCATE TABLE stg.products;
	-- INSERT INTO stg.products SELECT * FROM stg.mv_products;
	CALL stg.load_products();
	...
END;$procedure$
;        

To adapt the ETL process, it is essential to replace the #TRUNCATE and #INSERTINTO operations with a function called 'load_products()'. Both operations serve to store the information of the materialized view 'stg.mv_products' into the table 'stg.products'.

By doing so, we ensure a more streamlined and controlled data transfer process, since this modification allows for better synchronization between the source and destination tables, especially when historical data tracking is implemented, as it maintains data integrity while updating records over time.

Defining the Process 'stg.load_products()'

Within the 'stg.bulkdata()' procedure, the 'stg.load_products()' process plays a pivotal role.

CREATE OR REPLACE PROCEDURE stg.load_products()
 LANGUAGE plpgsql
AS $procedure$
BEGIN
    DELETE FROM stg.products 
    WHERE id IN 
    (
       SELECT
             old_data.id
       FROM (SELECT dest.id FROM stg.products dest) old_data 
       FULL OUTER JOIN (SELECT sour.id FROM stg.mv_products sour) new_data
                  ON old_data.id = new_data.id
       WHERE new_data.id IS NULL
    );
	
    INSERT INTO stg.products 
    SELECT sour.*
    FROM stg.products dest
	FULL OUTER JOIN stg.mv_products sour on dest.id = sour.id
    WHERE dest.id IS NULL;	

    UPDATE stg.products dest SET
	dest.name        = sour.name       ,
	dest.description = sour.description,
	dest.category    = sour.category   ,
	...              = ...             , -- More fields
	dest.price       = sour.price
    FROM stg.mv_products sour 
    WHERE dest.id = sour.id 
    AND MD5(CAST((dest.*) AS text)) <> MD5(CAST((sour.*) AS text)) ;
end;$procedure$
;        

Let's break down the key elements of this process:

  • The DELETE section ensures that products that no longer exist in the source data are removed from the table: 'WHERE new_data.id IS NULL'.
  • The INSERT section appends new products from the materialized view to the table: 'WHERE dest.id IS NULL'.
  • The UPDATE section updates existing records in the table if there are differences between the current data and the data in the materialized view: 'dest.id = sour.id AND MD5(CAST((dest.*))) <> MD5(CAST((sour.*)))'.

Efficient Data Comparison with #MD5 Hashing: To optimize data comparison, we employ 'MD5()' hashing. Rows with differing hash values are identified and updated, ensuring efficient data synchronization.

Matching Data Types: Ensuring that data types between the source and destination match is crucial for seamless data transfer. Using 'CAST()' guarantees data integrity during the ETL process.

Final thoughts

By adapting your ETL process to include historical data tracking, we ensure that the data warehouse remains synchronized with the source, even when dealing with changing data over time. PostgreSQL's flexibility allows you to implement robust data management strategies tailored to your specific needs.

Thank you for joining me on this journey to harness the power of PostgreSQL for effective data management. #PostgreSQL #DataManagement #ETL #HistoricalData #Database

If you missed the first two parts of this series, be sure to check them out for a comprehensive understanding of historical data tracking in PostgreSQL.

Read Part 1: Historical Data Tracking in PostgreSQL - Part 1: Historical Table and Triggers

Read Part 2: Historical Data Tracking in PostgreSQL - Part 2: Trigger Functions

Nicole Bre?a Ruelas

Content Marketing Specialist at Sonatafy Technology | Digital Marketing

1 年

Awesome!

要查看或添加评论,请登录

Jaime Martínez Verdú的更多文章

社区洞察

其他会员也浏览了