Skip to content

Commit

Permalink
fix: NodeConnectionManager.syncNodeGraph now pings nodes
Browse files Browse the repository at this point in the history
Need to ensure validity of nodes by pinging them before adding them to the node graph.

#322
  • Loading branch information
tegefaulkes authored and emmacasolin committed Jun 14, 2022
1 parent dac96be commit a522de3
Show file tree
Hide file tree
Showing 38 changed files with 639 additions and 84 deletions.
12 changes: 8 additions & 4 deletions src/bin/nodes/CommandAdd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class CommandAdd extends CommandPolykey {
this.addOption(binOptions.nodeId);
this.addOption(binOptions.clientHost);
this.addOption(binOptions.clientPort);
this.addOption(binOptions.forceNodeAdd);
this.addOption(binOptions.noPing);
this.action(async (nodeId: NodeId, host: Host, port: Port, options) => {
const { default: PolykeyClient } = await import('../../PolykeyClient');
const nodesUtils = await import('../../nodes/utils');
Expand Down Expand Up @@ -46,13 +48,15 @@ class CommandAdd extends CommandPolykey {
port: clientOptions.clientPort,
logger: this.logger.getChild(PolykeyClient.name),
});
const nodeAddressMessage = new nodesPB.NodeAddress();
nodeAddressMessage.setNodeId(nodesUtils.encodeNodeId(nodeId));
nodeAddressMessage.setAddress(
const nodeAddMessage = new nodesPB.NodeAdd();
nodeAddMessage.setNodeId(nodesUtils.encodeNodeId(nodeId));
nodeAddMessage.setAddress(
new nodesPB.Address().setHost(host).setPort(port),
);
nodeAddMessage.setForce(options.force);
nodeAddMessage.setPing(options.ping);
await binUtils.retryAuthentication(
(auth) => pkClient.grpcClient.nodesAdd(nodeAddressMessage, auth),
(auth) => pkClient.grpcClient.nodesAdd(nodeAddMessage, auth),
meta,
);
} finally {
Expand Down
11 changes: 11 additions & 0 deletions src/bin/utils/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ const pullVault = new commander.Option(
'Name or Id of the vault to pull from',
);

const forceNodeAdd = new commander.Option(
'--force',
'Force adding node to nodeGraph',
).default(false);

const noPing = new commander.Option('--no-ping', 'Skip ping step').default(
true,
);

export {
nodePath,
format,
Expand All @@ -176,4 +185,6 @@ export {
network,
workers,
pullVault,
forceNodeAdd,
noPing,
};
22 changes: 17 additions & 5 deletions src/client/service/nodesAdd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { NodeId, NodeAddress } from '../../nodes/types';
import type { Host, Hostname, Port } from '../../network/types';
import type * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
import type Logger from '@matrixai/logger';
import * as nodeErrors from '../../nodes/errors';
import * as grpcUtils from '../../grpc/utils';
import { validateSync } from '../../validation';
import * as validationUtils from '../../validation/utils';
Expand All @@ -30,12 +31,13 @@ function nodesAdd({
logger: Logger;
}) {
return async (
call: grpc.ServerUnaryCall<nodesPB.NodeAddress, utilsPB.EmptyMessage>,
call: grpc.ServerUnaryCall<nodesPB.NodeAdd, utilsPB.EmptyMessage>,
callback: grpc.sendUnaryData<utilsPB.EmptyMessage>,
): Promise<void> => {
try {
const response = new utilsPB.EmptyMessage();
const metadata = await authenticate(call.metadata);
const request = call.request;
call.sendMetadata(metadata);
const {
nodeId,
Expand All @@ -55,11 +57,21 @@ function nodesAdd({
);
},
{
nodeId: call.request.getNodeId(),
host: call.request.getAddress()?.getHost(),
port: call.request.getAddress()?.getPort(),
nodeId: request.getNodeId(),
host: request.getAddress()?.getHost(),
port: request.getAddress()?.getPort(),
},
);
// Pinging to authenticate the node
if (
request.getPing() &&
!(await nodeManager.pingNode(nodeId, { host, port }))
) {
throw new nodeErrors.ErrorNodePingFailed(
'Failed to authenticate target node',
);
}

await db.withTransactionF(async (tran) =>
nodeManager.setNode(
nodeId,
Expand All @@ -68,7 +80,7 @@ function nodesAdd({
port,
} as NodeAddress,
true,
true,
request.getForce(),
undefined,
tran,
),
Expand Down
64 changes: 45 additions & 19 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ class NodeConnectionManager {
this.nodeManager = nodeManager;
for (const nodeIdEncoded in this.seedNodes) {
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded)!;
await this.nodeGraph.setNode(nodeId, this.seedNodes[nodeIdEncoded]);
await this.nodeManager.setNode(
nodeId,
this.seedNodes[nodeIdEncoded],
true,
true,
);
}
this.logger.info(`Started ${this.constructor.name}`);
}
Expand Down Expand Up @@ -431,10 +436,14 @@ class NodeConnectionManager {
timer?: Timer,
options: { signal?: AbortSignal } = {},
): Promise<NodeAddress | undefined> {
const localNodeId = this.keyManager.getNodeId();
const { signal } = { ...options };
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
// FIXME? this is an array. Shouldn't it be a set?
// It's possible for this to grow faster than we can consume it,
// doubly so if we allow duplicates
const shortlist = await this.nodeGraph.getClosestNodes(
targetNodeId,
this.initialClosestNodes,
Expand Down Expand Up @@ -466,13 +475,15 @@ class NodeConnectionManager {
}
// Connect to the node (check if pre-existing connection exists, otherwise
// create a new one)
try {
// Add the node to the database so that we can find its address in
// call to getConnectionToNode
await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address);
await this.getConnection(nextNodeId, timer);
} catch (e) {
// If we can't connect to the node, then skip it
if (
await this.pingNode(
nextNodeId,
nextNodeAddress.address.host,
nextNodeAddress.address.port,
)
) {
await this.nodeManager!.setNode(nextNodeId, nextNodeAddress.address);
} else {
continue;
}
contacted[nextNodeId] = true;
Expand All @@ -486,12 +497,19 @@ class NodeConnectionManager {
// them to the shortlist
for (const [nodeId, nodeData] of foundClosest) {
if (signal?.aborted) throw new nodesErrors.ErrorNodeAborted();
// Ignore a`ny nodes that have been contacted
if (contacted[nodeId]) {
// Ignore any nodes that have been contacted or our own node
if (contacted[nodeId] || localNodeId.equals(nodeId)) {
continue;
}
if (nodeId.equals(targetNodeId)) {
await this.nodeGraph.setNode(nodeId, nodeData.address);
if (
nodeId.equals(targetNodeId) &&
(await this.pingNode(
nodeId,
nodeData.address.host,
nodeData.address.port,
))
) {
await this.nodeManager!.setNode(nodeId, nodeData.address);
foundAddress = nodeData.address;
// We have found the target node, so we can stop trying to look for it
// in the shortlist
Expand Down Expand Up @@ -555,7 +573,9 @@ class NodeConnectionManager {
host: address.getHost() as Host | Hostname,
port: address.getPort() as Port,
},
lastUpdated: 0, // FIXME?
// Not really needed
// But if it's needed then we need to add the information to the proto definition
lastUpdated: 0,
},
]);
}
Expand Down Expand Up @@ -589,15 +609,20 @@ class NodeConnectionManager {
this.keyManager.getNodeId(),
timer,
);
// FIXME: we need to ping a node before setting it
for (const [nodeId, nodeData] of nodes) {
const pingAndAddNode = async () => {
const port = nodeData.address.port;
const host = await networkUtils.resolveHost(nodeData.address.host);
if (await this.pingNode(nodeId, host, port)) {
await this.nodeManager!.setNode(nodeId, nodeData.address, true);
}
};

if (!block) {
this.queue.push(() =>
this.nodeManager!.setNode(nodeId, nodeData.address),
);
this.queue.push(pingAndAddNode);
} else {
try {
await this.nodeManager?.setNode(nodeId, nodeData.address);
await pingAndAddNode();
} catch (e) {
if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e;
}
Expand Down Expand Up @@ -703,10 +728,11 @@ class NodeConnectionManager {
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async pingNode(
nodeId: NodeId,
host: Host,
host: Host | Hostname,
port: Port,
timer?: Timer,
): Promise<boolean> {
host = await networkUtils.resolveHost(host);
// If we can create a connection then we have punched though the NAT,
// authenticated and confirmed the nodeId matches
const proxyAddress = networkUtils.buildAddress(
Expand Down
6 changes: 6 additions & 0 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ class NodeManager {
timeout?: number,
tran?: DBTransaction,
): Promise<void> {
// We don't want to add our own node
if (nodeId.equals(this.keyManager.getNodeId())) {
this.logger.debug('Is own NodeId, skipping');
return;
}

if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.setNode(nodeId, nodeAddress, block, force, timeout, tran),
Expand Down
6 changes: 6 additions & 0 deletions src/nodes/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class ErrorNodeConnectionHostWildcard<T> extends ErrorNodes<T> {
static description = 'An IP wildcard was provided for the target host';
exitCode = sysexits.USAGE;
}
class ErrorNodePingFailed<T> extends ErrorNodes<T> {
static description =
'Failed to ping the node when attempting to authenticate';
exitCode = sysexits.NOHOST;
}

export {
ErrorNodes,
Expand All @@ -106,4 +111,5 @@ export {
ErrorNodeConnectionPublicKeyNotFound,
ErrorNodeConnectionManagerNotRunning,
ErrorNodeConnectionHostWildcard,
ErrorNodePingFailed,
};
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/any_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/descriptor_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/duration_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/empty_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/field_mask_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/struct_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/timestamp_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/google/protobuf/wrappers_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/polykey/v1/agent/agent_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
1 change: 1 addition & 0 deletions src/proto/js/polykey/v1/agent_service_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* @fileoverview
* @enhanceable
* @suppress {missingRequire} reports error on implicit type usages.
* @suppress {messageConventions} JS Compiler reports an error if a variable or
* field starts with 'MSG_' and isn't a translatable message.
* @public
Expand Down
20 changes: 10 additions & 10 deletions src/proto/js/polykey/v1/client_service_grpc_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ interface IClientServiceService_IAgentUnlock extends grpc.MethodDefinition<polyk
responseSerialize: grpc.serialize<polykey_v1_utils_utils_pb.EmptyMessage>;
responseDeserialize: grpc.deserialize<polykey_v1_utils_utils_pb.EmptyMessage>;
}
interface IClientServiceService_INodesAdd extends grpc.MethodDefinition<polykey_v1_nodes_nodes_pb.NodeAddress, polykey_v1_utils_utils_pb.EmptyMessage> {
interface IClientServiceService_INodesAdd extends grpc.MethodDefinition<polykey_v1_nodes_nodes_pb.NodeAdd, polykey_v1_utils_utils_pb.EmptyMessage> {
path: "/polykey.v1.ClientService/NodesAdd";
requestStream: false;
responseStream: false;
requestSerialize: grpc.serialize<polykey_v1_nodes_nodes_pb.NodeAddress>;
requestDeserialize: grpc.deserialize<polykey_v1_nodes_nodes_pb.NodeAddress>;
requestSerialize: grpc.serialize<polykey_v1_nodes_nodes_pb.NodeAdd>;
requestDeserialize: grpc.deserialize<polykey_v1_nodes_nodes_pb.NodeAdd>;
responseSerialize: grpc.serialize<polykey_v1_utils_utils_pb.EmptyMessage>;
responseDeserialize: grpc.deserialize<polykey_v1_utils_utils_pb.EmptyMessage>;
}
Expand Down Expand Up @@ -679,7 +679,7 @@ export interface IClientServiceServer extends grpc.UntypedServiceImplementation
agentStatus: grpc.handleUnaryCall<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_agent_agent_pb.InfoMessage>;
agentStop: grpc.handleUnaryCall<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_utils_utils_pb.EmptyMessage>;
agentUnlock: grpc.handleUnaryCall<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_utils_utils_pb.EmptyMessage>;
nodesAdd: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.NodeAddress, polykey_v1_utils_utils_pb.EmptyMessage>;
nodesAdd: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.NodeAdd, polykey_v1_utils_utils_pb.EmptyMessage>;
nodesPing: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.Node, polykey_v1_utils_utils_pb.StatusMessage>;
nodesClaim: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.Claim, polykey_v1_utils_utils_pb.StatusMessage>;
nodesFind: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.Node, polykey_v1_nodes_nodes_pb.NodeAddress>;
Expand Down Expand Up @@ -755,9 +755,9 @@ export interface IClientServiceClient {
agentUnlock(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
agentUnlock(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
agentUnlock(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAddress, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAddress, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAddress, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAdd, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAdd, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAdd, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
nodesPing(request: polykey_v1_nodes_nodes_pb.Node, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.StatusMessage) => void): grpc.ClientUnaryCall;
nodesPing(request: polykey_v1_nodes_nodes_pb.Node, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.StatusMessage) => void): grpc.ClientUnaryCall;
nodesPing(request: polykey_v1_nodes_nodes_pb.Node, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.StatusMessage) => void): grpc.ClientUnaryCall;
Expand Down Expand Up @@ -943,9 +943,9 @@ export class ClientServiceClient extends grpc.Client implements IClientServiceCl
public agentUnlock(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public agentUnlock(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public agentUnlock(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAddress, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAddress, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAddress, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAdd, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAdd, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesAdd(request: polykey_v1_nodes_nodes_pb.NodeAdd, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.EmptyMessage) => void): grpc.ClientUnaryCall;
public nodesPing(request: polykey_v1_nodes_nodes_pb.Node, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.StatusMessage) => void): grpc.ClientUnaryCall;
public nodesPing(request: polykey_v1_nodes_nodes_pb.Node, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.StatusMessage) => void): grpc.ClientUnaryCall;
public nodesPing(request: polykey_v1_nodes_nodes_pb.Node, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_utils_utils_pb.StatusMessage) => void): grpc.ClientUnaryCall;
Expand Down
Loading

0 comments on commit a522de3

Please sign in to comment.