What is Dask?¶
Dask is task-based parallelization framework for Python. It allows you to distribute your work among a collection of workers controlled by a central scheduler.
Dask is well-documented, flexible, and currently under active development. A good way to start learning about Dask is to read through or try some of their tutorials. You can also watch a 15 minute Dask demo here.
Using Dask is experimental at NERSC. This means that recommended best practices will change. Ideally over time it will become easier and easier to use Dask at NERSC. We welcome your feedback!
Dask has three important parts: (1) the scheduler, which coordinates your Dask job, the (2) the workers, which actually get your work done in parallel, and (3) the client, the means by which the scheduler and workers communicate.
Advantages of Dask
- Dask can run on small systems like your laptop all the way up to large systems like Cori. The number of workers can be easily adjusted or even automatically scaled.
- It is resilient. A nanny process can revive dead workers and the job can continue.
- It has a useful dashboard for profiling, monitoring, and debugging.
Dask considerations * Dask will not scale as well as MPI. * Dask is stable but is also under active development and changes quickly. * Dask uses TCP communication (slow).
Dask works great with Jupyter at NERSC. We have configured the NERSC Jupyter environment to include the Dask labextension. Right now using the dashboard through the labextension requires a little slight of hand, but if you don't take advantage of the labextension that's OK too. You can access the labextension by clicking on the Dask logo to the left of the file contents manager in JupyterLab. We're working on streamlining this integration.
We are creating a collection of notebooks to help users understand how to use Dask at NERSC on Jupyter. You can find them below. By clicking on the "[Try It]" links you can have the notebook automatically cloned to your home directory, and a notebook server spun up to run it, on jupyter.nersc.gov. Note that you need to have some prerequisites in place (conda environments, etc, depending on the notebook):
- Dask directory in our GitLab repo
- Gentle intro [Try It!]
- These notebooks go together (demo'd during the 2019-12-12 NUG Teleconference):
Again this is an experimental set up (as of late 2019). If you encounter problems or have feedback, please let us know!
Dask Tips and Best Practices¶
Run your Dask jobs on $SCRATCH
It is better to run your Dask jobs on $SCRATCH. Dask will try to lock the files associated with each worker which works automatically on on $SCRATCH. On $HOME, however, file locking causes errors and you will see many error messages that look like:
distributed.diskutils - ERROR - Could not acquire workspace lock on path: /global/u1/s/stephey/worker-klsptdq3.dirlock .Continuing without lock. This may result in workspaces not being cleaned up
Each Dask task has about 1 ms of overhead. If you have a lot tasks this overhead can add up. For example, having 10,000 tasks can result in a few seconds of overhead for each operation. For this reason it is a good idea to give each task more than a few seconds of work.
To better understand how your program is performing, check out this page. For a video demo that demonstrates how to group your work into fewer, more substantial tasks, see here. This might mean that you call lazy operations at once instead of individually. This might also repartitioning your dataframe(s).
A good rule of thumb for choosing number of threads per Dask worker is to choose the square root of the number of cores per processor. For KNL for example, this would mean that you could assign 8 threads per worker. In general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, Numba, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter.
The Dask scheduler runs on a single thread, so assigning it its own node is a waste.
For more information about which kind of Dask API to choose (Dask Delayed, Dask Futures, or Dask Bag), see this page. For beginners, Dask Bag is an easy ("harder to shoot yourself in the foot with") but less configurable solution. Dask Delayed and Dask Futures are more powerful but also more complex. Users should be aware of all three options so they can choose which one is best suited to their application.
There is no hard limit on Dask scaling. The task overhead though will eventually start to swamp your calculation depending on how long each task takes to compute. For 1 ms overhead per task and for 1 second of work per task, the workers begin to swamp around 5000 workers.
To automatically clean up workers, you can setso that workers will die if they haven't communicated with the scheduler in the last 60 seconds. This will help keep your directory squeaky clean.