From cc7d0fa323e2f3beb45f307a85564786afb1d204 Mon Sep 17 00:00:00 2001 From: Garen Chan <1412950785@qq.com> Date: Sun, 6 Feb 2022 15:26:32 +0800 Subject: [PATCH] [UPDATE] NewGrpcTransporter support multiple dial options --- grpc_transporter.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/grpc_transporter.go b/grpc_transporter.go index f550119..6011b94 100644 --- a/grpc_transporter.go +++ b/grpc_transporter.go @@ -19,13 +19,13 @@ var ( // An GrpcTransporter is a default transport layer used to communicate between // multiple servers. type GrpcTransporter struct { - grpcDialOption grpc.DialOption + grpcDialOptions []grpc.DialOption } // Creates a new HTTP transporter with the given path prefix. -func NewGrpcTransporter(grpcDialOption grpc.DialOption) *GrpcTransporter { +func NewGrpcTransporter(grpcDialOptions ...grpc.DialOption) *GrpcTransporter { t := &GrpcTransporter{ - grpcDialOption: grpcDialOption, + grpcDialOptions: grpcDialOptions, } return t } @@ -49,7 +49,7 @@ func NewGrpcServer(server Server) *GrpcServer { // Sends an AppendEntries RPC to a peer. func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) (ret *AppendEntriesResponse) { - err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error { + err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error { ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second)) defer cancel() @@ -83,7 +83,7 @@ func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re // Sends a RequestVote RPC to a peer. func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) (ret *RequestVoteResponse) { - err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error { + err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error { ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second)) defer cancel() @@ -117,7 +117,7 @@ func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques // Sends a SnapshotRequest RPC to a peer. func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) (ret *SnapshotResponse) { - err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error { + err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error { ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second)) defer cancel() @@ -149,7 +149,7 @@ func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn // Sends a SnapshotRequest RPC to a peer. func (t *GrpcTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) (ret *SnapshotRecoveryResponse) { - err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error { + err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error { ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second)) defer cancel() @@ -285,12 +285,12 @@ func (t *GrpcServer) OnSendSnapshotRecoveryRequest(ctx context2.Context, pbReq * }, nil } -func withRaftServerClient(raftServer string, grpcDialOption grpc.DialOption, fn func(protobuf.RaftClient) error) error { +func withRaftServerClient(raftServer string, grpcDialOptions []grpc.DialOption, fn func(protobuf.RaftClient) error) error { return withCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := protobuf.NewRaftClient(grpcConnection) return fn(client) - }, raftServer, grpcDialOption) + }, raftServer, grpcDialOptions...) }