Skip to content

Commit

Permalink
enhance: Use collection cache in write operations (#752)
Browse files Browse the repository at this point in the history
See also #265

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored May 31, 2024
1 parent f477955 commit 388b832
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 133 deletions.
88 changes: 0 additions & 88 deletions client/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,94 +100,6 @@ func TestGrpcClientFlush(t *testing.T) {
})
}

func TestGrpcClientUpsert(t *testing.T) {
ctx := context.Background()

c := testClient(ctx, t)

t.Run("test create failure due to meta", func(t *testing.T) {
mockServer.DelInjection(MHasCollection) // collection does not exist
ids, err := c.Upsert(ctx, testCollectionName, "")
assert.Nil(t, ids)
assert.NotNil(t, err)

// partition not exists
mockServer.SetInjection(MHasCollection, hasCollectionDefault)
ids, err = c.Upsert(ctx, testCollectionName, "_part_not_exists")
assert.Nil(t, ids)
assert.NotNil(t, err)

// field not in collection
mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 0, testCollectionName, defaultSchema()))
vectors := generateFloatVector(10, testVectorDim)
ids, err = c.Upsert(ctx, testCollectionName, "",
entity.NewColumnInt64("extra_field", []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors))
assert.Nil(t, ids)
assert.NotNil(t, err)

// field type not match
mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 0, testCollectionName, defaultSchema()))
ids, err = c.Upsert(ctx, testCollectionName, "",
entity.NewColumnInt32("int64", []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors))
assert.Nil(t, ids)
assert.NotNil(t, err)

// missing field
ids, err = c.Upsert(ctx, testCollectionName, "")
assert.Nil(t, ids)
assert.NotNil(t, err)

// column len not match
ids, err = c.Upsert(ctx, testCollectionName, "", entity.NewColumnInt64("int64", []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}),
entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors))
assert.Nil(t, ids)
assert.NotNil(t, err)

// column len not match
ids, err = c.Upsert(ctx, testCollectionName, "", entity.NewColumnInt64("int64", []int64{1, 2, 3}),
entity.NewColumnFloatVector(testVectorField, testVectorDim, vectors))
assert.Nil(t, ids)
assert.NotNil(t, err)

// dim not match
ids, err = c.Upsert(ctx, testCollectionName, "", entity.NewColumnInt64("int64", []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}),
entity.NewColumnFloatVector(testVectorField, testVectorDim*2, vectors))
assert.Nil(t, ids)
assert.NotNil(t, err)
})

mockServer.SetInjection(MHasCollection, hasCollectionDefault)
mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 0, testCollectionName, defaultSchema()))

vector := generateFloatVector(4096, testVectorDim)
mockServer.SetInjection(MUpsert, func(_ context.Context, raw proto.Message) (proto.Message, error) {
req, ok := raw.(*milvuspb.UpsertRequest)
resp := &milvuspb.MutationResult{}
if !ok {
s, err := BadRequestStatus()
resp.Status = s
return resp, err
}
assert.EqualValues(t, 4096, req.GetNumRows())
assert.Equal(t, testCollectionName, req.GetCollectionName())
intIds := &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: make([]int64, 4096),
},
}
resp.IDs = &schemapb.IDs{
IdField: intIds,
}
s, err := SuccessStatus()
resp.Status = s
return resp, err
})
_, err := c.Upsert(ctx, testCollectionName, "", // use default partition
entity.NewColumnFloatVector(testVectorField, testVectorDim, vector))
assert.Nil(t, err)
mockServer.DelInjection(MUpsert)
}

func TestGrpcDeleteByPks(t *testing.T) {
ctx := context.Background()

Expand Down
48 changes: 18 additions & 30 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,20 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName
if c.Service == nil {
return nil, ErrClientNotReady
}
// 1. validation for all input params
// collection
if err := c.checkCollectionExists(ctx, collName); err != nil {
return nil, err
}
if partitionName != "" {
err := c.checkPartitionExists(ctx, collName, partitionName)
var schema *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collName)
if !ok {
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
}
}
// fields
var rowSize int
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
schema = coll.Schema
} else {
schema = collInfo.Schema
}

// convert columns to field data
fieldsData, rowSize, err := c.processInsertColumns(coll.Schema, columns...)
fieldsData, rowSize, err := c.processInsertColumns(schema, columns...)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +71,7 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName
}
MetaCache.setSessionTs(collName, resp.Timestamp)
// 3. parse id column
return entity.IDColumns(coll.Schema, resp.GetIDs(), 0, -1)
return entity.IDColumns(schema, resp.GetIDs(), 0, -1)
}

func (c *GrpcClient) processInsertColumns(colSchema *entity.Schema, columns ...entity.Column) ([]*schemapb.FieldData, int, error) {
Expand Down Expand Up @@ -350,25 +344,19 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName
if c.Service == nil {
return nil, ErrClientNotReady
}
// 1. validation for all input params
// collection
if err := c.checkCollectionExists(ctx, collName); err != nil {
return nil, err
}
if partitionName != "" {
err := c.checkPartitionExists(ctx, collName, partitionName)
var schema *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collName)
if !ok {
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
}
}
// fields
var rowSize int
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
schema = coll.Schema
} else {
schema = collInfo.Schema
}

fieldsData, rowSize, err := c.processInsertColumns(coll.Schema, columns...)
fieldsData, rowSize, err := c.processInsertColumns(schema, columns...)
if err != nil {
return nil, err
}
Expand All @@ -392,7 +380,7 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName
}
MetaCache.setSessionTs(collName, resp.Timestamp)
// 3. parse id column
return entity.IDColumns(coll.Schema, resp.GetIDs(), 0, -1)
return entity.IDColumns(schema, resp.GetIDs(), 0, -1)
}

// BulkInsert data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
Expand Down
Loading

0 comments on commit 388b832

Please sign in to comment.