Skip to content

Commit

Permalink
feat: setNode concurrently pings multiple nodes
Browse files Browse the repository at this point in the history
`setNode` now pings 3 nodes concurrently, updating ones that respond and removing ones that don't. If there is room in the bucket afterwards then we add the new node.

#322
  • Loading branch information
tegefaulkes authored and emmacasolin committed Jun 14, 2022
1 parent 0e34a17 commit 9764213
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 38 deletions.
20 changes: 8 additions & 12 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,26 +260,22 @@ class NodeGraph {
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getOldestNode(
bucketIndex: number,
limit: number = 1,
tran?: DBTransaction,
): Promise<NodeId | undefined> {
): Promise<Array<NodeId>> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getOldestNode(bucketIndex, tran),
this.getOldestNode(bucketIndex, limit, tran),
);
}

const bucketKey = nodesUtils.bucketKey(bucketIndex);
// Remove the oldest entry in the bucket
let oldestNodeId: NodeId | undefined;
for await (const [key] of tran.iterator({ limit: 1 }, [
...this.nodeGraphLastUpdatedDbPath,
bucketKey,
])) {
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as unknown as Buffer,
));
const oldestNodeIds: Array<NodeId> = [];
for await (const [key] of tran.iterator({ limit }, [...this.nodeGraphLastUpdatedDbPath, bucketKey])) {
const { nodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as unknown as Buffer);
oldestNodeIds.push(nodeId);
}
return oldestNodeId;
return oldestNodeIds;
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
Expand Down
69 changes: 46 additions & 23 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ class NodeManager {
const targetHost = await networkUtils.resolveHost(targetAddress.host);
return await this.nodeConnectionManager.pingNode(
nodeId,
targetHost,
targetAddress.port,
{
host: targetHost,
port: targetAddress.port,
},
timer,
);
}
Expand Down Expand Up @@ -351,6 +353,7 @@ class NodeManager {
* @param force - Flag for if we want to add the node without authenticating or if the bucket is full.
* This will drop the oldest node in favor of the new.
* @param timer Connection timeout timer
* @param tran
*/
public async setNode(
nodeId: NodeId,
Expand Down Expand Up @@ -382,35 +385,55 @@ class NodeManager {
} else {
// We want to add a node but the bucket is full
// We need to ping the oldest node
const oldestNodeId = (await this.nodeGraph.getOldestNode(
const oldestNodeIds = (await this.nodeGraph.getOldestNode(
bucketIndex,
3,
tran,
))!;
if ((await this.pingNode(oldestNodeId, undefined, timer)) && !force) {
// The node responded, we need to update it's info and drop the new node
const oldestNode = (await this.nodeGraph.getNode(oldestNodeId, tran))!;
await this.nodeGraph.setNode(oldestNodeId, oldestNode.address, tran);
} else {
// The node could not be contacted or force was set,
// we drop it in favor of the new node
await this.nodeGraph.unsetNode(oldestNodeId, tran);
if (force) {
// We just add the new node anyway without checking the old one
const oldNodeId = oldestNodeIds[0];
await this.nodeGraph.unsetNode(oldNodeId, tran);
await this.nodeGraph.setNode(nodeId, nodeAddress, tran);
return;
}
// We want to concurrently ping the nodes
const pingPromises = oldestNodeIds.map((nodeId) => {
const doPing = async (): Promise<{
nodeId: NodeId;
success: boolean;
}> => {
// This needs to return nodeId and ping result
const data = await this.nodeGraph.getNode(nodeId, tran);
if (data == null) return { nodeId, success: false };
const result = await this.pingNode(nodeId, undefined, timer);
return { nodeId, success: result };
};
return doPing();
});
const pingResults = await Promise.all(pingPromises);
for (const { nodeId, success } of pingResults) {
if (success) {
// Ping succeeded, update the node
const node = (await this.nodeGraph.getNode(nodeId, tran))!;
await this.nodeGraph.setNode(nodeId, node.address, tran);
} else {
// Otherwise we remove the node
await this.nodeGraph.unsetNode(nodeId, tran);
}
}
// Check if we now have room and add the new node
const count = await this.nodeGraph.getBucketMetaProp(
bucketIndex,
'count',
tran,
);
if (count < this.nodeGraph.nodeBucketLimit) {
await this.nodeGraph.setNode(nodeId, nodeAddress, tran);
}
}
}

// FIXME
// /**
// * Updates the node in the NodeGraph
// */
// public async updateNode(
// nodeId: NodeId,
// nodeAddress?: NodeAddress,
// tran?: DBTransaction,
// ): Promise<void> {
// return await this.nodeGraph.updateNode(nodeId, nodeAddress, tran);
// }

/**
* Removes a node from the NodeGraph
*/
Expand Down
6 changes: 3 additions & 3 deletions tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ describe(`${NodeManager.name} test`, () => {
// Mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(true);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
const oldestNodeId = (await nodeGraph.getOldestNode(bucketIndex)).pop();
const oldestNode = await nodeGraph.getNode(oldestNodeId!);
// Waiting for a second to tick over
await sleep(1100);
Expand Down Expand Up @@ -550,7 +550,7 @@ describe(`${NodeManager.name} test`, () => {
// Mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(true);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
const oldestNodeId = (await nodeGraph.getOldestNode(bucketIndex)).pop();
// Adding a new node with bucket full
await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true);
// Bucket still contains max nodes
Expand Down Expand Up @@ -591,7 +591,7 @@ describe(`${NodeManager.name} test`, () => {
// Mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(false);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
const oldestNodeId = (await nodeGraph.getOldestNode(bucketIndex)).pop();
// Adding a new node with bucket full
await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true);
// Bucket still contains max nodes
Expand Down

0 comments on commit 9764213

Please sign in to comment.