From 29b7edd0a0f2ca60794ec7de7be176ba4396a32d Mon Sep 17 00:00:00 2001 From: Patrick Weizhi Xu Date: Tue, 16 Jul 2024 11:28:17 +0800 Subject: [PATCH] feat: add support for partition key isolation property Signed-off-by: Patrick Weizhi Xu --- client/collection_test.go | 40 +++++++++++++++++++++++++++ entity/collection_attr.go | 28 ++++++++++++++++--- entity/collection_attr_test.go | 49 ++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 4 deletions(-) diff --git a/client/collection_test.go b/client/collection_test.go index d0021e9b7..c3a30cee5 100644 --- a/client/collection_test.go +++ b/client/collection_test.go @@ -155,6 +155,43 @@ func (s *CollectionSuite) TestCreateCollection() { s.NoError(err) }) + s.Run("create_with_partition_key_isolation", func() { + ds := defaultSchema() + partitionKeyFieldName := "partitionKeyField" + partitionKeyField := entity.NewField().WithName(partitionKeyFieldName).WithDataType(entity.FieldTypeInt64).WithIsPartitionKey(true) + ds.WithField(partitionKeyField) + + shardsNum := int32(1) + expectedIsoStr := "true" + IsoKeystr := "partitionkey.isolation" + defer s.resetMock() + s.mock.EXPECT().CreateCollection(mock.Anything, mock.AnythingOfType("*milvuspb.CreateCollectionRequest")). + Run(func(ctx context.Context, req *milvuspb.CreateCollectionRequest) { + s.Equal(testCollectionName, req.GetCollectionName()) + sschema := &schemapb.CollectionSchema{} + s.Require().NoError(proto.Unmarshal(req.GetSchema(), sschema)) + s.Require().Equal(len(ds.Fields), len(sschema.Fields)) + for idx, fieldSchema := range ds.Fields { + s.Equal(fieldSchema.Name, sschema.GetFields()[idx].GetName()) + s.Equal(fieldSchema.PrimaryKey, sschema.GetFields()[idx].GetIsPrimaryKey()) + s.Equal(fieldSchema.AutoID, sschema.GetFields()[idx].GetAutoID()) + s.EqualValues(fieldSchema.DataType, sschema.GetFields()[idx].GetDataType()) + } + s.Equal(shardsNum, req.GetShardsNum()) + s.Equal(commonpb.ConsistencyLevel_Eventually, req.GetConsistencyLevel()) + propsMp := make(map[string]string) + for _, prop := range req.GetProperties() { + propsMp[prop.Key] = prop.Value + } + s.Equal(expectedIsoStr, propsMp[IsoKeystr]) + }). + Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) + s.mock.EXPECT().HasCollection(mock.Anything, &milvuspb.HasCollectionRequest{CollectionName: testCollectionName}).Return(&milvuspb.BoolResponse{Status: &commonpb.Status{}, Value: false}, nil) + + err := c.CreateCollection(ctx, ds, shardsNum, WithConsistencyLevel(entity.ClEventually), WithCollectionProperty(IsoKeystr, expectedIsoStr)) + s.NoError(err) + }) + s.Run("invalid_schemas", func() { type testCase struct { name string @@ -439,6 +476,9 @@ func (s *CollectionSuite) TestAlterCollection() { err := c.AlterCollection(ctx, testCollectionName, entity.CollectionTTL(100000)) s.NoError(err) + + err = c.AlterCollection(ctx, testCollectionName, entity.CollectionPartitionKeyIsolation(true)) + s.NoError(err) }) s.Run("collection_not_exist", func() { diff --git a/entity/collection_attr.go b/entity/collection_attr.go index 7ada43802..0e6c27fea 100644 --- a/entity/collection_attr.go +++ b/entity/collection_attr.go @@ -22,10 +22,11 @@ const ( // cakTTL const for collection attribute key TTL in seconds. cakTTL = `collection.ttl.seconds` // cakAutoCompaction const for collection attribute key autom compaction enabled. - cakAutoCompaction = `collection.autocompaction.enabled` - akMmap = "mmap.enabled" - databaseReplica = "database.replica" - databaseResourceGroups = "database.resource_groups" + cakAutoCompaction = `collection.autocompaction.enabled` + akMmap = "mmap.enabled" + databaseReplica = "database.replica" + databaseResourceGroups = "database.resource_groups" + caPartitionKeyIsolation = "partitionkey.isolation" ) // CollectionAttribute is the interface for altering collection attributes. @@ -48,6 +49,10 @@ type ttlCollAttr struct { collAttrBase } +type partitionKeyIsolationCollAttr struct { + collAttrBase +} + // Valid implements CollectionAttribute. // checks ttl seconds is valid positive integer. func (ca collAttrBase) Valid() error { @@ -114,6 +119,21 @@ func (ca mmapAttr) Valid() error { return nil } +func CollectionPartitionKeyIsolation(enabled bool) partitionKeyIsolationCollAttr { + ca := partitionKeyIsolationCollAttr{} + ca.key = caPartitionKeyIsolation + ca.value = strconv.FormatBool(enabled) + return ca +} + +func (ca partitionKeyIsolationCollAttr) Valid() error { + _, err := strconv.ParseBool(ca.value) + if err != nil { + return errors.Wrap(err, "PartitionKeyIsolation setting is not a valid boolean") + } + return nil +} + type DatabaseAttribute = CollectionAttribute type databaseAttrBase = collAttrBase diff --git a/entity/collection_attr_test.go b/entity/collection_attr_test.go index b4f42e4a4..5ac4e7a62 100644 --- a/entity/collection_attr_test.go +++ b/entity/collection_attr_test.go @@ -130,3 +130,52 @@ func (s *CollectionAutoCompactionSuite) TestCollectionAutoCompactionEnabled() { func TestCollectionAutoCompaction(t *testing.T) { suite.Run(t, new(CollectionAutoCompactionSuite)) } + +type CollectionPartitionKeyIsolationSuite struct { + suite.Suite +} + +func (s *CollectionPartitionKeyIsolationSuite) TestValid() { + type testCase struct { + input string + expectErr bool + } + + cases := []testCase{ + {input: "a", expectErr: true}, + {input: "true", expectErr: false}, + {input: "false", expectErr: false}, + {input: "", expectErr: true}, + } + + for _, tc := range cases { + s.Run(tc.input, func() { + ca := partitionKeyIsolationCollAttr{} + ca.value = tc.input + err := ca.Valid() + if tc.expectErr { + s.Error(err) + } else { + s.NoError(err) + } + }) + } +} + +func (s *CollectionPartitionKeyIsolationSuite) TestCollectionAutoCompactionEnabled() { + + cases := []bool{true, false} + + for _, tc := range cases { + s.Run(fmt.Sprintf("%v", tc), func() { + ca := CollectionPartitionKeyIsolation(tc) + key, value := ca.KeyValue() + s.Equal(caPartitionKeyIsolation, key) + s.Equal(strconv.FormatBool(tc), value) + }) + } +} + +func TestCollectionPartitionKeyIsolationAttr(t *testing.T) { + suite.Run(t, new(CollectionPartitionKeyIsolationSuite)) +}