Skip to content

Commit

Permalink
fix: generated upstreams (#10)
Browse files Browse the repository at this point in the history
* fix: update to remove duplicate upstreams

* fix: update regex pattern to include more case

* fix: include multiple sources which are specified together under from clause

* fix: error when encountering nil upstreams

* fix: avoid infinite loop when encountering already met resources

* refactor: add limited re-use for extractor to slighly improve the performance

* feat: catch wild-card source query

* fix: capture table from wild character for schema

* fix: undetected cyclic upstream

* fix: return error if circular reference is detected

* chore: update plugin version to 0.3.11

* fix: error when dealing with wild card query

* refactor: move out suffix to a constant
  • Loading branch information
irainia authored Jul 21, 2023
1 parent 50a2ca4 commit e1d5edf
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 32 deletions.
18 changes: 17 additions & 1 deletion task/bq2bq/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

const (
MaxBQClientReuse = 5
MaxBQClientReuse = 5
MaxExtractorReuse = 10
)

type DefaultBQClientFactory struct {
Expand Down Expand Up @@ -56,13 +57,28 @@ func (fac *DefaultBQClientFactory) New(ctx context.Context, svcAccount string) (
}

type DefaultUpstreamExtractorFactory struct {
mu sync.Mutex

cachedExtractor UpstreamExtractor
timeUsed int
}

func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client) (UpstreamExtractor, error) {
d.mu.Lock()
defer d.mu.Unlock()

if d.cachedExtractor != nil && d.timeUsed < MaxExtractorReuse {
d.timeUsed++
return d.cachedExtractor, nil
}

extractor, err := upstream.NewExtractor(client)
if err != nil {
return nil, fmt.Errorf("error initializing extractor: %w", err)
}

d.cachedExtractor = extractor
d.timeUsed = 1

return extractor, nil
}
4 changes: 3 additions & 1 deletion task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
}

flattenedUpstreams := upstream.FlattenUpstreams(upstreams)
formattedUpstreams := b.formatUpstreams(flattenedUpstreams, func(r upstream.Resource) string {
uniqueUpstreams := upstream.UniqueFilterResources(flattenedUpstreams)

formattedUpstreams := b.formatUpstreams(uniqueUpstreams, func(r upstream.Resource) string {
name := fmt.Sprintf("%s:%s.%s", r.Project, r.Dataset, r.Name)
return fmt.Sprintf(plugin.DestinationURNFormat, selfTable.Type, name)
})
Expand Down
25 changes: 24 additions & 1 deletion task/bq2bq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`
})

t.Run("should generate unique dependencies for select statements", func(t *testing.T) {
expectedDeps := []string{"bigquery://proj:dataset.table1"}
expectedDeps := []string{"bigquery://proj:dataset.table1", "bigquery://proj:dataset.table2"}
query := "Select * from proj.dataset.table1 t1 join proj.dataset.table1 t2 on t1.col1 = t2.col1"
data := plugin.GenerateDependenciesRequest{
Assets: plugin.Assets{
Expand Down Expand Up @@ -337,6 +337,29 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`
extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Upstreams: []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
{
Resource: upstream.Resource{
Project: "proj",
Expand Down
4 changes: 2 additions & 2 deletions task/bq2bq/optimus-plugin-bq2bq.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: bq2bq
description: BigQuery to BigQuery transformation task
plugintype: task
pluginversion: 0.3.10 # update this with expected tag before release
image: docker.io/gotocompany/optimus-task-bq2bq-executor:0.3.10
pluginversion: 0.3.11 # update this with expected tag before release
image: docker.io/gotocompany/optimus-task-bq2bq-executor:0.3.11
entrypoint:
script: "python3 /opt/bumblebee/main.py"
questions:
Expand Down
63 changes: 50 additions & 13 deletions task/bq2bq/upstream/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package upstream
import (
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
)

type Extractor struct {
mutex *sync.Mutex
client bqiface.Client

schemaToUpstreams map[string][]*Upstream
Expand All @@ -19,6 +23,7 @@ func NewExtractor(client bqiface.Client) (*Extractor, error) {
}

return &Extractor{
mutex: &sync.Mutex{},
client: client,
schemaToUpstreams: make(map[string][]*Upstream),
}, nil
Expand All @@ -30,10 +35,15 @@ func (e *Extractor) ExtractUpstreams(ctx context.Context, query string, resource
ignoredResources[r] = true
}

return e.extractUpstreamsFromQuery(ctx, query, ignoredResources, ParseTopLevelUpstreamsFromQuery)
metResource := make(map[Resource]bool)
return e.extractUpstreamsFromQuery(ctx, query, ignoredResources, metResource, ParseTopLevelUpstreamsFromQuery)
}

func (e *Extractor) extractUpstreamsFromQuery(ctx context.Context, query string, ignoredResources map[Resource]bool, parseFn QueryParser) ([]*Upstream, error) {
func (e *Extractor) extractUpstreamsFromQuery(
ctx context.Context, query string,
ignoredResources, metResource map[Resource]bool,
parseFn QueryParser,
) ([]*Upstream, error) {
upstreamResources := parseFn(query)

uniqueUpstreamResources := UniqueFilterResources(upstreamResources)
Expand All @@ -42,7 +52,7 @@ func (e *Extractor) extractUpstreamsFromQuery(ctx context.Context, query string,

resourceGroups := GroupResources(filteredUpstreamResources)

output := make([]*Upstream, 0)
var output []*Upstream
for _, group := range resourceGroups {
schemas, err := ReadSchemasUnderGroup(ctx, e.client, group)
if err != nil {
Expand All @@ -54,7 +64,7 @@ func (e *Extractor) extractUpstreamsFromQuery(ctx context.Context, query string,
restsNodes := convertSchemasToNodes(rest)
output = append(output, restsNodes...)

nestedNodes, err := e.extractNestedNodes(ctx, nestedable, ignoredResources)
nestedNodes, err := e.extractNestedNodes(ctx, nestedable, ignoredResources, metResource)
if err != nil {
return nil, err
}
Expand All @@ -65,37 +75,64 @@ func (e *Extractor) extractUpstreamsFromQuery(ctx context.Context, query string,
return output, nil
}

func (e *Extractor) extractNestedNodes(ctx context.Context, schemas []*Schema, ignoredResources map[Resource]bool) ([]*Upstream, error) {
output := make([]*Upstream, len(schemas))
func (e *Extractor) extractNestedNodes(
ctx context.Context, schemas []*Schema,
ignoredResources, metResource map[Resource]bool,
) ([]*Upstream, error) {
var output []*Upstream

for i, sch := range schemas {
nodes, err := e.getNodes(ctx, sch, ignoredResources)
for _, sch := range schemas {
if metResource[sch.Resource] {
msg := e.getCircularMessage(metResource)
return nil, fmt.Errorf("circular reference is detected: [%s]", msg)
}
metResource[sch.Resource] = true

nodes, err := e.getNodes(ctx, sch, ignoredResources, metResource)
if err != nil {
return nil, err
}

output[i] = &Upstream{
output = append(output, &Upstream{
Resource: sch.Resource,
Upstreams: nodes,
}
})
}

return output, nil
}

func (e *Extractor) getNodes(ctx context.Context, schema *Schema, ignoredResources map[Resource]bool) ([]*Upstream, error) {
func (e *Extractor) getNodes(
ctx context.Context, schema *Schema,
ignoredResources, metResource map[Resource]bool,
) ([]*Upstream, error) {
key := schema.Resource.URN()

if existingNodes, ok := e.schemaToUpstreams[key]; ok {
e.mutex.Lock()
existingNodes, ok := e.schemaToUpstreams[key]
e.mutex.Unlock()

if ok {
return existingNodes, nil
}

nodes, err := e.extractUpstreamsFromQuery(ctx, schema.DDL, ignoredResources, ParseNestedUpsreamsFromDDL)
nodes, err := e.extractUpstreamsFromQuery(ctx, schema.DDL, ignoredResources, metResource, ParseNestedUpsreamsFromDDL)
if err != nil {
return nil, err
}

e.mutex.Lock()
e.schemaToUpstreams[key] = nodes
e.mutex.Unlock()

return nodes, nil
}

func (*Extractor) getCircularMessage(metResource map[Resource]bool) string {
var urns []string
for resource := range metResource {
urns = append(urns, resource.URN())
}

return strings.Join(urns, ", ")
}
46 changes: 45 additions & 1 deletion task/bq2bq/upstream/extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestExtractor(t *testing.T) {
{
Message: "should return filtered upstreams for select statements with ignore statement",
QueryRequest: "Select * from /* @ignoreupstream */ proj.dataset.table1",
ExpectedUpstreams: []*upstream.Upstream{},
ExpectedUpstreams: nil,
},
{
Message: "should return filtered upstreams for select statements with ignore statement for view",
Expand Down Expand Up @@ -191,5 +191,49 @@ func TestExtractor(t *testing.T) {
assert.EqualValues(t, expectedUpstreams, actualUpstreams)
assert.NoError(t, actualError)
})

t.Run("should return error if circular reference is detected", func(t *testing.T) {
client := new(ClientMock)
query := new(QueryMock)
rowIterator := new(RowIteratorMock)
resourcestoIgnore := []upstream.Resource{}

extractor, err := upstream.NewExtractor(client)
assert.NotNil(t, extractor)
assert.NoError(t, err)

ctx := context.Background()
queryRequest := "select * from `project_test_1.dataset_test_1.cyclic_test_1`"

client.On("Query", mock.Anything).Return(query)

query.On("Read", mock.Anything).Return(rowIterator, nil)

rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = []bigquery.Value{"project_test_1", "dataset_test_1", "cyclic_test_1", "VIEW", "select * from project_test_3.dataset_test_3.cyclic_test_3"}
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()
rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = []bigquery.Value{"project_test_3", "dataset_test_3", "cyclic_test_3", "VIEW", "select * from project_test_2.dataset_test_2.cyclic_test_2"}
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()
rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = []bigquery.Value{"project_test_2", "dataset_test_2", "cyclic_test_2", "VIEW", "select * from project_test_1.dataset_test_1.cyclic_test_1"}
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()
rowIterator.On("Next", mock.Anything).Run(func(args mock.Arguments) {
v := args.Get(0).(*[]bigquery.Value)
*v = []bigquery.Value{"project_test_1", "dataset_test_1", "cyclic_test_1", "VIEW", "select * from project_test_3.dataset_test_3.cyclic_test_3"}
}).Return(nil).Once()
rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once()

actualUpstreams, actualError := extractor.ExtractUpstreams(ctx, queryRequest, resourcestoIgnore)

assert.Nil(t, actualUpstreams)
assert.ErrorContains(t, actualError, "circular reference is detected")
})
})
}
16 changes: 14 additions & 2 deletions task/bq2bq/upstream/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ type QueryParser func(query string) []Resource

var (
topLevelUpstreamsPattern = regexp.MustCompile("" +
"(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
"(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-\\*?]+)`?" +
"|" +
"(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
"|" +
"(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)")
"(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)" +
"|" +
"(?i)(?:VIEW)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
"|" +
"(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`\\s*(?:AS)?")

queryCommentPatterns = regexp.MustCompile("(--.*)|(((/\\*)+?[\\w\\W]*?(\\*/)+))")
helperPattern = regexp.MustCompile("(\\/\\*\\s*(@[a-zA-Z0-9_-]+)\\s*\\*\\/)")
Expand All @@ -39,6 +43,10 @@ func ParseTopLevelUpstreamsFromQuery(query string) []Resource {
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 5, 6, 7, 8
case "with":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 9, 10, 11, 12
case "view":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 13, 14, 15, 16
default:
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 17, 18, 19, 20
}

project := match[projectIdx]
Expand All @@ -53,6 +61,10 @@ func ParseTopLevelUpstreamsFromQuery(query string) []Resource {
continue
}

if clause == "view" {
continue
}

resource := Resource{
Project: project,
Dataset: dataset,
Expand Down
Loading

0 comments on commit e1d5edf

Please sign in to comment.