Apache Spark-Part 2: Spark SQL/DataFrames

Apache Spark-Part 2: Spark SQL/DataFrames

DataFrames are logically comparable to relational tables or DataFrames in Python/R, but they have a lot of optimizations hidden behind the scenes. We can make DataFrames from collections, HIVE tables, Relational tables, and RDDs in a variety of methods.

We have a SparkSession, which is the starting place for the DataFrames API, similar to the sc (Spark Context) in RDDs. A SparkSession will be available as "spark" when we run spark-shell or pyspark. Otherwise, we may make one using the code below.

spark =SparkSession.builder.master(“local”).getOrCreate()

Construct a DataFrame from an HIVE table.

df = spark.table(“tbl_name”)


Basic operations on DataFrames

Count how many rows there are.

df.count()

The names of the columns in the DataFrame can be accessed.

df.columns

The DataType of columns within the DataFrame can be accessed.

df.dtypes

Examine how Spark saves the DataFrame's schema.

df.schema

In a heirarchical way, print the DataFrame's schema.

df.printSchema()

The contents of the DataFrame are shown.

df.show()

Choose certain columns from the DataFrame.

df.select(“col1”, “col2”)

Filter the rows according to a set of criteria. Let's see if we can locate the rows with id = 1. The condition can be specified in a variety of ways.

from pyspark.sql.functions import col

df.filter(df["id"] == 1)

df.filter(df.id == 1)

df.filter(col("id") == 1)

df.filter("id = 1")

Note:We must import the "col" function before we can use it.

Remove a certain Column

newdf = df.drop(“id”)

Note:Because DataFrames are immutable, this action will not remove the column from the “df” DataFrame. It will, however, return a new DataFrame that is missing that column.


?Aggregations

The groupBy function may be used to group the data, and then the “agg” function can be used to aggregate the grouped data.

(df.groupBy("col1") .agg(

count("col2").alias("count"),

sum("col2").alias("sum"),

max("col2").alias("max"),

min("col2").alias("min"),

avg("col2").alias("avg")

).show()

)


Sorting

Sort the information by “id.” Sorting is done in ascending order by default.

df.sort(“id”)

Sort the information in decreasing order.

df.sort(desc(“id”)).show()


Derived Columns

The “withColumn” method may be used to create a column based on existing columns...

df.withColumn(“age”, current_year — birth_year)

...where age is a derived column calculated using the formula "current year — birth year."


Joins

On several DataFrames, we may execute various sorts of joins. Let's say you want to combine two DataFrames df1 and df2 based on the "id" column.

df1.join(df2, df1[“id”] == df2[“id”])

An inner join is done by default. Other joins, such as "left outer," "right outer," and "full outer," can be performed by giving these as the third parameter.

df1.join(df2, df1[“id”] == df2[“id”], “left_outer”)


Executing SQL like queries

We may also do data analysis using SQL-like queries. We must register the DataFrame as a Temporary View in order to conduct SQL-like queries.

df.createOrReplaceTempView(“temp_table”)

Now we can execute the SQL like queries as below :

spark.sql(“select * from temp_table where id = 1”).show()


Creating an HIVE Table from the DataFrame

spark.sql(“select * from temp_table where id = 1”).show()


Saving the DataFrame as a HIVE Table

df.write.saveAsTable(“DB_NAME.TBL_NAME”)

For "overwrite," "append," "error," and other options, we may use the "mode" parameter.

df.write.saveAsTable(“DB_NAME.TBL_NAME”, mode=”overwrite”)

Note:The DataFrame will be saved as an HIVE Managed table by default.


Creating an HIVE External table from the DataFrame

df.write.saveAsTable(“DB_NAME.TBL_NAME”, path=<location_of_external_table>)


Make a DataFrame out of a CSV file

We may use a CSV file to construct a DataFrame and provide different parameters like as a separator, header, schema, inferSchema, and other variables. Let's assume we have a CSV file with a header delimited by "|" and we want to build the schema automatically.

df = spark.read.csv(“path_to_csv_file”, sep=”|”, header=True, inferSchema=True)


A CSV file can be created from a DataFrame.

If, after conducting our analysis, we need to save the DataFrame back to a CSV file, we may do it as follows:

df.write.csv(“path_to_CSV_File”, sep=”|”, header=True, mode=”overwrite”)


From a relational table, create a DataFrame.

A JDBC URL can be used to access data from relational databases.

relational_df = spark.read.format(‘jdbc’).options(url=jdbc_url,dbtable= <TBL_NAME>,user= <USER_NAME>,password = <PASSWORD>).load()


As a relational table, save the DataFrame.

Using a JDBC URL, we may store the DataFrame as a relational table.

relational_df.write.format(‘jdbc’).options(url=jdbc_url,dbtable= <TBL_NAME>,user= <USER_NAME>,password = <PASSWORD>).mode(‘overwrite’).save()


I hope you have enjoyed part 2 of this series. Next part will be on advanced DataFrame operations


For implementation of article and code click here

______________________________________________________________

Other parts of this series:

Apache Spark-Part 1:RDD and DATA Frame components

要查看或添加评论,请登录

Akshat Pattiwar的更多文章

社区洞察

其他会员也浏览了