Skip to content

Commit

Permalink
feat: generic SetNodeQueue class for queuing setNode operations
Browse files Browse the repository at this point in the history
`NodeManager.setNode` and `NodeConnectionManager.syncNodeGraph` now utilise a single, shared queue to asynchronously add nodes to the node graph without blocking the main loop. These methods are both blocking by default but can be made non-blocking by setting the `block` parameter to false.
#322
  • Loading branch information
emmacasolin committed Jun 14, 2022
1 parent b9717e1 commit 93f53c8
Show file tree
Hide file tree
Showing 30 changed files with 494 additions and 151 deletions.
29 changes: 25 additions & 4 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import * as errors from './errors';
import * as utils from './utils';
import * as keysUtils from './keys/utils';
import * as nodesUtils from './nodes/utils';
import SetNodeQueue from './nodes/SetNodeQueue';

type NetworkConfig = {
forwardHost?: Host;
Expand Down Expand Up @@ -87,6 +88,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
setNodeQueue,
nodeConnectionManager,
nodeManager,
discovery,
Expand Down Expand Up @@ -132,6 +134,7 @@ class PolykeyAgent {
gestaltGraph?: GestaltGraph;
proxy?: Proxy;
nodeGraph?: NodeGraph;
setNodeQueue?: SetNodeQueue;
nodeConnectionManager?: NodeConnectionManager;
nodeManager?: NodeManager;
discovery?: Discovery;
Expand Down Expand Up @@ -281,12 +284,18 @@ class PolykeyAgent {
keyManager,
logger: logger.getChild(NodeGraph.name),
}));
setNodeQueue =
setNodeQueue ??
new SetNodeQueue({
logger: logger.getChild(SetNodeQueue.name),
});
nodeConnectionManager =
nodeConnectionManager ??
new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
setNodeQueue,
seedNodes,
...nodeConnectionManagerConfig_,
logger: logger.getChild(NodeConnectionManager.name),
Expand All @@ -299,6 +308,7 @@ class PolykeyAgent {
keyManager,
nodeGraph,
nodeConnectionManager,
setNodeQueue,
logger: logger.getChild(NodeManager.name),
});
await nodeManager.start();
Expand Down Expand Up @@ -385,6 +395,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
setNodeQueue,
nodeConnectionManager,
nodeManager,
discovery,
Expand Down Expand Up @@ -417,6 +428,7 @@ class PolykeyAgent {
public readonly gestaltGraph: GestaltGraph;
public readonly proxy: Proxy;
public readonly nodeGraph: NodeGraph;
public readonly setNodeQueue: SetNodeQueue;
public readonly nodeConnectionManager: NodeConnectionManager;
public readonly nodeManager: NodeManager;
public readonly discovery: Discovery;
Expand All @@ -441,6 +453,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
setNodeQueue,
nodeConnectionManager,
nodeManager,
discovery,
Expand All @@ -464,6 +477,7 @@ class PolykeyAgent {
gestaltGraph: GestaltGraph;
proxy: Proxy;
nodeGraph: NodeGraph;
setNodeQueue: SetNodeQueue;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
discovery: Discovery;
Expand All @@ -489,6 +503,7 @@ class PolykeyAgent {
this.proxy = proxy;
this.discovery = discovery;
this.nodeGraph = nodeGraph;
this.setNodeQueue = setNodeQueue;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeManager = nodeManager;
this.vaultManager = vaultManager;
Expand Down Expand Up @@ -562,10 +577,14 @@ class PolykeyAgent {
);
// Reverse connection was established and authenticated,
// add it to the node graph
await this.nodeManager.setNode(data.remoteNodeId, {
host: data.remoteHost,
port: data.remotePort,
});
await this.nodeManager.setNode(
data.remoteNodeId,
{
host: data.remoteHost,
port: data.remotePort,
},
false,
);
}
},
);
Expand Down Expand Up @@ -647,6 +666,7 @@ class PolykeyAgent {
proxyPort: networkConfig_.proxyPort,
tlsConfig,
});
await this.setNodeQueue.start();
await this.nodeManager.start();
await this.nodeConnectionManager.start({ nodeManager: this.nodeManager });
await this.nodeGraph.start({ fresh });
Expand Down Expand Up @@ -704,6 +724,7 @@ class PolykeyAgent {
await this.nodeConnectionManager.stop();
await this.nodeGraph.stop();
await this.nodeManager.stop();
await this.setNodeQueue.stop();
await this.proxy.stop();
await this.grpcServerAgent.stop();
await this.grpcServerClient.stop();
Expand Down
4 changes: 4 additions & 0 deletions src/bootstrap/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import path from 'path';
import Logger from '@matrixai/logger';
import { DB } from '@matrixai/db';
import * as bootstrapErrors from './errors';
import SetNodeQueue from '../nodes/SetNodeQueue';
import { IdentitiesManager } from '../identities';
import { SessionManager } from '../sessions';
import { Status } from '../status';
Expand Down Expand Up @@ -141,10 +142,12 @@ async function bootstrapState({
keyManager,
logger: logger.getChild(NodeGraph.name),
});
const setNodeQueue = new SetNodeQueue({ logger });
const nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
setNodeQueue,
logger: logger.getChild(NodeConnectionManager.name),
});
const nodeManager = new NodeManager({
Expand All @@ -153,6 +156,7 @@ async function bootstrapState({
nodeGraph,
nodeConnectionManager,
sigchain,
setNodeQueue,
logger: logger.getChild(NodeManager.name),
});
const notificationsManager =
Expand Down
62 changes: 38 additions & 24 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type Proxy from '../network/Proxy';
import type { Host, Hostname, Port } from '../network/types';
import type { Timer } from '../types';
import type NodeGraph from './NodeGraph';
import type SetNodeQueue from './SetNodeQueue';
import type {
NodeAddress,
NodeData,
Expand Down Expand Up @@ -60,6 +61,7 @@ class NodeConnectionManager {
protected nodeGraph: NodeGraph;
protected keyManager: KeyManager;
protected proxy: Proxy;
protected setNodeQueue: SetNodeQueue;
// NodeManager has to be passed in during start to allow co-dependency
protected nodeManager: NodeManager | undefined;
protected seedNodes: SeedNodes;
Expand All @@ -80,6 +82,7 @@ class NodeConnectionManager {
keyManager,
nodeGraph,
proxy,
setNodeQueue,
seedNodes = {},
initialClosestNodes = 3,
connConnectTime = 20000,
Expand All @@ -89,6 +92,7 @@ class NodeConnectionManager {
nodeGraph: NodeGraph;
keyManager: KeyManager;
proxy: Proxy;
setNodeQueue: SetNodeQueue;
seedNodes?: SeedNodes;
initialClosestNodes?: number;
connConnectTime?: number;
Expand All @@ -99,6 +103,7 @@ class NodeConnectionManager {
this.keyManager = keyManager;
this.nodeGraph = nodeGraph;
this.proxy = proxy;
this.setNodeQueue = setNodeQueue;
this.seedNodes = seedNodes;
this.initialClosestNodes = initialClosestNodes;
this.connConnectTime = connConnectTime;
Expand Down Expand Up @@ -301,7 +306,7 @@ class NodeConnectionManager {
});
// We can assume connection was established and destination was valid,
// we can add the target to the nodeGraph
await this.nodeManager?.setNode(targetNodeId, targetAddress);
await this.nodeManager?.setNode(targetNodeId, targetAddress, false);
// Creating TTL timeout
const timeToLiveTimer = setTimeout(async () => {
await this.destroyConnection(targetNodeId);
Expand Down Expand Up @@ -574,18 +579,12 @@ class NodeConnectionManager {
/**
* Perform an initial database synchronisation: get k of the closest nodes
* from each seed node and add them to this database
* For now, we also attempt to establish a connection to each of them.
* If these nodes are offline, this will impose a performance penalty,
* so we should investigate performing this in the background if possible.
* Alternatively, we can also just add the nodes to our database without
* establishing connection.
* This has been removed from start() as there's a chicken-egg scenario
* where we require the NodeGraph instance to be created in order to get
* connections.
* @param timer Connection timeout timer
* Establish a proxy connection to each node before adding it
* By default this operation is blocking, set `block` to false to make it
* non-blocking
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async syncNodeGraph(timer?: Timer) {
public async syncNodeGraph(block: boolean = true, timer?: Timer) {
for (const seedNodeId of this.getSeedNodes()) {
// Check if the connection is viable
try {
Expand All @@ -594,28 +593,43 @@ class NodeConnectionManager {
if (e instanceof nodesErrors.ErrorNodeConnectionTimeout) continue;
throw e;
}

const nodes = await this.getRemoteNodeClosestNodes(
seedNodeId,
this.keyManager.getNodeId(),
timer,
);
for (const [nodeId, nodeData] of nodes) {
// FIXME: needs to ping the node right? we want to be non-blocking
try {
// FIXME: no tran needed
await this.nodeManager?.setNode(nodeId, nodeData.address);
} catch (e) {
if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e;
if (!block) {
this.setNodeQueue.queueSetNode(() =>
this.nodeManager!.setNode(nodeId, nodeData.address),
);
} else {
try {
// FIXME: no tran neededawait this.nodeManager?.setNode(nodeId, nodeData.address);
} catch (e) {
if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e;
}
}
}
// Refreshing every bucket above the closest node
const [closestNode] = (
await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1)
).pop()!;
const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode);
for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) {
this.nodeManager?.refreshBucketQueueAdd(i);
if (!block) {
this.setNodeQueue.queueSetNode(async () => {
const [closestNode] = (
await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1)
).pop()!;
const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode);
for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) {
this.nodeManager?.refreshBucketQueueAdd(i);
}
});
} else {
const [closestNode] = (
await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1)
).pop()!;
const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode);
for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) {
this.nodeManager?.refreshBucketQueueAdd(i);
}
}
}
}
Expand Down
Loading

0 comments on commit 93f53c8

Please sign in to comment.