End to End Pyspark Example

End to End Pyspark Example

We’ll use Spark to analyze some flight data from the United States Bureau of Transportation statistics. Inside the CSV folder, you’ll see that we have a number of files. There are also a number of other folders with different file formats, for now, let’s focus on the CSV files.

Each file has a number of rows within it. These files are CSV files, meaning that they’re a semistructured data format, with each row in the file representing a row in our future DataFrame:

$ head /data/flight-data/csv/2015-summary.cs

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344v        

Spark includes the ability to read and write from a large number of data sources. To read this data, we will use a DataFrameReader that is associated with our SparkSession. In doing so, we will specify the file format as well as any options we want to specify. In our case, we want to do something called schema inference, which means that we want Spark to take a best guess at what the schema of our DataFrame should be. We also want to specify that the first row is the header in the file, so we’ll specify that as an option, too.

To get the schema information, Spark reads in a little bit of the data and then attempts to parse the types in those rows according to the types available in Spark. You also have the option of strictly specifying a schema when you read in data (which we recommend in production scenarios):

flightData2015 = spark
                  .read\
                  .option("inferSchema", "true")\
                  .option("header", "true")\
                  .csv("/data/flight-data/csv/2015-summary.csv")\        

Each of these DataFrames (in Scala and Python) has a set of columns with an unspecified number of rows. The reason the number of rows is unspecified is that reading data is a transformation, and is, therefore, a lazy operation. Spark peeked at only a couple of rows of data to try to guess what types each column should be.

No alt text provided for this image

If we perform the take action on the DataFrame, we will be able to see the same results that we saw before when we used the command line:

flightData2015.take(3)

Array([United States,Romania,15], [United States,Croatia...)        

Let’s specify some more transformations! Now, let’s sort our data according to the count column, which is an integer type.

No alt text provided for this image

Nothing happens to the data when we call sort because it’s just a transformation. However, we can see that Spark is building up a plan for how it will execute this across the cluster by looking at the explain plan. We can call explain on any DataFrame object to see the DataFrame’s lineage (or how Spark will execute this query):

flightData2015.sort("count").explain()        
No alt text provided for this image

Congratulations, you’ve just read your first explain plan! Explain plans are a bit arcane, but with a bit of practice, it becomes second nature. You can read explain plans from top to bottom, the top being the end result, and the bottom being the source(s) of data. In this case, take a look at the first keywords. You will see sort, exchange, and FileScan. That’s because the sort of our data is actually a wide transformation because rows will need to be compared with one another. Don’t worry too much about understanding everything about explain plans at this point, they can just be helpful tools for debugging and improving your knowledge as you progress with Spark.

Now, just like we did before, we can specify an action to kick off this plan. However, before doing that, we’re going to set a configuration. By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let’s set this value to 5 to reduce the number of the output partitions from the shuffle:

spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(2)

... Array([United States,Singapore,1], [Moldova,United States,1]))        

The below image illustrates the above operation. Notice that in addition to the logical transformations, we include the physical partition count, as well.

No alt text provided for this image

The logical plan of transformations that we build up defines a lineage for the DataFrame so that at any given point in time, Spark knows how to recompute any partition by performing all of the operations it had before on the same input data. This sits at the heart of Spark’s programming model—functional programming where the same inputs always result in the same outputs when the transformations on that data stay constant.

We do not manipulate the physical data; instead, we configure physical execution characteristics through things like the shuffle partitions parameter that we set a few moments ago. We ended up with five output partitions because that’s the value we specified in the shuffle partition. You can change this to help control the physical execution characteristics of your Spark jobs. Go ahead and experiment with different values and see the number of partitions yourself. In experimenting with different values, you should see drastically different runtimes. Remember that you can monitor the job progress by navigating to the Spark UI on port 4040 to see the physical and logical execution characteristics of your jobs.

DataFrames and SQL

We worked through a simple transformation in the previous example, let’s now work through a more complex one and follow along in both DataFrames and SQL. Spark can run the same transformations, regardless of the language, in the exact same way. You can express your business logic in SQL or DataFrames (either in R, Python, Scala, or Java) and Spark will compile that logic down to an underlying plan (that you can see in the explain plan) before actually executing your code. With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.

You can make any DataFrame into a table or view with one simple method call:

flightData2015.createOrReplaceTempView("flight_data_2015")        

Now we can query our data in SQL. To do so, we’ll use the spark.sql function (remember, spark is our SparkSession variable) that conveniently returns a new DataFrame. Although this might seem a bit circular in logic—that a SQL query against a DataFrame returns another DataFrame—it’s actually quite powerful. This makes it possible for you to specify transformations in the manner most convenient to you at any given point in time and not sacrifice any efficiency to do so! To understand that this is happening, let’s take a look at two explain plans:

sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

sqlWay.explain()
dataFrameWay.explain()        
No alt text provided for this image

Notice that these plans compile to the exact same underlying plan!

Let’s pull out some interesting statistics from our data. One thing to understand is that DataFrames (and SQL) in Spark already have a huge number of manipulations available. There are hundreds of functions that you can use and import to help you resolve your big data problems faster. We will use the max function, to establish the maximum number of flights to and from any given location. This just scans each value in the relevant column in the DataFrame and checks whether it’s greater than the previous values that have been seen. This is a transformation because we are effectively filtering down to one row. Let’s see what that looks like:

spark.sql("SELECT max(count) from flight_data_2015").take(1)

from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)        

Great, that’s a simple example that gives a result of 370,002. Let’s perform something a bit more complicated and find the top five destination countries in the data. This is our first multitransformation query, so we’ll take it step by step. Let’s begin with a fairly straightforward SQL aggregation:

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()        
No alt text provided for this image

Now, let’s move to the DataFrame syntax that is semantically similar but slightly different in implementation and ordering. But, as we mentioned, the underlying plans for both of them are the same. Let’s run the queries and see their results as a sanity check:

from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()        
No alt text provided for this image

Now there are seven steps that take us all the way back to the source data. You can see this in the explain plan on those DataFrames. The below image shows the set of steps that we perform in “code.” The true execution plan (the one visible in explain) will differ from that shown in the below image because of optimizations in the physical execution; however, the illustration is as good of a starting point as any. This execution plan is a directed acyclic graph (DAG) of transformations, each resulting in a new immutable DataFrame, on which we call an action to generate a result.

No alt text provided for this image


要查看或添加评论,请登录

社区洞察

其他会员也浏览了