Apache Spark

Apache Spark

References

Alvaro Navas Notes

Data Engineering Zoomcamp Repository

Installing Spark

Installation instructions for Linux, MacOS and Windows are available on the course repository.

After installing the right JDK and Spark version, make sure that you set up PySpark by following these instructions.

In order to test your installation you can run this Jupyter Notebook.

Note: If you're running Spark and Jupyter Notebook on a remote machine, you will need to redirect ports 8888 for Jupyter Notebook and 4040 for the Spark UI.

Spark/PySpark Fundamentals

Creating a Spark session

We can use Spark with Python code by using PySpark library.

We start by importing PySpark to our code as following:

import pyspark
from pyspark.sql import SparkSession        

We also need to instantiate a Spark session, an object that we use to interact with Spark.

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()        

  • Sparksession is the class of the object that we instantiate. builder is the builder method
  • master() sets the Spark master URL to connect to. The local string means that Spark will run on a local cluster. [*] means that Spark will run with as many CPU cores as available.
  • appname() defines the name of our application/session. This will show in the Spark UI.
  • getOrCreate() will create the session or recover the object if it was previously created.

Once we've instantiated a session, we can access the Spark UI by browsing to localhost:4040. The UI will show all current jobs. Right now there are no jobs running, since we've just created the instance.

Reading csv files

Like Pandas, Spark can read csv files into dataframes, a tabular data structure. Unlike Pandas, Spark can handle much bigger datasets but it's unable to infer the datatypes of each column.

To read a csv file and create a dataframe we can use the following command:

df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')        

  • read() reads the file.
  • option() contains the options for the read method. In this case, we're specifying that the first line of the CSV file contains the column names.
  • csv() is the method to read csv files.

We can use the command df.show() or df.head() to show the first rows from the dataframe.

We can also use df.schema to check the current schema.

Partitions

A?Spark cluster?is composed of multiple?executors. Each executor can process data independently in order to parallelize and speed up work.

In the previous example we read a single large CSV file. A file can only be read by a single executor, which means that the code we've written so far isn't parallelized and thus will only be run by a single executor rather than many at the same time.

In order to solve this issue, we can?split a file into multiple parts?so that each executor can take care of a part and have all executors working simultaneously. These splits are called?partitions.

We will now read the CSV file, partition the dataframe and parquetize it. This will create multiple files in parquet format.

Note: converting to parquet is an expensive operation which may take several minutes.

# create 24 partitions in our dataframe
df = df.repartition(24)
# parquetize and write to fhvhv/2021/01/ folder
df.write.parquet('fhvhv/2021/01/')        

You may check the Spark UI at any time and see the progress of the current job, which is divided into stages which contain tasks. The tasks in a stage will not start until all tasks on the previous stage are finished.

When creating a dataframe, Spark creates as many partitions as CPU cores available by default, and each partition creates a task. Thus, assuming that the dataframe was initially partitioned into 6 partitions, the?write.parquet()?method will have 2 stages: the first with 6 tasks and the second one with 24 tasks.

Besides the 24 parquet files, you should also see a?_SUCCESS?file which should be empty. This file is created when the job finishes successfully.

Trying to write the files again will output an error because Spark will not write to a non-empty folder. You can force an overwrite with the?mode?argument:

df.write.parquet('fhvhv/2021/01/', mode='overwrite')        

The opposite of partitioning (joining multiple partitions into a single partition) is called?coalescing.

Spark dataframes

We can create a dataframe from a parquet file like this:

df = spark.read.parquet('fhvhv/2021/01/')        

Unlike CSV files, parquet files contain the schema of the dataset, so there is no need to specify a schema like we previously did when reading the CSV file. You can check the schema like this:

df.printSchema()        

(One of the reasons why parquet files are smaller than CSV files is because they store the data according to the datatypes, so integer values will take less space than long or string values.)

There are many Pandas-like operations that we can do on Spark dataframes, such as:

  • Column selection - returns a dataframe with only the specified columns.

new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')        

  • Filtering by value - returns a dataframe whose records match the condition stated in the filter.

new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID').filter(df.hvfhs_license_num == 'HV0003')        

Actions vs Transformations

Some Spark methods are called "lazy", meaning that they are not executed right away. You can test this with the last instructions we run in the previous section: after running them, the Spark UI will not show any new jobs. However, running?df.show()?right after will execute right away and display the contents of the dataframe; the Spark UI will also show a new job.

These lazy commands are called?transformations?and the eager commands are called?actions. Computations only happen when actions are triggered.

List of transformations (lazy):

  • Selecting columns
  • Filtering
  • Joins
  • Group by
  • Partitions
  • ...

List of actions (eager):

  • Show, take, head
  • Write, read
  • ...

Functions and User Defined Functions (UDFs)

Besides the SQL and Pandas-like commands we've seen so far, Spark provides additional built-in functions that allow for more complex data manipulation. By convention, these functions are imported as follows:

from pyspark.sql import functions as F
        

Here's an example of built-in function usage:

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()        

  • withColumn()?is a?transformation?that adds a new column to the dataframe.
  • select()?is another transformation that selects the stated columns.
  • F.to_date()?is a built-in Spark function that converts a timestamp to date format (year, month and day only, no hour and minute).

A list of built-in functions is available?in the official Spark documentation page.

Besides these built-in functions, Spark allows us to create?User Defined Functions?(UDFs) with custom behavior for those instances where creating SQL queries for that behaviour becomes difficult both to manage and test.

Spark SQL

Although there are other tools for expressing batch jobs as SQL queries, Spark can also run SQL queries, which can come in handy if you already have a Spark cluster and setting up an additional tool for sporadic use isn't desirable.

To make SQL queries with Spark, we have to create a table for retrieving records, because a dataframe is not a table, so we need to register the dataframe as a table first:

df_trips_data.registerTempTable('trips_data')
        

With our registered table, we can now perform regular SQL operations.

spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""")        

Spark Cluster

Until now, we've used a?local cluster?to run our Spark code, but Spark clusters often contain multiple computers that behace as executors.

Spark clusters are managed by a?master, which behaves similarly to an entry point of a Kubernetes cluster. A?driver?(an Airflow DAG, a computer running a local script, etc.) that wants to execute a Spark job will send the job to the master, which in turn will divide the work among the cluster's executors. If any executor fails and becomes offline for any reason, the master will reassign the task to another executor.

No alt text provided for this image

Each executor will fetch a?dataframe partition?stored in a?Data Lake?(usually S3, GCS or a similar cloud provider), do something with it and then store it somewhere, which could be the same Data Lake or somewhere else. If there are more partitions than executors, executors will keep fetching partitions until every single one has been processed.

This is in contrast to?Hadoop, another data analytics engine, whose executors locally store the data they process. Partitions in Hadoop are duplicated across several executors for redundancy, in case an executor fails for whatever reason (Hadoop is meant for clusters made of commodity hardware computers). However, data locality has become less important as storage and data transfer costs have dramatically decreased and nowadays it's feasible to separate storage from computation, so Hadoop has fallen out of fashion.

Sayan Chowdhury

Building Next-Gen Data Solutions ? IIT Madras ? Data Engineer @ L&T ? 2x Harvard HPAIR Delegate

2 年

Wohhoo... Great ??

Michael Shoemaker, MBA

Senior Data Analyst | Teacher | Content Creator | .5x Programmer

2 年

Dang Filipe Balseiro, well done!

要查看或添加评论,请登录

Filipe Balseiro的更多文章

  • Introduction to Streaming - Apache Kafka

    Introduction to Streaming - Apache Kafka

    References Alvaro Navas Notes Data Engineering Zoomcamp Repository What is a streaming data pipeline? A data pipeline…

  • Spark - Setting up a Dataproc Cluster on GCP

    Spark - Setting up a Dataproc Cluster on GCP

    Dataproc is Google's cloud-managed service for running Spark and other data processing tools such as Flink, Presto…

    6 条评论
  • DBT- Data Build Tool (Part II)

    DBT- Data Build Tool (Part II)

    References Alvaro Navas Notes Data Engineering Zoomcamp Repository Testing and documenting dbt models Although testing…

    2 条评论
  • DBT- Data Build Tool (Part I)

    DBT- Data Build Tool (Part I)

    References Alvaro Navas Notes Data Engineering Zoomcamp Repository What is dbt? dbt stands for data build tool. It's a…

    3 条评论
  • BigQuery

    BigQuery

    Partitioning vs Clustering It's possible to combine both partitioning and clustering in a table, but there are…

  • DataCamp - Data Engineering with Python

    DataCamp - Data Engineering with Python

    Data Engineers Data engineers deliver: The correct data In the right form To the right people As efficiently as…

  • Youtubers Popularity

    Youtubers Popularity

    Working with Youtube's API to collect channel and video statistics from 10 youtubers I follow and upload the data to an…

    12 条评论
  • Google Data Analytics Professional Certificate Capstone Project: Cyclistic

    Google Data Analytics Professional Certificate Capstone Project: Cyclistic

    Case Study: Help a bike-share company to convert casual riders into annual members In this article I showcase my…

社区洞察

其他会员也浏览了