Interview Question for Lead Data Engineer at MAANG
Abhishek Singh
Technical Lead Data Engineer Azure at Publicis Sapient. Expertise in SQL, Pyspark and Scala with Spark, Kafka with Spark Streaming, Databricks, and Data Tuning Spark Application for PetaByte. Cloud AWS, Azure and GCP
Interview Experience with MAANG at Jan-24 question asked by Faang at the L3 round :
write a kafka pyspark?code for the following use case :
Connect to PostgreSQL and ingest tables from Production_app with nested data types into Kafka topics, each prefixed with prod1.
Generate unique txnid and txn_timestamp keys for each record and store each table in a unique topic.
Create checkpoints at 11 pm when the pipeline starts and ends.
Create multiple streams for credit card fraud analytics in real-time:
Case 1: Calculate the average consumption on credit cards for the last year.
Case 2: Determine the number of failed payments in the previous year.
Case 3: Calculate the interest added to the current payment.
Case 4: Save the data to a Delta table with 18 partitions in an optimized way.
My solution:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum, expr, window
from pyspark.sql.types import StructType
from datetime import datetime
import os
from croniter import croniter
from time import sleep
# Step 1: Connect with PostgreSQL and ingest tables from the Production_app
spark = SparkSession.builder \
.appName("KafkaCreditCardFraudAnalytics") \
.config("spark.jars", "path/to/postgresql-connector.jar") \
.config("spark.delta .logStore.class", "org.apache.spark.sql.delta.storage .HDFSLogStore") \
.config("spark.hadoop.fs.defaultFS", "hdfs://<hdfs_host>:<hdfs_port>") \
.config("spark.sql.extensions", "io.delta .sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta .catalog.DeltaCatalog") \
.getOrCreate()
postgres_url = "jdbc:postgresql://localhost:5432/your_database "
properties = {
"user": "your_username",
"password": "your_password",
"driver": "org.postgresql.Driver"
}
tables = ["transactions", "payments", "interests"] # Sample tables from Production_app
# Read all tables from PostgreSQL and store them in a dictionary
table_dataframes = {}
for table in tables:
df = spark.read .jdbc(url=postgres_url, table=f"Production_app.{table}", properties=properties)
table_dataframes[table] = df
# Define the cron job to create a checkpoint at 11 pm daily
cron_schedule = "0 23 *"
cron = croniter(cron_schedule, datetime.now ())
# Define checkpoint path
checkpoint_path = "hdfs://<hdfs_host>:<hdfs_port>/checkpoints"
# Define Kafka consumer configurations
kafka_bootstrap_servers = "<kafka_broker>"
kafka_group_id = "credit_card_fraud_group"
# Define Kafka topics for streams
transaction_topic = "transaction_topic"
payment_topic = "payment_topic"
interest_topic = "interest_topic"
# Define window duration for stream processing
window_duration = "1 hour"
# Define watermark for stream processing
watermark_duration = "10 minutes"
while True:
# Check if it's time to create a checkpoint
if cron.get_next() <= datetime.now ():
# Step 3: Create a checkpoint
for table_name, df in table_dataframes.items():
checkpoint_dir = os.path.join(checkpoint_path, table_name)
df.write.mode("overwrite").format("delta").save(checkpoint_dir)
print(f"Checkpoint created for table {table_name} at {checkpoint_dir}")
领英推荐
# Reset the cron job for the next day
cron = croniter(cron_schedule, datetime.now ())
# Step 4: Define multiple streams to calculate credit card fraud analytics in real-time
# Define Kafka stream for each topic
transaction_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", transaction_topic) \
.option("startingOffsets", "latest") \
.load()
payment_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", payment_topic) \
.option("startingOffsets", "latest") \
.load()
interest_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", interest_topic) \
.option("startingOffsets", "latest") \
.load()
# Process streams and perform necessary transformations
transaction_df = transaction_stream \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.selectExpr("from_json(value, 'schema') as data") \
.select("data.*")
payment_df = payment_stream \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.selectExpr("from_json(value, 'schema') as data") \
.select("data.*")
interest_df = interest_stream \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.selectExpr("from_json(value, 'schema') as data") \
.select("data.*")
# Join streams based on common key
joined_stream = transaction_df \
.join(payment_df, "transaction_id") \
.join(interest_df, "transaction_id")
# Apply windowing and aggregation
windowed_joined_stream = joined_stream \
.groupBy(window("transaction_timestamp", window_duration)) \
.agg(avg("transaction_amount").alias("avg_transaction_amount"),
sum("payment_amount").alias("total_payment_amount"),
sum("interest_amount").alias("total_interest_amount"))
# Step 5: Implement analytics for the specified cases
# Placeholder for analytics
# Step 6: Save the data to a Delta table with optimized partitions
# Placeholder for saving to Delta table
# Start the streaming query
query = windowed_joined_stream \
.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
#Faang #spark #dataenginering #dataengineer #big data #sql #kafka #sparkstreaming