Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regular Polling #874

Merged
merged 10 commits into from
Sep 28, 2023
76 changes: 48 additions & 28 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,41 @@
/**
* This class distributes the tasks making up regional jobs to workers.
* <p>
* It should aim to draw tasks fairly from all organizations, and fairly from all jobs within each
* organization, while attempting to respect the transport network affinity of each worker, giving
* the worker tasks that require the same network it has been using recently.
* It respects the declared transport network affinity of each worker, giving the worker tasks that
* relate to the same network it has been using recently, and is therefore already loaded in memory.
* <p>
* Previously workers long-polled for work, holding lots of connections open. Now they short-poll
* and sleep for a while if there's no work. This is simpler and allows us to work withing much more
* standard HTTP frameworks.
* In our initial design, workers long-polled for work, holding lots of connections open. This was
* soon revised to short-poll and sleep for a while when there's no work. This was found to be
* simpler, allowing use of standard HTTP frameworks instead of custom networking code.
* <p>
* The fact that workers continuously re-poll for work every 10-30 seconds serves as a signal to the
* broker that they are still alive and waiting. This also allows the broker to maintain a catalog
* of active workers.
* Issue conveyal/r5#596 arose because we'd only poll when a worker was running low on tasks or
* idling. A worker could disappear for a long time, leaving the backend to assume it had shut down
* or crashed. Workers were revised to poll more frequently even when they were busy and didn't
* need any new tasks to work on, providing a signal to the broker that they are still alive and
* functioning. This allows the broker to maintain a more accurate catalog of active workers.
* <p>
* Because (at least currently) two organizations never share the same graph, we can get by with
* pulling tasks cyclically or randomly from all the jobs, and actively shape the number of workers
* with affinity for each graph by forcing some of them to accept tasks on graphs other than the one
* they have declared affinity for.
* Most methods on this class are synchronized because they can be called from many HTTP handler
* threads at once (when many workers are polling at once). We should occasionally evaluate whether
* synchronizing all methods to make this threadsafe is a performance issue. If so, fine-grained
* locking may be advantageous, but as a rule it is much harder to design, test, and maintain.
* <p>
* This could be thought of as "affinity homeostasis". We will constantly keep track of the ideal
* Workers were originally intended to migrate from one network to another to handle subsequent jobs
* without waiting for more cloud compute instances to start up. In practice we currently assign
* each worker a single network, but the balance of workers assigned to each network and the reuse
* of workers could in principle be made more sophisticated. The remainder of the comments below
* provide context for how this could be refined or improved.
*
* Because (at least currently) two organizations never share the same graph, we could get by with
* pulling tasks cyclically or randomly from all the jobs, and could actively shape the number of
* workers with affinity for each graph by forcing some of them to accept tasks on graphs other than
* the one they have declared affinity for. If the pool of workers was allowed to grow very large,
* we could aim to draw tasks fairly from all organizations, and fairly from all jobs within each
* organization.
* <p>
* We have described this approach as "affinity homeostasis": constantly keep track of the ideal
* proportion of workers by graph (based on active jobs), and the true proportion of consumers by
* graph (based on incoming polling), then we can decide when a worker's graph affinity should be
* ignored and what it should be forced to.
* <p>
* It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job
* is serviced, it is taken out of the queue and put at its end. Jobs that have not been serviced
* float to the top.
* <p>
* Most methods on this class are synchronized, because they can be called from many HTTP handler
* threads at once.
*
* TODO evaluate whether synchronizing all methods to make this threadsafe is a performance issue.
* ignored and what graph it should be forced to.
*/
public class Broker implements Component {

Expand All @@ -100,7 +105,15 @@ public interface Config {
private final ListMultimap<WorkerCategory, Job> jobs =
MultimapBuilder.hashKeys().arrayListValues().build();

/** The most tasks to deliver to a worker at a time. */
/**
* The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should
* never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*/
public final int MAX_TASKS_PER_WORKER = 16;

/**
Expand Down Expand Up @@ -317,9 +330,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT

/**
* Attempt to find some tasks that match what a worker is requesting.
* Always returns a list, which may be empty if there is nothing to deliver.
* Always returns a non-null List, which may be empty if there is nothing to deliver.
* Number of tasks in the list is strictly limited to maxTasksRequested.
*/
public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategory) {
public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategory, int maxTasksRequested) {
if (maxTasksRequested <= 0) {
return Collections.EMPTY_LIST;
}
Job job;
if (config.offline()) {
// Working in offline mode; get tasks from the first job that has any tasks to deliver.
Expand All @@ -335,7 +352,10 @@ public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategor
return Collections.EMPTY_LIST;
}
// Return up to N tasks that are waiting to be processed.
return job.generateSomeTasksToDeliver(MAX_TASKS_PER_WORKER);
if (maxTasksRequested > MAX_TASKS_PER_WORKER) {
maxTasksRequested = MAX_TASKS_PER_WORKER;
}
return job.generateSomeTasksToDeliver(maxTasksRequested);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public boolean isErrored () {
/**
* @param maxTasks the maximum number of tasks to return.
* @return some tasks that are not yet marked as completed and have not yet been delivered in
* this delivery pass.
* this delivery pass. May return an empty list, but never null.
*/
public List<RegionalTask> generateSomeTasksToDeliver (int maxTasks) {
List<RegionalTask> tasks = new ArrayList<>(maxTasks);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.conveyal.analysis.components.broker;

import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.WorkerStatus;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
Expand All @@ -21,7 +22,8 @@
*/
public class WorkerCatalog {

public static final int WORKER_RECORD_DURATION_MSEC = 2 * 60 * 1000;
/** If a worker says it will poll every s seconds or less, wait s plus this number before considering it gone. */
private static final int POLLING_TOLERANCE_SECONDS = 5;

/**
* The information supplied by workers the last time they polled for more tasks.
Expand Down Expand Up @@ -74,9 +76,9 @@ public synchronized void catalog (WorkerStatus workerStatus) {
*/
private synchronized void purgeDeadWorkers () {
long now = System.currentTimeMillis();
long oldestAcceptable = now - WORKER_RECORD_DURATION_MSEC;
List<WorkerObservation> ancientObservations = new ArrayList<>();
for (WorkerObservation observation : observationsByWorkerId.values()) {
long oldestAcceptable = now - ((observation.status.pollIntervalSeconds + POLLING_TOLERANCE_SECONDS) * 1000);
if (observation.lastSeen < oldestAcceptable) {
ancientObservations.add(observation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.gtfs.util.Util;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.RegionalTask;
Expand Down Expand Up @@ -45,13 +46,15 @@

import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.conveyal.r5.common.Util.human;
import static com.conveyal.r5.common.Util.notNullOrEmpty;
import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -289,7 +292,8 @@ private String jsonResponse (Response response, int statusCode, Object object) {
// We could call response.body(jsonMapper.writeValueAsBytes(object));
// but then the calling handler functions need to explicitly return null which is weird.
try {
return jsonMapper.writeValueAsString(object);
String body = jsonMapper.writeValueAsString(object);
return body;
} catch (JsonProcessingException e) {
throw AnalysisServerException.unknown(e);
}
Expand Down Expand Up @@ -362,7 +366,7 @@ private Object workerPoll (Request request, Response response) {
broker.recordWorkerObservation(workerStatus);
WorkerCategory workerCategory = workerStatus.getWorkerCategory();
// See if any appropriate tasks exist for this worker.
List<RegionalTask> tasks = broker.getSomeWork(workerCategory);
List<RegionalTask> tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested);
// If there is no work for the worker, signal this clearly with a "no content" code,
// so the worker can sleep a while before the next polling attempt.
if (tasks.isEmpty()) {
Expand Down
Loading
Loading