Skip to content

Commit

Permalink
Merge pull request #49 from UM-Bridge/linus/hq-submit-delay
Browse files Browse the repository at this point in the history
Add optional delay before submitting jobs, increase HTTP timeout to a weak
  • Loading branch information
linusseelinger authored Feb 16, 2024
2 parents 35d3901 + f0311e2 commit dfd11e3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 51 deletions.
7 changes: 7 additions & 0 deletions hpc/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ int main(int argc, char *argv[])
port = atoi(port_cstr);
}

char const *delay_cstr = std::getenv("HQ_SUBMIT_DELAY_MS");
if (delay_cstr != NULL)
{
hq_submit_delay_ms = atoi(delay_cstr);
}
std::cout << "HQ_SUBMIT_DELAY_MS set to " << hq_submit_delay_ms << std::endl;

// Initialize load balancer for each available model on the model server.
const std::vector<std::string> model_names = get_model_names();

Expand Down
108 changes: 58 additions & 50 deletions hpc/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,56 +65,8 @@ std::string readUrl(const std::string &filename)
return url;
}

// state = ["WAITING", "RUNNING", "FINISHED", "CANCELED"]
bool waitForHQJobState(const std::string &job_id, const std::string &state = "COMPLETED")
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
// std::cout << "Checking runtime: " << command << std::endl;
std::string job_status;

do
{
job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "" || (state != "FINISHED" && job_status == "FINISHED") || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for job status failure, status : " << job_status << std::endl;
return false;
}
// std::cout<<"Job status: "<<job_status<<std::endl;
sleep(1);
} while (job_status != state);

return true;
}

std::string submitHQJob()
{
std::string hq_command = "hq submit --output-mode=quiet hq_scripts/job.sh";

std::string job_id = getCommandOutput(hq_command);

// Delete the line break
if (!job_id.empty())
job_id.pop_back();

std::cout << "Waiting for job " << job_id << " to start." << std::endl;

// Wait for the HQ Job to start
waitForHQJobState(job_id, "RUNNING");

// Also wait until job is running and url file is written
waitForFile("./urls/url-" + job_id + ".txt");

std::cout << "Job " << job_id << " started." << std::endl;

return job_id;
}
std::mutex job_submission_mutex;
int hq_submit_delay_ms = 0;

class HyperQueueJob
{
Expand Down Expand Up @@ -146,6 +98,62 @@ class HyperQueueJob
std::unique_ptr<umbridge::HTTPModel> client_ptr;

private:

std::string submitHQJob()
{
if (hq_submit_delay_ms) {
std::lock_guard<std::mutex> lock(job_submission_mutex);
std::this_thread::sleep_for(std::chrono::milliseconds(hq_submit_delay_ms));
}

std::string hq_command = "hq submit --output-mode=quiet hq_scripts/job.sh";
std::string job_id = getCommandOutput(hq_command);

// Delete the line break
if (!job_id.empty())
job_id.pop_back();

std::cout << "Waiting for job " << job_id << " to start." << std::endl;

// Wait for the HQ Job to start
waitForHQJobState(job_id, "RUNNING");

// Also wait until job is running and url file is written
waitForFile("./urls/url-" + job_id + ".txt");

std::cout << "Job " << job_id << " started." << std::endl;

return job_id;
}

// state = ["WAITING", "RUNNING", "FINISHED", "CANCELED"]
bool waitForHQJobState(const std::string &job_id, const std::string &state)
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
// std::cout << "Checking runtime: " << command << std::endl;
std::string job_status;

do
{
job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "" || (state != "FINISHED" && job_status == "FINISHED") || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for job status failure, status : " << job_status << std::endl;
return false;
}

sleep(1);
} while (job_status != state);

return true;
}

std::string job_id;
};

Expand Down
2 changes: 1 addition & 1 deletion hpc/lib/umbridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

// Increase timeout to allow for long-running models.
// This should be (to be on the safe side) significantly greater than the maximum time your model may take
#define CPPHTTPLIB_READ_TIMEOUT_SECOND 60 * 60
#define CPPHTTPLIB_READ_TIMEOUT_SECOND 7 * 24 * 60 * 60

#include <string>
#include <vector>
Expand Down

0 comments on commit dfd11e3

Please sign in to comment.