Efficient Point Data Extraction from Zarr Datasets with FastAPI, Dask, and Xarray
Chonghua Yin
Head of Data Science | Climate Risk & Extreme Event Modeling | AI & Geospatial Analytics
I have recently started using Zarr to handle spatiotemporal climate data progressively and have found that it outperforms NetCDF in many aspects. In this tutorial, we will build a demonstration of the FastAPI application to extract data at the specific points defined by [latitudes and longitudes] from a Zarr dataset using Dask and Xarray. The key aim is to speed up point data extraction from data in Zarr formats using Dask's local cluster.
Zarr arrays have been designed as the source or sink for data in parallel computations. Here, we use Zarr data as a data source. It means that multiple concurrent read operations may occur. Both multi-threaded and multi-process parallelism are possible. The bottleneck for most storage and retrieval operations is compression/decompression, and the Python global interpreter lock (GIL) is released wherever possible during these operations, so Zarr will generally not block other Python threads from running.
Firstly, let's create a FastAPI server script (server.py) as follows:
import json
import time
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, HTTPException, status
from backend import Item, get_data
app = FastAPI()
origins = [
"https://localhost",
"https://localhost:3000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def read_root() -> dict:
return {"Hello": "World"}
@app.post("/pips/")
async def extract(args: Item):
if args.latitude and args.longitude:
if len(args.latitude) != len(args.longitude):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The latitude and longitude should have the same length.",
)
begin = time.time()
result = await get_data(args)
parsed = json.loads(result.to_json(orient="records"))
return {
"time(secs)": (time.time() - begin),
"result": parsed,
}
else:
return {'Alert': 'Please provide latitude and longitude information'}
In the server script, we create an endpoint of "pips" to receive a user's post (i.e., to collect point information defined by latitude and longitude), which invokes the backend function of "get_data" to extract data for the points posted by the user from a Zarr dataset using Xarry and Dask.
The backend functions are defined in backend.py as follows:
from typing import List
from pydantic import BaseModel
import xarray as xr
from dask.distributed import Client, Future
DASK_CLUSTER = "localhost:8786"
class Item(BaseModel):
latitude: List[float] = []
longitude: List[float] = []
async def get_data(locs: Item):
async with Client(DASK_CLUSTER, asynchronous=True) as client:
zarr_store = r"data/air_rechunked_consolid.zarr"
da_zr = xr.open_zarr(zarr_store, consolidated=True)['air']
sel_lats = locs.latitude
sel_lons = locs.longitude
da_sel = da_zr.sel(lat=xr.DataArray(sel_lats, dims='points'),
lon=xr.DataArray(sel_lons, dims='points'),
method='nearest'
).to_dask_dataframe()
future: Future = client.compute(da_sel)
return await future
In the only function of "get_data", the Zarr dataset is stored as "data/air_rechunked_consolid.zarr".
Then we can follow the below steps to run the whole service:
领英推荐
Summary and discussions
FastAPI is a modern Python web framework for creating high-performance APIs, while Zarr, Dask, and Xarray are Python libraries used for data processing and storage. The primary objective of this demo is to expedite the extraction of point data from Zarr-formatted data by leveraging Dask's local cluster. The demo covers setting up the application, running the Dask cluster, creating a FastAPI web server, and using HTTP requests to extract data from the Zarr dataset. If you are interested in the tutorial, all scripts and demo data can be found in my GitHub archive.