Loading files automatically to bronze lakehouse

Loading files automatically to bronze lakehouse

In my last post on LinkedIn , I explained how to export AdventureWorks2022 tables to csv files. If you don’t want to generate them, you can get them here , as stated in the last post (after my edit in which I realized I made a mistake). My actual blog https://kayondata.com is currently being moved to another place, hence not updated at the moment.

Certainly, it is best to avoid CSV files today. However, CSV files are still very common, so I am using this approach first, but I plan on writing other posts on other ways to do this. Another reason why I am using CSV files is the fact that this is also a very nice starting point to learn some basics and to exercise some data engineering stuff. Nevertheless, if you find yourself in a situation of choosing loading directly from a database without files, usually, this is the better route. If you have the possibility to use parquet instead of CSV, usually parquet should be the winner in the decision.

That said, let’s start with some code that I’ve written some months back on runtime 1.2 (Spark 3.4, Delta 2.4), but it should work on runtime 1.3 (Spark 3.5, Delta 3.2) as well, according to my test today. The code does quite some stuff, and it expects you to have an existing lakehouse that your notebook is attached to. Moreover it expects you to have a lakehouse called meta_lakehouse which is used to log the data loads and errors. You’ll see an output that returns some simple plausibility checks:

import os, time, datetime 
from pyspark.sql.functions import monotonically_increasing_id, lit, current_timestamp, when, col, regexp_replace, array, concat_ws
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from notebookutils import mssparkutils


# Specify the folder containing CSV files
csv_folder = "Files"

# Get a list of all files in the folder
files = notebookutils.fs.ls(csv_folder)

# Filter for CSV files
csv_files = [file.path for file in files if file.name.endswith(".csv")] # to test only one file, simply write the name before .csv

# Define the schema for the log DataFrame so it has have good column names
log_schema = StructType([
    StructField("file_name", StringType(), True),
    StructField("row_amount_file", IntegerType(), True),
    StructField("row_amount_table", IntegerType(), True),
    StructField("row_amount_diff", IntegerType(), True),
    StructField("load_timestamp_utc_bronze", TimestampType(), True)
])

# Define the schema for the malformed records DataFrame so it has good column names
malformed_schema = StructType([
    StructField("file_name", StringType(), True),
    StructField("record_number", IntegerType(), True),
    StructField("entire_record", StringType(), True),
    StructField("explanation", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Initialize an empty DataFrame for log entries with the defined schema
df_log = spark.createDataFrame([], schema=log_schema)
df_mal = spark.createDataFrame([], schema=malformed_schema)
df_mal_log = spark.createDataFrame([], schema=malformed_schema)

# Initialize a counter for warnings
warning_count = 0

# count files
csv_files_count = [file for file in files if file.name.endswith('.csv')]
csv_count = len(csv_files_count)
file_processed_count = 0

# Process each CSV file
for csv_file in csv_files:
    file_processed_count += 1
    table_name = os.path.splitext(os.path.basename(csv_file))[0] # Extract table name from the file name (remove ".csv" extension)
    table_name = table_name.replace(".","_") # remove "." for database schema and replace it with "_"

    print(f"{table_name}.csv is being processed. Remaining files to process: {csv_count - file_processed_count}") 

    # Get the number of lines in the CSV file
    lines = spark.read.text(csv_file)
    line_count = lines.count() - 1 # -1 to exclude the header
    print(f"{table_name} has this amount of lines: {line_count:,}".replace(",", "'"))

    # Read the CSV file with PERMISSIVE mode to get all records
    df_all = spark.read.options(
        delimiter=",", 
        header=True,
        encoding="UTF-8", 
        inferSchema=True, 
        multiLine=True,
        escapeQuotes=True, 
        mode="PERMISSIVE"
    ).csv(csv_file)

    # Read the CSV file with DROPMALFORMED mode to get only valid records
    df_well = spark.read.options(
        delimiter=",", 
        header=True,
        encoding="UTF-8", 
        inferSchema=True, 
        multiLine=True,
        escapeQuotes=True, 
        mode="DROPMALFORMED"
    ).csv(csv_file)

    # Identify malformed records by diff it
    df_mal = df_all.subtract(df_well)

    # Add a record number to the malformed records dataFrame
    df_mal = df_mal.withColumn("record_number", monotonically_increasing_id())
    columns = ['record_number'] + [col for col in df_mal.columns if col != 'record_number']
    df_mal = df_mal.select(columns)

    # Show the malformed records with their record numbers
    print(f"The amount of read well formed lines: {df_well.count():,}".replace(",","'"))
    if df_mal.count() == 0:
        print(f"\033[92mThe amount of read mal formed lines:  {df_mal.count():,}\033[0m".replace(",","'"))
    else:    
        print(f"\033[91mThe amount of read mal formed lines:  {df_mal.count():,}\033[0m".replace(",","'")) 
        # display(df_mal)

    # Add id column and put it into the front 
    df_well = df_well.withColumn("id", monotonically_increasing_id())  
    columns_ordered = ["id"] + [col for col in df_well.columns if col != "id"]
    df_well = df_well.select(*columns_ordered)

    # Add load_timestamp
    df_well = df_well.withColumn('load_timestamp_utc_bronze', current_timestamp())

    # Write to Delta table
    df_well.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)

    # Check the number of rows in the Delta table
    delta_table = spark.table(table_name)
    delta_count = delta_table.count()
    print(f"{table_name} has this amount of lines in the delta table: {delta_count:,}".replace(",", "'"))

    # Create a dataframe for the current log entry
    df_current_log = spark.createDataFrame([(
        table_name,
        line_count,
        delta_count,
        line_count - delta_count,
        datetime.datetime.now()  # Use current timestamp
    )], schema=log_schema)

    # Append the current log entry to the log DataFrame
    df_log = df_log.union(df_current_log)

    # Append the current malformed dataframe to a log dataframe
    # display(df_mal)
    # df_mal_log = df_mal_log.union(df_mal)

    # Prepare the malformed records for logging
    df_mal_log_entry = df_mal.withColumn("file_name", lit(table_name)) \
                             .withColumn("entire_record", concat_ws("|||", *df_mal.columns)) \
                             .withColumn("explanation", lit("malformed")) \
                             .withColumn("timestamp", current_timestamp()) \
                             .withColumn("record_number", monotonically_increasing_id())

    # Select the columns in the desired order
    df_mal_log_entry = df_mal_log_entry.select("file_name", "record_number", "entire_record", "explanation", "timestamp")

    # Append the current malformed records to the log DataFrame
    df_mal_log = df_mal_log.union(df_mal_log_entry)

    # Compare the counts and print messages
    if line_count == delta_count:
        print(f"\033[92mSUCCESS: {table_name}.csv loaded into Delta table {table_name} with matching line counts\033[0m")
        print(f"\n")
    else:
        print(f"\033[91mERROR: {table_name}.csv line count {line_count:,}".replace(",", "'") + f" does not match Delta table line count {delta_count:,}".replace(",", "'") + f" \033[0m")
        print(f"\n")
        warning_count += 1

# Write the log DataFrame to the lakehouse log table in the meta_lakehouse database
if not df_log.filter(df_log["row_amount_diff"] > 0).rdd.isEmpty():
    df_log.write.format("delta").mode("append").saveAsTable("meta_lakehouse.load_log_bronze")
    print(f"\n\033[93m ALL CSV files LOADED with WARNING into meta_lakehouse.load_log_bronze table.\033[0m")
else:
    print(f"\n\033[92m ALL CSV files SUCCESSFULLY LOADED into meta_lakehouse.load_log_bronze table.\033[0m")

# Write the malformed records DataFrame to a new Delta table in the meta_lakehouse database
if not df_mal_log.rdd.isEmpty():
    df_mal_log.write.format("delta").mode("append").saveAsTable("meta_lakehouse.bad_records")
    print(f"\n\033[93m Malformed records have been written to the meta_lakehouse.bad_records table.\033[0m")

# Print the number of tables that gave warnings
if warning_count == 0:
    print(f"\n\033[92m Number of tables with warnings: NONE - SUCCESS \033[0m")
else:
    print(f"\n\033[91m Number of tables with warnings: {warning_count} → GO CHECK LOG! \033[0m")
        

So if you run this, you’ll see a lot outputs like this:

As you can see, it outputs some table names and csv names. The csv-names are not actually very correct here, because the actual csv name is HumanResources.Departement.csv, so there is a dot that derives from the database schema. In this case, I am using a lakehouse without schema (it is still in preview) and the lakehouse does not allow points in a table name, hence I replaced the point with an underscore in line 53 via table_name = table_name.replace(".","_"), as also the comment after the very code explains.

Now, you could ask why I didn’t want to use shortcuts to load the data - I could have done this and it is a neat trick for many occasions. In this case, I not only wanted to showcase the possibility to use a meta_lakehouse, but also the possibility to have more control in the data that gets loaded here. For example, I get the possiblity to have some simple plausibility checks that certainly could be done much more sophisticated. This is going to be another blog in future.

An interesting code block is here (line 62):

    # Read the CSV file with PERMISSIVE mode to get all records
    df_all = spark.read.options(
        delimiter=",", 
        header=True,
        encoding="UTF-8", 
        inferSchema=True, 
        multiLine=True,
        escapeQuotes=True, 
        mode="PERMISSIVE"
    ).csv(csv_file)

    # Read the CSV file with DROPMALFORMED mode to get only valid records
    df_well = spark.read.options(
        delimiter=",", 
        header=True,
        encoding="UTF-8", 
        inferSchema=True, 
        multiLine=True,
        escapeQuotes=True, 
        mode="DROPMALFORMED"
    ).csv(csv_file)
        

Firstly, we get to give pySpark the possiblity to load all data and to try to figure out what datatypes the fields are. delimiter=”,” tells pySpark that it should use commatas, but you could use semicolon if needed, or another character. inferSchema=True is actually superfluous here as the spark.read.options uses this per default, but I think it is always a good idea to be explicit. In this case, the code reads the whole file, and for other file formats other than csv, you can tweak this with samplingRatio. There are many other options you can use here, and there’s a quite comprehensive documentation here . Be aware that inferring a schema based on a whole file isn’t what you want in all cases, especially if your file is too big. If you are aiming for speedy loads, then you should not using inferring schema, but define the targeted schema from the start. In this case, with relatively small amount of data of an AdventureWorks2022 sample data, I think this is a good showcase. By the way, with this you will also be able to get the schema pretty easy.

Another trick I am showcasing here is the possiblity to get the data that were malformed reads with a diff (line 84):

    # Identify malformed records by diff it
    df_mal = df_all.subtract(df_well)        

Last thing I want to point out for today is the possiblity to write out statements in colors (line 154):

# Write the log DataFrame to the lakehouse log table in the meta_lakehouse database
if not df_log.filter(df_log["row_amount_diff"] > 0).rdd.isEmpty():
    df_log.write.format("delta").mode("append").saveAsTable("meta_lakehouse.load_log_bronze")
    print(f"\n\033[93m ALL CSV files LOADED with WARNING into meta_lakehouse.load_log_bronze table.\033[0m")
else:
    print(f"\n\033[92m ALL CSV files SUCCESSFULLY LOADED into meta_lakehouse.load_log_bronze table.\033[0m")        

The strange code within print(f” ) is a code for coloring text. The difference we see in the warning output and the success output is [93m (yellow) and [92m (green)

At the very end, there’s an output like this:

The last line hints that the next blog post will be about how to find the issues easily and how to fix them. Notebooks are great to instruct people on what to do, especially if they have links to the next notebook!

Do you have any improvements I could have made in my code? Or do you have a question you want to discuss? Even better, found any mistake? Let me know in the comments!

This post was first published on Fabric community blogs for data engineering.

Kay Sauter

Data Engineer | DBA | Microsoft MVP

3 天前

Just a quick point out here: Be careful when using monotonically_increasing_id. In this example it does work, but in a later post, I am going to showcase why it should be avoided. This is not to say that I would never use it, however in this example I would normally probably avoid it. As a showcase like this and in non-production, it should be OK though.

回复
Jeff Moden

Senior DBA and T-SQL Mentor

3 天前

p.s. Can you post the link that will take us to what you spoke of as "In my last?post on LinkedIn, I explained how to export AdventureWorks2022 tables to csv files.", please? Thanks.

回复
Jeff Moden

Senior DBA and T-SQL Mentor

3 天前

I have to disagree with the notion that CSV files should be avoided, especially if the numeric data is passed as binary. They can be much more compact than JSON and certainly XML To make them even more compact (and "Compact = Speed" when it comes to transmission), Even dates can be passed as binary if you use the underlying date/time serial numbers. UNIX timestamps are a good example of that although I prefer the 1900 Epoch instead of the 1970 Epoch. Because of the way programs store data, sending the binary data in the correct form means that you only need to store the data when you receive it without string data needing to be converted to the correct binary format.

Dennes Torres

Data Platform MVP (2020-2023) | Certified Expert in Fabric Analytics Engineering, Azure Data Engineering, Solutions Architecture, DevOps, and Development

4 天前

The care to have a meta lakehouse for logs is specially interesting

要查看或添加评论,请登录

Kay Sauter的更多文章

  • Getting Adventureworks to Fabric

    Getting Adventureworks to Fabric

    Edit 20.09.

    11 条评论
  • Browsing Azure With Powershell

    Browsing Azure With Powershell

    Ever since my post about automated deployments of Azure VMs with SQL Server, I've been pondering on how to pick the…

    5 条评论
  • Fabrics OneDrive

    Fabrics OneDrive

    Sorry, my title indeed said “OneDrive” and that is actually by intention. It is really easy to mix them up, just look…

  • Getting into Fabric

    Getting into Fabric

    # Finally, I've found time to dig deeper into Microsoft Fabric So finally I've found time to dig deeper into Microsoft…

    5 条评论
  • Making good trouble for change

    Making good trouble for change

    There was a non-technical talk at SQLBits that resonated with me. I do believe that non-technical talks are necessary…

    1 条评论
  • My week at SQLBits 2023: Why You Should send your Employees to Conferences

    My week at SQLBits 2023: Why You Should send your Employees to Conferences

    As I'm now tired, but happy and heading back on my bus of national express to Heathrow airport and taking a plane there…

    18 条评论
  • I'm moderating at Dataweekender conference

    I'm moderating at Dataweekender conference

    The Dataweekender conference is coming up again this Saturday Oct 17! After the very successful first Dataweekender…

    10 条评论