First Spark Program - pySpark Orders
/*
Problem 1: We want to wrie the Spark code to load / read file from HDFS
Load orders data and create RDD and further perform various transformation
We want to perform some operations on data as
1. Count of order in each category
*/
#Solution:
#-------------------First create the Spark Session---------------------------
#-----------We have defined our requirements / Problem statement, also assumed we have orders data in HDFS and now we will try to execute our PySpark code on Cluster, but for that we need to create a Spark session
#---- hadoop fs -ls /public/<foldername>/retail_db/orders/part-ooooo
#----------We will use Spark Context avaliable via Spark Session for dealing with RDD Low Level APIs----
#If we consider Orders data as below
#['1,2013-07-25 00:00:00.0,11599,CLOSED',
# '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
# '3,2013-07-25 00:00:00.0,12111,COMPLETE',
# '4,2013-07-25 00:00:00.0,8827,CLOSED',
# '5,2013-07-25 00:00:00.0,11318,COMPLETE']
# If we want to calculate ( Count of order in each category )
# i.e.
# CLOSED,?
# PENDING_PAYMENT,?
# COMPLETE,?
# CLOSED,?
# COMPLETE,?
#---------------------------------------------------------------------------
#--------This is a boiler plate code for creating the spark session----------
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()
# this will create a Spark session we have this session in object "spark"
# if we run the spark in jupyter notebook cell
# we get output as
# SparkSession - hive
#
# SparkContext
#
# Spark UI
#
# Versionv3.1.2MasteryarnAppNamepyspark-shell
# we will read the file from hdfs and store the results in rdd
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")
# here orders_rdd is the parent RDD as per DAG we have parent RDD as orders_rdd
# if we print the RDD then
orders_rdd.take(5)
#['1,2013-07-25 00:00:00.0,11599,CLOSED',
# '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
领英推荐
# '3,2013-07-25 00:00:00.0,12111,COMPLETE',
# '4,2013-07-25 00:00:00.0,8827,CLOSED',
# '5,2013-07-25 00:00:00.0,11318,COMPLETE']
# If we have 1000 rows records then we have output as 1000 records
# We have to use map transformation here to calculate the count
# We can use lambda function as
#map_rdd = orders_rdd.map(lambda x: (x.split(",")[3], 1))
#map transformation
orders_map_rdd = orders_rdd.map(lambda x: (x.split(",")[3],1))
#----------DAG : orders_rdd -> orders_map_rdd
# orders_map_rdd.take(10)
#[('CLOSED', 1),
# ('PENDING_PAYMENT', 1),
# ('COMPLETE', 1),
# ('CLOSED', 1),
# ('COMPLETE', 1),
# ('COMPLETE', 1),
# ('COMPLETE', 1),
# ('PROCESSING', 1),
# ('PENDING_PAYMENT', 1),
# ('PENDING_PAYMENT', 1)]
# Now next we want to aggregate the output. i.e. all the Closed should be sum, PENDING_PAYMENT sum, here
# Key is (CLOSED or PENDING_PAYMENT or COMPLETE or PROCESSING) and values are (1) etc
# we want to aggregate by keys i.e CLOSED, PENDING_PAYMENT, etc then we use
# reduceByKey()
# reduceByKey will sorts the keys internally and merge the two keys atatime.
reduce_rdd = orders_map_rdd.reduceByKey(lambda x,y:x+y)
#----------DAG : orders_rdd -> orders_map_rdd -> reduce_rdd
# reduce_rdd.collect()
# [('CLOSED', 7556),
# ('CANCELED', 1428),
# ('PENDING_PAYMENT', 15030),
# ('COMPLETE', 22899),
# ('PROCESSING', 8275),
# ('PAYMENT_REVIEW', 729),
# ('PENDING', 7610),
# ('ON_HOLD', 3798),
# ('SUSPECTED_FRAUD', 1558)]
# we can further sort using the sortBy function
reduce_sort = reduce_rdd.sortBy(lambda x: x[1], False)
#----------DAG : orders_rdd -> orders_map_rdd -> reduce_rdd -> reduce_sort
#reduce_sort.collect().
# [('COMPLETE', 22899),
# ('PENDING_PAYMENT', 15030),
# ('PROCESSING', 8275),
# ('PENDING', 7610),
# ('CLOSED', 7556),
# ('ON_HOLD', 3798),
# ('SUSPECTED_FRAUD', 1558),
# ('CANCELED', 1428),
# ('PAYMENT_REVIEW', 729)]
Thanks
Sunil Ghate
DataEngineer | Bigdata Engineer | Bigdata Analyst | Bigdata Developer | Works at Flipkart |Hdfs| Hive|Mysql |Shellscripting|Python|scala|DSA|Spark|Aws|Aws s3| Aws Glue|Aws Redshift |AWs Emr
1 个月Insightful
Transforming Experienced Tech Professionals (7-15 YOE) into Cloud & AI Experts | Scalable Systems Specialist |Tech Stack Simplifier
1 个月Interesting