Skip to content

Commit

Permalink
feat: add support for partition key isolation property
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Weizhi Xu <[email protected]>
  • Loading branch information
PwzXxm committed Jul 16, 2024
1 parent 869db52 commit 29b7edd
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 4 deletions.
40 changes: 40 additions & 0 deletions client/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,43 @@ func (s *CollectionSuite) TestCreateCollection() {
s.NoError(err)
})

s.Run("create_with_partition_key_isolation", func() {
ds := defaultSchema()
partitionKeyFieldName := "partitionKeyField"
partitionKeyField := entity.NewField().WithName(partitionKeyFieldName).WithDataType(entity.FieldTypeInt64).WithIsPartitionKey(true)
ds.WithField(partitionKeyField)

shardsNum := int32(1)
expectedIsoStr := "true"
IsoKeystr := "partitionkey.isolation"
defer s.resetMock()
s.mock.EXPECT().CreateCollection(mock.Anything, mock.AnythingOfType("*milvuspb.CreateCollectionRequest")).
Run(func(ctx context.Context, req *milvuspb.CreateCollectionRequest) {
s.Equal(testCollectionName, req.GetCollectionName())
sschema := &schemapb.CollectionSchema{}
s.Require().NoError(proto.Unmarshal(req.GetSchema(), sschema))
s.Require().Equal(len(ds.Fields), len(sschema.Fields))
for idx, fieldSchema := range ds.Fields {
s.Equal(fieldSchema.Name, sschema.GetFields()[idx].GetName())
s.Equal(fieldSchema.PrimaryKey, sschema.GetFields()[idx].GetIsPrimaryKey())
s.Equal(fieldSchema.AutoID, sschema.GetFields()[idx].GetAutoID())
s.EqualValues(fieldSchema.DataType, sschema.GetFields()[idx].GetDataType())
}
s.Equal(shardsNum, req.GetShardsNum())
s.Equal(commonpb.ConsistencyLevel_Eventually, req.GetConsistencyLevel())
propsMp := make(map[string]string)
for _, prop := range req.GetProperties() {
propsMp[prop.Key] = prop.Value
}
s.Equal(expectedIsoStr, propsMp[IsoKeystr])
}).
Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
s.mock.EXPECT().HasCollection(mock.Anything, &milvuspb.HasCollectionRequest{CollectionName: testCollectionName}).Return(&milvuspb.BoolResponse{Status: &commonpb.Status{}, Value: false}, nil)

err := c.CreateCollection(ctx, ds, shardsNum, WithConsistencyLevel(entity.ClEventually), WithCollectionProperty(IsoKeystr, expectedIsoStr))
s.NoError(err)
})

s.Run("invalid_schemas", func() {
type testCase struct {
name string
Expand Down Expand Up @@ -439,6 +476,9 @@ func (s *CollectionSuite) TestAlterCollection() {

err := c.AlterCollection(ctx, testCollectionName, entity.CollectionTTL(100000))
s.NoError(err)

err = c.AlterCollection(ctx, testCollectionName, entity.CollectionPartitionKeyIsolation(true))
s.NoError(err)
})

s.Run("collection_not_exist", func() {
Expand Down
28 changes: 24 additions & 4 deletions entity/collection_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ const (
// cakTTL const for collection attribute key TTL in seconds.
cakTTL = `collection.ttl.seconds`
// cakAutoCompaction const for collection attribute key autom compaction enabled.
cakAutoCompaction = `collection.autocompaction.enabled`
akMmap = "mmap.enabled"
databaseReplica = "database.replica"
databaseResourceGroups = "database.resource_groups"
cakAutoCompaction = `collection.autocompaction.enabled`
akMmap = "mmap.enabled"
databaseReplica = "database.replica"
databaseResourceGroups = "database.resource_groups"
caPartitionKeyIsolation = "partitionkey.isolation"
)

// CollectionAttribute is the interface for altering collection attributes.
Expand All @@ -48,6 +49,10 @@ type ttlCollAttr struct {
collAttrBase
}

type partitionKeyIsolationCollAttr struct {
collAttrBase
}

// Valid implements CollectionAttribute.
// checks ttl seconds is valid positive integer.
func (ca collAttrBase) Valid() error {
Expand Down Expand Up @@ -114,6 +119,21 @@ func (ca mmapAttr) Valid() error {
return nil
}

func CollectionPartitionKeyIsolation(enabled bool) partitionKeyIsolationCollAttr {
ca := partitionKeyIsolationCollAttr{}
ca.key = caPartitionKeyIsolation
ca.value = strconv.FormatBool(enabled)
return ca
}

func (ca partitionKeyIsolationCollAttr) Valid() error {
_, err := strconv.ParseBool(ca.value)
if err != nil {
return errors.Wrap(err, "PartitionKeyIsolation setting is not a valid boolean")
}
return nil
}

type DatabaseAttribute = CollectionAttribute

type databaseAttrBase = collAttrBase
Expand Down
49 changes: 49 additions & 0 deletions entity/collection_attr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,52 @@ func (s *CollectionAutoCompactionSuite) TestCollectionAutoCompactionEnabled() {
func TestCollectionAutoCompaction(t *testing.T) {
suite.Run(t, new(CollectionAutoCompactionSuite))
}

type CollectionPartitionKeyIsolationSuite struct {
suite.Suite
}

func (s *CollectionPartitionKeyIsolationSuite) TestValid() {
type testCase struct {
input string
expectErr bool
}

cases := []testCase{
{input: "a", expectErr: true},
{input: "true", expectErr: false},
{input: "false", expectErr: false},
{input: "", expectErr: true},
}

for _, tc := range cases {
s.Run(tc.input, func() {
ca := partitionKeyIsolationCollAttr{}
ca.value = tc.input
err := ca.Valid()
if tc.expectErr {
s.Error(err)
} else {
s.NoError(err)
}
})
}
}

func (s *CollectionPartitionKeyIsolationSuite) TestCollectionAutoCompactionEnabled() {

cases := []bool{true, false}

for _, tc := range cases {
s.Run(fmt.Sprintf("%v", tc), func() {
ca := CollectionPartitionKeyIsolation(tc)
key, value := ca.KeyValue()
s.Equal(caPartitionKeyIsolation, key)
s.Equal(strconv.FormatBool(tc), value)
})
}
}

func TestCollectionPartitionKeyIsolationAttr(t *testing.T) {
suite.Run(t, new(CollectionPartitionKeyIsolationSuite))
}

0 comments on commit 29b7edd

Please sign in to comment.