Deploying Data Pipelines at Saturn

Deploying Data Pipelines at Saturn

As promised 5 months ago, I was going to talk about how we use Saturn at Saturn. This article is going to discuss both how we do that, and some lessons learned. This article also assumes that you're a fan of Prefect and Dask.

Use Dask! But only when you need it

Scaling up should be progressive. The more you scale, the more inherent complexity you deal with. I believe most jobs should be written with Pandas first, then Dask on a local cluster, and finally Dask on a multi-node cluster (if you really need it). The way we do this is by passomg om parameters that determine which type of Dask cluster we're using.

@resource_manager
class LocalDaskResource:
    def setup(self):
        self.cluster = LocalCluster(n_workers=1, threads_per_worker=15)
        Client(self.cluster)

    def cleanup(self, resource):
        self.cluster.close()


@resource_manager
class SaturnDaskResource:
    def __init__(self, teardown=True):
        self.teardown = teardown

    def setup(self):
        self.cluster = SaturnCluster(n_workers=1, threads_per_worker=15)
        Client(self.cluster)

    def cleanup(self, resource):
        if self.teardown:
            self.cluster.close()


def make_flow(mode, storage=None):
    if mode == "SaturnCluster":
        def resource():
            return SaturnDaskResource(teardown=False)
    else:
        def resource():
            return LocalDaskResource()

    with Flow("...", storage=storage) as flow:
        with resource():
            ...

    return flow


We wrote this flow was to backfill (and keep current) a job that loaded usage data from S3 and passes it into Snowflake. When developing the flow we used a multi-node Saturn Dask Cluster, but in our deployment, since we're only processing the most recent 24 hours worth of data. it's much easier to use a LocalCluster

Don't force yourself to use Jupyter

The data science world has standardized on Jupyter as the standard IDE of choice for data scientists. This isn't so bad now that Jupyter Lab has a decent text editor you can use to work on Python scripts and libraries, in addition to working in notebooks.

I love emacs. When writing these flows I worked locally on my laptop, but I connected to Saturn Clusters to offload the expensive computations. Since then, I've switched to SSHing into my Jupyter instance so that I can run emacs (this is also how our VS code and PyCharm integrations work). I found that it's helpful to make my development environment match exactly my production environment. And having a development machine that's more powerful than my laptop has been really nice.

There are many data science platforms out there that focus on Notebooks. Notebooks have their place, but they will never completely replace writing code.

Prefect has multiple deployment patterns. You don't need to limit yourself.

We've been building out our Prefect Cloud integration for some time. Our integration provides a Storage object for your flows, it will register them with Prefect Cloud, and also show you all the Saturn logs for your flows. Your flows will be deployed in a Kubernetes pod with the Prefect Cloud Kubernetes Agent and can also use a Saturn Dask cluster. We do a lot of work to make sure that your Prefect Flow runs in a pod that matches up precisely with the Jupyter environment you used to create it, without you needing to configure any of your own infrastructure.

Sounds great right?

I'm planning on passing on our integration for some of our newer flows

No alt text provided for this image


For a variety of reasons, we have a few flows that should be run every minute. And for that a Kubernetes Agent doesn't make sense. Spinning pods up and down isn't useful when you need to run every minute.

Instead I'm leveraging Saturn deployments. In Saturn we have a capability of executing long running, always on tasks (these are often used to serve ML models, or data science dashboards. I'm running a Saturn deployment, that instead of running a webserver, is running a Prefect Local Agent. And I'm labeling the agent, and my flows to run on that.

For my hourly and daily flows, the k8s agent still makes sense. But for something that runs every minute, this makes more sense.

Working with Research and Production.

In the past I've struggled with the logistics of working with Prefect cloud flows. Do I just write them in a notebook? If I write them in the notebook, is that what I use to make production deployments? If I move my flows to Python code, how to I explore my flows interactively?

I've settled on using click to solve this problem (really any command line interface will do).

@click.group()
def cli():
    pass


@cli.command()
def register():
    flow = make_flow(...)
    flow.storage.build()
    flow.register(...)


@cli.command()
@click.option("--mode", default=False)
def run(mode=None):
    flow = make_flow(mode)
    flow.run()


  1. I have a cli that I can use to register the flow. I call this from my development machine, but I can easily trigger this from any CI system.
  2. There is a separate cli I can use to run the flow. This allows me to pass in parameters, so I can run it with a LocalCluster (simulate production), or a Saturn Cluster (if I want to run it on the full dataset)
  3. Since flow creation has been encapsulated in a function, if I want to explore the flow interactively, I can import that function into a notebook, or just import and run the individual tasks.
Ilya Kirnos

Founding Partner @ SignalFire

3 年

Good stuff, although it's time you moved to a real IDE from emacs ??

回复

要查看或添加评论,请登录

Hugo Shi ??的更多文章

社区洞察

其他会员也浏览了