diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java index 495d0a9b1..f01724e17 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java @@ -24,9 +24,14 @@ import com.imageworks.spcue.grpc.host.LockState; import com.imageworks.spcue.util.CueUtil; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + public class DispatchHost extends Entity implements HostInterface, FacilityInterface, ResourceContainer { + private static final Logger logger = LogManager.getLogger(DispatchHost.class); + public String facilityId; public String allocationId; public LockState lockState; @@ -76,12 +81,53 @@ public String getFacilityId() { return facilityId; } + public boolean canHandleNegativeCoresRequest(int requestedCores) { + // Request is positive, no need to test further. + if (requestedCores > 0) { + logger.debug(getName() + " can handle the job with " + requestedCores + " cores."); + return true; + } + // All cores are available, validate the request. + if (cores == idleCores) { + logger.debug(getName() + " can handle the job with " + requestedCores + " cores."); + return true; + } + // Some or all cores are busy, avoid booking again. + logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores."); + return false; + } + + public int handleNegativeCoresRequirement(int requestedCores) { + // If we request a <=0 amount of cores, return positive core count. + // Request -2 on a 24 core machine will return 22. + + if (requestedCores > 0) { + // Do not process positive core requests. + logger.debug("Requested " + requestedCores + " cores."); + return requestedCores; + } + if (requestedCores <=0 && idleCores < cores) { + // If request is negative but cores are already used, return 0. + // We don't want to overbook the host. + logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs."); + return 0; + } + // Book all cores minus the request + int totalCores = idleCores + requestedCores; + logger.debug("Requested " + requestedCores + " cores <= 0, " + + idleCores + " cores are free, booking " + totalCores + " cores"); + return totalCores; + } + @Override public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) { - + minCores = handleNegativeCoresRequirement(minCores); if (idleCores < minCores) { return false; } + if (minCores <= 0) { + return false; + } else if (idleMemory < minMemory) { return false; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java b/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java index 3e073fa73..65ce05c7e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java +++ b/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java @@ -22,6 +22,9 @@ import com.imageworks.spcue.dispatcher.ResourceContainer; import com.imageworks.spcue.grpc.renderpartition.RenderPartitionType; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + /** * Contains information about local desktop cores a user has * assigned to the given job. @@ -33,6 +36,8 @@ public class LocalHostAssignment extends Entity implements ResourceContainer { + private static final Logger logger = LogManager.getLogger(LocalHostAssignment.class); + private int idleCoreUnits; private long idleMemory; private int idleGpuUnits; @@ -62,12 +67,37 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu this.maxGpuMemory = maxGpuMemory; } + public int handleNegativeCoresRequirement(int requestedCores) { + // If we request a <=0 amount of cores, return positive core count. + // Request -2 on a 24 core machine will return 22. + + if (requestedCores > 0) { + // Do not process positive core requests. + logger.debug("Requested " + requestedCores + " cores."); + return requestedCores; + } + if (requestedCores <=0 && idleCoreUnits < threads) { + // If request is negative but cores are already used, return 0. + // We don't want to overbook the host. + logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs."); + return 0; + } + // Book all cores minus the request + int totalCores = idleCoreUnits + requestedCores; + logger.debug("Requested " + requestedCores + " cores <= 0, " + + idleCoreUnits + " cores are free, booking " + totalCores + " cores"); + return totalCores; + } + @Override public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) { - + minCores = handleNegativeCoresRequirement(minCores); if (idleCoreUnits < minCores) { return false; } + if (minCores <= 0) { + return false; + } else if (idleMemory < minMemory) { return false; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java b/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java index f13fbaae2..83798f079 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java +++ b/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java @@ -54,12 +54,12 @@ public boolean isSkipped(String tags, long cores, long memory) { try { if (failed.containsKey(tags)) { long [] mark = failed.get(tags); - if (cores <= mark[0]) { - logger.info("skipped due to not enough cores " + cores + " <= " + mark[0]); + if (cores < mark[0]) { + logger.info("skipped due to not enough cores " + cores + " < " + mark[0]); return true; } - else if (memory <= mark[1]) { - logger.info("skipped due to not enough memory " + memory + " <= " + mark[1]); + else if (memory < mark[1]) { + logger.info("skipped due to not enough memory " + memory + " < " + mark[1]); return true; } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index ea0f5b98e..8205f3021 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -22,8 +22,13 @@ import com.imageworks.spcue.dispatcher.Dispatcher; import com.imageworks.spcue.grpc.host.ThreadMode; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + public class VirtualProc extends FrameEntity implements ProcInterface { + private static final Logger logger = LogManager.getLogger(VirtualProc.class); + public String hostId; public String allocationId; public String frameId; @@ -31,6 +36,7 @@ public class VirtualProc extends FrameEntity implements ProcInterface { public String os; public byte[] childProcesses; + public boolean canHandleNegativeCoresRequest; public int coresReserved; public long memoryReserved; public long memoryUsed; @@ -111,7 +117,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St proc.coresReserved = proc.coresReserved + host.strandedCores; } - if (proc.coresReserved >= 100) { + proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved); + + if (proc.coresReserved == 0) { + logger.debug("Reserving all cores"); + proc.coresReserved = host.cores; + } + else if (proc.coresReserved < 0) { + logger.debug("Reserving all cores minus " + proc.coresReserved); + proc.coresReserved = host.cores + proc.coresReserved; + } + else if (proc.coresReserved >= 100) { int originalCores = proc.coresReserved; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java index 9343c3aa0..c4b07edf9 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java @@ -59,7 +59,7 @@ public interface LayerDao { public List getLayerDetails(JobInterface job); /** - * Returns true if supplied layer is compelte. + * Returns true if supplied layer is complete. * * @param layer * @return boolean @@ -82,7 +82,7 @@ public interface LayerDao { void insertLayerDetail(LayerDetail l); /** - * gets a layer detail from an object that implments layer + * gets a layer detail from an object that implements layer * * @param layer * @return LayerDetail @@ -167,7 +167,7 @@ public interface LayerDao { void updateLayerTags(LayerInterface layer, Set tags); /** - * Insert a key/valye pair into the layer environment + * Insert a key/value pair into the layer environment * * @param layer * @param key @@ -292,7 +292,7 @@ public interface LayerDao { /** * Update all layers of the set type in the specified job - * with the new min cores requirement. + * with the new min gpu requirement. * * @param job * @param gpus @@ -304,9 +304,8 @@ public interface LayerDao { * Update a layer's max cores value, which limits how * much threading can go on. * - * @param job - * @param cores - * @param type + * @param layer + * @param threadable */ void updateThreadable(LayerInterface layer, boolean threadable); @@ -314,7 +313,7 @@ public interface LayerDao { * Update a layer's timeout value, which limits how * much the frame can run on a host. * - * @param job + * @param layer * @param timeout */ void updateTimeout(LayerInterface layer, int timeout); @@ -323,8 +322,8 @@ public interface LayerDao { * Update a layer's LLU timeout value, which limits how * much the frame can run on a host without updates in the log file. * - * @param job - * @param timeout + * @param layer + * @param timeout_llu */ void updateTimeoutLLU(LayerInterface layer, int timeout_llu); @@ -341,7 +340,7 @@ public interface LayerDao { /** * Appends a tag to the current set of tags. If the tag - * already exists than nothing happens. + * already exists then nothing happens. * * @param layer * @param val @@ -363,8 +362,9 @@ public interface LayerDao { * Update layer usage with processor time usage. * This happens when the proc has completed or failed some work. * - * @param proc + * @param layer * @param newState + * @param exitStatus */ void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus); @@ -387,6 +387,9 @@ public interface LayerDao { /** * Enable/disable memory optimizer. + * + * @param layer + * @param state */ void enableMemoryOptimizer(LayerInterface layer, boolean state); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java index f555bef6e..78753f578 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java @@ -51,8 +51,12 @@ import com.imageworks.spcue.util.CueUtil; import com.imageworks.spcue.util.SqlUtil; -public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao { +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao { + private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class); private static final String INSERT_OUTPUT_PATH = "INSERT INTO " + "layer_output " + @@ -77,7 +81,7 @@ public void insertLayerOutput(LayerInterface layer, String filespec) { "FROM " + "layer_output " + "WHERE " + - "pk_layer = ?" + + "pk_layer = ? " + "ORDER BY " + "ser_order"; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java index e55a76865..226d9466c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java @@ -264,10 +264,16 @@ public List dispatchHost(DispatchHost host, JobInterface job) { VirtualProc proc = VirtualProc.build(host, frame, selfishServices); - if (host.idleCores < frame.minCores || + if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) { + logger.debug("Cannot dispatch job, host is busy."); + break; + } + + if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) || host.idleMemory < frame.minMemory || host.idleGpus < frame.minGpus || host.idleGpuMemory < frame.minGpuMemory) { + logger.debug("Cannot dispatch, insufficient resources."); break; } @@ -283,6 +289,8 @@ public List dispatchHost(DispatchHost host, JobInterface job) { boolean success = new DispatchFrameTemplate(proc, job, frame, false) { public void wrapDispatchFrame() { + logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " + + proc.coresReserved + " coresReserved"); dispatch(frame, proc); dispatchSummary(proc, frame, "Booking"); return; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 46d56929f..b0a7ccd9c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -245,6 +245,7 @@ public void handleHostReport(HostReport report, boolean isBoot) { */ String msg = null; boolean hasLocalJob = bookingManager.hasLocalHostAssignment(host); + int coresToReserve = host.handleNegativeCoresRequirement(Dispatcher.CORE_POINTS_RESERVED_MIN); if (hasLocalJob) { List lcas = @@ -253,13 +254,13 @@ public void handleHostReport(HostReport report, boolean isBoot) { bookingManager.removeInactiveLocalHostAssignment(lca); } } - + if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) { msg = String.format( - "%s doens't have enough free space in the temporary directory (mcp), %dMB", + "%s doesn't have enough free space in the temporary directory (mcp), %dMB", host.name, (report.getHost().getFreeMcp()/1024)); } - else if (host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { + else if (coresToReserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { msg = String.format("%s doesn't have enough idle cores, %d needs %d", host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN); } @@ -268,7 +269,7 @@ else if (host.idleMemory < Dispatcher.MEM_RESERVED_MIN) { host.name, host.idleMemory, Dispatcher.MEM_RESERVED_MIN); } else if (report.getHost().getFreeMem() < CueUtil.MB512) { - msg = String.format("%s doens't have enough free system mem, %d needs %d", + msg = String.format("%s doesn't have enough free system mem, %d needs %d", host.name, report.getHost().getFreeMem(), Dispatcher.MEM_RESERVED_MIN); } else if(!host.hardwareState.equals(HardwareState.UP)) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java index 844f67635..2c9c14425 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java @@ -274,7 +274,7 @@ public JobDetail createJob(BuildableJob buildableJob) { } } - if (layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { + if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java index 7be581d1b..2e2fa0801 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java @@ -117,7 +117,7 @@ public class JobSpec { public JobSpec() { } - public static final String NAME_REGEX = "^([\\w\\.]{3,})$"; + public static final String NAME_REGEX = "^([\\w\\.-]{3,})$"; public static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX); @@ -607,12 +607,16 @@ private void determineMinimumCores(Element layerTag, LayerDetail layer) { int corePoints = layer.minimumCores; if (cores.contains(".")) { - corePoints = (int) (Double.valueOf(cores) * 100 + .5); + if (cores.contains("-")) { + corePoints = (int) (Double.valueOf(cores) * 100 - .5); + } else { + corePoints = (int) (Double.valueOf(cores) * 100 + .5); + } } else { corePoints = Integer.valueOf(cores); } - if (corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) { + if (corePoints > 0 && corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) { corePoints = Dispatcher.CORE_POINTS_RESERVED_DEFAULT; } else if (corePoints > Dispatcher.CORE_POINTS_RESERVED_MAX) { @@ -649,7 +653,7 @@ private void determineChunkSize(Element layerTag, LayerDetail layer) { */ private void determineThreadable(Element layerTag, LayerDetail layer) { // Must have at least 1 core to thread. - if (layer.minimumCores < 100) { + if (layer.minimumCores > 0 && layer.minimumCores < 100) { layer.isThreadable = false; } else if (layerTag.getChildTextTrim("threadable") != null) { diff --git a/cuegui/cuegui/FilterDialog.py b/cuegui/cuegui/FilterDialog.py index 90c72e263..ab4d4d25b 100644 --- a/cuegui/cuegui/FilterDialog.py +++ b/cuegui/cuegui/FilterDialog.py @@ -459,7 +459,7 @@ def createAction(self): "Create Action", "What value should this property be set to?", 0, - 0, + -8, # Minimum core value can be <=0, booking all cores minus this value. 50000, 2) value = float(value) diff --git a/cuegui/cuegui/LayerMonitorTree.py b/cuegui/cuegui/LayerMonitorTree.py index 6ddd6cf18..0f110f874 100644 --- a/cuegui/cuegui/LayerMonitorTree.py +++ b/cuegui/cuegui/LayerMonitorTree.py @@ -67,10 +67,12 @@ def __init__(self, parent): data=lambda layer: displayRange(layer), tip="The range of frames that the layer should render.") self.addColumn("Cores", 45, id=6, - data=lambda layer: "%.2f" % layer.data.min_cores, + data=lambda layer: self.labelCoresColumn(layer.data.min_cores), sort=lambda layer: layer.data.min_cores, tip="The number of cores that the frames in this layer\n" - "will reserve as a minimum.") + "will reserve as a minimum." + "Zero or negative value indicate that the layer will use\n" + "all available cores on the machine, minus this value.") self.addColumn("Memory", 60, id=7, data=lambda layer: cuegui.Utils.memoryToString(layer.data.min_memory), sort=lambda layer: layer.data.min_memory, @@ -181,6 +183,14 @@ def updateRequest(self): since last updated""" self.ticksWithoutUpdate = 9999 + def labelCoresColumn(self, reserved_cores): + """Returns the reserved cores for a job""" + if reserved_cores > 0: + return "%.2f" % reserved_cores + if reserved_cores == 0: + return "ALL" + return "ALL (%.2f)" % reserved_cores + # pylint: disable=inconsistent-return-statements def setJob(self, job): """Sets the current job. diff --git a/cuesubmit/cuesubmit/Submission.py b/cuesubmit/cuesubmit/Submission.py index dc5e64d0f..f064b4cd3 100644 --- a/cuesubmit/cuesubmit/Submission.py +++ b/cuesubmit/cuesubmit/Submission.py @@ -97,7 +97,7 @@ def buildLayer(layerData, command, lastLayer=None): @type lastLayer: outline.layer.Layer @param lastLayer: layer that this new layer should be dependent on if dependType is set. """ - threadable = float(layerData.cores) >= 2 + threadable = float(layerData.cores) >= 2 or float(layerData.cores) <= 0 layer = outline.modules.shell.Shell( layerData.name, command=command.split(), chunk=layerData.chunk, threads=float(layerData.cores), range=str(layerData.layerRange), threadable=threadable) diff --git a/cuesubmit/cuesubmit/Validators.py b/cuesubmit/cuesubmit/Validators.py index 540f92e21..0b0bfb6f8 100644 --- a/cuesubmit/cuesubmit/Validators.py +++ b/cuesubmit/cuesubmit/Validators.py @@ -53,7 +53,7 @@ def matchNoSpaces(value): def matchNumbersOnly(value): """Matches strings with numbers and '.' only.""" - if re.match(r'^[0-9.]+$', value): + if re.match(r'^-?[0-9.]+$', value): return True return False diff --git a/cuesubmit/tests/Validators_tests.py b/cuesubmit/tests/Validators_tests.py index 0a5ef78eb..cbbf0b9cd 100644 --- a/cuesubmit/tests/Validators_tests.py +++ b/cuesubmit/tests/Validators_tests.py @@ -77,6 +77,7 @@ def testMatchNoSpaces(self): def testMatchNumbersOnly(self): self.assertTrue(matchNumbersOnly('0123')) self.assertTrue(matchNumbersOnly('3.14')) + self.assertTrue(matchNumbersOnly('-3.14')) # bit weird, but that's how the function is written self.assertTrue(matchNumbersOnly('800.555.555'))