Efficient Point Data Extraction from Zarr Datasets with FastAPI, Dask, and Xarray
Photo by NASA on Unsplash

Efficient Point Data Extraction from Zarr Datasets with FastAPI, Dask, and Xarray

I have recently started using Zarr to handle spatiotemporal climate data progressively and have found that it outperforms NetCDF in many aspects. In this tutorial, we will build a demonstration of the FastAPI application to extract data at the specific points defined by [latitudes and longitudes] from a Zarr dataset using Dask and Xarray. The key aim is to speed up point data extraction from data in Zarr formats using Dask's local cluster.

Zarr arrays have been designed as the source or sink for data in parallel computations. Here, we use Zarr data as a data source. It means that multiple concurrent read operations may occur. Both multi-threaded and multi-process parallelism are possible. The bottleneck for most storage and retrieval operations is compression/decompression, and the Python global interpreter lock (GIL) is released wherever possible during these operations, so Zarr will generally not block other Python threads from running.

Firstly, let's create a FastAPI server script (server.py) as follows:

import json
import time
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, HTTPException, status
from backend import Item, get_data

app = FastAPI()

origins = [
    "https://localhost",
    "https://localhost:3000",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/")
async def read_root() -> dict:
    return {"Hello": "World"}


@app.post("/pips/")
async def extract(args: Item):
    if args.latitude and args.longitude:
        if len(args.latitude) != len(args.longitude):
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="The latitude and longitude should have the same length.",
            )

        begin = time.time()
        result = await get_data(args)
        parsed = json.loads(result.to_json(orient="records"))

        return {
            "time(secs)": (time.time() - begin),
            "result": parsed,
        }
    else:
        return {'Alert': 'Please provide latitude and longitude information'}        

In the server script, we create an endpoint of "pips" to receive a user's post (i.e., to collect point information defined by latitude and longitude), which invokes the backend function of "get_data" to extract data for the points posted by the user from a Zarr dataset using Xarry and Dask.

The backend functions are defined in backend.py as follows:

from typing import List
from pydantic import BaseModel
import xarray as xr
from dask.distributed import Client, Future

DASK_CLUSTER = "localhost:8786"


class Item(BaseModel):
    latitude: List[float] = []
    longitude: List[float] = []


async def get_data(locs: Item):
    async with Client(DASK_CLUSTER, asynchronous=True) as client:
        zarr_store = r"data/air_rechunked_consolid.zarr"
        da_zr = xr.open_zarr(zarr_store, consolidated=True)['air']
        sel_lats = locs.latitude
        sel_lons = locs.longitude

        da_sel = da_zr.sel(lat=xr.DataArray(sel_lats, dims='points'),
                           lon=xr.DataArray(sel_lons, dims='points'),
                           method='nearest'
                           ).to_dask_dataframe()

        future: Future = client.compute(da_sel)
        return await future        

In the only function of "get_data", the Zarr dataset is stored as "data/air_rechunked_consolid.zarr".

Then we can follow the below steps to run the whole service:

  1. Run the command: dask scheduler to start Dask Scheduler; you can visit the dashboard here: https://localhost:8787/status
  2. Run the command: dask worker localhost:8786 to start Dask worker.
  3. Run the command: uvicorn server:app --reload --port 8090 to start the ASGI web server.
  4. Launch Swagger at https://localhost:8090/docs/.
  5. Invoke HTTP via curl using command curl -X 'POST' 'https://localhost:8090/pips/' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "latitude": [15, 3], "longitude": [200, 5] }'`
  6. You should see the results as below:

Extracted data at the points posted by the user

Summary and discussions

FastAPI is a modern Python web framework for creating high-performance APIs, while Zarr, Dask, and Xarray are Python libraries used for data processing and storage. The primary objective of this demo is to expedite the extraction of point data from Zarr-formatted data by leveraging Dask's local cluster. The demo covers setting up the application, running the Dask cluster, creating a FastAPI web server, and using HTTP requests to extract data from the Zarr dataset. If you are interested in the tutorial, all scripts and demo data can be found in my GitHub archive.

References

https://www.dask.org/get-started

https://docs.xarray.dev/en/stable/

https://fastapi.tiangolo.com/

https://github.com/chirdeeptomar/fastapi-with-dask

https://zarr.readthedocs.io/en/stable/

要查看或添加评论,请登录

Chonghua Yin的更多文章

社区洞察

其他会员也浏览了