Skip to content

Commit

Permalink
Support grpc connection pooling
Browse files Browse the repository at this point in the history
See also #637

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Dec 8, 2023
1 parent 43226a3 commit 3ef070c
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 241 deletions.
12 changes: 8 additions & 4 deletions client/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (

// GetVersion returns milvus server version information.
func (c *GrpcClient) GetVersion(ctx context.Context) (string, error) {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return "", ErrClientNotReady
}
resp, err := c.Service.GetVersion(ctx, &milvuspb.GetVersionRequest{})
defer service.Close()
resp, err := service.GetVersion(ctx, &milvuspb.GetVersionRequest{})
if err != nil {
return "", err
}
Expand All @@ -32,10 +34,12 @@ func (c *GrpcClient) GetVersion(ctx context.Context) (string, error) {

// CheckHealth returns milvus state
func (c *GrpcClient) CheckHealth(ctx context.Context) (*entity.MilvusState, error) {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return nil, ErrClientNotReady
}
resp, err := c.Service.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
defer service.Close()
resp, err := service.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err != nil {
return nil, err
}
Expand Down
18 changes: 12 additions & 6 deletions client/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ import (

// CreateAlias creates an alias for collection
func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias string) error {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return ErrClientNotReady
}
defer service.Close()

req := &milvuspb.CreateAliasRequest{
DbName: "", // reserved
CollectionName: collName,
Alias: alias,
}

resp, err := c.Service.CreateAlias(ctx, req)
resp, err := service.CreateAlias(ctx, req)
if err != nil {
return err
}
Expand All @@ -47,16 +49,18 @@ func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias str

// DropAlias drops the specified Alias
func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return ErrClientNotReady
}
defer service.Close()

req := &milvuspb.DropAliasRequest{
DbName: "", // reserved
Alias: alias,
}

resp, err := c.Service.DropAlias(ctx, req)
resp, err := service.DropAlias(ctx, req)
if err != nil {
return err
}
Expand All @@ -69,17 +73,19 @@ func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error {

// AlterAlias changes collection alias to provided alias
func (c *GrpcClient) AlterAlias(ctx context.Context, collName string, alias string) error {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return ErrClientNotReady
}
defer service.Close()

req := &milvuspb.AlterAliasRequest{
DbName: "", // reserved
CollectionName: collName,
Alias: alias,
}

resp, err := c.Service.AlterAlias(ctx, req)
resp, err := service.AlterAlias(ctx, req)
if err != nil {
return err
}
Expand Down
24 changes: 16 additions & 8 deletions client/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (

// CreateCredential create new user and password
func (c *GrpcClient) CreateCredential(ctx context.Context, username string, password string) error {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return ErrClientNotReady
}
defer service.Close()
req := &milvuspb.CreateCredentialRequest{
Username: username,
Password: crypto.Base64Encode(password),
}
resp, err := c.Service.CreateCredential(ctx, req)
resp, err := service.CreateCredential(ctx, req)
if err != nil {
return err
}
Expand All @@ -29,15 +31,17 @@ func (c *GrpcClient) CreateCredential(ctx context.Context, username string, pass

// UpdateCredential update password for a user
func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldPassword string, newPassword string) error {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return ErrClientNotReady
}
defer service.Close()
req := &milvuspb.UpdateCredentialRequest{
Username: username,
OldPassword: crypto.Base64Encode(oldPassword),
NewPassword: crypto.Base64Encode(newPassword),
}
resp, err := c.Service.UpdateCredential(ctx, req)
resp, err := service.UpdateCredential(ctx, req)
if err != nil {
return err
}
Expand All @@ -50,13 +54,15 @@ func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldP

// DeleteCredential delete a user
func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) error {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return ErrClientNotReady
}
defer service.Close()
req := &milvuspb.DeleteCredentialRequest{
Username: username,
}
resp, err := c.Service.DeleteCredential(ctx, req)
resp, err := service.DeleteCredential(ctx, req)
if err != nil {
return err
}
Expand All @@ -69,11 +75,13 @@ func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) erro

// ListCredUsers list all usernames
func (c *GrpcClient) ListCredUsers(ctx context.Context) ([]string, error) {
if c.Service == nil {
service := c.Service(ctx)
if service == nil {
return nil, ErrClientNotReady
}
defer service.Close()
req := &milvuspb.ListCredUsersRequest{}
resp, err := c.Service.ListCredUsers(ctx, req)
resp, err := service.ListCredUsers(ctx, req)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"google.golang.org/grpc"

grpcpool "github.com/processout/grpc-go-pool"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"

"github.com/milvus-io/milvus-sdk-go/v2/entity"
Expand Down Expand Up @@ -238,11 +240,16 @@ func NewClient(ctx context.Context, config Config) (Client, error) {
// Parse grpc options
options := c.config.getDialOption()

// Connect the grpc server.
if err := c.connect(ctx, addr, options...); err != nil {
factory := func() (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
return c.connect(ctx, addr, options...)
}
pool, err := grpcpool.New(factory, c.config.ConnPoolInit, c.config.ConnPoolMax, c.config.ConnPoolIdleTimeout)
if err != nil {
return nil, err
}

c.connPool = pool
return c, nil
}

Expand Down
2 changes: 2 additions & 0 deletions client/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (s *MockSuiteBase) SetupTest() {
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(s.mockDialer),
},
ConnPoolInit: 1,
ConnPoolMax: 1,
})
s.Require().NoError(err)
s.setupConnect()
Expand Down
11 changes: 9 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -150,6 +152,8 @@ func TestGrpcClientNil(t *testing.T) {
if m.Name == "Close" || m.Name == "Connect" || // skip connect & close
m.Name == "UsingDatabase" || // skip use database
m.Name == "Search" || // type alias MetricType treated as string
m.Name == "QueryByPks" ||
m.Name == "Service" ||
m.Name == "CalcDistance" ||
m.Name == "ManualCompaction" || // time.Duration hard to detect in reflect
m.Name == "Insert" || m.Name == "Upsert" { // complex methods with ...
Expand Down Expand Up @@ -221,7 +225,7 @@ func TestGrpcClientConnect(t *testing.T) {
Config{
Address: "bufnet",
DialOptions: []grpc.DialOption{
grpc.WithBlock(), grpc.WithInsecure(), grpc.WithContextDialer(bufDialer),
grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(bufDialer),
},
})
assert.Nil(t, err)
Expand Down Expand Up @@ -303,7 +307,10 @@ func TestGrpcClientRetryPolicy(t *testing.T) {
assert.Nil(t, err)
defer client.Close()

greeterClient := helloworld.NewGreeterClient(client.(*GrpcClient).Conn)
conn, err := client.(*GrpcClient).connPool.Get(context.TODO())
require.NoError(t, err)

greeterClient := helloworld.NewGreeterClient(conn)
ctx := context.Background()
name := fmt.Sprintf("hello world %d", time.Now().Second())
res, err := greeterClient.SayHello(ctx, &helloworld.HelloRequest{Name: name})
Expand Down
Loading

0 comments on commit 3ef070c

Please sign in to comment.