Big Data with Spark in Google Colab
Rodolfo Maciéczyk
Decision Science Product Consultant @ Disney | DDSI | MS Computer Science Student at Georgia Tech | Data Science, Machine Learning
Building up the desire to extract the most relevant information from huge amounts of data, can lead us to what this fairly new term, "Big Data", refers. Although that desire always existed, with the generation of massive amounts of data from almost anything we do each day, the stack get's bigger by the second. But "bigger", depends from where we look at it. Our technology get's also powerful, maybe not at the same pace as the amount of data we create, but it definitely get's smarter. Hence, the time span of our capacity to extract and understand that data drops, and the cycle starts again.
So, if Big Data is the desire, what are Spark and Colab ? The latter, are tools that complement a Data Scientist's toolbox. The first one, is a framework that simplifies the task when working on distributed computing. It was developed from the beginning to work with machine learning algorithms, meaning that it has been optimized for high performance situations where data is accessed multiple times over many iterations. On the other hand Google Colaboratory, also known as Colab, is a free environment that runs in the cloud and accessed through Google Drive, where you can run your scripts.
Combining a dataset with these two tools can lead us to perform Big Data. For the purpose of learning, we will have the following objective: build a machine learning model that predicts the cancellation of flights, using Spark as our framework and Colab as our environment. We will be following the next steps:
- Know the dataset
- Setup our Colab and Spark environment
- Download the dataset directly from a website to our Google Drive
- Import additional tools and setup constants
- Connect to the Spark server and load the data
- Prepare, clean and validate the data
- Setup and run our model in Spark
- Evaluate our model
Our dataset
The dataset is hosted at stat-computing.org, however the original comes from RITA (Research and Innovative Technology Administration) at the US Department of Transportation. There are several years we can explore, however, in this case we will only focus on two years, 2007 and 2008. Only these two years sum up a total of 14'462'943 rows and 29 columns worth of data.
Setting up Colab and Spark environment
Assuming you have a Google account, access your Google Drive, and create two folders, by clicking on: + New > Folder. We can name them "Colab Datasets" and "Colab Notebooks".
Next, we will connect Colab to our Drive. Click on + New > More > + Connect more apps, type Colab in the search box, click on Colaboratory and then on + Connect. You can check Colaboratory is added by clicking on + New > More.
Now let's create a new notebook in our "Colab Notebook" folder. Double click in the latter, and right click inside the folder. Next, click on More > Colaboratory. You have now created your first Jupyter Notebook in your Google Drive thanks to Colab. This is the environment where you'll write and run your code. You can choose to rename your notebook and call it flight_delays, click on the default name in the upper left hand side and rename it:
We can now setup and install all the Spark dependencies for Python so we link our notebook with the Spark server. Also, we will import the required environment variables. Copy and run the following code:
# Install spark-related dependencies !apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q https://apache.osuosl.org/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz !tar xf spark-2.4.3-bin-hadoop2.7.tgz !pip install -q findspark !pip install pyspark # Set up required environment variables import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
Be aware that depending on the time you are reading this article, you must replace the Spark version with the latest. To do this, check the last version at PyPi, and replace the version number in the code above.
Furthermore, we will point Colab to our Google Drive. This will enable us to use the datasets we will download later to our "Colab Datasets" folder created earlier. Copy and run the following code:
# Point Colaboratory to your Google Drive from google.colab import drive drive.mount('/content/gdrive')
You will be prompted to copy and paste an authorization code to allow your drive to get connected to Colab. The result should look like this:
Data download to Google Drive?
Now that you are set up, you can proceed to download the relevant datasets to your "Colab Datasets" folder. For this, I used a useful code from geeksforgeeks.org. This enables you to download any dataset from any website to your Google Drive, avoiding downloading to your local machine and uploading to your Drive:
# Download datasets directly to your Google Drive "Colab Datasets" folder import requests # 2007 data file_url = "https://stat-computing.org/dataexpo/2009/2007.csv.bz2" r = requests.get(file_url, stream = True) with open("/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2", "wb") as file: for block in r.iter_content(chunk_size = 1024): if block: file.write(block) # 2008 data file_url = "https://stat-computing.org/dataexpo/2009/2008.csv.bz2" r = requests.get(file_url, stream = True) with open("/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2", "wb") as file: for block in r.iter_content(chunk_size = 1024): if block: file.write(block)
Note that you only need to do this once. If you run your code several times, you can comment each of the latter lines and bypass this block.
Import tools from PySpark and setup constants
We need additional tools to connect to the Spark server, load our data, clean it and prepare it to run our model. In this case, I'm choosing Random Forrest as our classifier, which is present in the PySpark Machine Learning library, so we will also setup the constants we will later need to run our model:
# Tools we need to connect to the Spark server, load our data, # clean it and prepare it from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.sql.functions import isnan, when, count, col # Set up constants CSV_2007= "/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2" CSV_2008= "/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2" APP_NAME = "Flight Delays" SPARK_URL = "local[*]" RANDOM_SEED = 141109 TRAINING_DATA_RATIO = 0.7 RF_NUM_TREES = 8 RF_MAX_DEPTH = 4 RF_NUM_BINS = 32
Check your folder and dataset file names are exactly as the ones in the constants, otherwise, modify the code above to match your path.
Connect to the server and load the data?
Counting on the constants we set above, we are now ready to connect to the Spark server and load our datasets. We will also concatenate row-wise both datasets using PySpark's .unionAll method:
# Connect to the Spark server spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate() # Load datasets df_2007 = spark.read.options(header="true",inferschema = "true").csv(CSV_2007) df_2008 = spark.read.options(header="true",inferschema = "true").csv(CSV_2008) # We concatenate both datasets df = df_2007.unionAll(df_2008)
We have now created a DataFrame from two datasets. This actually is a Spark DataFrame, which is not the same as a Pandas DataFrame, maybe generating some confusion. Although Spark DataFrames mimics each time closer to Pandas, as the Spark Machine Learning library mimics closer to the Scikit-learn library, they are close but still not the same. Some great advantages of a Spark DataFrame is that it is distributed, enabling, if the conditions are given, to speed up processes and get the benefits from parallel processing.
Prepare, clean and validate the data
With our dataset loaded we can now check up it's shape:
print(f"The shape is {df.count():d} rows by {len(df.columns):d} columns.")
Returning:
This is a massive amount of data (for now, thinking on what we mentioned in the beginning). Later we will build vector variables for our Random Forrest classifier, and for this, we need that there are no null values, otherwise, Spark will fail in building our vectors. We can check if null values are present running the following:
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().to_dict(orient='records') print(f"We have {sum(null_counts[0].values()):d} null values in this dataset.")
This will return that we have 14'248'147 null values in the dataset. If we run a quick exploration, for example with the .show() method, we can see that a great part of the CancellationCode column has null values. Since the purpose of this article is to learn the steps to build a model using Spark and Colab, and not to make deep analysis of the given dataset, we will drop this column, and any additional cell that contains null values:
df = df.drop(df.CancellationCode) df = df.na.drop()
Running the null_counts code above again, will conclude that no null values are present. And now running the shape code we used earlier to determine our dataset's shape, will return the following:
This reduced our dataset by 83'387 rows and a whole column, just 0.58 % row-wise.
Feature and label vector
As I mentioned before, we will build vector variables which on one hand will be the label vector that we want our classifier to predict, and on the other hand, the feature vector, that is, the collection of columns we want our classifier to use to predict results. For this, we first inspect our dataset's column types running:
df.dtypes
And as a return we will obtain:
We will determine we want to use our Cancelled column to build our label vector. Why ? This column has two unique variables as we can check running:
df.select('Cancelled').distinct().rdd.map(lambda r: r[0]).collect()
From the variable description referred in the dataset's site, "1" indicates that "yes", the flight has been cancelled, and "0" means "no", the flight hasn't been cancelled. This is what we want our model to predict. From the .dtypes method we run before, we can see that there are several "string" variables, these will be a problem when we want to generate our vector labels and run our model. Therefore, we either convert them to other type of variables, or remove them. As our final objective in this article is to learn how to make all this work, and not analyze this given dataset, we will choose not to use these columns. Hence, we will create a feature column list, with those int type columns we want to use for our feature vector:
feature_cols = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSDepTime', 'CRSArrTime', 'FlightNum', 'Distance', 'Diverted']
Next, we will generate our feature vector and add the corresponding column to the end of our dataframe, which will be undergoing the .transform() method:
df = VectorAssembler(inputCols=feature_cols, outputCol="features").transform(df)
Now we can isolate both input columns we will use to train and test our model, and view how they look like:
df.select("Cancelled", "features").show(5)
Setup and run our model in Spark
We are now ready to build our indexers, split our dataset into 70 % for our training set and 30 % for our test set, define the parameters of our model and finally link everything together into a pipeline which we''ll later use to actually run the model:
# Generate a labelIndexer labelIndexer = StringIndexer(inputCol="Cancelled", outputCol="indexedLabel").fit(df) # Generate the indexed feature vector featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df) # Split the data into training and tests sets (trainingData, testData) = df.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO]) # Train the RandomForest model rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES) # Chain indexers and the random forest model in a Pipeline pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])
Having both our training and test sets ready, we can train our model and make predictions running the .transform() method:
# Train model model = pipeline.fit(trainingData) # Make predictions predictions = model.transform(testData)
Evaluation and considerations
At this moment we can use the MulticlassClassificationEvaluator to test our model's accuracy:
evaluator = MulticlassClassificationEvaluator( labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) print(f"Test Error = {(1.0 - accuracy):g}") print(f"Accuracy = {accuracy:g}")
This looks like a great accuracy result ! For the purpose of this article, the steps we took helped us to chain everything together, and show us how easy you can link a dataset allocated in Google Drive with Spark within Google's Colab environment.
However, you need to be aware that this result needs further analysis and wrangling. One of the main issues that we have considering this result, is that in previous steps we should have checked how balanced our data is, i.e., how many "0"'s versus "1"'s are present in the Cancelled column, and according to that result, we over or under sample our database.
Using Spark is a great gateway to manipulate really large and multiple datasets, and reduce your processing time, so the benefits of combining it with Colaboratory and your Google Drive are immense!
You can check out the complete code at my GitHub repository.