Should I use Dask?
What is Dask?
From dask.org, “Dask is a flexible library for parallel computing in Python”. For most users, Dask is 3 things:
- A pandas-like DataFrame, which can take advantage of multiple processors or machines.
- A numpy-like Array, which can take advantage of multiple processes or machines.
- A way to write (delayed) functions, that can run on multiple processes or machines.
Dask DataFrame
A Dask DataFrame is represented by many smaller Pandas DataFrames that live on your cluster. When you interact with the Dask Dataframe, the functions and methods you call are converted into many smaller functions which are executed in parallel on the smaller Pandas DataFrames
Dask Array
A Dask Array is represented by many smaller NumPy Arrays that live on your cluster. When you interact with the Dask Array, the functions and methods you call are converted into many smaller functions that are executed in parallel on the smaller NumPy Arrays
Dask Delayed
A dask delayed function is a function that is designed to run on a Dask cluster. When you call a delayed function, instead of evaluating the function, you get a “delayed” result which represents the future result of that computation
Should I use Dask?
That is the question...
- Are you running out of memory on your workstation?
- Are you waiting more than 20 minutes for your work to run?
- Do you need more GPUs?
Solving memory issues without Dask
If you’re running out of memory on your workstation, Dask can help, but there are a few things I would recommend doing first. If you’re using primarily NumPy arrays, you can skip this section. Other than moving to sparse arrays (which have their own problems), there isn’t much you can do to optimize memory in NumPy. If you’re using Pandas, here are some suggestions.
StringDType
Python strings have roughly 40 bytes of overhead. That doesn’t sound like a lot, but if you have a billion strings, it can add up quickly. The new StringDType can help here.
df['column'] = df['column'].astype(pd.StringDType())
Categorical DTypes
Many string and numerical fields are actually categorical. Take a column named “Household Income”. Instead of a numerical value, you usually get data in bands, like “0-$40,000” or “more than $100,000”.
categorical_dtype = pd.Categorical(df['column']).dtype df['column'] = df['column'].astype(categorical_dtype)
As a general guideline, I usually look for columns where the ratio of the number of unique values, to the number of rows is less than 1%.
Diagnosing Memory Usage
df.memory_usage(deep=True) is a good way to understand how much memory each column is using. This can help you understand what benefit you are getting from converting your data to appropriate DTypes.
Get a bigger computer?
If you’re working on the cloud, the easiest solution is usually to get the biggest machine you can. At Saturn, we currently top out at 512 GB (but let us know if you want more RAM!)
Dask?
If you’re done optimizing memory, and your dataset takes up approximately 1/3 of your workstations memory, I would recommend skipping Dask. If your dataset takes up more than 1/3 of your workstations memory, there is a good chance that subsequent operations you do on that data frame will exceed the memory of your workstation, and looking at dask.dataframe is a good solution. If you’re doing this for memory-bound reasons, you’re going to need a multi-node Dask cluster in order to get more memory.
Solving compute time issues without Dask
If you’ve got some code that’s slow, Dask can help! But there are a few things to check first.
Swap Pandas for NumPy
NumPy can be orders of magnitude faster for many operations compared to Pandas. This is the biggest problem in a piece of code that gets called many times, for example, the objective function to any call to scipy.optimize. Replacing Pandas DataFrame calls with NumPy calls can easily speed up your code by 50x
Profile your code
There are many Python profiling tools, but the line_profiler is the easiest to understand. Profiling is a long topic, the short version here is, once you have line_profiler installed, you can call
%load_ext line_profiler
To load it into Jupyter, and then execute
%lprun -f func1 -f func2 expression
The lprun command takes multiple -f parameters, which identify the functions you want to profile. The expression is then evaluated, and the profile results displayed.
For example, if I have functions test, and test2, I might execute
%lprun -f test -f test2 test()
You will then get an output like:
Line # Hits Time Per Hit % Time Line Contents ============================================================== 149 @profile 150 def Proc2(IntParIO): 151 50000 82003 1.6 13.5 IntLoc = IntParIO + 10 152 50000 63162 1.3 10.4 while 1: 153 50000 69065 1.4 11.4 if Char1Glob == 'A': 154 50000 66354 1.3 10.9 IntLoc = IntLoc - 1 155 50000 67263 1.3 11.1 IntParIO = IntLoc - IntGlob 156 50000 65494 1.3 10.8 EnumLoc = Ident1 157 50000 68001 1.4 11.2 if EnumLoc == Ident1: 158 50000 63739 1.3 10.5 break 159 50000 61575 1.2 10.1 return IntParIO
If none of those help, then Dask is a good choice.
But I need more GPUs
Short of shoving more GPU cards into your machine, there isn’t a good way to solve this problem without going multi-node. This is a good problem to solve with Dask.
How do I use Dask
This obviously deserves its own article, or series of articles, but let’s cover it very briefly.
DataFrame and Array
Dask DataFrame has a bunch of methods like read_csv, or read_parquet, That can read a collection of data files, and return a distributed DataFrame. In the most general case, if you have a function that can return a small subset of your data, you can do something like this
from dask import delayed import dask.dataframe as dd @delayed def get_partition(partition_number): ... partitions = [get_partition(num) for num in range(number_of_partitions)] df = dd.from_delayed(partitions)
A similar approach works for Dask Array
Delayed
from dask import delayed, compute @delayed def inc1(x): return x + 1 @delayed def sum(x): return sum(x) result = sum([inc1(i) for i in input_vars])
Here, result represents a delayed computation. Nothing has been executed yet, until:
result = compute(result)
Conclusion
Distributed computing is hard. Saturn makes it easy. Get up to 100x faster workflows across ETL, exploratory data analysis, machine learning, and more with our secret sauce!
To try it out for yourself, get started with Saturn for free.
Product Manager | UX Design, Cloud Networking
4 年do you have a pointer to any performance studies for the supported backends in pandas?
Product Manager | UX Design, Cloud Networking
4 年Thanks for sharing