Skip to content

Commit

Permalink
Merge pull request #171 from GreenmaskIO/fix/unepxpected_error_in_val…
Browse files Browse the repository at this point in the history
…idate

fix: fixed validate command fatal error caused by filtered objects
  • Loading branch information
wwoytenko authored Aug 19, 2024
2 parents 1c4b181 + 91aea48 commit 1ee1757
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 46 deletions.
13 changes: 9 additions & 4 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Dump struct {
// sortedTablesDumpIds - sorted tables dump ids in topological order
sortedTablesDumpIds []int32
// validate shows that dump worker must be in validation mode
validate bool
validate bool
validateRowsLimit uint64
}

func NewDump(cfg *domains.Config, st storages.Storager, registry *utils.TransformerRegistry) *Dump {
Expand Down Expand Up @@ -257,13 +258,17 @@ func (d *Dump) dumpWorkerRunner(
func (d *Dump) taskProducer(ctx context.Context, tasks chan<- dumpers.DumpTask) func() error {
return func() error {
defer close(tasks)
dataObjects := d.context.DataSectionObjects
if d.validate {
dataObjects = d.context.DataSectionObjectsToValidate
}

for _, dumpObj := range d.context.DataSectionObjects {
for _, dumpObj := range dataObjects {
dumpObj.SetDumpId(d.dumpIdSequence)
var task dumpers.DumpTask
switch v := dumpObj.(type) {
case *entries.Table:
task = dumpers.NewTableDumper(v, d.validate, d.pgDumpOptions.Pgzip)
task = dumpers.NewTableDumper(v, d.validate, d.validateRowsLimit, d.pgDumpOptions.Pgzip)
case *entries.Sequence:
task = dumpers.NewSequenceDumper(v)
case *entries.Blobs:
Expand Down Expand Up @@ -332,7 +337,7 @@ func (d *Dump) setDumpDependenciesGraph(tables []*entries.Table) {
return entry.Oid == oid
})
if idx == -1 {
panic("table not found")
panic(fmt.Sprintf("table not found: oid=%d", oid))
}
t := tables[idx]
// Create dependencies graph with DumpId sequence for easier restoration coordination
Expand Down
34 changes: 13 additions & 21 deletions internal/db/postgres/cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewValidate(cfg *domains.Config, registry *utils.TransformerRegistry, st st
d.dumpIdSequence = toc.NewDumpSequence(0)

d.validate = true
d.validateRowsLimit = cfg.Validate.RowsLimit
return &Validate{
Dump: d,
tmpDir: tmpDirName,
Expand Down Expand Up @@ -139,7 +140,7 @@ func (v *Validate) Run(ctx context.Context) (int, error) {
return v.exitCode, nil
}

if err = v.dumpTables(ctx); err != nil {
if err = v.dataDump(ctx); err != nil {
return nonZeroExitCode, err
}

Expand All @@ -152,12 +153,20 @@ func (v *Validate) Run(ctx context.Context) (int, error) {

func (v *Validate) print(ctx context.Context) error {
for _, e := range v.dataEntries {
idx := slices.IndexFunc(v.context.DataSectionObjects, func(entry entries.Entry) bool {
t := entry.(*entries.Table)
idx := slices.IndexFunc(v.context.DataSectionObjectsToValidate, func(entry entries.Entry) bool {
t, ok := entry.(*entries.Table)
if !ok {
return false
}
return t.DumpId == e.DumpId
})

t := v.context.DataSectionObjects[idx].(*entries.Table)
if idx == -1 {
// skip if not in DataSectionObjectsToValidate
continue
}

t := v.context.DataSectionObjectsToValidate[idx].(*entries.Table)
doc, err := v.createDocument(ctx, t)
if err != nil {
return fmt.Errorf("unable to create validation document: %w", err)
Expand Down Expand Up @@ -269,23 +278,6 @@ func (v *Validate) createDocument(ctx context.Context, t *entries.Table) (valida
return doc, nil
}

func (v *Validate) dumpTables(ctx context.Context) error {
var tablesWithTransformers []entries.Entry
for _, item := range v.context.DataSectionObjects {

if t, ok := item.(*entries.Table); ok && len(t.TransformersContext) > 0 {
t.ValidateLimitedRecords = v.config.Validate.RowsLimit
tablesWithTransformers = append(tablesWithTransformers, t)
}
}
v.context.DataSectionObjects = tablesWithTransformers

if err := v.dataDump(ctx); err != nil {
return fmt.Errorf("data stage dumping error: %w", err)
}
return nil
}

func (v *Validate) printValidationWarnings() error {
// TODO: Implement warnings hook, such as logging and HTTP sender
for _, w := range v.context.Warnings {
Expand Down
26 changes: 19 additions & 7 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type RuntimeContext struct {
Types []*toolkit.Type
// DataSectionObjects - list of objects to dump in data-section. There are sequences, tables and large objects
DataSectionObjects []entries.Entry
// DataSectionObjectsToValidate - list of objects to validate in data-section
DataSectionObjectsToValidate []entries.Entry
// Warnings - list of occurred ValidationWarning during validation and config building
Warnings toolkit.ValidationWarnings
// Registry - registry of all the registered transformers definition
Expand Down Expand Up @@ -125,14 +127,24 @@ func NewRuntimeContext(
dataSectionObjects = append(dataSectionObjects, blobEntries)
}

// Generate list of tables that might be validated during the validate command call
var dataSectionObjectsToValidate []entries.Entry
for _, item := range dataSectionObjects {

if t, ok := item.(*entries.Table); ok && len(t.TransformersContext) > 0 {
dataSectionObjectsToValidate = append(dataSectionObjectsToValidate, t)
}
}

return &RuntimeContext{
Tables: tables,
Types: types,
DataSectionObjects: dataSectionObjects,
Warnings: warnings,
Registry: r,
DatabaseSchema: schema,
Graph: graph,
Tables: tables,
Types: types,
DataSectionObjects: dataSectionObjects,
Warnings: warnings,
Registry: r,
DatabaseSchema: schema,
Graph: graph,
DataSectionObjectsToValidate: dataSectionObjectsToValidate,
}, nil
}

Expand Down
22 changes: 12 additions & 10 deletions internal/db/postgres/dumpers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@ import (
"fmt"
"io"

"github.com/greenmaskio/greenmask/internal/utils/ioutils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/storages"
"github.com/greenmaskio/greenmask/internal/utils/ioutils"
)

type TableDumper struct {
table *entries.Table
recordNum uint64
validate bool
usePgzip bool
table *entries.Table
recordNum uint64
validate bool
validateRowsLimit uint64
usePgzip bool
}

func NewTableDumper(table *entries.Table, validate bool, usePgzip bool) *TableDumper {
func NewTableDumper(table *entries.Table, validate bool, rowsLimit uint64, usePgzip bool) *TableDumper {
return &TableDumper{
table: table,
validate: validate,
usePgzip: usePgzip,
table: table,
validate: validate,
usePgzip: usePgzip,
validateRowsLimit: rowsLimit,
}
}

Expand Down Expand Up @@ -162,7 +164,7 @@ func (td *TableDumper) process(ctx context.Context, tx pgx.Tx, w io.WriteCloser,
if td.validate {
// Logic for validation limiter - exit after recordNum rows
td.recordNum++
if td.recordNum == td.table.ValidateLimitedRecords {
if td.recordNum == td.validateRowsLimit {
return nil
}
}
Expand Down
6 changes: 2 additions & 4 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ type Table struct {
CompressedSize int64
ExcludeData bool
Driver *toolkit.Driver
// ValidateLimitedRecords - perform dumping and transformation only for N records and exit
ValidateLimitedRecords uint64
Scores int64
SubsetConds []string
Scores int64
SubsetConds []string
}

func (t *Table) HasCustomTransformer() bool {
Expand Down

0 comments on commit 1ee1757

Please sign in to comment.