Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the calculation for the number of threads to be used #39

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions libs/ccc/coef/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Contains function that implement the Clustermatch Correlation Coefficient (CCC).
"""
from __future__ import annotations

import os
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
from typing import Iterable, Union
Expand Down Expand Up @@ -515,6 +517,32 @@ def cdist_func(x, y):
return max_ari_list, max_part_idx_list, pvalues


def get_n_workers(n_jobs: int | None) -> int:
"""
Helper function to get the number of workers for parallel processing.

Args:
n_jobs: value specified by the main ccc function.
Returns:
The number of workers to use for parallel processing
"""
n_cpu_cores = os.cpu_count()
if n_cpu_cores is None:
raise ValueError("Could not determine the number of CPU cores. Please specify a positive value of n_jobs")

n_workers = n_cpu_cores
if n_jobs is None:
return n_workers

n_workers = os.cpu_count() + n_jobs if n_jobs < 0 else n_jobs

if n_workers < 1:
raise ValueError(f"The number of threads/processes to use must be greater than 0. Got {n_workers}."
"Please check the n_jobs argument provided")
Comment on lines +537 to +541
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. If a user specifies more threads than actual CPU cores, that's a problem for the user (using more resources than actual ones) and we should not check for that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! As ThreadPoolExecutor(max_workers=n_workers) also only complains when n_workers < 0. We don't need to check for that case


return n_workers


def ccc(
x: NDArray,
y: NDArray = None,
Expand Down Expand Up @@ -544,9 +572,10 @@ def ccc(
n_chunks_threads_ratio: allows to modify how pairwise comparisons are
split across different threads. It's given as the ratio parameter of
function get_chunks.
n_jobs: number of CPU cores to use for parallelization. The value
n_jobs: number of CPU cores/threads to use for parallelization. The value
None will use all available cores (`os.cpu_count()`), and negative
values will use `os.cpu_count() - n_jobs`. Default is 1.
values will use `os.cpu_count() + n_jobs` (exception will be raised
if this expression yields a result less than 1). Default is 1.
pvalue_n_perms: if given, it computes the p-value of the
coefficient using the given number of permutations.
partitioning_executor: Executor type used for partitioning the data. It
Expand Down Expand Up @@ -596,7 +625,8 @@ def ccc(
X_numerical_type = None
if x.ndim == 1 and (y is not None and y.ndim == 1):
# both x and y are 1d arrays
assert x.shape == y.shape, "x and y need to be of the same size"
if not x.shape == y.shape:
raise ValueError("x and y need to be of the same size")
n_objects = x.shape[0]
n_features = 2

Expand All @@ -612,10 +642,9 @@ def ccc(
# plus we have the features data type (numerical, categorical, etc)

if isinstance(x, np.ndarray):
assert get_feature_type_and_encode(x[0, :])[1], (
"If data is a 2d numpy array, it has to be numerical. Use pandas.DataFrame if "
"you need to mix features with different data types"
)
if not get_feature_type_and_encode(x[0, :])[1]:
raise ValueError("If data is a 2d numpy array, it has to be numerical. Use pandas.DataFrame if "
"you need to mix features with different data types")
n_objects = x.shape[1]
n_features = x.shape[0]

Expand All @@ -639,8 +668,7 @@ def ccc(
raise ValueError("Wrong combination of parameters x and y")

# get number of cores to use
n_jobs = os.cpu_count() if n_jobs is None else n_jobs
default_n_threads = (os.cpu_count() - n_jobs) if n_jobs < 0 else n_jobs
n_workers = get_n_workers(n_jobs)

if internal_n_clusters is not None:
_tmp_list = List()
Expand Down Expand Up @@ -675,11 +703,11 @@ def ccc(
max_parts = np.zeros((n_features_comp, 2), dtype=np.uint64)

with (
ThreadPoolExecutor(max_workers=default_n_threads) as executor,
ProcessPoolExecutor(max_workers=default_n_threads) as pexecutor,
ThreadPoolExecutor(max_workers=n_workers) as executor,
ProcessPoolExecutor(max_workers=n_workers) as pexecutor,
):
map_func = map
if default_n_threads > 1:
if n_workers > 1:
if partitioning_executor == "thread":
map_func = executor.map
elif partitioning_executor == "process":
Expand All @@ -695,7 +723,7 @@ def ccc(
for f_idx in range(n_features)
for c_idx, c in enumerate(range_n_clusters)
],
default_n_threads,
n_workers,
n_chunks_threads_ratio,
)

Expand Down Expand Up @@ -732,7 +760,7 @@ def ccc(
cdist_executor = False
inner_executor = DummyExecutor()

if default_n_threads > 1:
if n_workers > 1:
if n_features_comp == 1:
map_func = map
cdist_executor = executor
Expand All @@ -742,14 +770,14 @@ def ccc(
map_func = pexecutor.map

# iterate over all chunks of object pairs and compute the coefficient
inputs = get_chunks(n_features_comp, default_n_threads, n_chunks_threads_ratio)
inputs = get_chunks(n_features_comp, n_workers, n_chunks_threads_ratio)
inputs = [
(
i,
n_features,
parts,
pvalue_n_perms,
default_n_threads,
n_workers,
n_chunks_threads_ratio,
cdist_executor,
inner_executor,
Expand Down
26 changes: 26 additions & 0 deletions tests/test_coef.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from random import shuffle
from unittest.mock import patch
import time
import os

Expand All @@ -19,6 +20,7 @@
cdist_parts_basic,
cdist_parts_parallel,
get_chunks,
get_n_workers,
)


Expand Down Expand Up @@ -1557,3 +1559,27 @@ def test_cm_with_too_few_objects():
ccc(data, internal_n_clusters=3)

assert "too few objects" in str(e.value)



@pytest.mark.parametrize("n_jobs, cpu_count, expected", [
(None, 4, 4),
(2, 4, 2),
(-1, 4, 3),
(6, 4, 6),
])
def test_get_n_workers_valid(n_jobs, cpu_count, expected):
with patch('os.cpu_count', return_value=cpu_count):
assert get_n_workers(n_jobs) == expected


@pytest.mark.parametrize("n_jobs, cpu_count, error_type, error_message", [
(0, 4, ValueError, "The number of threads/processes to use must be greater than 0. Got 0"),
(-5, 4, ValueError, "The number of threads/processes to use must be greater than 0. Got -1"),
(2, None, ValueError, "Could not determine the number of CPU cores. Please specify a positive value of n_jobs"),
(None, None, ValueError, "Could not determine the number of CPU cores. Please specify a positive value of n_jobs"),
])
def test_get_n_workers_invalid(n_jobs, cpu_count, error_type, error_message):
with patch('os.cpu_count', return_value=cpu_count):
with pytest.raises(error_type, match=error_message):
get_n_workers(n_jobs)