Skip to content

Commit

Permalink
WIP: Generate job description for MPM from Tapis App
Browse files Browse the repository at this point in the history
  • Loading branch information
kks32 committed Sep 30, 2024
1 parent 7f4df0e commit 0702902
Showing 1 changed file with 84 additions and 79 deletions.
163 changes: 84 additions & 79 deletions dapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,113 @@
from datetime import datetime, timedelta, timezone
from tqdm import tqdm
import logging
import json
from typing import Dict, Any, Optional

# Configuring the logging system
# logging.basicConfig(
# level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
# )


def get_status(ag, job_id, time_lapse=15):
def generate_job_description(
t: Any, # Tapis client
app_name: str,
input_uri: str,
input_file: str,
job_name: str = None,
max_minutes: Optional[int] = None,
node_count: Optional[int] = None,
cores_per_node: Optional[int] = None,
queue: Optional[str] = None,
allocation: Optional[str] = None,
) -> Dict[str, Any]:
"""
Retrieves and monitors the status of a job from Agave.
This function initially waits for the job to start, displaying its progress using
a tqdm progress bar. Once the job starts, it monitors the job's status up to
a maximum duration specified by the job's "maxHours". If the job completes or fails
before reaching this maximum duration, it returns the job's final status.
Generates a job description dictionary based on the provided application name, job name, input URI, input file, and optional allocation.
Args:
ag (object): The Agave job object used to interact with the job.
job_id (str): The unique identifier of the job to monitor.
time_lapse (int, optional): Time interval, in seconds, to wait between status
checks. Defaults to 15 seconds.
t (object): The Tapis API client object.
app_name (str): The name of the application to use for the job.
input_uri (str): The URI of the input data for the job.
input_file (str): The local file path to the input data for the job.
job_name (str, optional): The name of the job to be created. Defaults to None.
max_minutes (int, optional): The maximum number of minutes the job can run. Defaults to None.
node_count (int, optional): The number of nodes to use for the job. Defaults to None.
cores_per_node (int, optional): The number of cores per node for the job. Defaults to None.
queue (str, optional): The queue to use for the job. Defaults to None.
allocation (str, optional): The allocation to use for the job. Defaults to None.
Returns:
str: The final status of the job. Typical values include "FINISHED", "FAILED",
and "STOPPED".
dict: The job description dictionary.
"""

Raises:
No exceptions are explicitly raised, but potential exceptions raised by the Agave
job object or other called functions/methods will propagate.
# Fetch the latest app information
app_info = t.apps.getAppLatestVersion(appId=app_name)

# If job_name is not provided, use the app name and date
if not job_name:
job_name = f"{app_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

# Create the base job description
job_description = {
"name": job_name,
"appId": app_info.id,
"appVersion": app_info.version,
"execSystemId": app_info.jobAttributes.execSystemId,
"maxMinutes": max_minutes or app_info.jobAttributes.maxMinutes,
"archiveOnAppError": app_info.jobAttributes.archiveOnAppError,
"fileInputs": [{"name": "Input Directory", "sourceUrl": input_uri}],
"execSystemLogicalQueue": queue
or app_info.jobAttributes.execSystemLogicalQueue,
"nodeCount": node_count or 1, # Default to 1 if not specified
"coresPerNode": cores_per_node or 1, # Default to 1 if not specified
"parameterSet": {
"appArgs": [{"name": "Input Script", "arg": input_file}],
"schedulerOptions": [],
},
}

# Add TACC allocation if provided
if allocation:
job_description["parameterSet"]["schedulerOptions"].append(
{"name": "TACC Allocation", "arg": f"-A {allocation}"}
)

return job_description


def get_status(t, mjobUuid, tlapse=15):
"""
Retrieves and monitors the status of a job using Tapis API.
This function waits for the job to start, then monitors it for up to maxMinutes.
Args:
t (object): The Tapis API client object.
mjobUuid (str): The unique identifier of the job to monitor.
tlapse (int, optional): Time interval, in seconds, to wait between status checks. Defaults to 15 seconds.
Returns:
str: The final status of the job (FINISHED, FAILED, or STOPPED).
"""
previous_status = None
# Initially check if the job is already running
status = ag.jobs.getStatus(jobId=job_id)["status"]

job_details = ag.jobs.get(jobId=job_id)
max_hours = job_details["maxHours"]
status = t.jobs.getJobStatus(jobUuid=mjobUuid).status
max_minutes = t.jobs.getJob(jobUuid=mjobUuid).maxMinutes

# Using tqdm to provide visual feedback while waiting for job to start
with tqdm(desc="Waiting for job to start", dynamic_ncols=True) as pbar:
while status not in ["RUNNING", "FINISHED", "FAILED", "STOPPED"]:
time.sleep(time_lapse)
status = ag.jobs.getStatus(jobId=job_id)["status"]
time.sleep(tlapse)
status = t.jobs.getJobStatus(jobUuid=mjobUuid).status
pbar.update(1)
pbar.set_postfix_str(f"Status: {status}")

# Once the job is running, monitor it for up to maxHours
max_iterations = int(max_hours * 3600 // time_lapse)
# Once the job is running, monitor it for up to maxMinutes
max_iterations = int(max_minutes * 60 // tlapse)

# Using tqdm for progress bar
for _ in tqdm(range(max_iterations), desc="Monitoring job", ncols=100):
status = ag.jobs.getStatus(jobId=job_id)["status"]
status = t.jobs.getJobStatus(jobUuid=mjobUuid).status

# Print status if it has changed
if status != previous_status:
Expand All @@ -64,10 +119,12 @@ def get_status(ag, job_id, time_lapse=15):
if status in ["FINISHED", "FAILED", "STOPPED"]:
break

time.sleep(time_lapse)
time.sleep(tlapse)
else:
# This block will execute if the for loop completes without a 'break'
logging.warn("Warning: Maximum monitoring time reached!")
logging.warning(
f"Warning: Maximum monitoring time of {max_minutes} minutes reached!"
)

return status

Expand Down Expand Up @@ -129,58 +186,6 @@ def runtime_summary(ag, job_id, verbose=False):
print("---------------")


def generate_job_info(
ag,
appid: str,
jobname: str = "dsjob",
queue: str = "development",
nnodes: int = 1,
nprocessors: int = 1,
runtime: str = "00:10:00",
inputs=None,
parameters=None,
) -> dict:
"""Generate a job information dictionary based on provided arguments.
Args:
ag (object): The Agave object to interact with the platform.
appid (str): The application ID for the job.
jobname (str, optional): The name of the job. Defaults to 'dsjob'.
queue (str, optional): The batch queue name. Defaults to 'skx-dev'.
nnodes (int, optional): The number of nodes required. Defaults to 1.
nprocessors (int, optional): The number of processors per node. Defaults to 1.
runtime (str, optional): The maximum runtime in the format 'HH:MM:SS'. Defaults to '00:10:00'.
inputs (dict, optional): The inputs for the job. Defaults to None.
parameters (dict, optional): The parameters for the job. Defaults to None.
Returns:
dict: A dictionary containing the job information.
Raises:
ValueError: If the provided appid is not valid.
"""

try:
app = ag.apps.get(appId=appid)
except Exception:
raise ValueError(f"Invalid app ID: {appid}")

job_info = {
"appId": appid,
"name": jobname,
"batchQueue": queue,
"nodeCount": nnodes,
"processorsPerNode": nprocessors,
"memoryPerNode": "1",
"maxRunTime": runtime,
"archive": True,
"inputs": inputs,
"parameters": parameters,
}

return job_info


def get_archive_path(ag, job_id):
"""
Get the archive path for a given job ID and modifies the user directory
Expand Down

0 comments on commit 0702902

Please sign in to comment.