Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support refreshing Iceberg tables #5707

Open
wants to merge 52 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
470b09c
Initial commit of refreshing Iceberg.
lbooker42 Jul 2, 2024
a8d957a
Rebased to main.
lbooker42 Jul 2, 2024
264fdb1
Change IcebergInstructions refreshing indicator to enum instead of bo…
lbooker42 Jul 2, 2024
58d0a73
WIP, for review
lbooker42 Jul 3, 2024
e090474
Manual and auto-refreshing working, better documentation.
lbooker42 Jul 23, 2024
57021ad
Addressed more PR comments, some remaining.
lbooker42 Jul 23, 2024
fb882e8
WIP, some PR comments addressed.
lbooker42 Jul 26, 2024
5bbdeb2
WIP, even more PR comments addressed.
lbooker42 Jul 27, 2024
3da205c
Nearly all PR comments addressed.
lbooker42 Jul 27, 2024
91acf9b
merged with main
lbooker42 Jul 27, 2024
08dd329
Adjustment to IcebergInstructions update mode.
lbooker42 Jul 29, 2024
7af0d1d
Added python wrapper for Iceberg refreshing tables.
lbooker42 Jul 29, 2024
2d79c38
Changes to mocked tests for ColumnSourceManager and PartitionAwareSou…
lbooker42 Jul 29, 2024
3809f21
Added DHError handler and add'l documentation to python `snapshots()`…
lbooker42 Jul 30, 2024
5273a15
Fixed typo in JavaDoc
lbooker42 Jul 30, 2024
b9e2c6e
WIP
lbooker42 Jul 31, 2024
9937f79
Suggestion from review
lbooker42 Jul 31, 2024
cd08038
WIP, changes to revert some transaction token code.
lbooker42 Jul 31, 2024
f28325f
Correct logic across multiple transactions.
lbooker42 Aug 21, 2024
2d92b3f
Merged with main
lbooker42 Aug 21, 2024
cd31d82
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 23, 2024
d680c0c
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 26, 2024
6607fc3
PR comments addressed.
lbooker42 Aug 28, 2024
893336f
Updated to use IcebergTableAdapter and exposed in python. Addressed P…
lbooker42 Aug 30, 2024
68e4546
Incorporated external PR to update PartitioningColumnDataIndex for re…
lbooker42 Aug 30, 2024
273f5c1
Added additional snapshots with removes to IcebergToolsTest resources.
lbooker42 Sep 3, 2024
1e92a19
Merge branch 'main' into lab-iceberg-refreshing
lbooker42 Sep 3, 2024
f72c1b7
Manual and auto refreshing tests for Iceberg.
lbooker42 Sep 4, 2024
5c7ff12
Manual and auto refreshing tests for Iceberg, not passing.
lbooker42 Sep 4, 2024
92eec61
PR comments addressed.
lbooker42 Sep 4, 2024
95194b7
Implemented improved location reference counting in AbstractTableLoca…
lbooker42 Sep 5, 2024
09e2b6e
Fixing doc problem.
lbooker42 Sep 5, 2024
30910dd
For review only, does not compile :(
lbooker42 Sep 12, 2024
dd12240
Compiles now, still many problems
lbooker42 Sep 13, 2024
944dac6
Working through problems.
lbooker42 Sep 13, 2024
912b9f2
Cleanup and minor changes
lbooker42 Sep 13, 2024
e2b6fd0
Refactored ATLP
lbooker42 Sep 18, 2024
72b03be
Merged with main.
lbooker42 Sep 18, 2024
01e50fe
Updated but RCSM still not referenced properly.
lbooker42 Sep 18, 2024
81e88d8
Refreshing tests still need work.
lbooker42 Sep 20, 2024
b37a04e
Better tests and improved liveness management for the TableLocation k…
lbooker42 Sep 25, 2024
dead9c4
Added TLP state (add, append, static, refreshing)
lbooker42 Sep 25, 2024
e5d10e7
Added TLP state (add, append, static, refreshing)
lbooker42 Sep 27, 2024
b30e240
Addressed PR comments, some TODO remaining to address.
lbooker42 Sep 30, 2024
fa9d154
Improved table location management in SourcePartitionedTable
lbooker42 Oct 1, 2024
d807a94
Merge with main
lbooker42 Oct 2, 2024
db2b031
Post-merge cleanup
lbooker42 Oct 2, 2024
746b343
Post-merge cleanup and test updating.
lbooker42 Oct 2, 2024
d69ddcd
Addressed PR comments and test failures.
lbooker42 Oct 4, 2024
06a3bd5
More test failure fixes.
lbooker42 Oct 4, 2024
91ba92c
Liveness management re-ordering in SourcePartitionedTable
lbooker42 Oct 7, 2024
73e8824
Addressing open PR comments.
lbooker42 Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.TrackedTableLocationKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -18,7 +18,7 @@
/**
* Manager for ColumnSources in a Table.
*/
public interface ColumnSourceManager extends LivenessNode {
public interface ColumnSourceManager extends LivenessReferent {

/**
* Get a map of name to {@link ColumnSource} for the column sources maintained by this manager.
Expand Down Expand Up @@ -112,8 +112,7 @@ public interface ColumnSourceManager extends LivenessNode {
/**
* Remove a table location key from the sources.
*
* @return true if the location key was actually removed
* @param tableLocationKey the location key being removed
*/
boolean removeLocationKey(@NotNull TrackedTableLocationKey tableLocationKey);
void removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import io.deephaven.api.Selectable;
import io.deephaven.api.filter.Filter;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.TrackedTableLocationKey;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
Expand Down Expand Up @@ -222,14 +222,14 @@ private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final Co
}

@Override
protected final Collection<TrackedTableLocationKey> filterLocationKeys(
@NotNull final Collection<TrackedTableLocationKey> foundLocationKeys) {
protected final Collection<LiveSupplier<ImmutableTableLocationKey>> filterLocationKeys(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> foundLocationKeys) {
if (partitioningColumnFilters.length == 0) {
return foundLocationKeys;
}

final Collection<ImmutableTableLocationKey> immutableTableLocationKeys = foundLocationKeys.stream()
.map(TrackedTableLocationKey::getKey)
.map(LiveSupplier::get)
.collect(Collectors.toList());

// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check and see if we can just close this ticket, and maybe delete the todo.

Expand All @@ -242,8 +242,11 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
partitionTableColumnSources.add(makePartitionSource(columnDefinition, immutableTableLocationKeys));
}
// Add the tracked keys to the table
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(foundLocationKeys,
TrackedTableLocationKey.class, null));
// noinspection unchecked,rawtypes
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(
(Collection<LiveSupplier>) (Collection) foundLocationKeys,
LiveSupplier.class,
null));

final Table filteredColumnPartitionTable = TableTools
.newTable(foundLocationKeys.size(), partitionTableColumnNames, partitionTableColumnSources)
Expand All @@ -253,7 +256,7 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
}

// Return the filtered keys
final Iterable<TrackedTableLocationKey> iterable =
final Iterable<LiveSupplier<ImmutableTableLocationKey>> iterable =
() -> filteredColumnPartitionTable.columnIterator(TRACKED_KEY_COLUMN_NAME);
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.*;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
Expand Down Expand Up @@ -37,6 +38,26 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
private static final String KEY_COLUMN_NAME = "TableLocationKey";
private static final String CONSTITUENT_COLUMN_NAME = "LocationTable";

/**
* Private constructor for a {@link SourcePartitionedTable}.
*
* @param table the locations table to use for this {@link SourcePartitionedTable}
* @param constituentDefinition The {@link TableDefinition} expected of constituent {@link Table tables}
* @param refreshLocations Whether the set of locations should be refreshed
*/
private SourcePartitionedTable(
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final Table table,
@NotNull final TableDefinition constituentDefinition,
final boolean refreshLocations) {
super(table,
Set.of(KEY_COLUMN_NAME),
true,
CONSTITUENT_COLUMN_NAME,
constituentDefinition,
refreshLocations,
false);
}

/**
* Construct a {@link SourcePartitionedTable} from the supplied parameters.
* <p>
Expand All @@ -51,29 +72,30 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
* @param refreshSizes Whether the locations found should be refreshed
* @param locationKeyMatcher Function to filter desired location keys
*/
public SourcePartitionedTable(
public static SourcePartitionedTable create(
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
super(new UnderlyingTableMaintainer(
constituentDefinition,
final UnderlyingTableMaintainer maintainer = new UnderlyingTableMaintainer(constituentDefinition,
applyTablePermissions,
tableLocationProvider,
refreshLocations,
refreshSizes,
locationKeyMatcher).result(),
Set.of(KEY_COLUMN_NAME),
true,
CONSTITUENT_COLUMN_NAME,
locationKeyMatcher);

final SourcePartitionedTable sourcePartitionedTable = new SourcePartitionedTable(
maintainer.result(),
constituentDefinition,
refreshLocations,
false);
refreshLocations);

maintainer.assignLivenessManager(sourcePartitionedTable);
return sourcePartitionedTable;
}

private static final class UnderlyingTableMaintainer {
private static final class UnderlyingTableMaintainer extends ReferenceCountedLivenessNode {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

private final TableDefinition constituentDefinition;
private final UnaryOperator<Table> applyTablePermissions;
Expand Down Expand Up @@ -103,13 +125,17 @@ private UnderlyingTableMaintainer(
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
super(false);

// Increase the refcount of this liveness node to allow it to manage the subscription buffer.
retainReference();

this.constituentDefinition = constituentDefinition;
this.applyTablePermissions = applyTablePermissions;
this.tableLocationProvider = tableLocationProvider;
this.refreshSizes = refreshSizes;
this.locationKeyMatcher = locationKeyMatcher;

// noinspection resource
resultRows = RowSetFactory.empty().toTracking();
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);
Expand All @@ -130,6 +156,7 @@ private UnderlyingTableMaintainer(

if (needToRefreshLocations) {
subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider);
manage(subscriptionBuffer);
pendingLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
Expand All @@ -154,7 +181,6 @@ protected void instrumentedRefresh() {
removedConstituents.forEach(result::unmanage);
removedConstituents = null;
});

processPendingLocations(false);
} else {
subscriptionBuffer = null;
Expand All @@ -166,7 +192,7 @@ protected void instrumentedRefresh() {

final Collection<TableLocation> locations = new ArrayList<>();
tableLocationProvider.getTableLocationKeys(tlk -> {
locations.add(tableLocationProvider.getTableLocation(tlk.getKey()));
locations.add(tableLocationProvider.getTableLocation(tlk.get()));
}, locationKeyMatcher);
try (final RowSet added = sortAndAddLocations(locations.stream())) {
resultRows.insert(added);
Expand All @@ -178,6 +204,15 @@ protected void instrumentedRefresh() {
}
}

/**
* Manage ourselves with the provided liveness manager and cleanup the incremented reference count from the
* constructor.
*/
private void assignLivenessManager(final LivenessManager manager) {
manager.manage(this);
dropReference();
}

private QueryTable result() {
return result;
}
Expand Down Expand Up @@ -207,7 +242,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
constituentDefinition,
"SingleLocationSourceTable-" + tableLocation,
RegionedTableComponentFactoryImpl.INSTANCE,
new SingleTableLocationProvider(tableLocation),
new SingleTableLocationProvider(tableLocation, TableLocationProvider.UpdateMode.ADD_REMOVE),
refreshSizes ? refreshCombiner : null);

// Be careful to propagate the systemic attribute properly to child tables
Expand Down Expand Up @@ -250,7 +285,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
.map(TrackedTableLocationKey::getKey)
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.map(PendingLocationState::new)
Expand All @@ -272,7 +307,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
final Set<ImmutableTableLocationKey> relevantRemovedLocations =
locationUpdate.getPendingRemovedLocationKeys()
.stream()
.map(TrackedTableLocationKey::getKey)
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
Expand Down Expand Up @@ -101,7 +103,20 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
}
}

setRefreshing(isRefreshing);
if (isRefreshing) {
setRefreshing(true);
if (locationProvider.getUpdateMode() == TableLocationProvider.UpdateMode.APPEND_ONLY
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
&& locationProvider.getLocationUpdateMode() == TableLocationProvider.UpdateMode.STATIC) {
// This table is APPEND_ONLY IFF the set of locations is APPEND_ONLY
// and the location contents are STATIC
setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, Boolean.FALSE);
} else if (locationProvider.getUpdateMode() != TableLocationProvider.UpdateMode.ADD_REMOVE
&& locationProvider.getLocationUpdateMode() != TableLocationProvider.UpdateMode.ADD_REMOVE) {
// This table is ADD_ONLY IFF the set of locations is not allowed to remove locations (!ADD_REMOVE)
// and the locations contents are not allowed to remove rows (!ADD_REMOVE)
setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE);
}
}
}

/**
Expand Down Expand Up @@ -148,33 +163,44 @@ private void initializeAvailableLocations() {
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
locationProvider.refresh();
final Collection<TrackedTableLocationKey> tableLocationKeys = new ArrayList<>();
locationProvider.getTableLocationKeys(tableLocationKeys::add);
maybeAddLocations(tableLocationKeys);
final Collection<LiveSupplier<ImmutableTableLocationKey>> keySuppliers = new ArrayList<>();
// Manage each of the location keys as we see them (since the TLP is not guaranteeing them outside
// the callback)
locationProvider.getTableLocationKeys(ttlk -> {
if (isRefreshing()) {
manage(ttlk);
}
keySuppliers.add(ttlk);
});
maybeAddLocations(keySuppliers);
if (isRefreshing()) {
// Now we can un-manage the location keys
keySuppliers.forEach(this::unmanage);
}
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}
});
locationsInitialized = true;
}
}

private void maybeAddLocations(@NotNull final Collection<TrackedTableLocationKey> locationKeys) {
private void maybeAddLocations(@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> locationKeys) {
if (locationKeys.isEmpty()) {
return;
}
filterLocationKeys(locationKeys)
.parallelStream()
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.getKey())));
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.get())));
}

private TrackedTableLocationKey[] maybeRemoveLocations(
@NotNull final Collection<TrackedTableLocationKey> removedKeys) {
private void maybeRemoveLocations(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (removedKeys.isEmpty()) {
return TrackedTableLocationKey.ZERO_LENGTH_TRACKED_TABLE_LOCATION_KEY_ARRAY;
return;
}

return filterLocationKeys(removedKeys).stream()
.filter(columnSourceManager::removeLocationKey)
.toArray(TrackedTableLocationKey[]::new);
filterLocationKeys(removedKeys).stream()
.map(LiveSupplier::get)
.forEach(columnSourceManager::removeLocationKey);
}

private void initializeLocationSizes() {
Expand Down Expand Up @@ -216,6 +242,14 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca
protected void instrumentedRefresh() {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
if (locationProvider.getUpdateMode() != TableLocationProvider.UpdateMode.ADD_REMOVE
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException("Source table does not support removed locations", keys);
}

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
Expand Down Expand Up @@ -255,8 +289,8 @@ protected void onRefreshError(@NotNull final Exception error) {
* {@link TableLocationProvider}, but not yet incorporated into the table
* @return A sub-collection of the input
*/
protected Collection<TrackedTableLocationKey> filterLocationKeys(
@NotNull final Collection<TrackedTableLocationKey> foundLocationKeys) {
protected Collection<LiveSupplier<ImmutableTableLocationKey>> filterLocationKeys(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> foundLocationKeys) {
return foundLocationKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.deephaven.api.SortColumn;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.BasicDataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.io.log.impl.LogOutputStringImpl;
Expand All @@ -22,7 +22,7 @@
* location allows access to columns, size, and possibly other metadata for a single partition that may be included in a
* source table.
*/
public interface TableLocation extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessNode {
public interface TableLocation extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessReferent {

/**
* Listener interface for anything that wants to know about changes to a location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ <PARTITION_VALUE_TYPE extends Comparable<PARTITION_VALUE_TYPE>> PARTITION_VALUE_
* @return An immutable version of this key
*/
ImmutableTableLocationKey makeImmutable();

/**
* Release any cached data associated with this key. This would only be called at EOL for this key.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*/
default void clear() {}
}
Loading
Loading