Skip to content

Commit

Permalink
fix: allow schema error (#18)
Browse files Browse the repository at this point in the history
* fix: update upstream schema reader to allow returning available schema regardless of error

* fix: update upstream extractor to return upstreams regardless of error

* fix: returns extracted upstream while logging any encountered error
  • Loading branch information
irainia authored Aug 9, 2023
1 parent 88b2f01 commit a96357a
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 41 deletions.
2 changes: 1 addition & 1 deletion task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
continue
}

return nil, fmt.Errorf("error extracting upstreams: %w", err)
b.logger.Error("error extracting upstreams", err)
}

return upstreams, nil
Expand Down
27 changes: 17 additions & 10 deletions task/bq2bq/upstream/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func (e *Extractor) extractUpstreamsFromQuery(
resourceGroups := GroupResources(filteredUpstreamResources)

var output []*Upstream
var errorMessages []string

for _, group := range resourceGroups {
schemas, err := ReadSchemasUnderGroup(ctx, e.client, group)
if err != nil {
return nil, err
errorMessages = append(errorMessages, err.Error())
}

nestedable, rest := splitNestedableFromRest(schemas)
Expand All @@ -66,12 +68,15 @@ func (e *Extractor) extractUpstreamsFromQuery(

nestedNodes, err := e.extractNestedNodes(ctx, nestedable, ignoredResources, metResource)
if err != nil {
return nil, err
errorMessages = append(errorMessages, err.Error())
}

output = append(output, nestedNodes...)
}

if len(errorMessages) > 0 {
return output, fmt.Errorf("error reading upstream: [%s]", strings.Join(errorMessages, ", "))
}
return output, nil
}

Expand All @@ -80,17 +85,19 @@ func (e *Extractor) extractNestedNodes(
ignoredResources, metResource map[Resource]bool,
) ([]*Upstream, error) {
var output []*Upstream
var errorMessages []string

for _, sch := range schemas {
if metResource[sch.Resource] {
msg := e.getCircularMessage(metResource)
return nil, fmt.Errorf("circular reference is detected: [%s]", msg)
msg := fmt.Sprintf("circular reference is detected: [%s]", e.getCircularURNs(metResource))
errorMessages = append(errorMessages, msg)
continue
}
metResource[sch.Resource] = true

nodes, err := e.getNodes(ctx, sch, ignoredResources, metResource)
if err != nil {
return nil, err
errorMessages = append(errorMessages, err.Error())
}

output = append(output, &Upstream{
Expand All @@ -99,6 +106,9 @@ func (e *Extractor) extractNestedNodes(
})
}

if len(errorMessages) > 0 {
return output, fmt.Errorf("error getting nested upstream: [%s]", strings.Join(errorMessages, ", "))
}
return output, nil
}

Expand All @@ -117,18 +127,15 @@ func (e *Extractor) getNodes(
}

nodes, err := e.extractUpstreamsFromQuery(ctx, schema.DDL, ignoredResources, metResource, ParseNestedUpsreamsFromDDL)
if err != nil {
return nil, err
}

e.mutex.Lock()
e.schemaToUpstreams[key] = nodes
e.mutex.Unlock()

return nodes, nil
return nodes, err
}

func (*Extractor) getCircularMessage(metResource map[Resource]bool) string {
func (*Extractor) getCircularURNs(metResource map[Resource]bool) string {
var urns []string
for resource := range metResource {
urns = append(urns, resource.URN())
Expand Down
30 changes: 29 additions & 1 deletion task/bq2bq/upstream/extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,37 @@ func TestExtractor(t *testing.T) {
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()

expectedUpstreams := []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "project_test_1",
Dataset: "dataset_test_1",
Name: "cyclic_test_1",
},
Upstreams: []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "project_test_3",
Dataset: "dataset_test_3",
Name: "cyclic_test_3",
},
Upstreams: []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "project_test_2",
Dataset: "dataset_test_2",
Name: "cyclic_test_2",
},
},
},
},
},
},
}

actualUpstreams, actualError := extractor.ExtractUpstreams(ctx, queryRequest, resourcestoIgnore)

assert.Nil(t, actualUpstreams)
assert.EqualValues(t, expectedUpstreams, actualUpstreams)
assert.ErrorContains(t, actualError, "circular reference is detected")
})
})
Expand Down
24 changes: 16 additions & 8 deletions task/bq2bq/upstream/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ func ReadSchemasUnderGroup(ctx context.Context, client bqiface.Client, group *Re
}

var schemas []*Schema
var errorMessages []string

for {
var values []bigquery.Value
err := rowIterator.Next(&values)
if err == iterator.Done {
break
}
if err != nil {
return nil, err
if err := rowIterator.Next(&values); err != nil {
if errors.Is(err, iterator.Done) {
break
}

errorMessages = append(errorMessages, err.Error())
continue
}

if len(values) == 0 {
Expand All @@ -58,13 +61,18 @@ func ReadSchemasUnderGroup(ctx context.Context, client bqiface.Client, group *Re

sch, err := convertToSchema(values)
if err != nil {
return nil, err
errorMessages = append(errorMessages, err.Error())
continue
}

schemas = append(schemas, sch)
}

return schemas, nil
if len(errorMessages) > 0 {
err = fmt.Errorf("error encountered when reading reading schema: [%s]", strings.Join(errorMessages, ", "))
}

return schemas, err
}

func buildQuery(group *ResourceGroup) string {
Expand Down
102 changes: 81 additions & 21 deletions task/bq2bq/upstream/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,78 @@ func TestReadSchemasUnderGroup(t *testing.T) {
assert.ErrorContains(t, actualError, unexpectedError.Error())
})

t.Run("should return nil and error if failed getting next value iterator", func(t *testing.T) {
client := new(ClientMock)
queryStatement := new(QueryMock)
rowIterator := new(RowIteratorMock)
t.Run("should return schema and error if failed getting next value iterator", func(t *testing.T) {
t.Run("should return nil schema if no other values available", func(t *testing.T) {
client := new(ClientMock)
queryStatement := new(QueryMock)
rowIterator := new(RowIteratorMock)

ctx := context.Background()
group := &upstream.ResourceGroup{
Project: "project_test",
Dataset: "dataset_test",
Names: []string{"table_test"},
}
ctx := context.Background()
group := &upstream.ResourceGroup{
Project: "project_test",
Dataset: "dataset_test",
Names: []string{"table_test"},
}

queryContent := buildQuery(group)
client.On("Query", queryContent).Return(queryStatement)
queryContent := buildQuery(group)
client.On("Query", queryContent).Return(queryStatement)

queryStatement.On("Read", ctx).Return(rowIterator, nil)
queryStatement.On("Read", ctx).Return(rowIterator, nil)

unexpectedError := errors.New("unexpected error")
rowIterator.On("Next", mock.Anything).Return(unexpectedError)
unexpectedError := errors.New("unexpected error")
rowIterator.On("Next", mock.Anything).Return(unexpectedError).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()

actualSchemas, actualError := upstream.ReadSchemasUnderGroup(ctx, client, group)
actualSchemas, actualError := upstream.ReadSchemasUnderGroup(ctx, client, group)

assert.Nil(t, actualSchemas)
assert.ErrorContains(t, actualError, unexpectedError.Error())
assert.Nil(t, actualSchemas)
assert.ErrorContains(t, actualError, unexpectedError.Error())
})

t.Run("should return available schema if other values available", func(t *testing.T) {
client := new(ClientMock)
queryStatement := new(QueryMock)
rowIterator := new(RowIteratorMock)

ctx := context.Background()
group := &upstream.ResourceGroup{
Project: "project_test",
Dataset: "dataset_test",
Names: []string{"table_test"},
}

queryContent := buildQuery(group)
client.On("Query", queryContent).Return(queryStatement)

queryStatement.On("Read", ctx).Return(rowIterator, nil)

unexpectedError := errors.New("unexpected error")
rowIterator.On("Next", mock.Anything).Return(unexpectedError).Once()
rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = []bigquery.Value{"project_test", "dataset_test", "table_test", "BASE TABLE", ""}
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()

expectedSchemas := []*upstream.Schema{
{
Resource: upstream.Resource{
Project: "project_test",
Dataset: "dataset_test",
Name: "table_test",
},
Type: upstream.BaseTable,
},
}

actualSchemas, actualError := upstream.ReadSchemasUnderGroup(ctx, client, group)

assert.EqualValues(t, expectedSchemas, actualSchemas)
assert.ErrorContains(t, actualError, unexpectedError.Error())
})
})

t.Run("should return nil and nil if row iterator results in zero value", func(t *testing.T) {
t.Run("should return schema and nil if row iterator results in zero value", func(t *testing.T) {
client := new(ClientMock)
queryStatement := new(QueryMock)
rowIterator := new(RowIteratorMock)
Expand Down Expand Up @@ -95,7 +140,7 @@ func TestReadSchemasUnderGroup(t *testing.T) {
assert.NoError(t, actualError)
})

t.Run("should return nil and error if row iterator cannot be converted to schema", func(t *testing.T) {
t.Run("should return schema and error if row iterator cannot be converted to schema", func(t *testing.T) {
testCases := []struct {
IteratorValues []bigquery.Value
ErrorMessage string
Expand Down Expand Up @@ -144,15 +189,30 @@ func TestReadSchemasUnderGroup(t *testing.T) {

queryStatement.On("Read", ctx).Return(rowIterator, nil)

rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = []bigquery.Value{"project_test", "dataset_test", "table_test", "BASE TABLE", ""}
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = test.IteratorValues
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()

expectedSchemas := []*upstream.Schema{
{
Resource: upstream.Resource{
Project: "project_test",
Dataset: "dataset_test",
Name: "table_test",
},
Type: upstream.BaseTable,
},
}

actualSchemas, actualError := upstream.ReadSchemasUnderGroup(ctx, client, group)

assert.Nil(t, actualSchemas)
assert.EqualValues(t, expectedSchemas, actualSchemas)
assert.ErrorContains(t, actualError, test.ErrorMessage)
}
})
Expand Down

0 comments on commit a96357a

Please sign in to comment.