Apache Sedona and Ilum: Spatial Indexing and Partitioning
Apache Sedona is a very powerful geospatial data analytic framework that revolutionizes the way we handle large-scale spatial data processing. The distinctive computation power of Sedona makes it fit for working in cluster computing environments. Handling big data stops being just an on-demand task and becomes a necessity. One of the killer features in Apache Sedona is its full support for various types of spatial data formats, ensuring integration with and access over different computing environments.
In this tutorial, we are going to see how Apache Sedona uses spatial indexes to optimize queries and improve performance. With languages such as Python or Scala, developers can easily manage and analyze geospatial datasets using Apache Sedona, which is a core tool in their kit for data engineers burrowed deep within the complexities of geospatial big data. You will also learn how to practically apply Apache Sedona to a set of real-world examples that involve, for instance, spatial join operations and route optimization, that is, how it proves to be useful in geospatial data analysis tasks.
Being able to process vast amounts of geospatial information is increasingly important in today's data-driven landscape. Be it for logistics optimization, better environmental monitoring, or urban planning optimization, the challenges in handling complex spatial datasets are huge. Traditional data processing tools most often come short against modern geospatial data needs, both volume and complexity. Apache Sedona, previously GeoSpark, is the bridging pioneer in this aspect of data processing frameworks and spatial data. It builds on Apache Spark and Apache Flink, allowing for scalable manipulation and analysis of very large volumes of spatial data with ease and efficiency. From real-time weather status monitoring to studying changes in urban sprawl or wildlife movements, Apache Sedona equips one with the tool to unlock insightful spatial data analysis at scale.
In this tutorial, we use Ilum, a free modular Data Lakehouse that smoothens the management of Apache Spark and gives an auto-configured environment with Kubernetes. That makes our integration even stronger for the effective handling of data in big volumes by using the modular, scalable architecture of Ilum to support workflows.
Apache Sedona and Ilum both aim to provide features that will enhance the geospatial data workflows for precise analysis and utilization of complex spatial data. This tutorial will mainly focus on the most important functionalities of Apache Sedona and demonstrate how it can be integrated with Ilum, therefore showing the complete picture of modern solutions in geospatial data engineering.
The Advanced Features of Apache Sedona in an Ilum Environment
You can use Apache Sedona to perform a wide range of spatial data processing that includes:
Ilum can further extend the capability of Apache Sedona to provide a scalable data lakehouse environment by being simpler in data management, able to boost processing performance, and facilitate integration with cluster computing abilities for large scale geospatial data analysis.
Now that we know the idea behind the Apache Sedona, it’s time to get to know it from the practical part.
Implementing Apache Sedona in Ilum: Configuration and Setup
In the previous section, we discussed the uses of Apache Sedona. After configuring the Ilum instance according to the guide, we can start the configuration process for working with spatial data. To enable Apache Sedona capabilities for working with spatial data in your environment, there are two approaches depending on your specific requirements. Below are the steps for both local configuration (per notebook or interactive job) and global cluster configuration.
1. Local Configuration
You can enable Sedona capabilities locally for a specific notebook or an individual interactive job without modifying the entire cluster configuration.
For a Jupyter Notebook
Add the following configuration snippet directly to your notebook when initializing your Spark session through the %%spark_magic Session Properties. This ensures Sedona capabilities are available only for the current session:
{"conf": {"spark.kubernetes.container.image": "ilum/spark:3.5.3-sedona-1.7.0", "spark.sql.extensions": "org.apache.sedona.sql.SedonaSqlExtensions", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.kryo.registrator": "org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator"}, "driverMemory": "1000M", "executorCores": 2}
For an Interactive Job
Key: spark.sql.extensions
Value: org.apache.sedona.sql.SedonaSqlExtensions
Key: spark.serializer
Value: org.apache.spark.serializer.KryoSerializer
Key: spark.kryo.registrator
Value: org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
Ensure the container image (spark.kubernetes.container.image) is set to ilum/spark:3.5.3-sedona-1.7.0. This can be specified in the same configuration section.
2. Global Cluster Configuration
If you want Sedona capabilities to be enabled for the entire cluster (so that any notebook or job on the cluster can use Sedona without additional configuration), follow these steps:
Key: spark.sql.extensions
Value: org.apache.sedona.sql.SedonaSqlExtensions
Key: spark.serializer
Value: org.apache.spark.serializer.KryoSerializer
Key: spark.kryo.registrator
Value: org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
Ensure the container image (spark.kubernetes.container.image) is set to ilum/spark:3.5.3-sedona-1.7.0. This can be specified in the same configuration section.
Press the Submit button to save the configuration and ensure the changes are applied to the cluster.
By setting Sedona at the cluster level, all sessions and jobs on this cluster will automatically have access to Sedona capabilities without requiring additional per-job or notebook configurations.
Congratulations! You’ve successfully configured Ilum with Sedona.
How to Upload and Manage Spatial Data in Apache Sedona
With our Ilum instance configured, it's time to inject some spatial data into the flow! The provided example uses two datasets:
The data is available for download here. Once you've downloaded these files to your local machine, let's upload them to ilum-data bucket. Head over to the Minio/Data section, log in and create a subfolder named "examples/sedona/geodata". This will be your designated landing zone for the data. Simply upload both files into the newly created folder.
Now, our data pipeline is fully equipped and ready to harness the power of Sedona for geospatial analysis!
Exploring Practical Geospatial Analysis with Apache Sedona Notebooks
Download the Sedona_intro.ipynb notebook from our github. This notebook serves as your gateway to use Sedona. Then go to the Notebook section in Ilum and upload it to our Jupyter instance. It holds practical examples designed to showcase Sedona's capabilities in tackling real-world spatial data challenges.
While we really encourage hands-on exploration for deeper understanding, we know that sometimes you just want to get things done. For those that like the hand holding, here's a step by step walk-through of the process.
Before we start processing, we need to import the necessary libraries.
from sedona.spark import *
import geopandas as gpd
import s3fs
Another necessary step before processing spatial data is SedonaContext initialization using the SparkSession (available as spark). Think of it as the secret handshake that lets Spark understand the language of spatial data.
SedonaContext.create(spark)
In this example, we use a different way of accessing the data because of its original structure. The following credentials are set by default when installing Ilum.
s3 = s3fs.S3FileSystem(
key='minioadmin',
secret='minioadmin',
endpoint_url='https://ilum-minio:9000/' )
Creating a Dedicated Database for the Use Case
A good practice in data engineering is to separate data within dedicated databases for specific use cases. This approach helps maintain data organization and makes it easier to manage, query, and scale.
For this use case, we will create a database named example_sedona. This will ensure that all data related to this use case is stored in a structured and isolated manner.
spark.sql("CREATE DATABASE example_sedona")
Below is the download of the files from the bucket, packing them into the geopandas dataframe format and creating a Temporary View.
Countries
countries_gpd = gpd.read_file(s3.open('s3://ilum-data/examples/sedona/geodata/ne_50m_countries_lakes.zip'),engine='pyogrio')
countries_df = spark.createDataFrame(countries_gpd)
countries_df.createOrReplaceTempView("country")
countries_df.show()
Airports
airports_gpd = gpd.read_file(s3.open('s3://ilum-data/examples/sedona/geodata/ne_50m_airports.zip'), engine='pyogrio')
airports_df = spark.createDataFrame(airports_gpd)
airports_df.createOrReplaceTempView("airport")
airports_df.show()
Spatial Data APIs in Apache Sedona: SQL and RDD
1. SQL API
Once you’ve imported the data, you can create a spatial join using spatial sql. In this case, we want to match airports with countries based on their location.
country_airport_mapping_sql = spark.sql("SELECT c.geometry as country_geom, c.NAME_EN, a.geometry as airport_geom, a.name FROM country c, airport a WHERE ST_Contains(c.geometry, a.geometry)")
country_airport_mapping_sql.createOrReplaceTempView("country_airport_mapping_sql")
country_airport_mapping_sql.show()
2. RDD API
Below, we convert the data into special Spark DataFrames called SpatialRDDs using the Sedona Adapter, enabling spatial operations based on their “geometry” columns. Leveraging these spatial optimizations, we perform a spatial join between the airports and countries’ SpatialRDDs to find matching locations.
# Convert DataFrames to SpatialRDDs
airports_rdd = Adapter.toSpatialRdd(airports_df, "geometry")
countries_df = countries_df.drop("NAME") # Drop duplicate column
countries_rdd = Adapter.toSpatialRdd(countries_df, "geometry")
# Analyze the RDDs
airports_rdd.analyze()
countries_rdd.analyze()
# Perform spatial partitioning
# 4 is the num partitions used in spatial partitioning. This is an optional parameter
airports_rdd.spatialPartitioning(GridType.KDBTREE, 4)
countries_rdd.spatialPartitioning(airports_rdd.getPartitioner())
# Build spatial index
buildOnSpatialPartitionedRDD = True
usingIndex = True
considerBoundaryIntersection = True
airports_rdd.buildIndex(IndexType.QUADTREE, buildOnSpatialPartitionedRDD)
# Execute the spatial join
result_pair_rdd = JoinQueryRaw.SpatialJoinQueryFlat(airports_rdd, countries_rdd, usingIndex, considerBoundaryIntersection)
# Convert RDD back to DataFrame
result2 = Adapter.toDf(result_pair_rdd, countries_rdd.fieldNames, airports_rdd.fieldNames, spark)
# Create a temporary view for the join result
result2.createOrReplaceTempView("join_result_with_all_cols")
# Select the required columns
country_airport_mapping_rdd = spark.sql("SELECT leftgeometry as country_geom, NAME_EN, rightgeometry as airport_geom, name FROM join_result_with_all_cols")
Print spatial join results:
print("The result of SQL API")
country_airport_mapping_sql.show()
print("The result of RDD API")
country_airport_mapping_rdd.show()
Next, we'll group airports based on the country they belong to.
country_airport_counts = spark.sql("SELECT c.NAME_EN, c.country_geom, count(*) as airports_count FROM result c GROUP BY c.NAME_EN, c.country_geom")
country_airport_counts.createOrReplaceTempView("country_airport_counts")
country_airport_counts.show()
The last step of processing will be to upload the processed data to a selected database for visualization or further processing.
country_airport_counts.write.format("geoparquet").mode("overwrite").saveAsTable("example_sedona.country_airport_counts")
Optimizing Spatial SQL Workflows in Docker with Ilum Interactive Jobs
Data processing forms an integral part of many of the modern business processes. The traditional methods of data processing are very cumbersome to use, and most often, they do not interlink with scalable processing.
Ilum Interactive Jobs are a new method of data processing with a few key advantages.
It's Faster: Interactive Jobs can do parallel processing and thus reduce the processing time significantly. For instance, you have lots of files that need processing; you can use Ilum's Interactive Jobs to run jobs in parallel. That way, you will be able to run through all your files in a shorter time compared to having run the files sequentially in a notebook.
It's Easier: Interactive Jobs offers API access, so it's easy to work with other systems from the platform, save you easily transiting data from other sources to Ilum Interactive Jobs, and return the results of calculations taken by other applications.
It's Flexible: Interactive Jobs is configurable as per the business requirement.
领英推荐
The following code snippet is an example of processing spatial data using Sedona and particularly addressed to run as an interactive job in Ilum. It automates the workflow for raw data, which is taken from various sources, transformations that are applied, and a database location where the processed data will be placed.
The code is dependent on the IlumJob class to manage and set up interactive jobs easily in the Ilum platform.
sedona_interactive.py
from sedona.spark import *
import geopandas as gpd
import s3fs
from ilum.api import IlumJob
class MatchAirportsWithCountries(IlumJob):
def run(self, spark, config):
# Retrieve data path from the configuration
countries_path = str(config.get('countries_path'))
airports_path = str(config.get('airports_path'))
result_db = str(config.get('result_db'))
# Retrieve country name from the config
country = str(config.get('country'))
# Retrieve save parameter from the config
save = config.get('save').lower() == 'true'
# Initialize SedonaContext
SedonaContext.create(spark)
# Configure s3fs to access the data
s3 = s3fs.S3FileSystem(
key='minioadmin',
secret='minioadmin',
endpoint_url='https://ilum-minio:9000/'
)
# Creating a dedicated database for the use case
spark.sql(f"CREATE DATABASE IF NOT EXISTS {result_db}")
# Load and process country data
countries_gpd = gpd.read_file(s3.open(countries_path), engine='pyogrio')
countries_df = spark.createDataFrame(countries_gpd)
countries_df.createOrReplaceTempView("country")
# Load and process airport data
airports_gpd = gpd.read_file(s3.open(airports_path), engine='pyogrio')
airports_df = spark.createDataFrame(airports_gpd)
airports_df.createOrReplaceTempView("airport")
# Match airports with countries and compute necessary fields
single_country_airport_mapping = spark.sql(f"""
SELECT
c.NAME_EN as country_name,
a.geometry as airport_geom,
a.name as airport_name,
COUNT(*) OVER (PARTITION BY c.NAME_EN) as airports_count
FROM
country c
JOIN
airport a
ON
ST_Contains(c.geometry, a.geometry)
WHERE
c.NAME_EN = '{country}'
""")
# Convert Spark DataFrame to GeoPandas DataFrame
gdf = gpd.GeoDataFrame(single_country_airport_mapping.toPandas(), geometry='airport_geom')
# Extract longitude and latitude coordinates from the geometry
gdf['longitude'] = gdf['airport_geom'].x
gdf['latitude'] = gdf['airport_geom'].y
# Drop the 'airport_geom' column
gdf = gdf.drop('airport_geom', axis=1)
# Create a new Spark DataFrame from the GeoPandas DataFrame
country_airport_summary_df = spark.createDataFrame(gdf)
# Save the detailed summary to the database
if save:
visual_data_table = f"{result_db}.{country}_airport_summary"
country_airport_summary_df.write.format("parquet").mode("overwrite").saveAsTable(visual_data_table)
# Extract the num_airports and return it
num_airports = gdf['airports_count'].iloc[0] if not gdf.empty else 0
return f"Data processing finished successfully.\nIn {country} there are {num_airports} airports."
This script shows that data processing is easily adapted to run within interactive jobs. We will explain the main differences and new functionalities of the approach that we are doing for a standard Jupyter Notebook approach.
?Class Structure:
Configuration Flexibility:
All the other processes such as data type definition, loading, transformation, sorting, and writing remain essentially the same as in the Jupyter Notebook example, but optimally done so that they can be executed on Ilum.
In essence, this code transformation encapsulates the power to utilize functionalities specific to Ilum and configuration management as a path to achieve a more potent and flexible data processing experience.
Executing Your Sedona Interactive Job
Are you ready to experience the interactive power of Sedona?
Prerequisite: make sure you've completed all steps in Step-by-Step Guide to Configuring Ilum with Apache Sedona and How to Upload and Manage Spatial Data in Apache Sedona paragraphs in this blog post.
Step 1: Create a Group
Add the following key-value pairs under the Configuration/Parameters section:
Key: spark.sql.extensions
Value: org.apache.sedona.sql.SedonaSqlExtensions
Key: spark.serializer
Value: org.apache.spark.serializer.KryoSerializer
Key: spark.kryo.registrator
Value: org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
Key: spark.kubernetes.container.image
Value: ilum/spark:3.5.3-sedona-1.7.0
Step 2: Upload Your Script
Step 3: Submit the Configuration
Step 4: Time to Execute!
Option 1: Execute Directly Through the UI
1. Go to the "Workloads" section in Ilum.
2. Locate your newly created group and click "Execute".
3. In the "Class" field, enter the fully qualified class name for your job. Example:
sedona_interactive.MatchAirportsWithCountries
4. In the "Parameters" section, provide the necessary job configuration in JSON format.
Example:?
{ "countries_path": "s3://ilum-data/examples/sedona/geodata/ne_50m_admin_0_countries_lakes.zip", "airports_path": "s3://ilum-data/examples/sedona/geodata/ne_50m_airports.zip", "result_db": "results_database", "country": "Canada", "save": "true" }
5. Click "Submit" to execute the job.
Option 2: Execute via API
If you prefer automation or wish to integrate execution into a CI/CD pipeline, you can execute the job using the Ilum API. Follow the steps below:
1. Setup Port Forwarding
To access the API locally, you must forward the Ilum Core service port. Use the following command to set up port forwarding:
kubectl port-forward -n default svc/ilum-core 9888:9888 &
2. Locate Job ID
To execute the job, you'll need the Service_ID, which can be obtained from the Ilum UI.
3. API Request
Once you have the Job ID, you can execute the job via a curl command. Here's an example:
curl -X POST "https://localhost:9888/api/v1/group/<groupID>/job/execute" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-d '{ "type": "interactive_job_execute", "jobClass": "sedona_interactive.MatchAirportsWithCountries", "jobConfig": { "countries_path": "s3://ilum-data/examples/sedona/geodata/ne_50m_admin_0_countries_lakes.zip", "airports_path": "s3://ilum-data/examples/sedona/geodata/ne_50m_airports.zip", "result_db": "example_sedona", "country": "Canada", "save": "true" } }'
Replace <groupID> with the appropriate Group ID for your workload.
4. View Response
Once the job finishes execution, the API will return the result in the response body. This includes any output generated by the job, such as the number of airports matched with the given country.
Example Response:
{
"jobInstanceId": "20241211-1913-fqe66-z5waqr1l",
"jobId": "20241211-1913-fqe66-f3c7b",
"groupId": "20241211-1913-fqe66",
"startTime": 1734696900655,
"endTime": 1734696920125,
"clazz": "sedona_interactive.MatchAirportsWithCountries",
"config": {
"countries_path": "s3://ilum-data/examples/sedona/geodata/ne_50m_countries_lakes.zip",
"airports_path": "s3://ilum-data/examples/sedona/geodata/ne_50m_airports.zip",
"result_db": "example_sedona",
"country": "Canada",
"save": "true"
},
"result": "Data processing finished successfully.\nIn Canada there are 15 airports.",
"error": null
}
5. Analyze the Results (Success!)
Your processed data will be saved at the specified output table (e.g., s3a://ilum-data/sedona_example.db/canada_airport_summary) once the job is finished.
Congratulations! You’ve successfully completed your first interactive Sedona job, gaining access to the capabilities of cloud-based geospatial analysis.
Expanding Data Horizons with Ilum Interactive Jobs
Interactive jobs with Ilum are not just for running code, they unleash endless ways in which data can be exchanged. The final product from an interactive job could be just about anything, such as JSON data, SQL queries and/or query results, or a single value. This flexibility empowers you to integrate your applications with ilum, transforming it into a versatile data processing lakehouse.
Picture this: Your application needs to calculate the average age of users in a specific region. With interactive jobs, you can send a request to ilum, specifying the region and the desired calculation. Ilum will then execute the necessary code, process the data, and return the average age as a JSON object. This JSON response can be directly integrated into your application, providing real-time insights without the need for complex data pipelines or manual integration as it's compatible with the OpenAPI specification.
The possibilities are endless:
Imagine Ilum interactive job as a data processing microservice that can handle any request, big or small.
Here’s how it works:
With interactive job results, here's what you'll be able to do:
Expanding Data Analysis Horizons with PygWalker in Notebooks
PygWalker helps turn geospatial data into interactive, illustrative maps and charts. You can imagine pinpointing global weather patterns, visualizing airport networks, or finding hidden correlations between geographical features easily.
Major Features:
Let's make the geospatial analysis in our Jupyter notebook shine with complementary visualizations.
Now, represent this data in a handy combination.
# Join the tables with proper column aliases
visual_data = spark.sql("""
SELECT a.NAME_EN AS country_name,
a.name AS airport_name,
a.airport_geom,
c.airports_count
FROM country_airport_mapping_sql AS a
JOIN country_airport_counts AS c ON a.NAME_EN = c.NAME_EN
""")
# Convert Spark DataFrame to GeoPandas DataFrame
gdf = gpd.GeoDataFrame(visual_data.toPandas(), geometry='airport_geom')
# Extract longitude and latitude coordinates from the geometry
gdf['longitude'] = gdf['airport_geom'].x
gdf['latitude'] = gdf['airport_geom'].y
# Drop the 'airport_geom' column
gdf = gdf.drop('airport_geom', axis=1)
# Create a new Spark DataFrame from the GeoPandas DataFrame
country_airport_summary_df = spark.createDataFrame(gdf)
country_airport_summary_df.show()
Warning: Visualization is within a different kernel!
The current kernel doesn't support advanced visualization techniques. Therefore, all visualization will occur in the ipykernel environment.
Data that is pre-processed will then be transferred across kernels as DataFrames. To begin using pygwalker in kernel ipykernel, install the following Python package:
pip install pandas pygwalker
Once the environment is prepared, you can simply call a visualization tool, passing your data frame as an argument.
import pygwalker as pyg
walker = pyg.walk(country_airport_summary_df, kernel_computation=True)
Through this visualization, Sedona's algorithms allow us to easily understand the spatial relationships and patterns in the data. By visually representing the data, this representation brings the geospatial landscape to life, improving our understanding of airport distribution and the boundaries of countries and lakes.
As our geospatial analysis with Apache Sedona progresses, we can look forward to even more immersive visualizations that unveil the secret patterns and insights hidden in our data.
Conclusion
The future of geospatial data analysis looks promising, thanks to the continuous improvements in both Apache Sedona and platforms like Ilum. As these technologies evolve, they will undoubtedly unlock new possibilities for data-driven decision-making in various industries, from urban planning to environmental monitoring.
Stay tuned for more data-driven discoveries!
Worldwide IT Specialist at Elinext | Boosting Efficiency with Custom-Designed Software Solutions | Engaging with 300+ Global Clients
7 个月This is an exciting development in geospatial data processing! Looking forward to reading more about your experiences and techniques! ??
Corporate HR Manager | LinkedIn Certified Recruiter, Leadership Hiring - Global | Talent Management | SHRM APAC, UK, Africa & USA | Investment Bank, Morgan Stanley | Talent Assessment | Employee Retention Policy | MHRD
7 个月Great Apache Sedona
CEO @ Wherobots | We are hiring!
8 个月Fantastic Article, Adam Lichtenstein ??