Skip to content

Commit

Permalink
enhance: add resource group declarative api (#733)
Browse files Browse the repository at this point in the history
issue: milvus-io/milvus#32282

- Add UpdateResourceGroups and modify AddResourceGroup api.

- Add example for add resource group declarative api.

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jun 17, 2024
1 parent 95550a5 commit d346d3e
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 6 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ type Client interface {
// ListResourceGroups returns list of resource group names in current Milvus instance.
ListResourceGroups(ctx context.Context) ([]string, error)
// CreateResourceGroup creates a resource group with provided name.
CreateResourceGroup(ctx context.Context, rgName string) error
CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error
// UpdateResourceGroups updates resource groups with provided options.
UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error
// DescribeResourceGroup returns resource groups information.
DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error)
// DropResourceGroup drops the resource group with provided name.
Expand Down
23 changes: 23 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,26 @@ type DropPartitionOption func(*milvuspb.DropPartitionRequest)
type LoadPartitionsOption func(*milvuspb.LoadPartitionsRequest)

type ReleasePartitionsOption func(*milvuspb.ReleasePartitionsRequest)

// CreateResourceGroupOption is an option that is used in CreateResourceGroup API.
type CreateResourceGroupOption func(*milvuspb.CreateResourceGroupRequest)

// WithCreateResourceGroupConfig returns a CreateResourceGroupOption that setup the config.
func WithCreateResourceGroupConfig(config *entity.ResourceGroupConfig) CreateResourceGroupOption {
return func(req *milvuspb.CreateResourceGroupRequest) {
req.Config = config
}
}

// UpdateResourceGroupsOption is an option that is used in UpdateResourceGroups API.
type UpdateResourceGroupsOption func(*milvuspb.UpdateResourceGroupsRequest)

// WithUpdateResourceGroupConfig returns an UpdateResourceGroupsOption that sets the new config to the specified resource group.
func WithUpdateResourceGroupConfig(resourceGroupName string, config *entity.ResourceGroupConfig) UpdateResourceGroupsOption {
return func(urgr *milvuspb.UpdateResourceGroupsRequest) {
if urgr.ResourceGroups == nil {
urgr.ResourceGroups = make(map[string]*entity.ResourceGroupConfig)
}
urgr.ResourceGroups[resourceGroupName] = config
}
}
24 changes: 24 additions & 0 deletions client/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,27 @@ func TestMakeSearchQueryOption(t *testing.T) {
assert.Error(t, err)
})
}

func TestWithUpdateResourceGroupConfig(t *testing.T) {
req := &milvuspb.UpdateResourceGroupsRequest{}

WithUpdateResourceGroupConfig("rg1", &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
})(req)
WithUpdateResourceGroupConfig("rg2", &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 2},
})(req)

assert.Equal(t, 2, len(req.ResourceGroups))
assert.Equal(t, int32(1), req.ResourceGroups["rg1"].Requests.NodeNum)
assert.Equal(t, int32(2), req.ResourceGroups["rg2"].Requests.NodeNum)
}

func TestWithCreateResourceGroup(t *testing.T) {
req := &milvuspb.CreateResourceGroupRequest{}

WithCreateResourceGroupConfig(&entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
})(req)
assert.Equal(t, int32(1), req.Config.Requests.NodeNum)
}
25 changes: 24 additions & 1 deletion client/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ func (c *GrpcClient) ListResourceGroups(ctx context.Context) ([]string, error) {
}

// CreateResourceGroup creates a resource group with provided name.
func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) error {
func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error {
if c.Service == nil {
return ErrClientNotReady
}

req := &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgName,
}
for _, opt := range opts {
opt(req)
}

resp, err := c.Service.CreateResourceGroup(ctx, req)
if err != nil {
Expand All @@ -54,6 +57,24 @@ func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) err
return handleRespStatus(resp)
}

// UpdateResourceGroups updates resource groups with provided options.
func (c *GrpcClient) UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error {
if c.Service == nil {
return ErrClientNotReady
}

req := &milvuspb.UpdateResourceGroupsRequest{}
for _, opt := range opts {
opt(req)
}

resp, err := c.Service.UpdateResourceGroups(ctx, req)
if err != nil {
return err
}
return handleRespStatus(resp)
}

// DescribeResourceGroup returns resource groups information.
func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error) {
if c.Service == nil {
Expand All @@ -80,6 +101,8 @@ func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) (
LoadedReplica: rg.GetNumLoadedReplica(),
OutgoingNodeNum: rg.GetNumOutgoingNode(),
IncomingNodeNum: rg.GetNumIncomingNode(),
Config: rg.GetConfig(),
Nodes: rg.GetNodes(),
}

return result, nil
Expand Down
64 changes: 63 additions & 1 deletion client/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

type ResourceGroupSuite struct {
Expand Down Expand Up @@ -121,6 +122,68 @@ func (s *ResourceGroupSuite) TestCreateResourceGroup() {
})
}

func (s *ResourceGroupSuite) TestUpdateResourceGroups() {
c := s.client
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("normal_run", func() {
defer s.resetMock()
rgName := randStr(10)

s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")).
Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) {
s.Len(req.ResourceGroups, 1)
s.NotNil(req.ResourceGroups[rgName])
s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum)
}).
Return(&commonpb.Status{}, nil)

err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
}))
s.NoError(err)
})

s.Run("request_fails", func() {
defer s.resetMock()

rgName := randStr(10)

s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")).
Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) {
s.Len(req.ResourceGroups, 1)
s.NotNil(req.ResourceGroups[rgName])
s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum)
}).
Return(nil, errors.New("mocked grpc error"))

err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
}))
s.Error(err)
})

s.Run("server_return_err", func() {
defer s.resetMock()

rgName := randStr(10)

s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")).
Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) {
s.Len(req.ResourceGroups, 1)
s.NotNil(req.ResourceGroups[rgName])
s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum)
}).
Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil)

err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
}))
s.Error(err)
})
}

func (s *ResourceGroupSuite) TestDescribeResourceGroup() {
c := s.client
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -153,7 +216,6 @@ func (s *ResourceGroupSuite) TestDescribeResourceGroup() {
s.Equal(rgName, req.GetResourceGroup())
}).
Call.Return(func(_ context.Context, req *milvuspb.DescribeResourceGroupRequest) *milvuspb.DescribeResourceGroupResponse {

return &milvuspb.DescribeResourceGroupResponse{
Status: &commonpb.Status{},
ResourceGroup: &milvuspb.ResourceGroup{
Expand Down
14 changes: 14 additions & 0 deletions entity/resource_group.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
package entity

import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
)

type (
ResourceGroupConfig = rgpb.ResourceGroupConfig
ResourceGroupLimit = rgpb.ResourceGroupLimit
ResourceGroupTransfer = rgpb.ResourceGroupTransfer
NodeInfo = commonpb.NodeInfo
)

// ResourceGroup information model struct.
type ResourceGroup struct {
Name string
Expand All @@ -8,4 +20,6 @@ type ResourceGroup struct {
LoadedReplica map[string]int32
OutgoingNodeNum map[string]int32
IncomingNodeNum map[string]int32
Config *ResourceGroupConfig
Nodes []*NodeInfo
}
Loading

0 comments on commit d346d3e

Please sign in to comment.