Skip to content

Commit

Permalink
Perform requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mroz45 committed Oct 16, 2024
1 parent 2d63cef commit f2fc33b
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 16 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class SerialSequencingQueue {
/// Strategy that describes how to handle items
class Processor {
public:
virtual ~Processor() = default;
/// Process the batch
///
/// This method will be called on each batch in order. Calls to this method
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,8 @@ class InputState: public util::SerialSequencingQueue::Processor {
}

private:
std::unique_ptr<util::SerialSequencingQueue> sequencer_;

// ExecBatch Sequencer
std::unique_ptr<util::SerialSequencingQueue> sequencer_;
// Pending record batches. The latest is the front. Batches cannot be empty.
BackpressureConcurrentQueue<std::shared_ptr<RecordBatch>> queue_;
// Schema associated with the input
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ void DoRunUnorderedPlanTest(bool l_unordered, bool r_unordered,
DoRunUnorderedPlanTest(l_unordered, r_unordered, l_schema, r_schema,
GetRepeatedOptions(2, "time", {"key"}, 1000),
"out-of-order on-key values");
// "requires sequenced input");
}

struct BasicTestTypes {
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,16 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
public:
/// Create an instance from values
SourceNodeOptions(std::shared_ptr<Schema> output_schema,
std::function<Future<std::optional<ExecBatch>>()> generator)
: output_schema(std::move(output_schema)), generator(std::move(generator)) {}
std::function<Future<std::optional<ExecBatch>>()> generator,
Ordering ordering = Ordering::Unordered())
: output_schema(std::move(output_schema)), generator(std::move(generator)),ordering(std::move(ordering)) {}

/// \brief the schema for batches that will be generated by this source
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
};

/// \brief a node that generates data from a table already loaded in memory
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ struct SourceNode : ExecNode, public TracedNode {
const auto& source_options = checked_cast<const SourceNodeOptions&>(options);
return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
source_options.generator,
Ordering::Implicit());
source_options.ordering);
}

const char* kind_name() const override { return "SourceNode"; }
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1062,16 +1062,18 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
return batch;
});

auto ordering = require_sequenced_output?Ordering::Implicit():Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
}

return acero::MakeExecNode(
"source", plan, {},
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen)});
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen), ordering});
}

Result<acero::ExecNode*> MakeAugmentedProjectNode(acero::ExecPlan* plan,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output = true)
bool require_sequenced_output = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output) {}
Expand Down
9 changes: 7 additions & 2 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4015,11 +4015,14 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
def _set_options(self, Dataset dataset, dict scan_options):
cdef:
shared_ptr[CScanOptions] c_scan_options
bint require_sequenced_output=False

c_scan_options = Scanner._make_scan_options(dataset, scan_options)

require_sequenced_output=scan_options.get("require_sequenced_output",False)

self.wrapped.reset(
new CScanNodeOptions(dataset.unwrap(), c_scan_options)
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output)
)


Expand All @@ -4045,7 +4048,9 @@ class ScanNodeOptions(_ScanNodeOptions):
dataset : pyarrow.dataset.Dataset
The table which acts as the data source.
**kwargs : dict, optional
Scan options. See `Scanner.from_dataset` for possible arguments.
Scan options. See `Scanner.from_dataset` for possible arguments.
require_sequenced_output : bool, default False
Assert implicit ordering on data.
"""

def __init__(self, Dataset dataset, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class InMemoryDataset:
ds = DatasetModuleStub


def _dataset_to_decl(dataset, use_threads=True):
decl = Declaration("scan", ScanNodeOptions(dataset, use_threads=use_threads))
def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False):
decl = Declaration("scan", ScanNodeOptions(dataset, use_threads=use_threads, require_sequenced_output=require_sequenced_output))

# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
Expand Down Expand Up @@ -311,13 +311,13 @@ def _perform_join_asof(left_operand, left_on, left_by,

# Add the join node to the execplan
if isinstance(left_operand, ds.Dataset):
left_source = _dataset_to_decl(left_operand, use_threads=use_threads)
left_source = _dataset_to_decl(left_operand, use_threads=use_threads, require_sequenced_output=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(right_operand, use_threads=use_threads)
right_source = _dataset_to_decl(right_operand, use_threads=use_threads, require_sequenced_output=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CExpression filter

cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options)
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)

shared_ptr[CScanOptions] scan_options

Expand Down

0 comments on commit f2fc33b

Please sign in to comment.