Getting ISO year right in PySpark
Part of working with data and dates is the age-old question "When does the first week of the year start?". In PySpark we can use the weekofyear function which is ISO 8601 compliant, but what if you want the correct year according to ISO 8601? That was more of a hassle than I thought!
I'm using Databricks runtime 13.3 LTS with Spark 3.4.1, so the behavoir of the functions used below might change in newer versions and make this issue obsolete
The issue
Imagine that you have data on date granularity and you need to create a week number and year from the date to group on. That's when you will run into issues when computing the year using the standard PySpark year or date_format functions and the week number using the weekofyear function.
The following code will produce date ranges starting on the 30th of December and ending on the 2nd of January for the years 2020 to 2025 to exemplify the issue.
%python
from pyspark.sql.functions import (
col,
year,
month,
weekofyear,
date_format,
when,
lit,
sum,
udf,
)
from pyspark.sql.types import DateType, IntegerType
from datetime import datetime, timedelta
# Create a list of dates starting from 30th December 2024 for the past 5 years
start_date = datetime(2024, 12, 29)
dates = [
start_date - timedelta(days=365 * i) + timedelta(days=j)
for i in range(5)
for j in range(4)
]
# Create a DataFrame from the list of dates
dates_df = spark.createDataFrame([(date,) for date in dates], ["date"])
# Convert the date column to DateType
end_of_year = dates_df.withColumn("date", dates_df["date"].cast(DateType()))
display(end_of_year.orderBy("date"))
This produces a nice little DataFrame with the date ranges for each year, of special interest are the dates 2021-01-01, 2021-01-02, 2024-12-30, and 2024-12-31.
Let's extract the year from the 'date' column using the year() and date_format() and also the week number using the weekofyear() built-in PySpark functions. We'll also add a 'count' column to aggregate later.
# Extract year and week number using built-in Spark functions
end_of_year_non_iso = (
end_of_year.withColumn("year_standard", year("date"))
.withColumn("year_date_format", date_format("date", "yyyy"))
.withColumn("week_number", weekofyear("date"))
.withColumn("count", lit(1))
)
display(end_of_year_non_iso.orderBy("date"))
If we take a closer look at 2024-12-30, we'll see that something isn't right as the year is set to 2024 but the week number is set to 1. The same goes for 2024-12-30.
But according to EpochConverter week 1 of 2025 starts on the 30th of December 2024, meaning the data gets assigned to the wrong year:
This is the crux of the issue: the year() and date_format() functions don't comply with the ISO 8601 standard while weekofyear() does. As far as I can tell, this isn't explained in the Apache Spark documentation and there is no way to force the two non-compliant functions to adhere to ISO 8601.
The fix
There are two ways to make the year ISO 8601 compliant: creating a User Defined Function (UDF) using isocalendar() from Python (or a similar library) or creating a custom logic in PySpark.
Both come with tradeoffs, UDFs can introduce performance overhead because they require serialization and deserialization of data between the JVM and Python. For better performance, it is recommended to use built-in Spark functions whenever possible. The custom logic might not produce the correct ISO-compliant year for all dates, it works on the ones I have tested but there might be edge cases.
User Defined Function
Using the isocalendar() function is pretty easy in PySpark, but since it is not native to PySpark it must be wrapped in a UDF to work in a DataFrame. The function also returns a tuple containing: ISO Year, ISO Week Number, and ISO Weekday, so we need only to extract the first element of the tuple using [0].
领英推荐
#Extract year using UDF
# Define a UDF to extract the ISO year using isocalendar()
def get_iso_year(dt):
return dt.isocalendar()[0]
iso_year_udf = udf(get_iso_year, IntegerType())
end_of_year_udf = (
end_of_year.withColumn("year_iso_calendar", iso_year_udf(col("date")))
.withColumn("week_number", weekofyear("date"))
.withColumn("count", lit(1))
)
display(end_of_year_udf.orderBy("date"))
Now the year is set to 2025 for 2024-12-30 and 2024-12-31, and also set to 2020 for 2021-01-01 and 2021-01-02 which complies with ISO 8601.
Remembering the potential performance degradation caused by using a UDF is important. It won't be noticeable on a small dataset like this, but if you are working with billions of rows you will surely see performance degradation.
Custom logic
The guard against performance degradation is implementing a custom logic that only uses native PySpark functions. The custom logic uses the year(), month (), and weekofyear() functions together to determine if we are in a situation where the date's month is in December, and the week is set to '1' indicating that the year() function will produce the incorrect year needing to added by 1. Likewise for a situation where the date's month is in January, and the week is set to '52' or above indicating that the year() function will produce the incorrect year needing to be subtracted by 1.
#Extract year using custom logic
end_of_year_custom = (
end_of_year.withColumn(
"year_iso_manual_logic",
when(
(weekofyear(col("date")) == 1) & (month(col("date")) == 12),
year(col("date")) + 1,
)
.when(
(weekofyear(col("date")) >= 52) & (month(col("date")) == 1),
year(col("date")) - 1,
)
.otherwise(year(col("date"))),
)
.withColumn("week_number", weekofyear("date"))
.withColumn("count", lit(1))
)
display(end_of_year_custom.orderBy("date"))
The logic will now correctly set the year to 2025 for 2024-12-30 and 2024-12-31 and also set the year to 2020 for 2021-01-01 and 2021-01-02 making it comply with ISO 8601.
Closing remarks
While I haven't tested the custom logic on all dates for edge cases, it should be the best way to produce ISO 8601-compliant years until we have that functionality built into (maybe as a flag or parameter) the year() and date_format() native Spark functions.
When grouping the DataFrames it becomes apparent why taking into account the ISO year is important since not doing so will produce incorrect aggregations, in this case, the sum() of the 'count' column.
# Group by year_standard and week_number, and sum the count
grouped_end_of_year_non_iso = end_of_year_non_iso.groupBy("year_standard", "week_number").agg(
sum("count").alias("total_count")
)
display(grouped_end_of_year_non_iso.orderBy("year_standard", "week_number"))
Produces a DataFrame where the counts of 2024-12-30, 2024-12-31, 2021-01-01, and 2021-01-02 are grouped to the incorrect year and week. It also creates a week 53 in 2021 which doesn't exist.
Grouping using the ISO-compliant year custom logic column.
# Group by year_iso_manual_logic, and week_number, and sum the count
grouped_end_of_year_custom = end_of_year_custom.groupBy("year_iso_manual_logic", "week_number").agg(
sum("count").alias("total_count")
)
display(grouped_end_of_year_custom.orderBy("year_iso_manual_logic", "week_number"))
It will assign the counts of 2024-12-30, 2024-12-31, 2021-01-01, and 2021-01-02 to the correct year and week thus giving the correct aggregates.
Head of Digital Transformation @ Arla | External Lecturer @ Aalborg University
1 个月Jeg bliver n?dt til at l?re mere af dig, Morten!
Technical Lead - Data Engineer | Transforming Data Landscapes with Expertise in Architecture, ETL, and Azure | Passionate about Driving Innovation | MBA In Operation and System Management | Azure Data Engineer
1 个月Thanks for sharing Morten, I was also thinking why i am getting wrong while using naive function of spark. Its help lots.
Speaking PL, EN, IT, ESP _ POWER BI Developer / People oriented leader / Lean Management enthusiast / Six Sigma Black belt / Fast learner
2 个月That is great! Thanks for sharing Morten