Skip to content

Commit

Permalink
Merge commit 'bab50a574ce25cf52ccf45fcd8aa4638f14da6c0' into users_ex…
Browse files Browse the repository at this point in the history
…ceptions
  • Loading branch information
nurbal committed Sep 21, 2024
2 parents 0329790 + bab50a5 commit 8644834
Show file tree
Hide file tree
Showing 90 changed files with 2,837 additions and 575 deletions.
2 changes: 1 addition & 1 deletion config/sarc-dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"duc_inodes_command": null,
"duc_storage_command": null,
"diskusage_report_command": "beegfs-ctl --cfgFile=/etc/beegfs/home.d/beegfs-client.conf --getquota --uid $USER --csv",
"prometheus_url": "http://monitoring.server.mila.quebec:9090/",
"prometheus_url": "http://prometheus01.server.mila.quebec:9090/",
"start_date": "2022-04-01"
},
"narval": {
Expand Down
2 changes: 1 addition & 1 deletion config/sarc-prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"duc_inodes_command": null,
"duc_storage_command": null,
"diskusage_report_command": "beegfs-ctl --cfgFile=/etc/beegfs/home.d/beegfs-client.conf --getquota --uid $USER --csv",
"prometheus_url": "http://monitoring.server.mila.quebec:9090/",
"prometheus_url": "http://prometheus01.server.mila.quebec:9090/",
"start_date": "2022-04-01"
},
"narval": {
Expand Down
2 changes: 1 addition & 1 deletion docs/account_matching.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ We will explain the pipeline from Mila LDAP and CC reports to populate those ent
```
export MONGODB_CONNECTION_STRING='mongodb://127.0.0.1:27017'
python3 sarc/ldap/read_mila_ldap.py \
python3 sarc/users/read_mila_ldap.py \
--local_private_key_file secrets/ldap/Google_2026_01_26_66827.key \
--local_certificate_file secrets/ldap/Google_2026_01_26_66827.crt \
--ldap_service_uri ldaps://ldap.google.com \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"metadata": {},
"outputs": [],
"source": [
"from sarc.ldap.api import get_users\n",
"from sarc.client import get_users\n",
"users = get_users()\n",
"print(f\"Number users: {len(users)}\")"
]
Expand Down
4 changes: 2 additions & 2 deletions docs/notebooks/notebook_2_jobs_from_users_list.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"metadata": {},
"outputs": [],
"source": [
"from sarc.ldap.api import get_users\n",
"from sarc.client import get_users\n",
"users = get_users()\n",
"print(f\"Number users: {len(users)}\")"
]
Expand Down Expand Up @@ -82,7 +82,7 @@
"metadata": {},
"outputs": [],
"source": [
"from sarc.jobs.job import get_jobs\n",
"from sarc.client import get_jobs\n",
"from tqdm import tqdm\n",
"\n",
"drac_users.sort(key=lambda user: user.name)\n",
Expand Down
2 changes: 1 addition & 1 deletion docs/notebooks/notebook_3_usage_stats.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"import pandas as pd\n",
"\n",
"from sarc.config import config\n",
"from sarc.jobs import get_jobs\n",
"from sarc.client import get_jobs\n",
"\n",
"# Clusters for which we want to compute statistics. \n",
"# For this example, we will use just 2 clusters.\n",
Expand Down
2 changes: 1 addition & 1 deletion examples/allocation_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from tqdm import tqdm

from sarc.allocations import get_allocation_summaries
from sarc.client.job import get_jobs
from sarc.config import config
from sarc.jobs import get_jobs

# Clusters we want to compare
clusters = ["narval", "beluga", "cedar", "graham"]
Expand Down
2 changes: 1 addition & 1 deletion examples/trends.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import pandas as pd
from tqdm import tqdm

from sarc.client.job import get_jobs
from sarc.config import config
from sarc.jobs import get_jobs

# Clusters we want to compare
clusters = ["mila", "narval", "beluga", "cedar", "graham"]
Expand Down
2 changes: 1 addition & 1 deletion examples/usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import pandas as pd
from tqdm import tqdm

from sarc.client.job import get_jobs
from sarc.config import MTL, config
from sarc.jobs import get_jobs

# Clusters we want to compare
clusters = ["mila", "narval", "beluga", "cedar", "graham"]
Expand Down
2 changes: 1 addition & 1 deletion examples/waste_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import pandas as pd
from tqdm import tqdm

from sarc.client.job import get_jobs
from sarc.config import ScraperConfig, _config_class, config
from sarc.jobs import get_jobs


def load_job_series(filename=None) -> pd.DataFrame:
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ disable = [
"invalid-name",
"no-else-return", # Bad rule IMO (- OB)
"line-too-long", # Black takes care of line length.
"logging-fstring-interpolation"
"logging-fstring-interpolation",
"duplicate-code",
]
extension-pkg-whitelist = "pydantic"

Expand Down
File renamed without changes.
129 changes: 129 additions & 0 deletions sarc/alerts/usage_alerts/cluster_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
import sys
from datetime import datetime, timedelta
from typing import List, Optional

import pandas

from sarc.config import MTL
from sarc.jobs.series import compute_time_frames, load_job_series

logger = logging.getLogger(__name__)


def check_nb_jobs_per_cluster_per_time(
time_interval: Optional[timedelta] = timedelta(days=7),
time_unit=timedelta(days=1),
cluster_names: Optional[List[str]] = None,
nb_stddev=2,
verbose=False,
):
"""
Check if we have scraped enough jobs per time unit per cluster on given time interval.
Log a warning for each cluster where number of jobs per time unit is lower than a limit
computed using mean and standard deviation statistics from this cluster.
Parameters
----------
time_interval: timedelta
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 7 days.
If None, all jobs are used.
time_unit: timedelta
Time unit in which we must check cluster usage through time_interval. Default is 1 day.
cluster_names: list
Optional list of clusters to check.
If empty (or not specified), use all clusters available among jobs retrieved with time_interval.
nb_stddev: int
Amount of standard deviation to remove from average statistics to compute checking threshold.
For each cluster, threshold is computed as:
max(0, average - nb_stddev * stddev)
verbose: bool
If True, print supplementary info about clusters statistics.
"""

# Parse time_interval
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True

# Get data frame
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Split data frame into time frames using `time_unit`
tf = compute_time_frames(df, frame_size=time_unit)

# List all available timestamps.
# We will check each timestamp for each cluster.
timestamps = sorted(tf["timestamp"].unique())

# List clusters
if cluster_names:
cluster_names = sorted(cluster_names)
else:
cluster_names = sorted(df["cluster_name"].unique())

# Iter for each cluster.
for cluster_name in cluster_names:
# Select only jobs for current cluster,
# group jobs by timestamp, and count jobs for each timestamp.
f_stats = (
tf[tf["cluster_name"] == cluster_name]
.groupby(["timestamp"])[["job_id"]]
.count()
)

# Create a dataframe with all available timestamps
# and associate each timestamp to 0 jobs by default.
c = (
pandas.DataFrame({"timestamp": timestamps, "count": [0] * len(timestamps)})
.groupby(["timestamp"])[["count"]]
.sum()
)
# Set each timestamp valid for this cluster with real number of jobs scraped in this timestamp.
c.loc[f_stats.index, "count"] = f_stats["job_id"]

# We now have number of jobs for each timestamp for this cluster,
# with count 0 for timestamps where no jobs run on cluster,

# Compute average number of jobs per timestamp for this cluster
avg = c["count"].mean()
# Compute standard deviation of job count per timestamp for this cluster
stddev = c["count"].std()
# Compute threshold to use for warnings: <average> - nb_stddev * <standard deviation>
threshold = max(0, avg - nb_stddev * stddev)

if verbose:
print(f"[{cluster_name}]", file=sys.stderr)
print(c, file=sys.stderr)
print(f"avg {avg}, stddev {stddev}, threshold {threshold}", file=sys.stderr)
print(file=sys.stderr)

if threshold == 0:
# If threshold is zero, no check can be done, as jobs count will be always >= 0.
# Instead, we log a general warning.
msg = f"[{cluster_name}] threshold 0 ({avg} - {nb_stddev} * {stddev})."
if len(timestamps) == 1:
msg += (
f" Only 1 timestamp found. Either time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
else:
msg += (
f" Either nb_stddev is too high, time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
logger.warning(msg)
else:
# With a non-null threshold, we can check each timestamp.
for timestamp in timestamps:
nb_jobs = c.loc[timestamp]["count"]
if nb_jobs < threshold:
logger.warning(
f"[{cluster_name}][{timestamp}] "
f"insufficient cluster scraping: {nb_jobs} jobs / cluster / time unit; "
f"minimum required for this cluster: {threshold} ({avg} - {nb_stddev} * {stddev}); "
f"time unit: {time_unit}"
)
99 changes: 99 additions & 0 deletions sarc/alerts/usage_alerts/gpu_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging
from datetime import datetime, timedelta
from typing import Optional, Sequence

from sarc.config import MTL
from sarc.jobs.series import load_job_series

logger = logging.getLogger(__name__)


def check_gpu_type_usage_per_node(
gpu_type: str,
time_interval: Optional[timedelta] = timedelta(hours=24),
minimum_runtime: Optional[timedelta] = timedelta(minutes=5),
threshold=1.0,
min_tasks=0,
ignore_min_tasks_for_clusters: Optional[Sequence[str]] = ("mila",),
):
"""
Check if a GPU type is sufficiently used on each node.
Log a warning for each node where ratio of jobs using GPU type is lesser than given threshold.
Parameters
----------
gpu_type: str
GPU type to check.
time_interval: timedelta
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 24 hours.
If None, all jobs are used.
minimum_runtime: timedelta
If given, only jobs which ran at least for this minimum runtime will be used for checking.
Default is 5 minutes.
If None, set to 0.
threshold: float
A value between 0 and 1 to represent the minimum expected ratio of jobs that use given GPU type
wr/t running jobs on each node. Log a warning if computed ratio is lesser than this threshold.
min_tasks: int
Minimum number of jobs required on a cluster node to make checking.
Checking is performed on a node only if, either it contains at least `min_tasks` jobs,
or node cluster is in `ignore_min_tasks_for_clusters`.
ignore_min_tasks_for_clusters: Sequence
Clusters to check even if nodes from those clusters don't have `min_tasks` jobs.
"""
# Parse time_interval
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True

# Parse minimum_runtime
if minimum_runtime is None:
minimum_runtime = timedelta(seconds=0)

# Get data frame. We clip time if start and end are available,
# so that minimum_runtime is compared to job running time in given interval.
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Add a column `gpu_task_` with value 1 for each job running on given GPU type.
df.loc[:, "gpu_task_"] = df["allocated.gpu_type"] == gpu_type
# Add a column `task_` with value 1 for each job. Used later to count jobs in a groupby().
df.loc[:, "task_"] = 1

# Group jobs.
ff = (
# Select only jobs where elapsed time >= minimum runtime and gres_gpu > 0
df[
(df["elapsed_time"] >= minimum_runtime.total_seconds())
& (df["allocated.gres_gpu"] > 0)
]
# `nodes` is a list of nodes. We explode this column to count each job for each node where it is running
.explode("nodes")
# Then we group by cluster name and nodes,
.groupby(["cluster_name", "nodes"])[["gpu_task_", "task_"]]
# and we sum on gpu_task_ and task_
.sum()
)
# Finally, we compute GPU usage.
ff["gpu_usage_"] = ff["gpu_task_"] / ff["task_"]

# We can now check GPU usage.
ignore_min_tasks_for_clusters = set(ignore_min_tasks_for_clusters or ())
for row in ff.itertuples():
cluster_name, node = row.Index
nb_gpu_tasks = row.gpu_task_
nb_tasks = row.task_
gpu_usage = row.gpu_usage_
if gpu_usage < threshold and (
cluster_name in ignore_min_tasks_for_clusters or nb_tasks >= min_tasks
):
# We warn if gpu usage < threshold and if
# either we are on a cluster listed in `ignore_min_tasks_for_clusters`,
# or there are enough jobs in node.
logger.warning(
f"[{cluster_name}][{node}] insufficient usage for GPU {gpu_type}: "
f"{round(gpu_usage * 100, 2)} % ({nb_gpu_tasks}/{nb_tasks}), "
f"minimum required: {round(threshold * 100, 2)} %"
)
Loading

0 comments on commit 8644834

Please sign in to comment.