Working with spark:Spark Session
Thus far in my previous article, I tried to covered the basic concepts of Spark Applications. we are going to need a way to send user commands and data to the Spark Application. We do that with a Spark Session.
We will start Spark’s local mode, ./bin/spark-shell to access the Scala console to start an interactive session. Python console with : ./bin/pyspark, This starts an interactive Spark Application.
A process for submitting standalone applications to Spark called spark-submit.where you can submit a pre compiled application to Spark.When we start Spark in this interactive mode, we implicitly create a Spark Session which manages the Spark Application. When we start it through a job submission, we must go about creating it or accessing it.The Spark Session instance is the way Spark executes user-defined manipulations across the cluster.
Let’s go ahead and look at the Spark Session in both Scala and/or Python.
In Scala, type spark, you should see something like:
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@77858a84
In Python you’ll see something like:
<pyspark.sql.session.SparkSession at 0x8efda7c6ef0>
Let’s now perform the simple task of creating a range of numbers. This range of numbers is just like a named column in a spreadsheet both in Scala and Python.
%scala : val myRange = spark.range(500).toDF("number")
%python : myRange = spark.range(500).toDF("number")
We created a DataFrame with one column containing 500 rows with values from 0 to 499.
This range of number represents a distributed collection. When run on a cluster, each part of this range of numbers exists on a different executor. This is a Spark DataFrame.
DataFrames
A DataFrame is the most common Structured API and simply represents a table of data with rows and columns.The fundamental difference is that while a spreadsheet sits on one computer in one specific location, a Spark DataFrame can span thousands of computers. present on distributed cluster's.The reason for putting the data on more than one computer should be intuitive: either the data is too large to fit on one machine or it would simply take too long to perform that computation on one machine. the same concept we are dealing in R and Python as DataFrames.Spark has language interfaces for both Python and R, it’s quite easy to convert to Pandas (Python) DataFrames to Spark DataFrames and R DataFrames to Spark DataFrames (in R).
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). These abstractions all represent distributed collections of data however they have different interfaces for working with that data. The easiest and most efficient are DataFrames, which are available in all languages.
Partitions
- Allow every executor to perform work in parallel, Spark breaks up the data into chunks, called partitions.
- Partition is a collection of rows that sit on one physical machine in our cluster.
- A DataFrame’s partitions represent how the data is physically distributed across your cluster of machines during execution.
- If you have one partition, Spark will only have a parallelism of one even if you have thousands of executors. If you have many partitions, but only one executor Spark will still only have a parallelism of one because there is only one computation resource.
We simply specify high-level transformations of data in the physical partitions and Spark determines how this work will actually execute on the cluster.
Transformations
- In Spark, the core data structures are immutable meaning they cannot be changed once created.if you cannot change it, how are you supposed to use it?
- In order to “change” a DataFrame you will have to instruct Spark how you would like to modify the DataFrame you have into the one that you want.These instructions are called Transformations.
%scala : val divisBy2 = myRange.where("number % 2 = 0")
%python : divisBy2 = myRange.where("number % 2 = 0")
These return no output, that’s because we only specified an abstract transformation and Spark will not act on transformations until we call an action.Transformations are the core of how you will be expressing your business logic using Spark.There are two types of transformations, those that specify narrow dependencies and those that specify wide dependencies.
Transformations consisting of narrow dependencies, we’ll call them narrow transformations are those where each input partition will contribute to only one output partition.where statement specifies a narrow dependency. 1 to 1
A wide dependency (or wide transformation) style transformation will have input partitions contributing to many output partitions. 1 to N, how transformations are simply ways of specifying different series of data manipulation. This leads us to a topic called lazy evaluation.
Lazy Evaluation
In Spark, instead of modifying the data immediately when we express some operation, we build up a plan of transformations that we would like to apply to our source data.Spark, by waiting until the last minute to execute the code, will compile this plan from your raw, DataFrame transformations, to an efficient physical plan that will run as efficiently as possible across the cluster.This provides immense benefits to the end user because Spark can optimize the entire data flow from end to end. If we build a large Spark job but specify a filter at the end that only requires us to fetch one row from our source data, the most efficient way to execute this is to access the single record that we need. Spark will actually optimize this for us by pushing the filter down automatically.
Actions
An action instructs Spark to compute a result from a series of transformations. The simplest action is count which gives us the total number of records in the DataFrame.
divisBy2.count() :
count is not the only action. There are three kinds of actions:
- actions to view data in the console;
- actions to collect data to native objects in the respective language;
- and actions to write to output data sources.
In specifying our action, we started a Spark job that runs our filter transformation (a narrow transformation), then an aggregation (a wide transformation) that performs the counts on a per partition basis, then a collect with brings our result to a native object in the respective language.
We can see all of this by inspecting the Spark UI, Spark UI and End to End example of Spark we will discuss in next article.
Thanks for Reading the Content.