First Spark Program - pySpark Orders

First Spark Program - pySpark Orders

/*

Problem 1: We want to wrie the Spark code to load / read file from HDFS

Load orders data and create RDD and further perform various transformation

We want to perform some operations on data as

1. Count of order in each category

*/



#Solution:

#-------------------First create the Spark Session---------------------------

#-----------We have defined our requirements / Problem statement, also assumed we have orders data in HDFS and now we will try to execute our PySpark code on Cluster, but for that we need to create a Spark session

#---- hadoop fs -ls /public/<foldername>/retail_db/orders/part-ooooo

#----------We will use Spark Context avaliable via Spark Session for dealing with RDD Low Level APIs----

#If we consider Orders data as below

#['1,2013-07-25 00:00:00.0,11599,CLOSED',

# '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',

# '3,2013-07-25 00:00:00.0,12111,COMPLETE',

# '4,2013-07-25 00:00:00.0,8827,CLOSED',

# '5,2013-07-25 00:00:00.0,11318,COMPLETE']

# If we want to calculate ( Count of order in each category )

# i.e.

# CLOSED,?

# PENDING_PAYMENT,?

# COMPLETE,?

# CLOSED,?

# COMPLETE,?

#---------------------------------------------------------------------------

#--------This is a boiler plate code for creating the spark session----------

from pyspark.sql import SparkSession

import getpass

username = getpass.getuser()

spark = SparkSession. \

builder. \

config('spark.ui.port', '0'). \

config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \

enableHiveSupport(). \

master('yarn'). \

getOrCreate()

# this will create a Spark session we have this session in object "spark"

# if we run the spark in jupyter notebook cell

# we get output as

# SparkSession - hive

#

# SparkContext

#

# Spark UI

#

# Versionv3.1.2MasteryarnAppNamepyspark-shell

# we will read the file from hdfs and store the results in rdd

orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

# here orders_rdd is the parent RDD as per DAG we have parent RDD as orders_rdd

# if we print the RDD then

orders_rdd.take(5)

#['1,2013-07-25 00:00:00.0,11599,CLOSED',

# '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',

# '3,2013-07-25 00:00:00.0,12111,COMPLETE',

# '4,2013-07-25 00:00:00.0,8827,CLOSED',

# '5,2013-07-25 00:00:00.0,11318,COMPLETE']

# If we have 1000 rows records then we have output as 1000 records

# We have to use map transformation here to calculate the count

# We can use lambda function as

#map_rdd = orders_rdd.map(lambda x: (x.split(",")[3], 1))

#map transformation

orders_map_rdd = orders_rdd.map(lambda x: (x.split(",")[3],1))

#----------DAG : orders_rdd -> orders_map_rdd

# orders_map_rdd.take(10)

#[('CLOSED', 1),

# ('PENDING_PAYMENT', 1),

# ('COMPLETE', 1),

# ('CLOSED', 1),

# ('COMPLETE', 1),

# ('COMPLETE', 1),

# ('COMPLETE', 1),

# ('PROCESSING', 1),

# ('PENDING_PAYMENT', 1),

# ('PENDING_PAYMENT', 1)]

# Now next we want to aggregate the output. i.e. all the Closed should be sum, PENDING_PAYMENT sum, here

# Key is (CLOSED or PENDING_PAYMENT or COMPLETE or PROCESSING) and values are (1) etc

# we want to aggregate by keys i.e CLOSED, PENDING_PAYMENT, etc then we use

# reduceByKey()

# reduceByKey will sorts the keys internally and merge the two keys atatime.

reduce_rdd = orders_map_rdd.reduceByKey(lambda x,y:x+y)

#----------DAG : orders_rdd -> orders_map_rdd -> reduce_rdd

# reduce_rdd.collect()

# [('CLOSED', 7556),

# ('CANCELED', 1428),

# ('PENDING_PAYMENT', 15030),

# ('COMPLETE', 22899),

# ('PROCESSING', 8275),

# ('PAYMENT_REVIEW', 729),

# ('PENDING', 7610),

# ('ON_HOLD', 3798),

# ('SUSPECTED_FRAUD', 1558)]

# we can further sort using the sortBy function

reduce_sort = reduce_rdd.sortBy(lambda x: x[1], False)

#----------DAG : orders_rdd -> orders_map_rdd -> reduce_rdd -> reduce_sort

#reduce_sort.collect().

# [('COMPLETE', 22899),

# ('PENDING_PAYMENT', 15030),

# ('PROCESSING', 8275),

# ('PENDING', 7610),

# ('CLOSED', 7556),

# ('ON_HOLD', 3798),

# ('SUSPECTED_FRAUD', 1558),

# ('CANCELED', 1428),

# ('PAYMENT_REVIEW', 729)]



Thanks

Sunil Ghate

Ashutosh Agrawal

DataEngineer | Bigdata Engineer | Bigdata Analyst | Bigdata Developer | Works at Flipkart |Hdfs| Hive|Mysql |Shellscripting|Python|scala|DSA|Spark|Aws|Aws s3| Aws Glue|Aws Redshift |AWs Emr

1 个月

Insightful

回复
Vijay Vishnu

Transforming Experienced Tech Professionals (7-15 YOE) into Cloud & AI Experts | Scalable Systems Specialist |Tech Stack Simplifier

1 个月

Interesting

回复

要查看或添加评论,请登录

Sunil Ghate的更多文章

  • Evolution of Spark

    Evolution of Spark

    ???????????? ?????????? ???????????????????? ???????? ???? 1. Performance : Lot of Disk IO's 2.

    4 条评论
  • Sqoop Internals

    Sqoop Internals

    ?????? ?????????? ?????????? ???????? ??: ???????????????? ????????????????: When you run a Sqoop import command, Sqoop…

    1 条评论
  • MapReduce - Combiner (Local Aggregation)

    MapReduce - Combiner (Local Aggregation)

    MapReduce - Combiner (Local Aggregation) Consider another use case. Suppose in a factory we have a cold storage section…

  • MapReduce Paradigm - Multi Reducers

    MapReduce Paradigm - Multi Reducers

    MapReduce Paradigm Multi Reducer scenario By default we have 4 Mappers and 1 Reducer as default configured in system…

    1 条评论
  • MapReduce Paradigm....continuation another example

    MapReduce Paradigm....continuation another example

    Map Phase Map phase runs on each machine and gives us parallelism. Reduce Phase - Aggregation We will consider a use…

    3 条评论
  • MapReduce Program Paradigm

    MapReduce Program Paradigm

    MapReduce divided into two phase 1. Map phase Mapper works on a principle called Data locality we keep the mapper on…

  • HDFS Architecture

    HDFS Architecture

    HDFS Architecture Master Slave Architecture: - Master Node - We call Master node as NameNode. - Slave Node - We call…

    2 条评论

社区洞察

其他会员也浏览了