Apache Spark
Filipe Balseiro
?? Data Engineer | ?? Snowflake SnowPro Core & dbt Developer Certified | Python | GCP BigQuery | CI/CD Github Actions. Let's elevate your data strategy!
References
Installing Spark
Installation instructions for Linux, MacOS and Windows are available on the course repository.
After installing the right JDK and Spark version, make sure that you set up PySpark by following these instructions.
In order to test your installation you can run this Jupyter Notebook.
Note: If you're running Spark and Jupyter Notebook on a remote machine, you will need to redirect ports 8888 for Jupyter Notebook and 4040 for the Spark UI.
Spark/PySpark Fundamentals
Creating a Spark session
We can use Spark with Python code by using PySpark library.
We start by importing PySpark to our code as following:
import pyspark
from pyspark.sql import SparkSession
We also need to instantiate a Spark session, an object that we use to interact with Spark.
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()
Once we've instantiated a session, we can access the Spark UI by browsing to localhost:4040. The UI will show all current jobs. Right now there are no jobs running, since we've just created the instance.
Reading csv files
Like Pandas, Spark can read csv files into dataframes, a tabular data structure. Unlike Pandas, Spark can handle much bigger datasets but it's unable to infer the datatypes of each column.
To read a csv file and create a dataframe we can use the following command:
df = spark.read \
.option("header", "true") \
.csv('fhvhv_tripdata_2021-01.csv')
We can use the command df.show() or df.head() to show the first rows from the dataframe.
We can also use df.schema to check the current schema.
Partitions
A?Spark cluster?is composed of multiple?executors. Each executor can process data independently in order to parallelize and speed up work.
In the previous example we read a single large CSV file. A file can only be read by a single executor, which means that the code we've written so far isn't parallelized and thus will only be run by a single executor rather than many at the same time.
In order to solve this issue, we can?split a file into multiple parts?so that each executor can take care of a part and have all executors working simultaneously. These splits are called?partitions.
We will now read the CSV file, partition the dataframe and parquetize it. This will create multiple files in parquet format.
Note: converting to parquet is an expensive operation which may take several minutes.
# create 24 partitions in our dataframe
df = df.repartition(24)
# parquetize and write to fhvhv/2021/01/ folder
df.write.parquet('fhvhv/2021/01/')
You may check the Spark UI at any time and see the progress of the current job, which is divided into stages which contain tasks. The tasks in a stage will not start until all tasks on the previous stage are finished.
When creating a dataframe, Spark creates as many partitions as CPU cores available by default, and each partition creates a task. Thus, assuming that the dataframe was initially partitioned into 6 partitions, the?write.parquet()?method will have 2 stages: the first with 6 tasks and the second one with 24 tasks.
Besides the 24 parquet files, you should also see a?_SUCCESS?file which should be empty. This file is created when the job finishes successfully.
Trying to write the files again will output an error because Spark will not write to a non-empty folder. You can force an overwrite with the?mode?argument:
df.write.parquet('fhvhv/2021/01/', mode='overwrite')
The opposite of partitioning (joining multiple partitions into a single partition) is called?coalescing.
领英推荐
Spark dataframes
We can create a dataframe from a parquet file like this:
df = spark.read.parquet('fhvhv/2021/01/')
Unlike CSV files, parquet files contain the schema of the dataset, so there is no need to specify a schema like we previously did when reading the CSV file. You can check the schema like this:
df.printSchema()
(One of the reasons why parquet files are smaller than CSV files is because they store the data according to the datatypes, so integer values will take less space than long or string values.)
There are many Pandas-like operations that we can do on Spark dataframes, such as:
new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')
new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID').filter(df.hvfhs_license_num == 'HV0003')
Actions vs Transformations
Some Spark methods are called "lazy", meaning that they are not executed right away. You can test this with the last instructions we run in the previous section: after running them, the Spark UI will not show any new jobs. However, running?df.show()?right after will execute right away and display the contents of the dataframe; the Spark UI will also show a new job.
These lazy commands are called?transformations?and the eager commands are called?actions. Computations only happen when actions are triggered.
List of transformations (lazy):
List of actions (eager):
Functions and User Defined Functions (UDFs)
Besides the SQL and Pandas-like commands we've seen so far, Spark provides additional built-in functions that allow for more complex data manipulation. By convention, these functions are imported as follows:
from pyspark.sql import functions as F
Here's an example of built-in function usage:
df \
.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
.withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
.select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
.show()
A list of built-in functions is available?in the official Spark documentation page.
Besides these built-in functions, Spark allows us to create?User Defined Functions?(UDFs) with custom behavior for those instances where creating SQL queries for that behaviour becomes difficult both to manage and test.
Spark SQL
Although there are other tools for expressing batch jobs as SQL queries, Spark can also run SQL queries, which can come in handy if you already have a Spark cluster and setting up an additional tool for sporadic use isn't desirable.
To make SQL queries with Spark, we have to create a table for retrieving records, because a dataframe is not a table, so we need to register the dataframe as a table first:
df_trips_data.registerTempTable('trips_data')
With our registered table, we can now perform regular SQL operations.
spark.sql("""
SELECT
service_type,
count(1)
FROM
trips_data
GROUP BY
service_type
""")
Spark Cluster
Until now, we've used a?local cluster?to run our Spark code, but Spark clusters often contain multiple computers that behace as executors.
Spark clusters are managed by a?master, which behaves similarly to an entry point of a Kubernetes cluster. A?driver?(an Airflow DAG, a computer running a local script, etc.) that wants to execute a Spark job will send the job to the master, which in turn will divide the work among the cluster's executors. If any executor fails and becomes offline for any reason, the master will reassign the task to another executor.
Each executor will fetch a?dataframe partition?stored in a?Data Lake?(usually S3, GCS or a similar cloud provider), do something with it and then store it somewhere, which could be the same Data Lake or somewhere else. If there are more partitions than executors, executors will keep fetching partitions until every single one has been processed.
This is in contrast to?Hadoop, another data analytics engine, whose executors locally store the data they process. Partitions in Hadoop are duplicated across several executors for redundancy, in case an executor fails for whatever reason (Hadoop is meant for clusters made of commodity hardware computers). However, data locality has become less important as storage and data transfer costs have dramatically decreased and nowadays it's feasible to separate storage from computation, so Hadoop has fallen out of fashion.
Building Next-Gen Data Solutions ? IIT Madras ? Data Engineer @ L&T ? 2x Harvard HPAIR Delegate
2 年Wohhoo... Great ??
Senior Data Analyst | Teacher | Content Creator | .5x Programmer
2 年Dang Filipe Balseiro, well done!