Skip to content

Commit

Permalink
feat(bigquery): add clustering fields and extra fields for partition (#…
Browse files Browse the repository at this point in the history
…472)

* add clustering fields
* add partition details for different kinds of partitioning in bigquery - time and range

---------

Co-authored-by: Anjali Aggarwal <[email protected]>
  • Loading branch information
anjali9791 and anjaliagg9791 authored Feb 24, 2023
1 parent a8e255c commit a79fdde
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
38 changes: 27 additions & 11 deletions plugins/extractors/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,33 @@ source:

## Outputs

| Field | Sample Value |
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `resource.urn` | `project_id.dataset_name.table_name` |
| `resource.name` | `table_name` |
| `resource.service` | `bigquery` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `profile.usage_count` | `15` |
| `profile.joins` | [][Join](#Join) |
| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] |
| `schema` | [][Column](#column) |
| Field | Sample Value | Description |
|:-------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------|
| `resource.urn` | `project_id.dataset_name.table_name` | |
| `resource.name` | `table_name` | |
| `resource.service` | `bigquery` | |
| `description` | `table description` | |
| `profile.total_rows` | `2100` | |
| `profile.usage_count` | `15` | |
| `profile.joins` | [][Join](#Join) | |
| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] | |
| `schema` | [][Column](#column) | |
| `properties.partition_data` | `"partition_data": {"partition_field": "data_date", "require_partition_filter": false, "time_partition": {"partition_by": "DAY","partition_expire": 0 } }` | partition related data for time and range partitioning. |
| `properties.clustering_fields` | `['created_at', 'updated_at']` | list of fields on which the table is clustered |
| `properties.partition_field` | `created_at` | returns the field on which table is time partitioned |

### Partition Data

| Field | Sample Value | Description |
|:------------------------------------------|:-------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `partition_field` | `created_at` | field on which the table is partitioned either by TimePartitioning or RangePartitioning. In case field is empty for TimePartitioning _PARTITIONTIME is returned instead of empty. |
| `require_partition_filter` | `true` | boolean value which denotes if every query on the bigquery table must include at least one predicate that only references the partitioning column |
| `time_partition.partition_by` | `HOUR` | returns partition type HOUR/DAY/MONTH/YEAR |
| `time_partition.partition_expire_seconds` | `0` | time in which data will expire from this partition. If 0 it will not expire. |
| `range_partition.interval` | `10` | width of a interval range |
| `range_partition.start` | `0` | start value for partition inclusive of this value |
| `range_partition.end` | `100` | end value for partition exclusive of this value |


### Column

Expand Down
30 changes: 27 additions & 3 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,34 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu
tableURN := plugins.BigQueryURN(t.ProjectID, t.DatasetID, t.TableID)

tableProfile := e.buildTableProfile(tableURN, tableStats)

var partitionField string
partitionData := make(map[string]interface{})
if md.TimePartitioning != nil {
partitionField = md.TimePartitioning.Field
if partitionField == "" {
partitionField = "_PARTITIONTIME"
}
partitionData["partition_field"] = partitionField
partitionData["time_partition"] = map[string]interface{}{
"partition_by": string(md.TimePartitioning.Type),
"partition_expire_seconds": md.TimePartitioning.Expiration.Seconds(),
}
} else if md.RangePartitioning != nil {
partitionData["partition_field"] = md.RangePartitioning.Field
partitionData["range_partition"] = map[string]interface{}{
"start": md.RangePartitioning.Range.Start,
"end": md.RangePartitioning.Range.End,
"interval": md.RangePartitioning.Range.Interval,
}
}
partitionData["require_partition_filter"] = md.RequirePartitionFilter

var clusteringFields []interface{}
if md.Clustering != nil && len(md.Clustering.Fields) > 0 {
clusteringFields = make([]interface{}, len(md.Clustering.Fields))
for idx, field := range md.Clustering.Fields {
clusteringFields[idx] = field
}
}

var previewFields []string
Expand All @@ -271,7 +295,6 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu
if err != nil {
e.logger.Warn("error building preview", "err", err, "table", tableFQN)
}

}

table, err := anypb.New(&v1beta2.Table{
Expand All @@ -284,7 +307,8 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu
"dataset": t.DatasetID,
"project": t.ProjectID,
"type": string(md.Type),
"partition_field": partitionField,
"partition_data": partitionData,
"clustering_fields": clusteringFields,
}),
CreateTime: timestamppb.New(md.CreationTime),
UpdateTime: timestamppb.New(md.LastModifiedTime),
Expand Down

0 comments on commit a79fdde

Please sign in to comment.