Interview Question for Lead Data Engineer at MAANG

Interview Question for Lead Data Engineer at MAANG

Interview Experience with MAANG at Jan-24 question asked by Faang at the L3 round :


write a kafka pyspark?code for the following use case :

Connect to PostgreSQL and ingest tables from Production_app with nested data types into Kafka topics, each prefixed with prod1.

Generate unique txnid and txn_timestamp keys for each record and store each table in a unique topic.

Create checkpoints at 11 pm when the pipeline starts and ends.

Create multiple streams for credit card fraud analytics in real-time:

Case 1: Calculate the average consumption on credit cards for the last year.

Case 2: Determine the number of failed payments in the previous year.

Case 3: Calculate the interest added to the current payment.

Case 4: Save the data to a Delta table with 18 partitions in an optimized way.




My solution:

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, avg, count, sum, expr, window

from pyspark.sql.types import StructType

from datetime import datetime

import os

from croniter import croniter

from time import sleep

# Step 1: Connect with PostgreSQL and ingest tables from the Production_app

spark = SparkSession.builder \

.appName("KafkaCreditCardFraudAnalytics") \

.config("spark.jars", "path/to/postgresql-connector.jar") \

.config("spark.delta .logStore.class", "org.apache.spark.sql.delta.storage .HDFSLogStore") \

.config("spark.hadoop.fs.defaultFS", "hdfs://<hdfs_host>:<hdfs_port>") \

.config("spark.sql.extensions", "io.delta .sql.DeltaSparkSessionExtension") \

.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta .catalog.DeltaCatalog") \

.getOrCreate()

postgres_url = "jdbc:postgresql://localhost:5432/your_database "

properties = {

"user": "your_username",

"password": "your_password",

"driver": "org.postgresql.Driver"

}

tables = ["transactions", "payments", "interests"] # Sample tables from Production_app

# Read all tables from PostgreSQL and store them in a dictionary

table_dataframes = {}

for table in tables:

df = spark.read .jdbc(url=postgres_url, table=f"Production_app.{table}", properties=properties)

table_dataframes[table] = df

# Define the cron job to create a checkpoint at 11 pm daily

cron_schedule = "0 23 *"

cron = croniter(cron_schedule, datetime.now ())

# Define checkpoint path

checkpoint_path = "hdfs://<hdfs_host>:<hdfs_port>/checkpoints"

# Define Kafka consumer configurations

kafka_bootstrap_servers = "<kafka_broker>"

kafka_group_id = "credit_card_fraud_group"

# Define Kafka topics for streams

transaction_topic = "transaction_topic"

payment_topic = "payment_topic"

interest_topic = "interest_topic"

# Define window duration for stream processing

window_duration = "1 hour"

# Define watermark for stream processing

watermark_duration = "10 minutes"

while True:

# Check if it's time to create a checkpoint

if cron.get_next() <= datetime.now ():

# Step 3: Create a checkpoint

for table_name, df in table_dataframes.items():

checkpoint_dir = os.path.join(checkpoint_path, table_name)

df.write.mode("overwrite").format("delta").save(checkpoint_dir)

print(f"Checkpoint created for table {table_name} at {checkpoint_dir}")

# Reset the cron job for the next day

cron = croniter(cron_schedule, datetime.now ())

# Step 4: Define multiple streams to calculate credit card fraud analytics in real-time

# Define Kafka stream for each topic

transaction_stream = spark \

.readStream \

.format("kafka") \

.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \

.option("subscribe", transaction_topic) \

.option("startingOffsets", "latest") \

.load()

payment_stream = spark \

.readStream \

.format("kafka") \

.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \

.option("subscribe", payment_topic) \

.option("startingOffsets", "latest") \

.load()

interest_stream = spark \

.readStream \

.format("kafka") \

.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \

.option("subscribe", interest_topic) \

.option("startingOffsets", "latest") \

.load()

# Process streams and perform necessary transformations

transaction_df = transaction_stream \

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

.selectExpr("from_json(value, 'schema') as data") \

.select("data.*")

payment_df = payment_stream \

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

.selectExpr("from_json(value, 'schema') as data") \

.select("data.*")

interest_df = interest_stream \

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

.selectExpr("from_json(value, 'schema') as data") \

.select("data.*")

# Join streams based on common key

joined_stream = transaction_df \

.join(payment_df, "transaction_id") \

.join(interest_df, "transaction_id")

# Apply windowing and aggregation

windowed_joined_stream = joined_stream \

.groupBy(window("transaction_timestamp", window_duration)) \

.agg(avg("transaction_amount").alias("avg_transaction_amount"),

sum("payment_amount").alias("total_payment_amount"),

sum("interest_amount").alias("total_interest_amount"))

# Step 5: Implement analytics for the specified cases

# Placeholder for analytics

# Step 6: Save the data to a Delta table with optimized partitions

# Placeholder for saving to Delta table

# Start the streaming query

query = windowed_joined_stream \

.writeStream \

.outputMode("complete") \

.format("console") \

.option("truncate", "false") \

.start()


#Faang #spark #dataenginering #dataengineer #big data #sql #kafka #sparkstreaming

要查看或添加评论,请登录

社区洞察

其他会员也浏览了