Skip to content

Commit

Permalink
Sequencing output of a scan node
Browse files Browse the repository at this point in the history
  • Loading branch information
mroz45 committed Sep 24, 2024
1 parent c8047bb commit 2d63cef
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
5 changes: 3 additions & 2 deletions cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode"));
const auto& source_options = checked_cast<const SourceNodeOptions&>(options);
return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
source_options.generator);
source_options.generator,
Ordering::Implicit());
}

const char* kind_name() const override { return "SourceNode"; }
Expand Down Expand Up @@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
: SourceNode(plan, schema, generator) {}
: SourceNode(plan, schema, generator, Ordering::Implicit()) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,10 +1032,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
} else {
batch_gen = std::move(merged_batch_gen);
}

int64_t index=require_sequenced_output?0:compute::kUnsequencedIndex;
auto gen = MakeMappedGenerator(
std::move(batch_gen),
[scan_options](const EnumeratedRecordBatch& partial)
[scan_options, index](const EnumeratedRecordBatch& partial)mutable
-> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
Expand All @@ -1057,6 +1057,8 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index!=compute::kUnsequencedIndex)
batch->index = index++;
return batch;
});

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output = false)
bool require_sequenced_output = true)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output) {}
Expand Down

0 comments on commit 2d63cef

Please sign in to comment.