Dask at NERSC¶
What is Dask?¶
Dask is a task-based parallelization framework for Python. It allows you to distribute your work among a collection of workers controlled by a central scheduler. Dask can enable internode and intranode scaling on both CPUs and GPUs and is a central part of the NVIDIA RAPIDS ecosystem. Users who want to scale their work but may not want to use options like mpi4py may be interested in Dask.
To learn how to use Dask at NERSC, please explore our example notebooks. In these examples we provide both CPU and GPU examples that you can run yourself on NERSC systems.
Dask is well-documented, flexible, and currently under active development. A good way to start learning about Dask is to read through or try some of their tutorials. You can also watch a 15 minute Dask demo. The Dask architecture is comprised of a scheduler and workers, which can be either CPU or GPU workers.
Strengths of Dask¶
- Dask can run on small systems like your laptop all the way up to large systems like NERSC supercomputers. The number of workers can be easily adjusted.
- It is resilient. A nanny process can revive dead workers and the job can continue.
- It has a useful dashboard for profiling, monitoring, and debugging.
- Dask can scale both CPU and GPU code.
- If your array or DataFrame is too large to fit in memory- no problem. Dask will handle the memory management for you.
Disadvantages of Dask¶
- Dask will not scale as well as MPI.
- Dask is stable but is also under active development and does change.
- Dask TCP communication can be slow. UCX is not supported on Perlmutter.
- Like other frameworks (including MPI), it is best to avoid moving large amounts of data.
Dask Arrays (CPU)¶
Dask Dataframes (CPU)¶
Dask on GPUs¶
Dask can support computations that use GPUs, although it is up to the user to chose an appropriate library. For array operations, users may consider adapting their NumPy code to use CuPy instead. For pandas DataFrame operations, users may consider using cuDF, which is part of the NVIDIA RAPIDS ecosystem.
Dask Tips and Best Practices at NERSC¶
Run your Dask jobs on $SCRATCH
It is better to run your Dask jobs on $SCRATCH. Dask will try to lock the files associated with each worker which works automatically on on $SCRATCH. On $HOME, however, file locking causes errors and you will see many error messages that look like:
distributed.diskutils - ERROR - Could not acquire workspace
lock on path: /global/u1/s/elvis/worker-klsptdq3.dirlock
.Continuing without lock. This may result in workspaces not being
Each Dask task has about 1 ms of overhead. If you have a lot tasks this overhead can add up. For example, having 10,000 tasks can result in a few seconds of overhead for each operation. For this reason it is a good idea to give each task more than a few seconds of work.
Dask provides some advice about understanding performance. You can also group your work into fewer, more substantial tasks. This might mean that you call lazy operations at once instead of individually. This might also repartitioning your dataframe(s).
A good rule of thumb for choosing number of threads per Dask worker is to choose the square root of the number of cores per processor. For KNL for example, this would mean that you could assign 8 threads per worker. In general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, Numba, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter.
The Dask scheduler runs on a single thread, so assigning it its own node is a waste.
There is no hard limit on Dask scaling. The task overhead though will eventually start to swamp your calculation depending on how long each task takes to compute. For 1 ms overhead per task and for 1 second of work per task, the workers begin to swamp around 5000 workers.
Dask environments are often large with many dependencies. We suggest you consider building a container with Dask and your required software. We provide a Dask example container recipe.