From 388b832f78d684bff72304d7c73daaab1a81ef4b Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 31 May 2024 10:29:39 +0800 Subject: [PATCH] enhance: Use collection cache in write operations (#752) See also #265 --------- Signed-off-by: Congqi Xia --- client/data_test.go | 88 ----------- client/insert.go | 48 +++--- client/insert_test.go | 283 ++++++++++++++++++++++++++++++++-- test/testcases/insert_test.go | 4 +- test/testcases/upsert_test.go | 4 +- 5 files changed, 294 insertions(+), 133 deletions(-) diff --git a/client/data_test.go b/client/data_test.go index 1e1b7295b..46631b6f0 100644 --- a/client/data_test.go +++ b/client/data_test.go @@ -100,94 +100,6 @@ func TestGrpcClientFlush(t *testing.T) { }) } -func TestGrpcClientUpsert(t *testing.T) { - ctx := context.Background() - - c := testClient(ctx, t) - - t.Run("test create failure due to meta", func(t *testing.T) { - mockServer.DelInjection(MHasCollection) // collection does not exist - ids, err := c.Upsert(ctx, testCollectionName, "") - assert.Nil(t, ids) - assert.NotNil(t, err) - - // partition not exists - mockServer.SetInjection(MHasCollection, hasCollectionDefault) - ids, err = c.Upsert(ctx, testCollectionName, "_part_not_exists") - assert.Nil(t, ids) - assert.NotNil(t, err) - - // field not in collection - mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 0, testCollectionName, defaultSchema())) - vectors := generateFloatVector(10, testVectorDim) - ids, err = c.Upsert(ctx, testCollectionName, "", - entity.NewColumnInt64("extra_field", []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors)) - assert.Nil(t, ids) - assert.NotNil(t, err) - - // field type not match - mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 0, testCollectionName, defaultSchema())) - ids, err = c.Upsert(ctx, testCollectionName, "", - entity.NewColumnInt32("int64", []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors)) - assert.Nil(t, ids) - assert.NotNil(t, err) - - // missing field - ids, err = c.Upsert(ctx, testCollectionName, "") - assert.Nil(t, ids) - assert.NotNil(t, err) - - // column len not match - ids, err = c.Upsert(ctx, testCollectionName, "", entity.NewColumnInt64("int64", []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}), - entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors)) - assert.Nil(t, ids) - assert.NotNil(t, err) - - // column len not match - ids, err = c.Upsert(ctx, testCollectionName, "", entity.NewColumnInt64("int64", []int64{1, 2, 3}), - entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors)) - assert.Nil(t, ids) - assert.NotNil(t, err) - - // dim not match - ids, err = c.Upsert(ctx, testCollectionName, "", entity.NewColumnInt64("int64", []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), - entity.NewColumnFloatVector(testVectorField, testVectorDim*2, vectors)) - assert.Nil(t, ids) - assert.NotNil(t, err) - }) - - mockServer.SetInjection(MHasCollection, hasCollectionDefault) - mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 0, testCollectionName, defaultSchema())) - - vector := generateFloatVector(4096, testVectorDim) - mockServer.SetInjection(MUpsert, func(_ context.Context, raw proto.Message) (proto.Message, error) { - req, ok := raw.(*milvuspb.UpsertRequest) - resp := &milvuspb.MutationResult{} - if !ok { - s, err := BadRequestStatus() - resp.Status = s - return resp, err - } - assert.EqualValues(t, 4096, req.GetNumRows()) - assert.Equal(t, testCollectionName, req.GetCollectionName()) - intIds := &schemapb.IDs_IntId{ - IntId: &schemapb.LongArray{ - Data: make([]int64, 4096), - }, - } - resp.IDs = &schemapb.IDs{ - IdField: intIds, - } - s, err := SuccessStatus() - resp.Status = s - return resp, err - }) - _, err := c.Upsert(ctx, testCollectionName, "", // use default partition - entity.NewColumnFloatVector(testVectorField, testVectorDim, vector)) - assert.Nil(t, err) - mockServer.DelInjection(MUpsert) -} - func TestGrpcDeleteByPks(t *testing.T) { ctx := context.Background() diff --git a/client/insert.go b/client/insert.go index 2f52f0e2e..d7b91dfaa 100644 --- a/client/insert.go +++ b/client/insert.go @@ -34,26 +34,20 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName if c.Service == nil { return nil, ErrClientNotReady } - // 1. validation for all input params - // collection - if err := c.checkCollectionExists(ctx, collName); err != nil { - return nil, err - } - if partitionName != "" { - err := c.checkPartitionExists(ctx, collName, partitionName) + var schema *entity.Schema + collInfo, ok := MetaCache.getCollectionInfo(collName) + if !ok { + coll, err := c.DescribeCollection(ctx, collName) if err != nil { return nil, err } - } - // fields - var rowSize int - coll, err := c.DescribeCollection(ctx, collName) - if err != nil { - return nil, err + schema = coll.Schema + } else { + schema = collInfo.Schema } // convert columns to field data - fieldsData, rowSize, err := c.processInsertColumns(coll.Schema, columns...) + fieldsData, rowSize, err := c.processInsertColumns(schema, columns...) if err != nil { return nil, err } @@ -77,7 +71,7 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName } MetaCache.setSessionTs(collName, resp.Timestamp) // 3. parse id column - return entity.IDColumns(coll.Schema, resp.GetIDs(), 0, -1) + return entity.IDColumns(schema, resp.GetIDs(), 0, -1) } func (c *GrpcClient) processInsertColumns(colSchema *entity.Schema, columns ...entity.Column) ([]*schemapb.FieldData, int, error) { @@ -350,25 +344,19 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName if c.Service == nil { return nil, ErrClientNotReady } - // 1. validation for all input params - // collection - if err := c.checkCollectionExists(ctx, collName); err != nil { - return nil, err - } - if partitionName != "" { - err := c.checkPartitionExists(ctx, collName, partitionName) + var schema *entity.Schema + collInfo, ok := MetaCache.getCollectionInfo(collName) + if !ok { + coll, err := c.DescribeCollection(ctx, collName) if err != nil { return nil, err } - } - // fields - var rowSize int - coll, err := c.DescribeCollection(ctx, collName) - if err != nil { - return nil, err + schema = coll.Schema + } else { + schema = collInfo.Schema } - fieldsData, rowSize, err := c.processInsertColumns(coll.Schema, columns...) + fieldsData, rowSize, err := c.processInsertColumns(schema, columns...) if err != nil { return nil, err } @@ -392,7 +380,7 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName } MetaCache.setSessionTs(collName, resp.Timestamp) // 3. parse id column - return entity.IDColumns(coll.Schema, resp.GetIDs(), 0, -1) + return entity.IDColumns(schema, resp.GetIDs(), 0, -1) } // BulkInsert data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments diff --git a/client/insert_test.go b/client/insert_test.go index 550319429..693c7b0ce 100644 --- a/client/insert_test.go +++ b/client/insert_test.go @@ -35,20 +35,11 @@ func (s *InsertSuite) TestInsertFail() { s.Run("collection_not_exist", func() { defer s.resetMock() - s.setupHasCollection() + s.setupDescribeCollectionError(commonpb.ErrorCode_UnexpectedError, errors.New("mocked")) _, err := c.Insert(ctx, testCollectionName, "") s.Error(err) }) - s.Run("partition_not_exist", func() { - defer s.resetMock() - s.setupHasCollection(testCollectionName) - s.setupHasPartition(testCollectionName) - - _, err := c.Insert(ctx, testCollectionName, "partition_name") - s.Error(err) - }) - s.Run("field_not_exist", func() { defer s.resetMock() s.setupHasCollection(testCollectionName) @@ -302,6 +293,276 @@ func (s *InsertSuite) TestInsertSuccess() { }) } -func TestGrpcInsert(t *testing.T) { +type UpsertSuite struct { + MockSuiteBase +} + +func (s *UpsertSuite) TestUpsertFail() { + c := s.client + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("collection_not_exist", func() { + defer s.resetMock() + s.setupDescribeCollectionError(commonpb.ErrorCode_UnexpectedError, errors.New("mocked")) + _, err := c.Upsert(ctx, testCollectionName, "") + s.Error(err) + }) + + s.Run("field_not_exist", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnInt64("extra", []int64{1}), + ) + s.Error(err) + }) + + s.Run("missing_field_without_default_value", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnInt64("ID", []int64{1}), + ) + s.Error(err) + }) + + s.Run("column_len_not_match", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnInt64("ID", []int64{1, 2}), + entity.NewColumnFloatVector("vector", 128, [][]float32{}), + ) + s.Error(err) + }) + + s.Run("duplicated column", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")). + Run(func(ctx context.Context, req *milvuspb.UpsertRequest) { + s.Equal(1, len(req.GetFieldsData())) + }).Return(&milvuspb.MutationResult{ + Status: &commonpb.Status{}, + IDs: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{1}, + }, + }, + }, + }, nil) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + ) + + s.Error(err) + }) + + s.Run("dim_not_match", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnInt64("ID", []int64{1}), + entity.NewColumnFloatVector("vector", 8, [][]float32{{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8}}), + ) + s.Error(err) + }) + + s.Run("server_insert_fail", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")).Return( + &milvuspb.MutationResult{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil, + ) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + ) + + s.Error(err) + }) + + s.Run("server_connection_error", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")).Return( + nil, errors.New("mocked error"), + ) + + _, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + ) + + s.Error(err) + }) +} + +func (s *UpsertSuite) TestUpsertSuccess() { + c := s.client + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Run("non_dynamic_schema", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")). + Run(func(ctx context.Context, req *milvuspb.UpsertRequest) { + s.Equal(1, len(req.GetFieldsData())) + }).Return(&milvuspb.MutationResult{ + Status: &commonpb.Status{}, + IDs: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{1}, + }, + }, + }, + }, nil) + + r, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + ) + + s.NoError(err) + s.Equal(1, r.Len()) + }) + + s.Run("dynamic_field_schema", func() { + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithDynamicFieldEnabled(true). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")). + Run(func(ctx context.Context, req *milvuspb.UpsertRequest) { + s.Equal(2, len(req.GetFieldsData())) + var found bool + for _, fd := range req.GetFieldsData() { + if fd.GetFieldName() == "" && fd.GetIsDynamic() { + found = true + break + } + } + s.True(found) + }).Return(&milvuspb.MutationResult{ + Status: &commonpb.Status{}, + IDs: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{1}, + }, + }, + }, + }, nil) + + r, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnInt64("extra", []int64{1}), + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + ) + + s.NoError(err) + s.Equal(1, r.Len()) + }) + + s.Run("missing_field_with_default_value", func() { + s.T().Skip("skip for default value test") + defer s.resetMock() + s.setupHasCollection(testCollectionName) + s.setupHasPartition(testCollectionName, "partition_1") + + s.setupDescribeCollection(testCollectionName, entity.NewSchema(). + WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("default_value").WithDataType(entity.FieldTypeInt64)). + WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")), + ) + + s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")). + Run(func(ctx context.Context, req *milvuspb.UpsertRequest) { + s.Equal(2, len(req.GetFieldsData())) + }).Return(&milvuspb.MutationResult{ + Status: &commonpb.Status{}, + IDs: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{1}, + }, + }, + }, + }, nil) + + r, err := c.Upsert(ctx, testCollectionName, "partition_1", + entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)), + ) + s.NoError(err) + s.Equal(1, r.Len()) + }) +} + +func TestWrite(t *testing.T) { suite.Run(t, new(InsertSuite)) + suite.Run(t, new(UpsertSuite)) } diff --git a/test/testcases/insert_test.go b/test/testcases/insert_test.go index 406aef110..43f0d6b22 100644 --- a/test/testcases/insert_test.go +++ b/test/testcases/insert_test.go @@ -133,7 +133,7 @@ func TestInsertNotExistCollection(t *testing.T) { // insert intColumn, floatColumn, vecColumn := common.GenDefaultColumnData(0, common.DefaultNb, common.DefaultDim) _, errInsert := mc.Insert(ctx, "notExist", "", intColumn, floatColumn, vecColumn) - common.CheckErr(t, errInsert, false, "does not exist") + common.CheckErr(t, errInsert, false, "can't find collection") } // test insert into an not existed partition @@ -149,7 +149,7 @@ func TestInsertNotExistPartition(t *testing.T) { // insert _, floatColumn, vecColumn := common.GenDefaultColumnData(0, common.DefaultNb, common.DefaultDim) _, errInsert := mc.Insert(ctx, collName, "aaa", floatColumn, vecColumn) - common.CheckErr(t, errInsert, false, "does not exist") + common.CheckErr(t, errInsert, false, "partition not found") } // test insert data into collection that has all scala fields diff --git a/test/testcases/upsert_test.go b/test/testcases/upsert_test.go index 34a6f4a70..e51c368ab 100644 --- a/test/testcases/upsert_test.go +++ b/test/testcases/upsert_test.go @@ -224,11 +224,11 @@ func TestUpsertNotExistCollectionPartition(t *testing.T) { // upsert not exist partition _, floatColumn, vecColumn := common.GenDefaultColumnData(0, common.DefaultNb, common.DefaultDim) _, errUpsert := mc.Upsert(ctx, collName, "aaa", floatColumn, vecColumn) - common.CheckErr(t, errUpsert, false, "does not exist") + common.CheckErr(t, errUpsert, false, "can not assign primary field data when auto id enabled") // upsert not exist collection _, errUpsert = mc.Upsert(ctx, "aaa", "", floatColumn, vecColumn) - common.CheckErr(t, errUpsert, false, "does not exist") + common.CheckErr(t, errUpsert, false, "can't find collection") } // test upsert with invalid column data