Skip to content

Commit

Permalink
Enable maximum pool size for RMM async allocator (#1221)
Browse files Browse the repository at this point in the history
In addition to the `release_threshold`, enable as well support for the `maximum_pool_size`. The difference between the two is that `release_threshold` will attempt to bring RMM's memory usage down to that value after the next stream synchronization, whereas `maximum_pool_size` is a hard limit enforced by RMM.

Depends on rapidsai/rmm#1327.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Benjamin Zaitlen (https://github.com/quasiben)

URL: #1221
  • Loading branch information
pentschev authored Aug 22, 2023
1 parent 62fb56a commit def2e69
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 13 deletions.
50 changes: 50 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,56 @@ def test_rmm_async(loop): # noqa: F811
assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000


def test_rmm_async_with_maximum_pool_size(loop): # noqa: F811
rmm = pytest.importorskip("rmm")

driver_version = rmm._cuda.gpu.driverGetVersion()
runtime_version = rmm._cuda.gpu.runtimeGetVersion()
if driver_version < 11020 or runtime_version < 11020:
pytest.skip("cudaMallocAsync not supported")

with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-async",
"--rmm-pool-size",
"2 GB",
"--rmm-release-threshold",
"3 GB",
"--rmm-maximum-pool-size",
"4 GB",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_types = client.run(
lambda: (
rmm.mr.get_current_device_resource_type(),
type(rmm.mr.get_current_device_resource().get_upstream()),
)
)
for v in memory_resource_types.values():
memory_resource_type, upstream_memory_resource_type = v
assert memory_resource_type is rmm.mr.LimitingResourceAdaptor
assert (
upstream_memory_resource_type is rmm.mr.CudaAsyncMemoryResource
)

ret = get_cluster_configuration(client)
wait(ret)
assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000
assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000
assert ret["[plugin] RMMSetup"]["maximum_pool_size"] == 4000000000


def test_rmm_logging(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
Expand Down
34 changes: 34 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,40 @@ async def test_rmm_async():
assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000


@gen_test(timeout=20)
async def test_rmm_async_with_maximum_pool_size():
rmm = pytest.importorskip("rmm")

driver_version = rmm._cuda.gpu.driverGetVersion()
runtime_version = rmm._cuda.gpu.runtimeGetVersion()
if driver_version < 11020 or runtime_version < 11020:
pytest.skip("cudaMallocAsync not supported")

async with LocalCUDACluster(
rmm_async=True,
rmm_pool_size="2GB",
rmm_release_threshold="3GB",
rmm_maximum_pool_size="4GB",
asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_types = await client.run(
lambda: (
rmm.mr.get_current_device_resource_type(),
type(rmm.mr.get_current_device_resource().get_upstream()),
)
)
for v in memory_resource_types.values():
memory_resource_type, upstream_memory_resource_type = v
assert memory_resource_type is rmm.mr.LimitingResourceAdaptor
assert upstream_memory_resource_type is rmm.mr.CudaAsyncMemoryResource

ret = await get_cluster_configuration(client)
assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000
assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000
assert ret["[plugin] RMMSetup"]["maximum_pool_size"] == 4000000000


@gen_test(timeout=20)
async def test_rmm_logging():
rmm = pytest.importorskip("rmm")
Expand Down
31 changes: 18 additions & 13 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ def __init__(
"`rmm_maximum_pool_size` was specified without specifying "
"`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool."
)
if async_alloc is True and managed_memory is True:
raise ValueError(
"`rmm_managed_memory` is incompatible with the `rmm_async`."
)
if async_alloc is True and maximum_pool_size is not None:
raise ValueError(
"`rmm_maximum_pool_size` is incompatible with the `rmm_async`."
)
if async_alloc is True:
if managed_memory is True:
raise ValueError(
"`rmm_managed_memory` is incompatible with the `rmm_async`."
)
if async_alloc is False and release_threshold is not None:
raise ValueError("`rmm_release_threshold` requires `rmm_async`.")

Expand All @@ -90,12 +87,20 @@ def setup(self, worker=None):
self.release_threshold, alignment_size=256
)

rmm.mr.set_current_device_resource(
rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=self.initial_pool_size,
release_threshold=self.release_threshold,
)
mr = rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=self.initial_pool_size,
release_threshold=self.release_threshold,
)

if self.maximum_pool_size is not None:
self.maximum_pool_size = parse_device_memory_limit(
self.maximum_pool_size, alignment_size=256
)
mr = rmm.mr.LimitingResourceAdaptor(
mr, allocation_limit=self.maximum_pool_size
)

rmm.mr.set_current_device_resource(mr)
if self.logging:
rmm.enable_logging(
log_file_name=get_rmm_log_file_name(
Expand Down

0 comments on commit def2e69

Please sign in to comment.