Skip to content

Commit

Permalink
WIP: Deciding between templates and inheritance
Browse files Browse the repository at this point in the history
  • Loading branch information
Schlevidon committed Aug 25, 2024
1 parent a347ce4 commit 1d96b27
Showing 1 changed file with 112 additions and 45 deletions.
157 changes: 112 additions & 45 deletions hpc/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "../lib/umbridge.h"

// run and get the result of command
std::string getCommandOutput(const std::string command)
std::string getCommandOutput(const std::string& command)
{
FILE *pipe = popen(command.c_str(), "r"); // execute the command and return the output as stream
if (!pipe)
Expand Down Expand Up @@ -59,16 +59,14 @@ std::string readLineFromFile(const std::string& filename)
return line;
}

using SafeUniqueModelPointer = std::unique_ptr<umbridge::Model, std::function<void (umbridge::Model*)>>;

class JobManager
{
public:
// Grant exclusive ownership of a model (with a given name) to a caller.
// The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller.
// This can be achieved by returning a unique pointer with an appropriate deleter.
// This method may return a nullptr to deny a request.
virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) = 0;
virtual std::unique_ptr<umbridge::Model> requestModelAccess(const std::string& model_name) = 0;

// To initialize the load balancer we first need a list of model names that are available on a server.
// Typically, this can be achieved by simply running the model code and requesting the model names from the server.
Expand All @@ -78,72 +76,148 @@ class JobManager
virtual ~JobManager() = default;
};

void remove_trailing_newline(std::string& s)
{
if (!s.empty() && s.back() == '\n')
{
s.pop_back();
}
}

class Job
{
public:
Job() = default;
Job(Job &other) = delete;
Job(Job &&other) = delete;
Job &operator=(Job &other) = delete;
Job &operator=(Job &&other) = delete;
virtual ~Job() = default;

virtual std::string getJobId() const = 0;
};

class HyperQueueJob : public Job
{
public:
explicit HyperQueueJob(const std::string& command)
{
std::string output = getCommandOutput(command);
remove_trailing_newline(output);
id = output;
}

~HyperQueueJob() override
{
std::system(("./hq job cancel " + id).c_str());
}

std::string getJobId() const override
{
return id;
}

private:
std::string id;
};

class FileBasedJob : public Job
class SlurmJob : public Job
{
public:
FileBasedJob(const std::string& command, std::function<std::string (const std::string&)> extract_job_id)
explicit SlurmJob(const std::string& command)
{
std::string command_output = getCommandOutput(command);
id = extract_job_id(command_output);
std::string output = getCommandOutput(command);
id = output.substr(0, output.find(';'));
}

~FileBasedJob()
~SlurmJob() override
{
std::system(("scancel " + id).c_str());
}

std::string getJobId() const override
{
return id;
}
protected:

private:
std::string id;
};

template<typename T>
void deleteFileBased(T* t, std::string file_to_delete, std::string cancel_command)
/*
std::string submit_hyperqueue_job(const std::string& command)
{
delete t;
std::filesystem::remove(file_to_delete);
std::system(cancel_command.c_str());
std::string id = getCommandOutput(command);
remove_trailing_newline(id);
return id;
}
std::string submit_slurm_job(const std::string& command)
{
std::string id = getCommandOutput(command);
return id.substr(0, id.find(';'));
}
void cancel_hyperqueue_job(const std::string& id)
{
std::system(("./hq job cancel " + id).c_str());
}
void cancel_slurm_job(const std::string& id)
{
std::system(("scancel " + id).c_str());
}
template <typename SubmitFunction, typename CancelFunction>
class Job
{
public:
Job(std::string submit_command, SubmitFunction submit, CancelFunction cancel) : id(submit(submit_command)), cancel(cancel) {}
Job(Job &other) = delete;
Job(Job &&other) = delete;
Job &operator=(Job &other) = delete;
Job &operator=(Job &&other) = delete;
~Job()
{
cancel(id);
}
private:
std::string id;
CancelFunction cancel;
};
class HyperQueueJob : public Job
{
HyperQueueJob()
};
using SlurmJob = Job<decltype(&submit_slurm_job), decltype(&cancel_slurm_job)>;
using HyperQueueJob = Job<decltype(&submit_hyperqueue_job), decltype(&cancel_hyperqueue_job)>;
*/

// Basic idea:
// 1. Run some command to request a resource allocation on the HPC cluster.
// 2. Launch a model server in the resource allocation.
// 3. Retrieve the URL of the model server.
// 4. Connect to the model server using the URL.
class FileBasedJobManager : public JobManager
template <typename Job>
class CommandJobManager : public JobManager
{
public:
virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) override
std::unique_ptr<umbridge::Model> requestModelAccess(const std::string& model_name) override
{
std::string job_id = submitJob();
std::string server_url = readURL(job_id);

SafeUniqueModelPointer client(new umbridge::HTTPModel(server_url, model_name), createModelDeleter(job_id));
return client;
std::unique_ptr<umbridge::Model> model = std::make_unique<umbridge::HTTPModel>(model_name, server_url);
return model;
}
protected:
private:
virtual std::string getSubmissionCommand() = 0;
virtual std::string getCancelationCommand(const std::string& job_id) = 0;

std::function<void (umbridge::Model*)> createModelDeleter(const std::string& job_id)
{
std::string file_to_delete = getURLFileName(job_id);
std::string cancelation_command = getCancelationCommand(job_id);
return [file_to_delete, cancelation_command](umbridge::Model* model) {
delete model;
std::filesystem::remove(file_to_delete);
std::system(cancelation_command.c_str());
};
}

std::string getURLFileName(const std::string& job_id)
std::string getURLFileName(const std::string& job_id) const
{
std::filesystem::path url_file_name(url_file_prefix + job_id + url_file_suffix);
return (url_dir / url_file_name).string();
Expand All @@ -158,19 +232,12 @@ class FileBasedJobManager : public JobManager
{
// Add optional delay to job submissions to prevent issues in some cases.
if (submission_delay_ms > 0) {
std::lock_guard<std::mutex> lock(submission_mutex);
std::lock_guard lock(submission_mutex);
std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms));
}
// Submit job and increase job count
std::string command_output = getCommandOutput(getSubmissionCommand());
Job job(getSubmissionCommand()); // getSubmissionCommand may depend on job_count. Possible race condition!
job_count++;

// Extract the actual job id from the command output
return parseJobID(command_output);
}

virtual std::string parseJobID(const std::string& unparsed_job_id) {
return unparsed_job_id;
}

std::string selectJobScript(const std::string& model_name, bool force_default_submission_script = false)
Expand Down Expand Up @@ -201,7 +268,7 @@ class FileBasedJobManager : public JobManager

std::filesystem::path submission_script_dir;
std::filesystem::path submission_script_default;
// Model-specifc job-script format: <prefix><model_name><suffix>
// Model-specific job-script format: <prefix><model_name><suffix>
std::string submission_script_model_specific_prefix;
std::string submission_script_model_specific_suffix;

Expand All @@ -216,7 +283,7 @@ class FileBasedJobManager : public JobManager
std::atomic<int32_t> job_count = 0;
};

class HyperQueueJob : public umbridge::Model
class HyperQueueJob
{
public:
static std::atomic<int32_t> job_count;
Expand Down Expand Up @@ -382,7 +449,7 @@ class LoadBalancer : public umbridge::Model
const std::vector<std::vector<double>> &inputs,
const std::vector<double> &sens,
const std::vector<double> &vec,
json config_json = json::parse("{}"))
json config_json = json::parse("{}")) override
{
auto model = job_manager->requestModelAccess(name);
return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json);
Expand Down

0 comments on commit 1d96b27

Please sign in to comment.