API

Setup

dask_cuda.initialize.initialize(create_cuda_context=True, enable_tcp_over_ucx=False, enable_infiniband=False, enable_nvlink=False, enable_rdmacm=False, net_devices='', cuda_device_index=None)[source]

Create CUDA context and initialize UCX-Py, depending on user parameters.

Sometimes it is convenient to initialize the CUDA context, particularly before starting up Dask workers which create a variety of threads.

To ensure UCX works correctly, it is important to ensure it is initialized with the correct options. This is especially important for the client, which cannot be configured to use UCX with arguments like LocalCUDACluster and dask-cuda-worker. This function will ensure that they are provided a UCX configuration based on the flags and options passed by the user.

This function can also be used within a worker preload script for UCX configuration of mainline Dask/Distributed. https://docs.dask.org/en/latest/setup/custom-startup.html

You can add it to your global config with the following YAML:

distributed:
  worker:
    preload:
      - dask_cuda.initialize

See https://docs.dask.org/en/latest/configuration.html for more information about Dask configuration.

Parameters
create_cuda_context: bool

Create CUDA context on initialization. Default is True.

enable_tcp_over_ucx: bool

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled. Default is False.

enable_infiniband: bool

Set environment variables to enable UCX InfiniBand support, implies enable_tcp_over_ucx=True. Default is False.

enable_nvlink: bool

Set environment variables to enable UCX NVLink support, implies enable_tcp_over_ucx=True. Default is False.

enable_rdmacm: bool

Set environment variables to enable UCX RDMA connection manager support, implies enable_infiniband=True. Default is False.

net_devices: callable or str

If callable, the function must take exactly one argument (the index of current GPU) that will be used to get the interface name, such as lambda dev: "mlx5_%d:1" % (dev // 2), which would return "mlx5_1:1" for GPU 3. If a string, must be an explicit interface name, such as "ib0" for InfiniBand or "eth0" if InfiniBand is disabled. Default is "", which will result in all available devices being used.

cuda_device_index: None or int

Index of the current GPU, which will be supplied to net_devices if it is callable. Default is None.

Cluster

class dask_cuda.LocalCUDACluster(n_workers=None, threads_per_worker=1, processes=True, memory_limit='auto', device_memory_limit=0.8, CUDA_VISIBLE_DEVICES=None, data=None, local_directory=None, protocol=None, enable_tcp_over_ucx=False, enable_infiniband=False, enable_nvlink=False, enable_rdmacm=False, ucx_net_devices=None, rmm_pool_size=None, rmm_managed_memory=False, rmm_log_directory=None, jit_unspill=None, log_spilling=False, **kwargs)[source]

A variant of dask.distributed.LocalCluster that uses one GPU per process.

This assigns a different CUDA_VISIBLE_DEVICES environment variable to each worker process.

For machines with a complex architecture mapping CPUs, GPUs, and network hardware, such as NVIDIA DGX-1 and DGX-2, this class creates a local cluster that tries to respect this hardware as much as possible.

It creates one Dask worker process per GPU, and assigns each worker process the correct CPU cores and Network interface cards to maximize performance. If UCX and UCX-Py are also available, it’s possible to use InfiniBand and NVLink connections for optimal data transfer performance.

Parameters
CUDA_VISIBLE_DEVICES: str or list

String or list "0,1,2,3" or [0, 1, 2, 3] to restrict activity to different GPUs.

device_memory_limit: int, float or str

Specifies the size of the CUDA device LRU cache, which is used to determine when the worker starts spilling to host memory. This can be a float (fraction of total device memory), an integer (bytes), a string (like "5GB" or "5000M"), or "auto", 0, or None to disable spilling to host (i.e., allow full device memory usage). Default is 0.8, 80% of the worker’s total device memory.

interface: str

The external interface used to connect to the scheduler, usually an ethernet interface is used for connection, and not an InfiniBand interface (if one is available).

threads_per_worker: int

Number of threads to be used for each CUDA worker process.

protocol: str

Protocol to use for communication, e.g., "tcp" or "ucx".

enable_tcp_over_ucx: bool

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

enable_infiniband: bool

Set environment variables to enable UCX InfiniBand support, requires protocol="ucx" and implies enable_tcp_over_ucx=True.

enable_rdmacm: bool

Set environment variables to enable UCX RDMA connection manager support, requires protocol="ucx" and enable_infiniband=True.

enable_nvlink: bool

Set environment variables to enable UCX NVLink support, requires protocol="ucx" and implies enable_tcp_over_ucx=True.

ucx_net_devices: None, callable or str

When None (default), "UCX_NET_DEVICES" will be left to its default. If callable, the function must take exactly one argument (the index of current GPU) that will be used to get the interface name, such as lambda dev: "mlx5_%d:1" % (dev // 2), returning "mlx5_1:1" for GPU 3, for example. If it’s a string, it must be a non-empty string with the interface name, such as "eth0" or "auto" to allow for automatically choosing the closest interface based on the system’s topology.

Warning

"auto" requires UCX-Py to be installed and compiled with hwloc support. Additionally that will always use the closest interface, and that may cause unexpected errors if that interface is not properly configured or is disconnected, for that reason it’s limited to InfiniBand only and will still cause unpredictable errors if all interfaces are not connected and properly configured.

rmm_pool_size: None, int or str

When None (default), no RMM pool is initialized. If a different value is given, it can be an integer (bytes) or string (like "5GB" or "5000M").

Note

The size is a per worker (i.e., per GPU) configuration, and not cluster-wide!

rmm_managed_memory: bool

If True, initialize each worker with RMM and set it to use managed memory. If False, RMM may still be used if rmm_pool_size is specified, but in that case with default (non-managed) memory type.

Warning

Managed memory is currently incompatible with NVLink, trying to enable both will result in an exception.

rmm_log_directory: str

Directory to write per-worker RMM log files to; the client and scheduler are not logged here. Logging will only be enabled if rmm_pool_size or rmm_managed_memory are specified.

jit_unspill: bool

If True, enable just-in-time unspilling. This is experimental and doesn’t support memory spilling to disk. Please see proxy_object.ProxyObject and proxify_host_file.ProxifyHostFile.

log_spilling: bool

If True, all spilling operations will be logged directly to distributed.worker with an INFO loglevel. This will eventually be replaced by a Dask configuration flag.

Raises
TypeError

If enable_infiniband or enable_nvlink is True and protocol is not "ucx".

ValueError

If ucx_net_devices is an empty string, or if it is "auto" and UCX-Py is not installed, or if it is "auto" and enable_infiniband=False, or UCX-Py wasn’t compiled with hwloc support, or both RMM managed memory and NVLink are enabled.

See also

LocalCluster

Examples

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
new_worker_spec()[source]

Return name and spec for the next worker

Returns
d: dict mapping names to worker specs

See also

scale