diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 59fb3d9..47e83c5 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -10,7 +10,7 @@ #include "../lib/umbridge.h" // run and get the result of command -std::string getCommandOutput(const std::string command) +std::string getCommandOutput(const std::string& command) { FILE *pipe = popen(command.c_str(), "r"); // execute the command and return the output as stream if (!pipe) @@ -59,8 +59,6 @@ std::string readLineFromFile(const std::string& filename) return line; } -using SafeUniqueModelPointer = std::unique_ptr>; - class JobManager { public: @@ -68,7 +66,7 @@ class JobManager // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. // This can be achieved by returning a unique pointer with an appropriate deleter. // This method may return a nullptr to deny a request. - virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) = 0; + virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; // To initialize the load balancer we first need a list of model names that are available on a server. // Typically, this can be achieved by simply running the model code and requesting the model names from the server. @@ -78,72 +76,148 @@ class JobManager virtual ~JobManager() = default; }; +void remove_trailing_newline(std::string& s) +{ + if (!s.empty() && s.back() == '\n') + { + s.pop_back(); + } +} + class Job { public: + Job() = default; + Job(Job &other) = delete; + Job(Job &&other) = delete; + Job &operator=(Job &other) = delete; + Job &operator=(Job &&other) = delete; virtual ~Job() = default; virtual std::string getJobId() const = 0; }; +class HyperQueueJob : public Job +{ +public: + explicit HyperQueueJob(const std::string& command) + { + std::string output = getCommandOutput(command); + remove_trailing_newline(output); + id = output; + } + + ~HyperQueueJob() override + { + std::system(("./hq job cancel " + id).c_str()); + } + + std::string getJobId() const override + { + return id; + } + +private: + std::string id; +}; -class FileBasedJob : public Job +class SlurmJob : public Job { public: - FileBasedJob(const std::string& command, std::function extract_job_id) + explicit SlurmJob(const std::string& command) { - std::string command_output = getCommandOutput(command); - id = extract_job_id(command_output); + std::string output = getCommandOutput(command); + id = output.substr(0, output.find(';')); } - ~FileBasedJob() + ~SlurmJob() override { + std::system(("scancel " + id).c_str()); + } + std::string getJobId() const override + { + return id; } -protected: + +private: std::string id; }; -template -void deleteFileBased(T* t, std::string file_to_delete, std::string cancel_command) +/* +std::string submit_hyperqueue_job(const std::string& command) { - delete t; - std::filesystem::remove(file_to_delete); - std::system(cancel_command.c_str()); + std::string id = getCommandOutput(command); + remove_trailing_newline(id); + return id; } +std::string submit_slurm_job(const std::string& command) +{ + std::string id = getCommandOutput(command); + return id.substr(0, id.find(';')); +} + +void cancel_hyperqueue_job(const std::string& id) +{ + std::system(("./hq job cancel " + id).c_str()); +} + +void cancel_slurm_job(const std::string& id) +{ + std::system(("scancel " + id).c_str()); +} + +template +class Job +{ +public: + Job(std::string submit_command, SubmitFunction submit, CancelFunction cancel) : id(submit(submit_command)), cancel(cancel) {} + Job(Job &other) = delete; + Job(Job &&other) = delete; + Job &operator=(Job &other) = delete; + Job &operator=(Job &&other) = delete; + ~Job() + { + cancel(id); + } + +private: + std::string id; + CancelFunction cancel; +}; + +class HyperQueueJob : public Job +{ + HyperQueueJob() +}; + + +using SlurmJob = Job; +using HyperQueueJob = Job; +*/ + // Basic idea: // 1. Run some command to request a resource allocation on the HPC cluster. // 2. Launch a model server in the resource allocation. // 3. Retrieve the URL of the model server. // 4. Connect to the model server using the URL. -class FileBasedJobManager : public JobManager +template +class CommandJobManager : public JobManager { public: - virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) override + std::unique_ptr requestModelAccess(const std::string& model_name) override { std::string job_id = submitJob(); std::string server_url = readURL(job_id); - SafeUniqueModelPointer client(new umbridge::HTTPModel(server_url, model_name), createModelDeleter(job_id)); - return client; + std::unique_ptr model = std::make_unique(model_name, server_url); + return model; } -protected: +private: virtual std::string getSubmissionCommand() = 0; - virtual std::string getCancelationCommand(const std::string& job_id) = 0; - std::function createModelDeleter(const std::string& job_id) - { - std::string file_to_delete = getURLFileName(job_id); - std::string cancelation_command = getCancelationCommand(job_id); - return [file_to_delete, cancelation_command](umbridge::Model* model) { - delete model; - std::filesystem::remove(file_to_delete); - std::system(cancelation_command.c_str()); - }; - } - - std::string getURLFileName(const std::string& job_id) + std::string getURLFileName(const std::string& job_id) const { std::filesystem::path url_file_name(url_file_prefix + job_id + url_file_suffix); return (url_dir / url_file_name).string(); @@ -158,19 +232,12 @@ class FileBasedJobManager : public JobManager { // Add optional delay to job submissions to prevent issues in some cases. if (submission_delay_ms > 0) { - std::lock_guard lock(submission_mutex); + std::lock_guard lock(submission_mutex); std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms)); } // Submit job and increase job count - std::string command_output = getCommandOutput(getSubmissionCommand()); + Job job(getSubmissionCommand()); // getSubmissionCommand may depend on job_count. Possible race condition! job_count++; - - // Extract the actual job id from the command output - return parseJobID(command_output); - } - - virtual std::string parseJobID(const std::string& unparsed_job_id) { - return unparsed_job_id; } std::string selectJobScript(const std::string& model_name, bool force_default_submission_script = false) @@ -201,7 +268,7 @@ class FileBasedJobManager : public JobManager std::filesystem::path submission_script_dir; std::filesystem::path submission_script_default; - // Model-specifc job-script format: + // Model-specific job-script format: std::string submission_script_model_specific_prefix; std::string submission_script_model_specific_suffix; @@ -216,7 +283,7 @@ class FileBasedJobManager : public JobManager std::atomic job_count = 0; }; -class HyperQueueJob : public umbridge::Model +class HyperQueueJob { public: static std::atomic job_count; @@ -382,7 +449,7 @@ class LoadBalancer : public umbridge::Model const std::vector> &inputs, const std::vector &sens, const std::vector &vec, - json config_json = json::parse("{}")) + json config_json = json::parse("{}")) override { auto model = job_manager->requestModelAccess(name); return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json);