-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support refreshing Iceberg tables #5707
base: main
Are you sure you want to change the base?
Conversation
engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java
Outdated
Show resolved
Hide resolved
/** | ||
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location | ||
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i> | ||
* guaranteed. | ||
* | ||
* @param tableLocationKey The new table location key | ||
*/ | ||
void handleTableLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey); | ||
void handleTableLocationKeyAdded(@NotNull ImmutableTableLocationKey tableLocationKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good change, may be breaking for DHE, please consult Andy pre-merge.
void beginTransaction(); | ||
|
||
void endTransaction(); | ||
|
||
/** | ||
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location | ||
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider whether we can have add + remove + add. What about remove + add in the same pull?
Should document that this may change the "at most once per location" guarantee, and define semantics.
I think it should be something like:
We allow re-add of a removed TLK. Downstream consumers should process these in an order that respects delivery and transactionality.
Within one transaction, expect at most one of "remove" or "add" for a given TLK.
Within one transaction, we can allow remove followed by add, but not add followed by remove. This dictates that we deliver pending removes before pending adds in processPending
.
That is, one transaction allows:
- Replace a TLK (remove followed by add)
- Remove a TLK (remove)
- Add a TLK (add)
Double add, double remove, or add followed by remove is right out.
Processing an addition to a transaction.
- Remove: If there's an existing accumulated remove, error. Else, if there's an existing accumulated add, error. Else, accumulate the remove.
- Add: If there's an existing accumulated add, error. Else, accumulate the add.
Across multiple transactions delivered as a batch, ensure that the right end-state is achieved.
- Add + remove collapses pairwise to no-op
- Remove + add (assuming prior add) should be processed in order. We might very well choose to not allow re-add at this time, I don't expect Iceberg to do this. If we do allow it, we need to be conscious that the removed location's region(s) need(s) to be used for previous data, while the added one needs to be used for current data.
- Multiple adds or removes
withinwithout their opposite intervening is an error.
null
token should be handled exactly the same as a single-element transaction.
Processing a transaction:
- Process removes first. If there's an add pending, then delete, swallow the remove. Else, if there's a remove pending, error. Else, store the remove as pending.
- Process adds. If there's an add pending, error. Else, store the add as pending.
Note: removal support means that RegionedColumnSources may no longer be immutable! We need to be sure that we are aware of whether a particular TLP might remove data, and ensure that in those cases the RCS is not marked immutable. REVISED: ONLY REPLACE IS AN ISSUE FOR IMMUTABILITY, AS LONG AS WE DON'T RESUSE SLOTS.
We discussed that TLPs should probably specify whether they are guaranteeing that they will never remove TLKs, and whether their TLs will never remove or modify rows. I think if and when we encounter data sources that require modify support, we should probably just use SourcePartitionedTable
instead of PartitionAwareSourceTable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I need to handle the RCS immutability question in this PR since Iceberg will not modify rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing a region makes the values in the corresponding row key range disappear. That's OK for immutability.
If you allow a new region to use the same slot, or allow the old region to reincarnate in the same slot potentially with different data, you are violating immutability.
Not reusing slots means that a long-lived iceberg table may eventually exhaust its row key space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace (remove + add of a TLK) requires some kind of versioning of the TL, in a way that the TLK is aware of in order to ensure that we provide the table with the right TL for the version. AbstractTableLocationProvider
's location caching layer is not currently sufficient for atomically replacing TLs.
...e/src/main/java/io/deephaven/engine/table/impl/locations/impl/CompositeTableDataService.java
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java
Outdated
Show resolved
Hide resolved
...ceberg/src/main/java/io/deephaven/iceberg/layout/IcebergRefreshingTableLocationProvider.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java
Outdated
Show resolved
Hide resolved
...ceberg/src/main/java/io/deephaven/iceberg/layout/IcebergRefreshingTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...ions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergTableLocationProviderBase.java
Outdated
Show resolved
Hide resolved
...ions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergTableLocationProviderBase.java
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableRefreshing.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to get slightly more complicated; I was wrong, the TableLocation is not sufficient for reference counting. We instead need to register "ownership interest" by TableLocationKey.
We should introduce ReferenceCountedImmutableTableLocationKey, and use that as the type delivered to TableLocationProvider.Listeners.
AbstractTableLocationProvider should bifurcate its state internally into:
- "live" set of TLKs (RCITLKs). Live set is set of keys shown to static consumers and new listeners.
- "available" map of TLK -> Object (which may be the TL, or the TLK). Available map allows TLs to be accessed. Keys are superset of live set.
Incements:
- Ref count on RCITLK to be held by ATLP as long as the TLK is in the “live” set;
- Ref count bumped before delivery to any Listener, once per listener.
Decrements:
- Listeners responsible to decrement if OBE, for example add followed by remove in a subscription buffer.
- SourceTable responsible to decrement if filtered out.
- RCSM responsible to decrement upon processing remove at end of cycle, or in its own destroy().
Notes:
- ATLP must unwrap TLK before giving to makeTableLocation
- RCITLK.onReferenceCountAtZero removes the TLK (and any TL that exists) from the available map. If the TL existed, sends a null update, and clears column locations.
- It's not bad that the RCITLK hard refs the ATLP; we already ref it from the SourceTable.
Enterprise note:
RemoteTableDataService is a non-issue, since it’s only used with Deephaven format. Meaning, we don’t need to extend this across the wire. Andy may have to deal with that, or we may have to evolve the API, in some future use case.
Missing feature:
TLP needs to advertise it’s update model.
This might be:
Single-partition, add-only -> append-only table -> partition removal bad
Multi-partition, add-only -> add-only table -> partition removal bad
Multi-partition, partition removes possible -> no shifts or mods (still immutable) -> partition removal OK
Not exposing any other models at this time (e.g. partitions that can have mods, shifts, removes; if we want that, use partitioned tables).
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/ColumnSourceManager.java
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java
Outdated
Show resolved
Hide resolved
…eys and providers.
engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java
Outdated
Show resolved
Hide resolved
engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java
Outdated
Show resolved
Hide resolved
engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java
Outdated
Show resolved
Hide resolved
engine/updategraph/src/main/java/io/deephaven/engine/liveness/SingletonLivenessManager.java
Outdated
Show resolved
Hide resolved
engine/updategraph/src/main/java/io/deephaven/engine/liveness/DelegatingLivenessNode.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/PartitionAwareSourceTable.java
Outdated
Show resolved
Hide resolved
final Collection<ImmutableTableLocationKey> immutableTableLocationKeys = foundLocationKeys.stream() | ||
.map(LiveSupplier::get) | ||
.collect(Collectors.toList()); | ||
|
||
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check and see if we can just close this ticket, and maybe delete the todo.
engine/table/src/main/java/io/deephaven/engine/table/impl/PartitionAwareSourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished table data infrastructure. May need to look further at Iceberg code and tests.
engine/table/src/main/java/io/deephaven/engine/table/impl/ColumnSourceManager.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java
Outdated
Show resolved
Hide resolved
engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateMode.java
Outdated
Show resolved
Hide resolved
...main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java
Outdated
Show resolved
Hide resolved
...main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java
Outdated
Show resolved
Hide resolved
...ns/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergStaticTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...eberg/src/main/java/io/deephaven/iceberg/layout/IcebergAutoRefreshTableLocationProvider.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor commentary. We can dig into the rest of the Iceberg layer tomorrow.
...c/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/io/deephaven/engine/table/impl/locations/impl/CompositeTableDataService.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/io/deephaven/engine/table/impl/locations/impl/CompositeTableDataService.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/io/deephaven/engine/table/impl/locations/impl/FilteredTableDataService.java
Show resolved
Hide resolved
final TableLocationProvider.Listener outputListener = getWrapped(); | ||
if (outputListener != null) { | ||
outputListener.handleTableLocationKeysUpdate(addedKeys, removedKeys); | ||
// Produce filtered lists of added and removed keys. | ||
final Collection<LiveSupplier<ImmutableTableLocationKey>> filteredAddedKeys = addedKeys.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to be able to generate these collections only once, rather than once per listener. Doing so would require something of a redesign; we'd probably need to work more like a SubscriptionAggregator
. Let's comment (without using the word "todo" ;-)) on that optimization, but defer it for now.
* @return the most permissive mode encountered in the stream | ||
*/ | ||
public static TableUpdateMode mostPermissiveMode(Stream<TableUpdateMode> modes) { | ||
// Analyze the location update modes of the input providers to determine the location update mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You said this was inappropriate.
* @param modes a stream of modes | ||
* @return the most permissive mode encountered in the stream | ||
*/ | ||
public static TableUpdateMode mostPermissiveMode(Stream<TableUpdateMode> modes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As-is, this could be simplified to sorting. I guess it could become more complicated if we introduce capabilities that don't "nest" in the future.
/** | ||
* Records the guarantees that the table offers regarding addition and removal of data elements. This can apply to | ||
* {@link SourceTable source table} location additions and removals or row additions and removals within a location. | ||
*/ | ||
public enum TableUpdateMode { | ||
STATIC, APPEND_ONLY, ADD_ONLY, ADD_REMOVE; | ||
|
||
/** | ||
* Returns true if the addition is allowed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Returns true if the addition is allowed. | |
* Returns true if addition is allowed. |
@@ -22,6 +29,9 @@ public boolean addAllowed() { | |||
} | |||
} | |||
|
|||
/** | |||
* Returns true if the removal is allowed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Returns true if the removal is allowed. | |
* Returns true if removal is allowed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review of Iceberg-specific code.
import org.apache.iceberg.Snapshot; | ||
import org.jetbrains.annotations.NotNull; | ||
|
||
public interface IcebergTable extends Table { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to note that these methods are optional, and in practice only supported when the implementation requested supports manual updating.
* @param tableDefinition The {@link TableDefinition} describing the table schema | ||
* @param description A human-readable description for this table | ||
* @param componentFactory A component factory for creating column source managers | ||
* @param locationProvider A {@link io.deephaven.engine.table.impl.locations.TableLocationProvider}, for use in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lie now.
public static Builder builder() { | ||
return ImmutableIcebergUpdateMode.builder(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think builder()
and the Builder
interface should be private at this time.
|
||
@Value.Immutable | ||
@BuildableStyle | ||
public abstract class IcebergUpdateMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to consider whether initial snapshot should be a parameter.
} | ||
|
||
@Value.Default | ||
public long autoRefreshMs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field should be documented. We should also have a @Check
to ensure it's only set with compatible modes.
} | ||
} | ||
} | ||
@Deprecated(forRemoval = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with deleting these, unless there's a strong consensus otherwise.
*/ | ||
public Snapshot currentSnapshot() { | ||
final List<Snapshot> snapshots = listSnapshots(); | ||
if (snapshots.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You pointed out that this should probably not be an error.
* <td>The snapshot identifier (can be used for updating the table or loading a specific snapshot)</td> | ||
* </tr> | ||
* <tr> | ||
* <td>TimestampMs</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inaccurate.
// Find the snapshot with the given snapshot id | ||
final Snapshot tableSnapshot = | ||
snapshot(tableSnapshotId).orElseThrow(() -> new IllegalArgumentException( | ||
"Snapshot with id " + tableSnapshotId + " not found for table " + tableIdentifier)); | ||
|
||
return table(tableSnapshot, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please apply this kind of change anywhere we can avoid duplication.
// Find the snapshot with the given snapshot id | |
final Snapshot tableSnapshot = | |
snapshot(tableSnapshotId).orElseThrow(() -> new IllegalArgumentException( | |
"Snapshot with id " + tableSnapshotId + " not found for table " + tableIdentifier)); | |
return table(tableSnapshot, null); | |
return table(tableSnapshotId, null); |
/** | ||
* Read a snapshot of an Iceberg table from the Iceberg catalog. | ||
* | ||
* @param tableSnapshot The snapshot id to load |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param tableSnapshot The snapshot id to load | |
* @param tableSnapshot The snapshot to load |
@Nullable final IcebergInstructions instructions) { | ||
|
||
// Do we want the latest or a specific snapshot? | ||
final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're supposed to get the current snapshot, we should refresh()
first.
if (!initialized) { | ||
refreshSnapshot(); | ||
activationSuccessful(this); | ||
initialized = true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activationSuccessful(this)
needs to be unconditional, and happen at the end of the method.
The remainder looks like a job for an implementation of doInitialization()
, and a call to ensureInitialized()
.
* Refresh the table location provider with the latest snapshot from the catalog. This method will identify new | ||
* locations and removed locations. | ||
*/ | ||
private void refreshSnapshot() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear what the behavior should be if there's no current snapshot. Empty? Get one? I think requiring the user to call update()
or specify and initial snapshot is reasonable, but then refreshSnapshot()
with no snapshot should be a no-op.
|
||
@Override | ||
protected @NotNull ColumnLocation makeColumnLocation(@NotNull String name) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw a UOE, or return a dummy?
if (null == ProcessEnvironment.tryGet()) { | ||
ProcessEnvironment.basicServerInitialization(Configuration.getInstance(), | ||
"AbstractTableLocationProviderTest", new StreamLoggerImpl()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should come out.
@Override | ||
public void tearDown() throws Exception { | ||
super.tearDown(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Override | |
public void tearDown() throws Exception { | |
super.tearDown(); | |
} |
Add two methods of refreshing tables:
Example code:
Java automatic and manually refreshing tables
Python automatic and manually refreshing tables