Skip to content

Commit

Permalink
Merge branch 'dev' into network-config
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd authored Oct 3, 2024
2 parents e4614bb + 3437d67 commit 439936d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 87 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
withSourcesJar()
}

jar {
Expand Down Expand Up @@ -164,7 +165,7 @@ dependencies {
}

// Database driver.
implementation 'org.mongodb:mongo-java-driver:3.11.0'
implementation 'org.mongodb:mongodb-driver-legacy:5.2.0'

// Legacy system for storing Java objects, this functionality is now provided by the MongoDB driver itself.
implementation 'org.mongojack:mongojack:2.10.1'
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ protected TransportNetwork buildValue(Key key) {

// Get the set of points to which we are measuring travel time. Any smaller sub-grids created here will
// reference the scenarioNetwork's built-in full-extent pointset, so can reuse its linkage.
// TODO handle multiple destination grids.
// FIXME handle multiple destination grids.

if (key.destinationGridExtents == null) {
// Special (and ideally temporary) case for regional freeform destinations, where there is no grid to link.
// The null destinationGridExtents are created by the WebMercatorExtents#forPointsets else clause.
// FIXME there is no grid to link, but there are points and egress tables to make!
// see com.conveyal.r5.analyst.cluster.AnalysisWorkerTask.loadAndValidateDestinationPointSets
return scenarioNetwork;
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
* scenarios from the backend instead of from S3.
*
* TODO merge this with TransportNetworkCache#resolveScenario into a single multi-level mem/disk/s3 cache.
* Note that this cache is going to just grow indefinitely in size as a worker receives many iterations of the same
* scenario - that could be a memory leak. Again multi level caching could releive those worries.
* It's debatable whether we should be hanging on to scenarios passed with single point requests becuase they may never
* be used again.
* This cache grows in size without bound as a worker receives many iterations of the same scenario.
* This is technically a sort of memory leak for long-lived workers. Multi-level caching could relieve those worries.
* However, this cache stores only the Scenarios and Modifications, not any large egress tables or linkages.
*
* It's debatable whether we should be hanging on to scenarios passed with single point requests,
* because they may never be used again.
* Should we just always require a single point task to be sent to the cluster before a regional?
* That would not ensure the scenario was present on all workers though.
*
* Created by abyrd on 2018-10-29
*/
public class ScenarioCache {

Expand All @@ -44,7 +44,7 @@ public class ScenarioCache {
public synchronized void storeScenario (Scenario scenario) {
Scenario existingScenario = scenariosById.put(scenario.id, scenario);
if (existingScenario != null) {
LOG.debug("Scenario cache already contained a this scenario.");
LOG.debug("Scenario cache already contained this scenario.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class WorkerStatus {
public String workerVersion;
public String workerId;
public Set<String> networks = new HashSet<>();
public Set<String> scenarios = new HashSet<>();
public double secondsSinceLastPoll;
public Map<String, Integer> tasksPerMinuteByJobId;
@JsonUnwrapped(prefix = "ec2")
Expand Down Expand Up @@ -86,7 +85,6 @@ public WorkerStatus (AnalysisWorker worker) {
// networks = worker.networkPreloader.transportNetworkCache.getLoadedNetworkIds();
// For now we report a single network, even before it's loaded.
networks = Sets.newHashSet(worker.networkId);
scenarios = worker.networkPreloader.transportNetworkCache.getAppliedScenarios();
ec2 = worker.ec2info;

OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/conveyal/r5/streets/StreetRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ public int getTravelTimeToVertex (int vertexIndex) {
* fragments from the vertices at either end of the edge up to the destination split point.
* If no states can be produced return null.
*
* Note that this is only used by the point to point street router, not by LinkedPointSets (which have equivalent
* NOTE that this is ONLY USED BY the point to point street router, NOT BY LinkedPointSets (which have equivalent
* logic in their eval method). The PointSet implementation only needs to produce times, not States. But ideally
* some common logic can be factored out.
*/
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/com/conveyal/r5/transit/TransportNetwork.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,10 @@ public class TransportNetwork implements Serializable {
public TransitLayer transitLayer;

/**
* This stores any number of lightweight scenario networks built upon the current base network.
* FIXME that sounds like a memory leak, should be a WeighingCache or at least size-limited.
* A single network cache at the top level could store base networks and scenarios since they all have globally
* unique IDs. A hierarchical cache does have the advantage of evicting all the scenarios with the associated
* base network, which keeps the references in the scenarios from holding on to the base network. But considering
* that we have never started evicting networks (other than for a "cache" of one element) this might be getting
* ahead of ourselves.
* This field is no longer used. It has been moved to TransportNetworkCache, but this one remains for now, to
* avoid any inadvertent incompatibilities with serialized network files or serialization library settings.
*/
@Deprecated
public transient Map<String, TransportNetwork> scenarios = new HashMap<>();

/**
Expand Down
131 changes: 63 additions & 68 deletions src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

Expand All @@ -53,37 +50,70 @@ public class TransportNetworkCache implements Component {
private static final Logger LOG = LoggerFactory.getLogger(TransportNetworkCache.class);

/** Cache size is currently limited to one, i.e. the worker holds on to only one network at a time. */
private static final int DEFAULT_CACHE_SIZE = 1;
private static final int MAX_CACHED_NETWORKS = 1;

/**
* It might seem sufficient to hold only two scenarios (for single point scenario comparison). But in certain cases
* (e.g. the regional task queue is bigger than the size of each queued regional job) we might end up working on
* a mix of tasks from N different scenarios. Note also that scenarios hold references to their base networks, so
* caching multiple scenario networks can theoretically keep just as many TransportNetworks in memory.
* But in practice, in non-local (cloud) operation a given worker instance is locked to a single network for its
* entire lifespan.
*/
public static final int MAX_CACHED_SCENARIO_NETWORKS = 10;

// TODO change all other caches from Guava to Caffeine caches. This one is already a Caffeine cache.
private final LoadingCache<String, TransportNetwork> cache;
private final LoadingCache<String, TransportNetwork> networkCache;

private final FileStorage fileStorage;
private final GTFSCache gtfsCache;
private final OSMCache osmCache;

/**
* A table of already seen scenarios, avoiding downloading them repeatedly from S3 and allowing us to replace
* scenarios with only their IDs, and reverse that replacement later.
* scenarios with only their IDs, and reverse that replacement later. Note that this caches the Scenario objects
* themselves, not the TransportNetworks built from those Scenarios.
*/
private final ScenarioCache scenarioCache = new ScenarioCache();

/**
* This record type is used for the private, encapsulated cache of TransportNetworks for different scenarios.
* Scenario IDs are unique so we could look up these networks by scenario ID alone. However the cache values need
* to be derived entirely from the cache keys. We need some way to look up the base network so we include its ID.
*/
private record BaseAndScenarioId (String baseNetworkId, String scenarioId) { }

/**
* This stores a number of lightweight scenario networks built upon the current base network.
* Each scenario TransportNetwork has its own LinkageCache, containing LinkedPointSets that each have their own
* EgressCostTable. In practice this can exhaust memory, e.g. after using bicycle egress for about 50 scenarios.
* The previous hierarchical arrangement of caches has the advantage of evicting all the scenarios with the
* associated base network, which keeps the references in the scenarios from holding on to the base network.
* But considering that we have never started evicting networks (other than for a "cache" of one element) this
* eviction can be handled in other ways.
*/
private LoadingCache<BaseAndScenarioId, TransportNetwork> scenarioNetworkCache;

/** Create a transport network cache. If source bucket is null, will work offline. */
public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMCache osmCache) {
this.osmCache = osmCache;
this.gtfsCache = gtfsCache;
this.cache = createCache(DEFAULT_CACHE_SIZE);
this.networkCache = Caffeine.newBuilder()
.maximumSize(MAX_CACHED_NETWORKS)
.build(this::loadNetwork);
this.scenarioNetworkCache = Caffeine.newBuilder()
.maximumSize(MAX_CACHED_SCENARIO_NETWORKS)
.build(this::loadScenario);
this.fileStorage = fileStorage;
}

/**
* Find a transport network by ID, building or loading as needed from pre-existing OSM, GTFS, MapDB, or Kryo files.
* This should never return null. If a TransportNetwork can't be built or loaded, an exception will be thrown.
*/
public synchronized @Nonnull
TransportNetwork getNetwork (String networkId) throws TransportNetworkException {
public TransportNetwork getNetwork (String networkId) throws TransportNetworkException {
try {
return cache.get(networkId);
return networkCache.get(networkId);
} catch (Exception e) {
throw new TransportNetworkException("Could not load TransportNetwork into cache. ", e);
}
Expand All @@ -107,43 +137,35 @@ public void rememberScenario (Scenario scenario) {
* base graphs). Therefore we can look up cached scenario networks based solely on their scenarioId rather than a
* compound key of (networkId, scenarioId).
*
* The fact that scenario networks are cached means that PointSet linkages will be automatically reused.
* Reusing scenario networks automatically leads to reuse of the associated PointSet linkages and egress tables.
* TODO it seems to me that this method should just take a Scenario as its second parameter, and that resolving
* the scenario against caches on S3 or local disk should be pulled out into a separate function.
* The problem is that then you resolve the scenario every time, even when the ID is enough to look up the already
* built network. So we need to pass the whole task in here, so either the ID or full scenario are visible.
*
* Thread safety notes: This entire method is synchronized so access by multiple threads will be sequential.
* The first thread will have a chance to build and store the requested scenario before any others see it.
* This means each new scenario will be applied one after the other. This is probably OK as long as building egress
* tables is already parallelized.
* Thread safety: getNetwork and getNetworkForScenario are threadsafe caches, so access to the same key by multiple
* threads will occur sequentially without repeatedly or simultaneously performing the same loading actions.
* Javadoc on the Caffeine LoadingCache indicates that it will throw exceptions when the cache loader method throws
* them, without establishing a mapping in the cache. So exceptions occurring during scenario application are
* expected to bubble up unimpeded.
*/
public synchronized TransportNetwork getNetworkForScenario (String networkId, String scenarioId) {
// If the networkId is different than previous calls, a new network will be loaded. Its transient nested map
// of scenarios will be empty at first. This ensures it's initialized if null.
// FIXME apparently this can't happen - the field is transient and initialized in TransportNetwork.
TransportNetwork baseNetwork = this.getNetwork(networkId);
if (baseNetwork.scenarios == null) {
baseNetwork.scenarios = new HashMap<>();
}
public TransportNetwork getNetworkForScenario (String networkId, String scenarioId) {
TransportNetwork scenarioNetwork = scenarioNetworkCache.get(new BaseAndScenarioId(networkId, scenarioId));
return scenarioNetwork;
}

TransportNetwork scenarioNetwork = baseNetwork.scenarios.get(scenarioId);
if (scenarioNetwork == null) {
// The network for this scenario was not found in the cache. Create that scenario network and cache it.
LOG.debug("Applying scenario to base network...");
// Fetch the full scenario if an ID was specified.
Scenario scenario = resolveScenario(networkId, scenarioId);
// Apply any scenario modifications to the network before use, performing protective copies where necessary.
// We used to prepend a filter to the scenario, removing trips that are not running during the search time window.
// However, because we are caching transportNetworks with scenarios already applied to them, we can’t use
// the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always
// apply scenarios every time.
scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork);
LOG.debug("Done applying scenario. Caching the resulting network.");
baseNetwork.scenarios.put(scenario.id, scenarioNetwork);
} else {
LOG.debug("Reusing cached TransportNetwork for scenario {}.", scenarioId);
}
private TransportNetwork loadScenario (BaseAndScenarioId ids) {
TransportNetwork baseNetwork = this.getNetwork(ids.baseNetworkId());
LOG.debug("Scenario TransportNetwork not found. Applying scenario to base network and caching it.");
// Fetch the full scenario if an ID was specified.
Scenario scenario = resolveScenario(ids.baseNetworkId(), ids.scenarioId());
// Apply any scenario modifications to the network before use, performing protective copies where necessary.
// We used to prepend a filter to the scenario, removing trips that are not running during the search time window.
// However, because we are caching transportNetworks with scenarios already applied to them, we can’t use
// the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always
// apply scenarios every time.
TransportNetwork scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork);
LOG.debug("Done applying scenario. Caching the resulting network.");
return scenarioNetwork;
}

Expand Down Expand Up @@ -187,6 +209,7 @@ private TransportNetworkConfig loadNetworkConfig (String networkId) {
// The switch to use JSON manifests instead of zips occurred in 32a1aebe in July 2016.
// Over six years have passed, buildNetworkFromBundleZip is deprecated and could probably be removed.
LOG.warn("No network config (aka manifest) found. Assuming old-format network inputs bundle stored as a single ZIP file.");
// FIXME Bundle ZIP building to reduce duplicate code.
network = buildNetworkFromBundleZip(networkId);
} else {
network = buildNetworkFromConfig(networkConfig);
Expand Down Expand Up @@ -357,12 +380,6 @@ private String getNetworkConfigFilename (String networkId) {
return GTFSCache.cleanId(networkId) + ".json";
}

private LoadingCache createCache(int size) {
return Caffeine.newBuilder()
.maximumSize(size)
.build(this::loadNetwork);
}

/**
* CacheLoader method, which should only be called by the LoadingCache.
* Return the graph for the given unique identifier. Load pre-built serialized networks from local or remote
Expand Down Expand Up @@ -394,28 +411,6 @@ private LoadingCache createCache(int size) {
}
}

/**
* This will eventually be used in WorkerStatus to report to the backend all loaded networks, to give it hints about
* what kind of tasks the worker is ready to work on immediately. This is made more complicated by the fact that
* workers are started up with no networks loaded, but with the intent for them to work on a particular job. So
* currently the workers just report which network they were started up for, and this method is not used.
*
* In the future, workers should just report an empty set of loaded networks, and the back end should strategically
* send them tasks when they come on line to assign them to networks as needed. But this will require a new
* mechanism to fairly allocate the workers to jobs.
*/
public Set<String> getLoadedNetworkIds() {
return cache.asMap().keySet();
}

public Set<String> getAppliedScenarios() {
return cache.asMap().values().stream()
.filter(network -> network.scenarios != null)
.map(network -> network.scenarios.keySet())
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

/**
* Given a network and scenario ID, retrieve that scenario from the local disk cache (falling back on S3).
*/
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/conveyal/r5/transit/TripPattern.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TripPattern implements Serializable, Cloneable {
* ID in this transport network the ID would depend on the order of application of scenarios, and because this ID is
* used to map results back to the original network.
* TODO This concept of an "original" transport network may be obsolete, this field doesn't seem to be used anywhere.
* These are set to sequential integers: the index of the pattern in the TransitLayer's list of patterns.
*/
public int originalId;

Expand Down

0 comments on commit 439936d

Please sign in to comment.