Apache Spark-Part 2: Spark SQL/DataFrames
Akshat Pattiwar
Associate @PwC India || Salesforce Developer || 6x Salesforce Certified || Cloud Computing
DataFrames are logically comparable to relational tables or DataFrames in Python/R, but they have a lot of optimizations hidden behind the scenes. We can make DataFrames from collections, HIVE tables, Relational tables, and RDDs in a variety of methods.
We have a SparkSession, which is the starting place for the DataFrames API, similar to the sc (Spark Context) in RDDs. A SparkSession will be available as "spark" when we run spark-shell or pyspark. Otherwise, we may make one using the code below.
spark =SparkSession.builder.master(“local”).getOrCreate()
Construct a DataFrame from an HIVE table.
df = spark.table(“tbl_name”)
Basic operations on DataFrames
Count how many rows there are.
df.count()
The names of the columns in the DataFrame can be accessed.
df.columns
The DataType of columns within the DataFrame can be accessed.
df.dtypes
Examine how Spark saves the DataFrame's schema.
df.schema
In a heirarchical way, print the DataFrame's schema.
df.printSchema()
The contents of the DataFrame are shown.
df.show()
Choose certain columns from the DataFrame.
df.select(“col1”, “col2”)
Filter the rows according to a set of criteria. Let's see if we can locate the rows with id = 1. The condition can be specified in a variety of ways.
from pyspark.sql.functions import col
df.filter(df["id"] == 1)
df.filter(df.id == 1)
df.filter(col("id") == 1)
df.filter("id = 1")
Note:We must import the "col" function before we can use it.
Remove a certain Column
newdf = df.drop(“id”)
Note:Because DataFrames are immutable, this action will not remove the column from the “df” DataFrame. It will, however, return a new DataFrame that is missing that column.
?Aggregations
The groupBy function may be used to group the data, and then the “agg” function can be used to aggregate the grouped data.
(df.groupBy("col1") .agg(
count("col2").alias("count"),
sum("col2").alias("sum"),
max("col2").alias("max"),
min("col2").alias("min"),
avg("col2").alias("avg")
).show()
)
Sorting
Sort the information by “id.” Sorting is done in ascending order by default.
df.sort(“id”)
Sort the information in decreasing order.
df.sort(desc(“id”)).show()
领英推荐
Derived Columns
The “withColumn” method may be used to create a column based on existing columns...
df.withColumn(“age”, current_year — birth_year)
...where age is a derived column calculated using the formula "current year — birth year."
Joins
On several DataFrames, we may execute various sorts of joins. Let's say you want to combine two DataFrames df1 and df2 based on the "id" column.
df1.join(df2, df1[“id”] == df2[“id”])
An inner join is done by default. Other joins, such as "left outer," "right outer," and "full outer," can be performed by giving these as the third parameter.
df1.join(df2, df1[“id”] == df2[“id”], “left_outer”)
Executing SQL like queries
We may also do data analysis using SQL-like queries. We must register the DataFrame as a Temporary View in order to conduct SQL-like queries.
df.createOrReplaceTempView(“temp_table”)
Now we can execute the SQL like queries as below :
spark.sql(“select * from temp_table where id = 1”).show()
Creating an HIVE Table from the DataFrame
spark.sql(“select * from temp_table where id = 1”).show()
Saving the DataFrame as a HIVE Table
df.write.saveAsTable(“DB_NAME.TBL_NAME”)
For "overwrite," "append," "error," and other options, we may use the "mode" parameter.
df.write.saveAsTable(“DB_NAME.TBL_NAME”, mode=”overwrite”)
Note:The DataFrame will be saved as an HIVE Managed table by default.
Creating an HIVE External table from the DataFrame
df.write.saveAsTable(“DB_NAME.TBL_NAME”, path=<location_of_external_table>)
Make a DataFrame out of a CSV file
We may use a CSV file to construct a DataFrame and provide different parameters like as a separator, header, schema, inferSchema, and other variables. Let's assume we have a CSV file with a header delimited by "|" and we want to build the schema automatically.
df = spark.read.csv(“path_to_csv_file”, sep=”|”, header=True, inferSchema=True)
A CSV file can be created from a DataFrame.
If, after conducting our analysis, we need to save the DataFrame back to a CSV file, we may do it as follows:
df.write.csv(“path_to_CSV_File”, sep=”|”, header=True, mode=”overwrite”)
From a relational table, create a DataFrame.
A JDBC URL can be used to access data from relational databases.
relational_df = spark.read.format(‘jdbc’).options(url=jdbc_url,dbtable= <TBL_NAME>,user= <USER_NAME>,password = <PASSWORD>).load()
As a relational table, save the DataFrame.
Using a JDBC URL, we may store the DataFrame as a relational table.
relational_df.write.format(‘jdbc’).options(url=jdbc_url,dbtable= <TBL_NAME>,user= <USER_NAME>,password = <PASSWORD>).mode(‘overwrite’).save()
I hope you have enjoyed part 2 of this series. Next part will be on advanced DataFrame operations
For implementation of article and code click here
______________________________________________________________