Should I use Dask?
Even dogs are curious about Dask

Should I use Dask?

What is Dask?

From dask.org, “Dask is a flexible library for parallel computing in Python”. For most users, Dask is 3 things:

  1. A pandas-like DataFrame, which can take advantage of multiple processors or machines.
  2. A numpy-like Array, which can take advantage of multiple processes or machines.
  3. A way to write (delayed) functions, that can run on multiple processes or machines.


Dask DataFrame

A Dask DataFrame is represented by many smaller Pandas DataFrames that live on your cluster. When you interact with the Dask Dataframe, the functions and methods you call are converted into many smaller functions which are executed in parallel on the smaller Pandas DataFrames

Dask Array

A Dask Array is represented by many smaller NumPy Arrays that live on your cluster. When you interact with the Dask Array, the functions and methods you call are converted into many smaller functions that are executed in parallel on the smaller NumPy Arrays

Dask Delayed

A dask delayed function is a function that is designed to run on a Dask cluster. When you call a delayed function, instead of evaluating the function, you get a “delayed” result which represents the future result of that computation 


Should I use Dask?

That is the question...

  • Are you running out of memory on your workstation?
  • Are you waiting more than 20 minutes for your work to run?
  • Do you need more GPUs? 


Solving memory issues without Dask

If you’re running out of memory on your workstation, Dask can help, but there are a few things I would recommend doing first. If you’re using primarily NumPy arrays, you can skip this section. Other than moving to sparse arrays (which have their own problems), there isn’t much you can do to optimize memory in NumPy. If you’re using Pandas, here are some suggestions.

StringDType

Python strings have roughly 40 bytes of overhead. That doesn’t sound like a lot, but if you have a billion strings, it can add up quickly. The new StringDType can help here.

df['column'] = df['column'].astype(pd.StringDType())

Categorical DTypes

Many string and numerical fields are actually categorical. Take a column named “Household Income”. Instead of a numerical value, you usually get data in bands, like “0-$40,000” or “more than $100,000”.

categorical_dtype = pd.Categorical(df['column']).dtype
df['column'] = df['column'].astype(categorical_dtype)

As a general guideline, I usually look for columns where the ratio of the number of unique values, to the number of rows is less than 1%.

Diagnosing Memory Usage

df.memory_usage(deep=True) is a good way to understand how much memory each column is using. This can help you understand what benefit you are getting from converting your data to appropriate DTypes.

Get a bigger computer?

If you’re working on the cloud, the easiest solution is usually to get the biggest machine you can. At Saturn, we currently top out at 512 GB (but let us know if you want more RAM!)

Dask?

If you’re done optimizing memory, and your dataset takes up approximately 1/3 of your workstations memory, I would recommend skipping Dask. If your dataset takes up more than 1/3 of your workstations memory, there is a good chance that subsequent operations you do on that data frame will exceed the memory of your workstation, and looking at dask.dataframe is a good solution. If you’re doing this for memory-bound reasons, you’re going to need a multi-node Dask cluster in order to get more memory.

Solving compute time issues without Dask

If you’ve got some code that’s slow, Dask can help! But there are a few things to check first.

Swap Pandas for NumPy

NumPy can be orders of magnitude faster for many operations compared to Pandas. This is the biggest problem in a piece of code that gets called many times, for example, the objective function to any call to scipy.optimize. Replacing Pandas DataFrame calls with NumPy calls can easily speed up your code by 50x

Profile your code

There are many Python profiling tools, but the line_profiler is the easiest to understand. Profiling is a long topic, the short version here is, once you have line_profiler installed, you can call

%load_ext line_profiler

To load it into Jupyter, and then execute

%lprun -f func1 -f func2 expression

The lprun command takes multiple -f parameters, which identify the functions you want to profile. The expression is then evaluated, and the profile results displayed.

For example, if I have functions test, and test2, I might execute

%lprun -f test -f test2 test()

You will then get an output like:

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   149                                           @profile
   150                                           def Proc2(IntParIO):
   151     50000        82003      1.6     13.5      IntLoc = IntParIO + 10
   152     50000        63162      1.3     10.4      while 1:
   153     50000        69065      1.4     11.4          if Char1Glob == 'A':
   154     50000        66354      1.3     10.9              IntLoc = IntLoc - 1
   155     50000        67263      1.3     11.1              IntParIO = IntLoc - IntGlob
   156     50000        65494      1.3     10.8              EnumLoc = Ident1
   157     50000        68001      1.4     11.2          if EnumLoc == Ident1:
   158     50000        63739      1.3     10.5              break
   159     50000        61575      1.2     10.1      return IntParIO

If none of those help, then Dask is a good choice.

But I need more GPUs

Short of shoving more GPU cards into your machine, there isn’t a good way to solve this problem without going multi-node. This is a good problem to solve with Dask.

 

How do I use Dask

This obviously deserves its own article, or series of articles, but let’s cover it very briefly.

DataFrame and Array

Dask DataFrame has a bunch of methods like read_csv, or read_parquet, That can read a collection of data files, and return a distributed DataFrame. In the most general case, if you have a function that can return a small subset of your data, you can do something like this

from dask import delayed
import dask.dataframe as dd

@delayed
def get_partition(partition_number):
   ...

partitions = [get_partition(num) for num in range(number_of_partitions)]
df = dd.from_delayed(partitions)

A similar approach works for Dask Array

Delayed

from dask import delayed, compute
@delayed
def inc1(x):
    return x + 1
    
@delayed
def sum(x):
    return sum(x)

result = sum([inc1(i) for i in input_vars])

Here, result represents a delayed computation. Nothing has been executed yet, until:

result = compute(result)

Conclusion

Distributed computing is hard. Saturn makes it easy. Get up to 100x faster workflows across ETL, exploratory data analysis, machine learning, and more with our secret sauce!

To try it out for yourself, get started with Saturn for free.


sudhir modali

Product Manager | UX Design, Cloud Networking

4 年

do you have a pointer to any performance studies for the supported backends in pandas?

回复
sudhir modali

Product Manager | UX Design, Cloud Networking

4 年

Thanks for sharing

回复

要查看或添加评论,请登录

(Michael) Sebastian Metti的更多文章

社区洞察

其他会员也浏览了