From 2d63cef748c263cd3546a9affd2e80f8e27cf97e Mon Sep 17 00:00:00 2001 From: kamilt Date: Tue, 24 Sep 2024 11:26:03 +0000 Subject: [PATCH] Sequencing output of a scan node --- cpp/src/arrow/acero/source_node.cc | 5 +++-- cpp/src/arrow/dataset/scanner.cc | 6 ++++-- cpp/src/arrow/dataset/scanner.h | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 8060e01f074f8..1337dd9c39c8a 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode")); const auto& source_options = checked_cast(options); return plan->EmplaceNode(plan, source_options.output_schema, - source_options.generator); + source_options.generator, + Ordering::Implicit()); } const char* kind_name() const override { return "SourceNode"; } @@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode { struct RecordBatchReaderSourceNode : public SourceNode { RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr schema, arrow::AsyncGenerator> generator) - : SourceNode(plan, schema, generator) {} + : SourceNode(plan, schema, generator, Ordering::Implicit()) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index a856a792a264f..9eb69e8d1119d 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -1032,10 +1032,10 @@ Result MakeScanNode(acero::ExecPlan* plan, } else { batch_gen = std::move(merged_batch_gen); } - + int64_t index=require_sequenced_output?0:compute::kUnsequencedIndex; auto gen = MakeMappedGenerator( std::move(batch_gen), - [scan_options](const EnumeratedRecordBatch& partial) + [scan_options, index](const EnumeratedRecordBatch& partial)mutable -> Result> { // TODO(ARROW-13263) fragments may be able to attach more guarantees to batches // than this, for example parquet's row group stats. Failing to do this leaves @@ -1057,6 +1057,8 @@ Result MakeScanNode(acero::ExecPlan* plan, batch->values.emplace_back(partial.record_batch.index); batch->values.emplace_back(partial.record_batch.last); batch->values.emplace_back(partial.fragment.value->ToString()); + if (index!=compute::kUnsequencedIndex) + batch->index = index++; return batch; }); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index d2de267897180..7817e090412ae 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -563,7 +563,7 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions { public: explicit ScanNodeOptions(std::shared_ptr dataset, std::shared_ptr scan_options, - bool require_sequenced_output = false) + bool require_sequenced_output = true) : dataset(std::move(dataset)), scan_options(std::move(scan_options)), require_sequenced_output(require_sequenced_output) {}