Skip to content

Commit

Permalink
Merge pull request #5 from garenchan/ck-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf authored Feb 6, 2022
2 parents f40de7b + cc7d0fa commit 51d9d02
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions grpc_transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ var (
// An GrpcTransporter is a default transport layer used to communicate between
// multiple servers.
type GrpcTransporter struct {
grpcDialOption grpc.DialOption
grpcDialOptions []grpc.DialOption
}

// Creates a new HTTP transporter with the given path prefix.
func NewGrpcTransporter(grpcDialOption grpc.DialOption) *GrpcTransporter {
func NewGrpcTransporter(grpcDialOptions ...grpc.DialOption) *GrpcTransporter {
t := &GrpcTransporter{
grpcDialOption: grpcDialOption,
grpcDialOptions: grpcDialOptions,
}
return t
}
Expand All @@ -49,7 +49,7 @@ func NewGrpcServer(server Server) *GrpcServer {
// Sends an AppendEntries RPC to a peer.
func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) (ret *AppendEntriesResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -83,7 +83,7 @@ func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
// Sends a RequestVote RPC to a peer.
func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) (ret *RequestVoteResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -117,7 +117,7 @@ func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
// Sends a SnapshotRequest RPC to a peer.
func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) (ret *SnapshotResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -149,7 +149,7 @@ func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn
// Sends a SnapshotRequest RPC to a peer.
func (t *GrpcTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) (ret *SnapshotRecoveryResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -285,12 +285,12 @@ func (t *GrpcServer) OnSendSnapshotRecoveryRequest(ctx context2.Context, pbReq *
}, nil
}

func withRaftServerClient(raftServer string, grpcDialOption grpc.DialOption, fn func(protobuf.RaftClient) error) error {
func withRaftServerClient(raftServer string, grpcDialOptions []grpc.DialOption, fn func(protobuf.RaftClient) error) error {

return withCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := protobuf.NewRaftClient(grpcConnection)
return fn(client)
}, raftServer, grpcDialOption)
}, raftServer, grpcDialOptions...)

}

Expand Down

0 comments on commit 51d9d02

Please sign in to comment.