From 070290273865b854eec17e571f43f762b781586f Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Sun, 29 Sep 2024 20:08:12 -0500 Subject: [PATCH] WIP: Generate job description for MPM from Tapis App --- dapi/jobs/jobs.py | 163 ++++++++++++++++++++++++---------------------- 1 file changed, 84 insertions(+), 79 deletions(-) diff --git a/dapi/jobs/jobs.py b/dapi/jobs/jobs.py index 62f5c7a..4989a30 100644 --- a/dapi/jobs/jobs.py +++ b/dapi/jobs/jobs.py @@ -2,6 +2,8 @@ from datetime import datetime, timedelta, timezone from tqdm import tqdm import logging +import json +from typing import Dict, Any, Optional # Configuring the logging system # logging.basicConfig( @@ -9,51 +11,104 @@ # ) -def get_status(ag, job_id, time_lapse=15): +def generate_job_description( + t: Any, # Tapis client + app_name: str, + input_uri: str, + input_file: str, + job_name: str = None, + max_minutes: Optional[int] = None, + node_count: Optional[int] = None, + cores_per_node: Optional[int] = None, + queue: Optional[str] = None, + allocation: Optional[str] = None, +) -> Dict[str, Any]: """ - Retrieves and monitors the status of a job from Agave. - - This function initially waits for the job to start, displaying its progress using - a tqdm progress bar. Once the job starts, it monitors the job's status up to - a maximum duration specified by the job's "maxHours". If the job completes or fails - before reaching this maximum duration, it returns the job's final status. + Generates a job description dictionary based on the provided application name, job name, input URI, input file, and optional allocation. Args: - ag (object): The Agave job object used to interact with the job. - job_id (str): The unique identifier of the job to monitor. - time_lapse (int, optional): Time interval, in seconds, to wait between status - checks. Defaults to 15 seconds. + t (object): The Tapis API client object. + app_name (str): The name of the application to use for the job. + input_uri (str): The URI of the input data for the job. + input_file (str): The local file path to the input data for the job. + job_name (str, optional): The name of the job to be created. Defaults to None. + max_minutes (int, optional): The maximum number of minutes the job can run. Defaults to None. + node_count (int, optional): The number of nodes to use for the job. Defaults to None. + cores_per_node (int, optional): The number of cores per node for the job. Defaults to None. + queue (str, optional): The queue to use for the job. Defaults to None. + allocation (str, optional): The allocation to use for the job. Defaults to None. Returns: - str: The final status of the job. Typical values include "FINISHED", "FAILED", - and "STOPPED". + dict: The job description dictionary. + """ - Raises: - No exceptions are explicitly raised, but potential exceptions raised by the Agave - job object or other called functions/methods will propagate. + # Fetch the latest app information + app_info = t.apps.getAppLatestVersion(appId=app_name) + + # If job_name is not provided, use the app name and date + if not job_name: + job_name = f"{app_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # Create the base job description + job_description = { + "name": job_name, + "appId": app_info.id, + "appVersion": app_info.version, + "execSystemId": app_info.jobAttributes.execSystemId, + "maxMinutes": max_minutes or app_info.jobAttributes.maxMinutes, + "archiveOnAppError": app_info.jobAttributes.archiveOnAppError, + "fileInputs": [{"name": "Input Directory", "sourceUrl": input_uri}], + "execSystemLogicalQueue": queue + or app_info.jobAttributes.execSystemLogicalQueue, + "nodeCount": node_count or 1, # Default to 1 if not specified + "coresPerNode": cores_per_node or 1, # Default to 1 if not specified + "parameterSet": { + "appArgs": [{"name": "Input Script", "arg": input_file}], + "schedulerOptions": [], + }, + } + + # Add TACC allocation if provided + if allocation: + job_description["parameterSet"]["schedulerOptions"].append( + {"name": "TACC Allocation", "arg": f"-A {allocation}"} + ) + + return job_description + + +def get_status(t, mjobUuid, tlapse=15): """ + Retrieves and monitors the status of a job using Tapis API. + This function waits for the job to start, then monitors it for up to maxMinutes. + + Args: + t (object): The Tapis API client object. + mjobUuid (str): The unique identifier of the job to monitor. + tlapse (int, optional): Time interval, in seconds, to wait between status checks. Defaults to 15 seconds. + Returns: + str: The final status of the job (FINISHED, FAILED, or STOPPED). + """ previous_status = None # Initially check if the job is already running - status = ag.jobs.getStatus(jobId=job_id)["status"] - - job_details = ag.jobs.get(jobId=job_id) - max_hours = job_details["maxHours"] + status = t.jobs.getJobStatus(jobUuid=mjobUuid).status + max_minutes = t.jobs.getJob(jobUuid=mjobUuid).maxMinutes # Using tqdm to provide visual feedback while waiting for job to start with tqdm(desc="Waiting for job to start", dynamic_ncols=True) as pbar: while status not in ["RUNNING", "FINISHED", "FAILED", "STOPPED"]: - time.sleep(time_lapse) - status = ag.jobs.getStatus(jobId=job_id)["status"] + time.sleep(tlapse) + status = t.jobs.getJobStatus(jobUuid=mjobUuid).status pbar.update(1) pbar.set_postfix_str(f"Status: {status}") - # Once the job is running, monitor it for up to maxHours - max_iterations = int(max_hours * 3600 // time_lapse) + # Once the job is running, monitor it for up to maxMinutes + max_iterations = int(max_minutes * 60 // tlapse) # Using tqdm for progress bar for _ in tqdm(range(max_iterations), desc="Monitoring job", ncols=100): - status = ag.jobs.getStatus(jobId=job_id)["status"] + status = t.jobs.getJobStatus(jobUuid=mjobUuid).status # Print status if it has changed if status != previous_status: @@ -64,10 +119,12 @@ def get_status(ag, job_id, time_lapse=15): if status in ["FINISHED", "FAILED", "STOPPED"]: break - time.sleep(time_lapse) + time.sleep(tlapse) else: # This block will execute if the for loop completes without a 'break' - logging.warn("Warning: Maximum monitoring time reached!") + logging.warning( + f"Warning: Maximum monitoring time of {max_minutes} minutes reached!" + ) return status @@ -129,58 +186,6 @@ def runtime_summary(ag, job_id, verbose=False): print("---------------") -def generate_job_info( - ag, - appid: str, - jobname: str = "dsjob", - queue: str = "development", - nnodes: int = 1, - nprocessors: int = 1, - runtime: str = "00:10:00", - inputs=None, - parameters=None, -) -> dict: - """Generate a job information dictionary based on provided arguments. - - Args: - ag (object): The Agave object to interact with the platform. - appid (str): The application ID for the job. - jobname (str, optional): The name of the job. Defaults to 'dsjob'. - queue (str, optional): The batch queue name. Defaults to 'skx-dev'. - nnodes (int, optional): The number of nodes required. Defaults to 1. - nprocessors (int, optional): The number of processors per node. Defaults to 1. - runtime (str, optional): The maximum runtime in the format 'HH:MM:SS'. Defaults to '00:10:00'. - inputs (dict, optional): The inputs for the job. Defaults to None. - parameters (dict, optional): The parameters for the job. Defaults to None. - - Returns: - dict: A dictionary containing the job information. - - Raises: - ValueError: If the provided appid is not valid. - """ - - try: - app = ag.apps.get(appId=appid) - except Exception: - raise ValueError(f"Invalid app ID: {appid}") - - job_info = { - "appId": appid, - "name": jobname, - "batchQueue": queue, - "nodeCount": nnodes, - "processorsPerNode": nprocessors, - "memoryPerNode": "1", - "maxRunTime": runtime, - "archive": True, - "inputs": inputs, - "parameters": parameters, - } - - return job_info - - def get_archive_path(ag, job_id): """ Get the archive path for a given job ID and modifies the user directory