diff --git a/internal/db/postgres/cmd/dump.go b/internal/db/postgres/cmd/dump.go index 6a3b9f45..0a4f6ae6 100644 --- a/internal/db/postgres/cmd/dump.go +++ b/internal/db/postgres/cmd/dump.go @@ -67,7 +67,8 @@ type Dump struct { // sortedTablesDumpIds - sorted tables dump ids in topological order sortedTablesDumpIds []int32 // validate shows that dump worker must be in validation mode - validate bool + validate bool + validateRowsLimit uint64 } func NewDump(cfg *domains.Config, st storages.Storager, registry *utils.TransformerRegistry) *Dump { @@ -257,13 +258,17 @@ func (d *Dump) dumpWorkerRunner( func (d *Dump) taskProducer(ctx context.Context, tasks chan<- dumpers.DumpTask) func() error { return func() error { defer close(tasks) + dataObjects := d.context.DataSectionObjects + if d.validate { + dataObjects = d.context.DataSectionObjectsToValidate + } - for _, dumpObj := range d.context.DataSectionObjects { + for _, dumpObj := range dataObjects { dumpObj.SetDumpId(d.dumpIdSequence) var task dumpers.DumpTask switch v := dumpObj.(type) { case *entries.Table: - task = dumpers.NewTableDumper(v, d.validate, d.pgDumpOptions.Pgzip) + task = dumpers.NewTableDumper(v, d.validate, d.validateRowsLimit, d.pgDumpOptions.Pgzip) case *entries.Sequence: task = dumpers.NewSequenceDumper(v) case *entries.Blobs: @@ -332,7 +337,7 @@ func (d *Dump) setDumpDependenciesGraph(tables []*entries.Table) { return entry.Oid == oid }) if idx == -1 { - panic("table not found") + panic(fmt.Sprintf("table not found: oid=%d", oid)) } t := tables[idx] // Create dependencies graph with DumpId sequence for easier restoration coordination diff --git a/internal/db/postgres/cmd/validate.go b/internal/db/postgres/cmd/validate.go index 90e90078..d756570f 100644 --- a/internal/db/postgres/cmd/validate.go +++ b/internal/db/postgres/cmd/validate.go @@ -63,6 +63,7 @@ func NewValidate(cfg *domains.Config, registry *utils.TransformerRegistry, st st d.dumpIdSequence = toc.NewDumpSequence(0) d.validate = true + d.validateRowsLimit = cfg.Validate.RowsLimit return &Validate{ Dump: d, tmpDir: tmpDirName, @@ -139,7 +140,7 @@ func (v *Validate) Run(ctx context.Context) (int, error) { return v.exitCode, nil } - if err = v.dumpTables(ctx); err != nil { + if err = v.dataDump(ctx); err != nil { return nonZeroExitCode, err } @@ -152,12 +153,20 @@ func (v *Validate) Run(ctx context.Context) (int, error) { func (v *Validate) print(ctx context.Context) error { for _, e := range v.dataEntries { - idx := slices.IndexFunc(v.context.DataSectionObjects, func(entry entries.Entry) bool { - t := entry.(*entries.Table) + idx := slices.IndexFunc(v.context.DataSectionObjectsToValidate, func(entry entries.Entry) bool { + t, ok := entry.(*entries.Table) + if !ok { + return false + } return t.DumpId == e.DumpId }) - t := v.context.DataSectionObjects[idx].(*entries.Table) + if idx == -1 { + // skip if not in DataSectionObjectsToValidate + continue + } + + t := v.context.DataSectionObjectsToValidate[idx].(*entries.Table) doc, err := v.createDocument(ctx, t) if err != nil { return fmt.Errorf("unable to create validation document: %w", err) @@ -269,23 +278,6 @@ func (v *Validate) createDocument(ctx context.Context, t *entries.Table) (valida return doc, nil } -func (v *Validate) dumpTables(ctx context.Context) error { - var tablesWithTransformers []entries.Entry - for _, item := range v.context.DataSectionObjects { - - if t, ok := item.(*entries.Table); ok && len(t.TransformersContext) > 0 { - t.ValidateLimitedRecords = v.config.Validate.RowsLimit - tablesWithTransformers = append(tablesWithTransformers, t) - } - } - v.context.DataSectionObjects = tablesWithTransformers - - if err := v.dataDump(ctx); err != nil { - return fmt.Errorf("data stage dumping error: %w", err) - } - return nil -} - func (v *Validate) printValidationWarnings() error { // TODO: Implement warnings hook, such as logging and HTTP sender for _, w := range v.context.Warnings { diff --git a/internal/db/postgres/context/context.go b/internal/db/postgres/context/context.go index c534a813..a8234152 100644 --- a/internal/db/postgres/context/context.go +++ b/internal/db/postgres/context/context.go @@ -42,6 +42,8 @@ type RuntimeContext struct { Types []*toolkit.Type // DataSectionObjects - list of objects to dump in data-section. There are sequences, tables and large objects DataSectionObjects []entries.Entry + // DataSectionObjectsToValidate - list of objects to validate in data-section + DataSectionObjectsToValidate []entries.Entry // Warnings - list of occurred ValidationWarning during validation and config building Warnings toolkit.ValidationWarnings // Registry - registry of all the registered transformers definition @@ -125,14 +127,24 @@ func NewRuntimeContext( dataSectionObjects = append(dataSectionObjects, blobEntries) } + // Generate list of tables that might be validated during the validate command call + var dataSectionObjectsToValidate []entries.Entry + for _, item := range dataSectionObjects { + + if t, ok := item.(*entries.Table); ok && len(t.TransformersContext) > 0 { + dataSectionObjectsToValidate = append(dataSectionObjectsToValidate, t) + } + } + return &RuntimeContext{ - Tables: tables, - Types: types, - DataSectionObjects: dataSectionObjects, - Warnings: warnings, - Registry: r, - DatabaseSchema: schema, - Graph: graph, + Tables: tables, + Types: types, + DataSectionObjects: dataSectionObjects, + Warnings: warnings, + Registry: r, + DatabaseSchema: schema, + Graph: graph, + DataSectionObjectsToValidate: dataSectionObjectsToValidate, }, nil } diff --git a/internal/db/postgres/dumpers/table.go b/internal/db/postgres/dumpers/table.go index 5ed121d5..48cc97e1 100644 --- a/internal/db/postgres/dumpers/table.go +++ b/internal/db/postgres/dumpers/table.go @@ -19,7 +19,6 @@ import ( "fmt" "io" - "github.com/greenmaskio/greenmask/internal/utils/ioutils" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgproto3" "github.com/rs/zerolog/log" @@ -27,20 +26,23 @@ import ( "github.com/greenmaskio/greenmask/internal/db/postgres/entries" "github.com/greenmaskio/greenmask/internal/storages" + "github.com/greenmaskio/greenmask/internal/utils/ioutils" ) type TableDumper struct { - table *entries.Table - recordNum uint64 - validate bool - usePgzip bool + table *entries.Table + recordNum uint64 + validate bool + validateRowsLimit uint64 + usePgzip bool } -func NewTableDumper(table *entries.Table, validate bool, usePgzip bool) *TableDumper { +func NewTableDumper(table *entries.Table, validate bool, rowsLimit uint64, usePgzip bool) *TableDumper { return &TableDumper{ - table: table, - validate: validate, - usePgzip: usePgzip, + table: table, + validate: validate, + usePgzip: usePgzip, + validateRowsLimit: rowsLimit, } } @@ -162,7 +164,7 @@ func (td *TableDumper) process(ctx context.Context, tx pgx.Tx, w io.WriteCloser, if td.validate { // Logic for validation limiter - exit after recordNum rows td.recordNum++ - if td.recordNum == td.table.ValidateLimitedRecords { + if td.recordNum == td.validateRowsLimit { return nil } } diff --git a/internal/db/postgres/entries/table.go b/internal/db/postgres/entries/table.go index 42109663..bc389a40 100644 --- a/internal/db/postgres/entries/table.go +++ b/internal/db/postgres/entries/table.go @@ -44,10 +44,8 @@ type Table struct { CompressedSize int64 ExcludeData bool Driver *toolkit.Driver - // ValidateLimitedRecords - perform dumping and transformation only for N records and exit - ValidateLimitedRecords uint64 - Scores int64 - SubsetConds []string + Scores int64 + SubsetConds []string } func (t *Table) HasCustomTransformer() bool {