Skip to content

Commit

Permalink
Cuebot reserve all cores (#1313)
Browse files Browse the repository at this point in the history
**Link the Issue(s) this Pull Request is related to.**
Fixes #1297

**Summarize your change.**
As in many render engines, we should be able to set a negative core
requirement.
  minCores=8 > reserve 8 cores
  minCores=0 > reserve all cores
  minCores=-2 > reserve all cores minus 2
  
This PR addresses this feature by handling negative core requests.
Cuebot will try to match this number against the number of cores on each
host.
The frame will be booked only if all cores are available in this
scenario.
If the host is busy (even slightly), the frame is **not** booked, to
avoid filling the remaining cores.

**Testing**
I would need some guidance to create proper tests for cuebot.

**Screenshot**

![negative_cores](https://github.com/AcademySoftwareFoundation/OpenCue/assets/5556461/d9c4400c-824a-40cc-9ba9-2f76a3fd8ceb)
Update:
There is now a "ALL" text for zero cores, or "ALL (-2)" for negative
cores reservation.

![core_reservation](https://github.com/user-attachments/assets/88802b15-3ccd-4cb5-90b7-58e532523ae6)
(cuesubmit feature in another PR #1284)

---------

Signed-off-by: Kern Attila GERMAIN <[email protected]>
  • Loading branch information
KernAttila authored Sep 25, 2024
1 parent 388255c commit da2fe01
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 36 deletions.
48 changes: 47 additions & 1 deletion cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import com.imageworks.spcue.grpc.host.LockState;
import com.imageworks.spcue.util.CueUtil;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class DispatchHost extends Entity
implements HostInterface, FacilityInterface, ResourceContainer {

private static final Logger logger = LogManager.getLogger(DispatchHost.class);

public String facilityId;
public String allocationId;
public LockState lockState;
Expand Down Expand Up @@ -76,12 +81,53 @@ public String getFacilityId() {
return facilityId;
}

public boolean canHandleNegativeCoresRequest(int requestedCores) {
// Request is positive, no need to test further.
if (requestedCores > 0) {
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
return true;
}
// All cores are available, validate the request.
if (cores == idleCores) {
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
return true;
}
// Some or all cores are busy, avoid booking again.
logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores.");
return false;
}

public int handleNegativeCoresRequirement(int requestedCores) {
// If we request a <=0 amount of cores, return positive core count.
// Request -2 on a 24 core machine will return 22.

if (requestedCores > 0) {
// Do not process positive core requests.
logger.debug("Requested " + requestedCores + " cores.");
return requestedCores;
}
if (requestedCores <=0 && idleCores < cores) {
// If request is negative but cores are already used, return 0.
// We don't want to overbook the host.
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
return 0;
}
// Book all cores minus the request
int totalCores = idleCores + requestedCores;
logger.debug("Requested " + requestedCores + " cores <= 0, " +
idleCores + " cores are free, booking " + totalCores + " cores");
return totalCores;
}

@Override
public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) {

minCores = handleNegativeCoresRequirement(minCores);
if (idleCores < minCores) {
return false;
}
if (minCores <= 0) {
return false;
}
else if (idleMemory < minMemory) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.imageworks.spcue.dispatcher.ResourceContainer;
import com.imageworks.spcue.grpc.renderpartition.RenderPartitionType;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

/**
* Contains information about local desktop cores a user has
* assigned to the given job.
Expand All @@ -33,6 +36,8 @@
public class LocalHostAssignment extends Entity
implements ResourceContainer {

private static final Logger logger = LogManager.getLogger(LocalHostAssignment.class);

private int idleCoreUnits;
private long idleMemory;
private int idleGpuUnits;
Expand Down Expand Up @@ -62,12 +67,37 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu
this.maxGpuMemory = maxGpuMemory;
}

public int handleNegativeCoresRequirement(int requestedCores) {
// If we request a <=0 amount of cores, return positive core count.
// Request -2 on a 24 core machine will return 22.

if (requestedCores > 0) {
// Do not process positive core requests.
logger.debug("Requested " + requestedCores + " cores.");
return requestedCores;
}
if (requestedCores <=0 && idleCoreUnits < threads) {
// If request is negative but cores are already used, return 0.
// We don't want to overbook the host.
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
return 0;
}
// Book all cores minus the request
int totalCores = idleCoreUnits + requestedCores;
logger.debug("Requested " + requestedCores + " cores <= 0, " +
idleCoreUnits + " cores are free, booking " + totalCores + " cores");
return totalCores;
}

@Override
public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) {

minCores = handleNegativeCoresRequirement(minCores);
if (idleCoreUnits < minCores) {
return false;
}
if (minCores <= 0) {
return false;
}
else if (idleMemory < minMemory) {
return false;
}
Expand Down
8 changes: 4 additions & 4 deletions cuebot/src/main/java/com/imageworks/spcue/SortableShow.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public boolean isSkipped(String tags, long cores, long memory) {
try {
if (failed.containsKey(tags)) {
long [] mark = failed.get(tags);
if (cores <= mark[0]) {
logger.info("skipped due to not enough cores " + cores + " <= " + mark[0]);
if (cores < mark[0]) {
logger.info("skipped due to not enough cores " + cores + " < " + mark[0]);
return true;
}
else if (memory <= mark[1]) {
logger.info("skipped due to not enough memory " + memory + " <= " + mark[1]);
else if (memory < mark[1]) {
logger.info("skipped due to not enough memory " + memory + " < " + mark[1]);
return true;
}
}
Expand Down
18 changes: 17 additions & 1 deletion cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.grpc.host.ThreadMode;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class VirtualProc extends FrameEntity implements ProcInterface {

private static final Logger logger = LogManager.getLogger(VirtualProc.class);

public String hostId;
public String allocationId;
public String frameId;
public String hostName;
public String os;
public byte[] childProcesses;

public boolean canHandleNegativeCoresRequest;
public int coresReserved;
public long memoryReserved;
public long memoryUsed;
Expand Down Expand Up @@ -111,7 +117,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St
proc.coresReserved = proc.coresReserved + host.strandedCores;
}

if (proc.coresReserved >= 100) {
proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved);

if (proc.coresReserved == 0) {
logger.debug("Reserving all cores");
proc.coresReserved = host.cores;
}
else if (proc.coresReserved < 0) {
logger.debug("Reserving all cores minus " + proc.coresReserved);
proc.coresReserved = host.cores + proc.coresReserved;
}
else if (proc.coresReserved >= 100) {

int originalCores = proc.coresReserved;

Expand Down
27 changes: 15 additions & 12 deletions cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface LayerDao {
public List<LayerDetail> getLayerDetails(JobInterface job);

/**
* Returns true if supplied layer is compelte.
* Returns true if supplied layer is complete.
*
* @param layer
* @return boolean
Expand All @@ -82,7 +82,7 @@ public interface LayerDao {
void insertLayerDetail(LayerDetail l);

/**
* gets a layer detail from an object that implments layer
* gets a layer detail from an object that implements layer
*
* @param layer
* @return LayerDetail
Expand Down Expand Up @@ -167,7 +167,7 @@ public interface LayerDao {
void updateLayerTags(LayerInterface layer, Set<String> tags);

/**
* Insert a key/valye pair into the layer environment
* Insert a key/value pair into the layer environment
*
* @param layer
* @param key
Expand Down Expand Up @@ -292,7 +292,7 @@ public interface LayerDao {

/**
* Update all layers of the set type in the specified job
* with the new min cores requirement.
* with the new min gpu requirement.
*
* @param job
* @param gpus
Expand All @@ -304,17 +304,16 @@ public interface LayerDao {
* Update a layer's max cores value, which limits how
* much threading can go on.
*
* @param job
* @param cores
* @param type
* @param layer
* @param threadable
*/
void updateThreadable(LayerInterface layer, boolean threadable);

/**
* Update a layer's timeout value, which limits how
* much the frame can run on a host.
*
* @param job
* @param layer
* @param timeout
*/
void updateTimeout(LayerInterface layer, int timeout);
Expand All @@ -323,8 +322,8 @@ public interface LayerDao {
* Update a layer's LLU timeout value, which limits how
* much the frame can run on a host without updates in the log file.
*
* @param job
* @param timeout
* @param layer
* @param timeout_llu
*/
void updateTimeoutLLU(LayerInterface layer, int timeout_llu);

Expand All @@ -341,7 +340,7 @@ public interface LayerDao {

/**
* Appends a tag to the current set of tags. If the tag
* already exists than nothing happens.
* already exists then nothing happens.
*
* @param layer
* @param val
Expand All @@ -363,8 +362,9 @@ public interface LayerDao {
* Update layer usage with processor time usage.
* This happens when the proc has completed or failed some work.
*
* @param proc
* @param layer
* @param newState
* @param exitStatus
*/
void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus);

Expand All @@ -387,6 +387,9 @@ public interface LayerDao {

/**
* Enable/disable memory optimizer.
*
* @param layer
* @param state
*/
void enableMemoryOptimizer(LayerInterface layer, boolean state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@
import com.imageworks.spcue.util.CueUtil;
import com.imageworks.spcue.util.SqlUtil;

public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class);
private static final String INSERT_OUTPUT_PATH =
"INSERT INTO " +
"layer_output " +
Expand All @@ -77,7 +81,7 @@ public void insertLayerOutput(LayerInterface layer, String filespec) {
"FROM " +
"layer_output " +
"WHERE " +
"pk_layer = ?" +
"pk_layer = ? " +
"ORDER BY " +
"ser_order";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,16 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {

VirtualProc proc = VirtualProc.build(host, frame, selfishServices);

if (host.idleCores < frame.minCores ||
if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) {
logger.debug("Cannot dispatch job, host is busy.");
break;
}

if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) ||
host.idleMemory < frame.minMemory ||
host.idleGpus < frame.minGpus ||
host.idleGpuMemory < frame.minGpuMemory) {
logger.debug("Cannot dispatch, insufficient resources.");
break;
}

Expand All @@ -283,6 +289,8 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {

boolean success = new DispatchFrameTemplate(proc, job, frame, false) {
public void wrapDispatchFrame() {
logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " +
proc.coresReserved + " coresReserved");
dispatch(frame, proc);
dispatchSummary(proc, frame, "Booking");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public void handleHostReport(HostReport report, boolean isBoot) {
*/
String msg = null;
boolean hasLocalJob = bookingManager.hasLocalHostAssignment(host);
int coresToReserve = host.handleNegativeCoresRequirement(Dispatcher.CORE_POINTS_RESERVED_MIN);

if (hasLocalJob) {
List<LocalHostAssignment> lcas =
Expand All @@ -253,13 +254,13 @@ public void handleHostReport(HostReport report, boolean isBoot) {
bookingManager.removeInactiveLocalHostAssignment(lca);
}
}

if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) {
msg = String.format(
"%s doens't have enough free space in the temporary directory (mcp), %dMB",
"%s doesn't have enough free space in the temporary directory (mcp), %dMB",
host.name, (report.getHost().getFreeMcp()/1024));
}
else if (host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
else if (coresToReserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
msg = String.format("%s doesn't have enough idle cores, %d needs %d",
host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN);
}
Expand All @@ -268,7 +269,7 @@ else if (host.idleMemory < Dispatcher.MEM_RESERVED_MIN) {
host.name, host.idleMemory, Dispatcher.MEM_RESERVED_MIN);
}
else if (report.getHost().getFreeMem() < CueUtil.MB512) {
msg = String.format("%s doens't have enough free system mem, %d needs %d",
msg = String.format("%s doesn't have enough free system mem, %d needs %d",
host.name, report.getHost().getFreeMem(), Dispatcher.MEM_RESERVED_MIN);
}
else if(!host.hardwareState.equals(HardwareState.UP)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public JobDetail createJob(BuildableJob buildableJob) {
}
}

if (layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN;
}

Expand Down
Loading

0 comments on commit da2fe01

Please sign in to comment.