Skip to content

Commit

Permalink
fix: dedupe owners in upsert/patch asset
Browse files Browse the repository at this point in the history
  • Loading branch information
Chief-Rishab committed Sep 27, 2023
1 parent fa6266d commit 14020c9
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 51 deletions.
40 changes: 35 additions & 5 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"context"
"errors"
"fmt"
"github.com/raystack/compass/core/namespace"
"github.com/raystack/compass/pkg/grpc_interceptor"
"time"

"github.com/r3labs/diff/v2"
"github.com/raystack/compass/core/asset"
"github.com/raystack/compass/core/namespace"
"github.com/raystack/compass/core/star"
"github.com/raystack/compass/core/user"
"github.com/raystack/compass/pkg/grpc_interceptor"
"github.com/raystack/compass/pkg/statsd"
compassv1beta1 "github.com/raystack/compass/proto/raystack/compass/v1beta1"
"github.com/r3labs/diff/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -465,7 +465,7 @@ func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest
}

var owners []user.User
for _, owner := range baseAsset.GetOwners() {
for _, owner := range dedupe(baseAsset.GetOwners()) {
owners = append(owners, user.User{
ID: owner.Id,
UUID: owner.Uuid,
Expand Down Expand Up @@ -547,7 +547,7 @@ func decodePatchAssetToMap(pb *compassv1beta1.UpsertPatchAssetRequest_Asset) map
}
if len(pb.GetOwners()) > 0 {
ownersMap := []map[string]interface{}{}
ownersPB := pb.GetOwners()
ownersPB := dedupe(pb.GetOwners())
for _, ownerPB := range ownersPB {
ownerMap := map[string]interface{}{}
if ownerPB.GetId() != "" {
Expand All @@ -570,6 +570,36 @@ func decodePatchAssetToMap(pb *compassv1beta1.UpsertPatchAssetRequest_Asset) map
return m
}

func dedupe(owners []*compassv1beta1.User) []*compassv1beta1.User {
n := len(owners)
uniq := make([]*compassv1beta1.User, 0, n)
ids := make(map[string]struct{}, n)
uuids := make(map[string]struct{}, n)
emails := make(map[string]struct{}, n)
for _, o := range owners {
if _, ok := ids[o.Id]; ok {
continue
}
if _, ok := uuids[o.Uuid]; ok {
continue
}
if _, ok := emails[o.Email]; ok {
continue
}
if o.Id != "" {
ids[o.Id] = struct{}{}
}
if o.Uuid != "" {
uuids[o.Uuid] = struct{}{}
}
if o.Email != "" {
emails[o.Email] = struct{}{}
}
uniq = append(uniq, o)
}
return uniq
}

// assetToProto transforms struct to proto
func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.Asset, err error) {
var data *structpb.Struct
Expand Down
38 changes: 25 additions & 13 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ import (
"context"
"errors"
"fmt"
"github.com/raystack/compass/core/namespace"
"github.com/raystack/compass/pkg/grpc_interceptor"
"reflect"
"testing"
"time"

"github.com/raystack/compass/core/namespace"
"github.com/raystack/compass/pkg/grpc_interceptor"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/r3labs/diff/v2"
"github.com/raystack/compass/core/asset"
"github.com/raystack/compass/core/star"
"github.com/raystack/compass/core/user"
"github.com/raystack/compass/internal/server/v1beta1/mocks"
compassv1beta1 "github.com/raystack/compass/proto/raystack/compass/v1beta1"
"github.com/raystack/salt/log"
"github.com/r3labs/diff/v2"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -364,8 +365,13 @@ func TestUpsertAsset(t *testing.T) {
Service: "kafka",
Data: &structpb.Struct{},
Url: "https://sample-url.com",
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
},
Owners: []*compassv1beta1.User{
{Id: "id", Uuid: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"},
// the following users should get de-duplicated.
{Id: "id"},
{Uuid: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8"},
{Email: "[email protected]"},
}},
Upstreams: []*compassv1beta1.LineageNode{
{
Urn: "upstream-1",
Expand Down Expand Up @@ -498,7 +504,7 @@ func TestUpsertAsset(t *testing.T) {
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
Owners: []user.User{{ID: "id", UUID: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
}
upstreams := []string{"upstream-1"}
downstreams := []string{"downstream-1", "downstream-2"}
Expand Down Expand Up @@ -576,7 +582,13 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
Data: &structpb.Struct{},
Url: "https://sample-url.com",
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
Owners: []*compassv1beta1.User{
{Id: "id", Uuid: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"},
// the following users should get de-duplicated.
{Id: "id"},
{Uuid: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8"},
{Email: "[email protected]"},
},
},
Upstreams: []*compassv1beta1.LineageNode{
{
Expand Down Expand Up @@ -606,7 +618,7 @@ func TestUpsertPatchAsset(t *testing.T) {
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
Owners: []user.User{{ID: "id", UUID: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
}
)
ctx := user.NewContext(context.Background(), user.User{UUID: userUUID})
Expand Down Expand Up @@ -724,7 +736,7 @@ func TestUpsertPatchAsset(t *testing.T) {
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
Owners: []user.User{{ID: "id", UUID: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
}
upstreams := []string{"upstream-1"}
downstreams := []string{"downstream-1", "downstream-2"}
Expand Down Expand Up @@ -761,7 +773,7 @@ func TestUpsertPatchAsset(t *testing.T) {
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
Owners: []user.User{{ID: "id", UUID: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
}

assetWithID := patchedAsset
Expand All @@ -781,7 +793,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
},
},
ExpectStatus: codes.OK,
Expand All @@ -807,7 +819,7 @@ func TestUpsertPatchAsset(t *testing.T) {
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
Owners: []user.User{{ID: "id", UUID: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
}

assetWithID := patchedAsset
Expand All @@ -827,7 +839,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "1aecb8b3-23a9-4456-8ebd-3aafc746fff8", Email: "[email protected]", Provider: "provider"}},
},
OverwriteLineage: true,
},
Expand Down
36 changes: 26 additions & 10 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/raystack/compass/core/namespace"
"strings"
"time"

"github.com/raystack/compass/core/namespace"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/r3labs/diff/v2"
"github.com/raystack/compass/core/asset"
"github.com/raystack/compass/core/user"
"github.com/r3labs/diff/v2"
)

// AssetRepository is a type that manages user operation to the primary database
Expand Down Expand Up @@ -721,32 +722,47 @@ func (r *AssetRepository) compareOwners(current, newOwners []user.User) (toInser
}

func (r *AssetRepository) createOrFetchUsers(ctx context.Context, tx *sqlx.Tx, ns *namespace.Namespace, users []user.User) ([]user.User, error) {
ids := make(map[string]struct{}, len(users))
var results []user.User
for _, u := range users {
if u.ID != "" {
if _, ok := ids[u.ID]; ok {
continue
}
ids[u.ID] = struct{}{}
results = append(results, u)
continue
}

var (
userID string
fetchedUser user.User
err error
)
if u.UUID != "" {
fetchedUser, err = r.userRepo.GetByUUID(ctx, u.UUID)
fetchedUser, err = r.userRepo.GetByUUIDWithTx(ctx, tx, u.UUID)
} else {
fetchedUser, err = r.userRepo.GetByEmail(ctx, u.Email)
fetchedUser, err = r.userRepo.GetByEmailWithTx(ctx, tx, u.Email)
}
if err == nil {
userID = fetchedUser.ID
}
if errors.As(err, &user.NotFoundError{}) {
switch {
case errors.As(err, &user.NotFoundError{}):
u.Provider = r.defaultUserProvider
userID, err = r.userRepo.CreateWithTx(ctx, tx, ns, &u)
if err != nil {
return nil, fmt.Errorf("error creating owner: %w", err)
}
}
if err != nil {

case err != nil:
return nil, fmt.Errorf("error getting owner's ID: %w", err)

case err == nil:
userID = fetchedUser.ID
}

if _, ok := ids[userID]; ok {
continue
}
ids[userID] = struct{}{}
u.ID = userID
results = append(results, u)
}
Expand Down
13 changes: 8 additions & 5 deletions internal/store/postgres/asset_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/r3labs/diff/v2"
"github.com/raystack/compass/core/asset"
"github.com/raystack/compass/core/user"
"github.com/raystack/compass/internal/store/postgres"
"github.com/raystack/salt/log"
"github.com/ory/dockertest/v3"
"github.com/r3labs/diff/v2"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -986,8 +986,10 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Owners: []user.User{
stripUserID(r.users[1]),
r.users[1],
{Email: r.users[2].Email},
{UUID: r.users[2].UUID}, // should get deduplicated by ID on fetch by UUID
{ID: r.users[1].ID}, // should get deduplicated by ID
},
UpdatedBy: r.users[0],
}
Expand All @@ -1000,7 +1002,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
actual, err := r.repository.GetByID(r.ctx, ast.ID)
r.NoError(err)

r.Len(actual.Owners, len(ast.Owners))
r.Len(actual.Owners, 2)
r.Equal(r.users[1].ID, actual.Owners[0].ID)
r.Equal(r.users[2].ID, actual.Owners[1].ID)
})
Expand All @@ -1013,6 +1015,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Owners: []user.User{
{Email: "[email protected]"},
{UUID: "108151e5-4c9f-4951-a8e1-6966b5aa2bb6"},
{Email: "[email protected]"}, // should get deduplicated by ID on fetch user by email
},
UpdatedBy: r.users[0],
}
Expand All @@ -1024,7 +1027,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
actual, err := r.repository.GetByID(r.ctx, id)
r.NoError(err)

r.Len(actual.Owners, len(ast.Owners))
r.Len(actual.Owners, 2)
r.Equal(ast.Owners[0].Email, actual.Owners[0].Email)
r.Equal(ast.Owners[1].UUID, actual.Owners[1].UUID)
})
Expand Down
Loading

0 comments on commit 14020c9

Please sign in to comment.