Skip to content

Commit

Permalink
MINOR: remove get prefix for internal state methods (apache#17053)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mjsax authored Aug 31, 2024
1 parent 8f4d856 commit fc720d3
Show file tree
Hide file tree
Showing 33 changed files with 110 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoin
while (it.hasNext()) {
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key;
sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp();
sharedTimeTracker.minTime = timestampedKeyAndJoinSide.timestamp();
if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
// if windows are open for both joinSides we can break since there are no more candidates to emit
break;
Expand Down Expand Up @@ -250,8 +250,8 @@ private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoin
private void forwardNonJoinedOuterRecords(final Record<K, VThis> record,
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
final LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
final K key = timestampedKeyAndJoinSide.getKey();
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
final K key = timestampedKeyAndJoinSide.key();
final long timestamp = timestampedKeyAndJoinSide.timestamp();
final VThis thisValue = getThisValue(leftOrRightValue);
final VOther otherValue = getOtherValue(leftOrRightValue);
final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timesta

@Override
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
return leftOrRightValue.leftValue();
}

@Override
public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue();
return leftOrRightValue.rightValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timesta

@Override
public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue();
return leftOrRightValue.rightValue();
}

@Override
public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
return leftOrRightValue.leftValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

static StateStore readWriteStore(final StateStore store) {
static StateStore wrapWithReadWriteStore(final StateStore store) {
if (store instanceof TimestampedKeyValueStore) {
return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
} else if (store instanceof VersionedKeyValueStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private void runOnce() {
private void performActionsOnTasks() {
tasksAndActionsLock.lock();
try {
for (final TaskAndAction taskAndAction : getTasksAndActions()) {
for (final TaskAndAction taskAndAction : tasksAndActions()) {
final Action action = taskAndAction.action();
switch (action) {
case ADD:
Expand Down Expand Up @@ -458,7 +458,7 @@ private void clearUpdatingAndPausedTasks() {
changelogReader.clear();
}

private List<TaskAndAction> getTasksAndActions() {
private List<TaskAndAction> tasksAndActions() {
final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
tasksAndActions.clear();
return tasksAndActionsToProcess;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import java.time.Duration;

import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;

public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> {

Expand All @@ -60,7 +60,7 @@ protected StateManager stateManager() {
@Override
public <S extends StateStore> S getStateStore(final String name) {
final StateStore store = stateManager.globalStore(name);
return (S) readWriteStore(store);
return (S) wrapWithReadWriteStore(store);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -156,4 +156,4 @@ public void transitionToStandby(final ThreadCache newCache) {
public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) {
cache.addDirtyEntryFlushListener(namespace, listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;

public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
// the below are null for standby tasks
Expand Down Expand Up @@ -182,7 +182,7 @@ public <S extends StateStore> S getStateStore(final String name) {
}

final StateStore store = stateManager.store(name);
return (S) readWriteStore(store);
return (S) wrapWithReadWriteStore(store);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public KeyValueIterator<Bytes, byte[]> backwardAll() {
public void remove(final Bytes rawBaseKey) {
final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final S segment = segments.getSegmentForTimestamp(timestamp);
final S segment = segments.segmentForTimestamp(timestamp);
if (segment == null) {
return;
}
Expand Down Expand Up @@ -227,7 +227,7 @@ public byte[] get(final Bytes rawKey) {
}
}

final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
final S segment = segments.segmentForTimestamp(timestampFromRawKey);
if (segment == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
public void remove(final Bytes key) {
final long timestamp = keySchema.segmentTimestamp(key);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final S segment = segments.getSegmentForTimestamp(timestamp);
final S segment = segments.segmentForTimestamp(timestamp);
if (segment == null) {
return;
}
Expand All @@ -249,7 +249,7 @@ public void remove(final Bytes key) {
@Override
public void remove(final Bytes key, final long timestamp) {
final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp);
final S segment = segments.getSegmentForTimestamp(timestamp);
final S segment = segments.segmentForTimestamp(timestamp);
if (segment != null) {
segment.deleteRange(keyBytes, keyBytes);
}
Expand Down Expand Up @@ -281,7 +281,7 @@ public byte[] get(final Bytes key) {
key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
return null;
}
final S segment = segments.getSegmentForTimestamp(timestampFromKey);
final S segment = segments.segmentForTimestamp(timestampFromKey);
if (segment == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public String segmentName(final long segmentId) {
}

@Override
public S getSegmentForTimestamp(final long timestamp) {
public S segmentForTimestamp(final long timestamp) {
return segments.get(segmentId(timestamp));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public static <V1, V2> LeftOrRightValue<V1, V2> makeRightValue(final V2 rightVal
return new LeftOrRightValue<>(null, rightValue);
}

public V1 getLeftValue() {
public V1 leftValue() {
return leftValue;
}

public V2 getRightValue() {
public V2 rightValue() {
return rightValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ public byte[] serialize(final String topic, final LeftOrRightValue<V1, V2> data)
return null;
}

final byte[] rawValue = (data.getLeftValue() != null)
? leftSerializer.serialize(topic, data.getLeftValue())
: rightSerializer.serialize(topic, data.getRightValue());
final byte[] rawValue = (data.leftValue() != null)
? leftSerializer.serialize(topic, data.leftValue())
: rightSerializer.serialize(topic, data.rightValue());

if (rawValue == null) {
return null;
}

return ByteBuffer
.allocate(1 + rawValue.length)
.put((byte) (data.getLeftValue() != null ? 1 : 0))
.put((byte) (data.leftValue() != null ? 1 : 0))
.put(rawValue)
.array();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public synchronized void destroy() {
+ "an entire store is closed, via the close() method rather than destroy().");
}

final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
final Bytes keyPrefix = prefixKeyFormatter.prefix();

// this deleteRange() call deletes all entries with the given prefix, because the
// deleteRange() implementation calls Bytes.increment() in order to make keyTo inclusive
Expand Down Expand Up @@ -192,8 +192,8 @@ private synchronized byte[] get(final Bytes key, final Optional<Snapshot> snapsh
}
}

public Snapshot getSnapshot() {
return physicalStore.getSnapshot();
public Snapshot snapshot() {
return physicalStore.snapshot();
}

public void releaseSnapshot(final Snapshot snapshot) {
Expand All @@ -204,14 +204,14 @@ public void releaseSnapshot(final Snapshot snapshot) {
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
// from bound is inclusive. if the provided bound is null, replace with prefix
final Bytes fromBound = from == null
? prefixKeyFormatter.getPrefix()
? prefixKeyFormatter.prefix()
: prefixKeyFormatter.addPrefix(from);
// to bound is inclusive. if the provided bound is null, replace with the next prefix.
// this requires potentially filtering out the element corresponding to the next prefix
// with empty bytes from the returned iterator. this filtering is accomplished by
// passing the prefix filter into StrippedPrefixKeyValueIteratorAdapter().
final Bytes toBound = to == null
? incrementWithoutOverflow(prefixKeyFormatter.getPrefix())
? incrementWithoutOverflow(prefixKeyFormatter.prefix())
: prefixKeyFormatter.addPrefix(to);
final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
fromBound,
Expand All @@ -226,7 +226,7 @@ public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, fina
@Override
public synchronized KeyValueIterator<Bytes, byte[]> all() {
final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
prefixKeyFormatter.getPrefix(),
prefixKeyFormatter.prefix(),
new BytesSerializer(),
openIterators);
return new StrippedPrefixKeyValueIteratorAdapter(
Expand Down Expand Up @@ -288,7 +288,7 @@ private byte[] removePrefix(final byte[] keyWithPrefix) {
return rawKey;
}

Bytes getPrefix() {
Bytes prefix() {
return Bytes.wrap(prefix);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Regular segments with {@code segmentId >= 0} expire according to the specified
* retention period. "Reserved" segments with {@code segmentId < 0} do not expire
* and are completely separate from regular segments in that methods such as
* {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)},
* {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)},
* {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
* {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
* only return regular segments and not reserved segments. The methods {@link #flush()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ private boolean maybeFillIterator() {
// fact all use the same physical RocksDB under-the-hood.
this.snapshotOwner = segment;
// take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency)
this.snapshot = snapshotOwner.getSnapshot();
this.snapshot = snapshotOwner.snapshot();
}

final byte[] rawSegmentValue = segment.get(key, snapshot);
if (rawSegmentValue != null) { // this segment contains record(s) with the specified key
if (segment.id() == -1) { // this is the latestValueStore
final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.timestamp(rawSegmentValue);
if (recordTimestamp <= toTime) {
// latest value satisfies timestamp bound
queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp));
queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.value(rawSegmentValue), recordTimestamp));
}
} else {
// this segment contains records with the specified key and time range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;

/**
* A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
Expand Down Expand Up @@ -263,7 +262,7 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
iterator,
getSensor,
getDeserializeValue(serdes, wrapped())
StoreQueryUtils.deserializeValue(serdes, wrapped())
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
Expand All @@ -289,7 +288,7 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final Function<byte[], V> deserializer = getDeserializeValue(serdes, wrapped());
final Function<byte[], V> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final V value = deserializer.apply(rawResult.getResult());
final QueryResult<V> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
numOpenIterators,
openIterators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;

/**
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
Expand Down Expand Up @@ -186,7 +185,7 @@ private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped());
final Function<byte[], ValueAndTimestamp<V>> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult());
final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp);
Expand Down Expand Up @@ -224,7 +223,7 @@ private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator(
iterator,
getSensor,
getDeserializeValue(serdes, wrapped()),
StoreQueryUtils.deserializeValue(serdes, wrapped()),
false
);
final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult =
Expand All @@ -251,7 +250,7 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped());
final Function<byte[], ValueAndTimestamp<V>> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult());
final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value();
final QueryResult<V> typedQueryResult =
Expand Down Expand Up @@ -290,7 +289,7 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator(
iterator,
getSensor,
getDeserializeValue(serdes, wrapped()),
StoreQueryUtils.deserializeValue(serdes, wrapped()),
true
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final
rawResult.getResult(),
iteratorDurationSensor,
time,
StoreQueryUtils.getDeserializeValue(plainValueSerdes),
StoreQueryUtils.deserializeValue(plainValueSerdes),
numOpenIterators,
openIterators
);
Expand Down
Loading

0 comments on commit fc720d3

Please sign in to comment.