diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index f1f82651..20265273 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -88,12 +88,14 @@ public class KinesisStreamsSource private final Configuration sourceConfig; private final KinesisDeserializationSchema deserializationSchema; private final KinesisShardAssigner kinesisShardAssigner; + private final boolean preserveShardOrder; KinesisStreamsSource( String streamArn, Configuration sourceConfig, KinesisDeserializationSchema deserializationSchema, - KinesisShardAssigner kinesisShardAssigner) { + KinesisShardAssigner kinesisShardAssigner, + boolean preserveShardOrder) { Preconditions.checkNotNull(streamArn); Preconditions.checkArgument(!streamArn.isEmpty(), "stream ARN cannot be empty string"); Preconditions.checkNotNull(sourceConfig); @@ -103,6 +105,7 @@ public class KinesisStreamsSource this.sourceConfig = sourceConfig; this.deserializationSchema = deserializationSchema; this.kinesisShardAssigner = kinesisShardAssigner; + this.preserveShardOrder = preserveShardOrder; } /** @@ -167,7 +170,8 @@ public SplitEnumerator c sourceConfig, createKinesisStreamProxy(sourceConfig), kinesisShardAssigner, - checkpoint); + checkpoint, + preserveShardOrder); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java index dabfedac..2e74d052 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java @@ -55,6 +55,7 @@ public class KinesisStreamsSourceBuilder { private Configuration sourceConfig; private KinesisDeserializationSchema deserializationSchema; private KinesisShardAssigner kinesisShardAssigner = ShardAssignerFactory.uniformShardAssigner(); + private boolean preserveShardOrder = true; public KinesisStreamsSourceBuilder setStreamArn(String streamArn) { this.streamArn = streamArn; @@ -84,8 +85,17 @@ public KinesisStreamsSourceBuilder setKinesisShardAssigner( return this; } + public KinesisStreamsSourceBuilder setPreserveShardOrder(boolean preserveShardOrder) { + this.preserveShardOrder = preserveShardOrder; + return this; + } + public KinesisStreamsSource build() { return new KinesisStreamsSource<>( - streamArn, sourceConfig, deserializationSchema, kinesisShardAssigner); + streamArn, + sourceConfig, + deserializationSchema, + kinesisShardAssigner, + preserveShardOrder); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java index aa8079c6..98000efc 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java @@ -24,7 +24,8 @@ import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.util.Optional; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; @@ -42,20 +43,25 @@ private KinesisStreamsSourceConfigUtil() { * configuration. * * @param sourceConfig the configuration to parse timestamp from - * @return the timestamp + * @return {@link Optional} containing the initial timestamp if configured, an empty {@link + * Optional} otherwise */ - public static Date parseStreamTimestampStartingPosition(final Configuration sourceConfig) { + public static Optional parseStreamTimestampStartingPosition( + final Configuration sourceConfig) { Preconditions.checkNotNull(sourceConfig); String timestamp = sourceConfig.get(STREAM_INITIAL_TIMESTAMP); + if (timestamp == null || timestamp.isEmpty()) { + return Optional.empty(); + } try { String format = sourceConfig.get(STREAM_TIMESTAMP_DATE_FORMAT); SimpleDateFormat customDateFormat = new SimpleDateFormat(format); - return customDateFormat.parse(timestamp); + return Optional.of(customDateFormat.parse(timestamp).toInstant()); } catch (IllegalArgumentException | NullPointerException exception) { throw new IllegalArgumentException(exception); } catch (ParseException exception) { - return new Date((long) (Double.parseDouble(timestamp) * 1000)); + return Optional.of(Instant.ofEpochMilli((long) (Double.parseDouble(timestamp) * 1000))); } } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisShardSplitWithAssignmentStatus.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisShardSplitWithAssignmentStatus.java new file mode 100644 index 00000000..4fb79357 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisShardSplitWithAssignmentStatus.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; + +import java.util.Objects; + +/** Kinesis shard split with assignment status. */ +@Internal +public class KinesisShardSplitWithAssignmentStatus { + private final KinesisShardSplit kinesisShardSplit; + private final SplitAssignmentStatus splitAssignmentStatus; + + public KinesisShardSplitWithAssignmentStatus( + KinesisShardSplit kinesisShardSplit, SplitAssignmentStatus splitAssignmentStatus) { + this.kinesisShardSplit = kinesisShardSplit; + this.splitAssignmentStatus = splitAssignmentStatus; + } + + public KinesisShardSplit split() { + return kinesisShardSplit; + } + + public SplitAssignmentStatus assignmentStatus() { + return splitAssignmentStatus; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KinesisShardSplitWithAssignmentStatus that = (KinesisShardSplitWithAssignmentStatus) o; + return Objects.equals(kinesisShardSplit, that.kinesisShardSplit) + && Objects.equals(splitAssignmentStatus, that.splitAssignmentStatus); + } + + @Override + public int hashCode() { + return Objects.hash(kinesisShardSplit, splitAssignmentStatus); + } + + @Override + public String toString() { + return "KinesisShardSplitWithAssignmentStatus{" + + "kinesisShardSplit=" + + kinesisShardSplit + + ", splitAssignmentStatus=" + + splitAssignmentStatus + + '}'; + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java index 9f8b7f08..741bc5fc 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java @@ -19,25 +19,33 @@ package org.apache.flink.connector.kinesis.source.enumerator; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.enumerator.tracker.SplitTracker; +import org.apache.flink.connector.kinesis.source.event.SplitsFinishedEvent; import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.ListShardsStartingPosition; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.StartingPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import javax.annotation.Nullable; import java.io.IOException; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -45,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; @@ -67,10 +76,9 @@ public class KinesisStreamsSourceEnumerator private final StreamProxy streamProxy; private final KinesisShardAssigner shardAssigner; private final ShardAssignerContext shardAssignerContext; + private final SplitTracker splitTracker; private final Map> splitAssignment = new HashMap<>(); - private final Set assignedSplitIds = new HashSet<>(); - private final Set unassignedSplits; private String lastSeenShardId; @@ -80,7 +88,8 @@ public KinesisStreamsSourceEnumerator( Configuration sourceConfig, StreamProxy streamProxy, KinesisShardAssigner shardAssigner, - KinesisStreamsSourceEnumeratorState state) { + KinesisStreamsSourceEnumeratorState state, + boolean preserveShardOrder) { this.context = context; this.streamArn = streamArn; this.sourceConfig = sourceConfig; @@ -89,23 +98,23 @@ public KinesisStreamsSourceEnumerator( this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); if (state == null) { this.lastSeenShardId = null; - this.unassignedSplits = new HashSet<>(); + this.splitTracker = new SplitTracker(preserveShardOrder); } else { this.lastSeenShardId = state.getLastSeenShardId(); - this.unassignedSplits = state.getUnassignedSplits(); + this.splitTracker = new SplitTracker(preserveShardOrder, state.getKnownSplits()); } } @Override public void start() { if (lastSeenShardId == null) { - context.callAsync(this::initialDiscoverSplits, this::assignSplits); + context.callAsync(this::initialDiscoverSplits, this::processDiscoveredSplits); } final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS); context.callAsync( this::periodicallyDiscoverSplits, - this::assignSplits, + this::processDiscoveredSplits, shardDiscoveryInterval, shardDiscoveryInterval); } @@ -116,24 +125,28 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); - return; + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); } + } - for (KinesisShardSplit split : splits) { - splitAssignment.get(subtaskId).remove(split); - assignedSplitIds.remove(split.splitId()); - unassignedSplits.add(split); - } + private void handleFinishedSplits(int subtask, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtask) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + + assignSplits(); + } - // Assign the unassignedSplits - // We did not discover any new splits, so we put in an empty list - assignSplits(Collections.emptyList(), null); + @Override + public void addSplitsBack(List splits, int subtaskId) { + throw new UnsupportedOperationException("Partial recovery is not supported"); } @Override @@ -143,7 +156,9 @@ public void addReader(int subtaskId) { @Override public KinesisStreamsSourceEnumeratorState snapshotState(long checkpointId) throws Exception { - return new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId); + List splitStates = + splitTracker.snapshotState(checkpointId); + return new KinesisStreamsSourceEnumeratorState(splitStates, lastSeenShardId); } @Override @@ -151,9 +166,91 @@ public void close() throws IOException { streamProxy.close(); } - private List initialDiscoverSplits() { - List shards = streamProxy.listShards(streamArn, lastSeenShardId); - return mapToSplits(shards, sourceConfig.get(STREAM_INITIAL_POSITION)); + @VisibleForTesting + List initialDiscoverSplits() { + StreamDescriptionSummary streamDescriptionSummary = + streamProxy.getStreamDescriptionSummary(streamArn); + InitialPosition initialPosition = sourceConfig.get(STREAM_INITIAL_POSITION); + Instant currentTimestamp = Instant.now(); + Optional initialTimestamp = parseStreamTimestampStartingPosition(sourceConfig); + + ListShardsStartingPosition shardDiscoveryStartingPosition = + getInitialPositionForShardDiscovery(initialPosition, currentTimestamp); + StartingPosition initialStartingPosition = + getInitialStartingPosition(initialPosition, currentTimestamp); + + List shards; + try { + shards = streamProxy.listShards(streamArn, shardDiscoveryStartingPosition); + } catch (InvalidArgumentException e) { + // If initial timestamp is older than retention period of the stream, + // fall back to TRIM_HORIZON for shard discovery + Instant trimHorizonTimestamp = + Instant.now() + .minus( + streamDescriptionSummary.retentionPeriodHours(), + ChronoUnit.HOURS); + boolean isInitialTimestampBeforeTrimHorizon = + initialTimestamp.map(trimHorizonTimestamp::isAfter).orElse(false); + if (initialPosition.equals(InitialPosition.AT_TIMESTAMP) + && isInitialTimestampBeforeTrimHorizon) { + LOG.warn( + "Configured initial position timestamp is before stream TRIM_HORIZON. Falling back to TRIM_HORIZON"); + shards = streamProxy.listShards(streamArn, ListShardsStartingPosition.fromStart()); + } else { + throw new KinesisStreamsSourceException("Unable to list shards", e); + } + } + + return mapToSplits(shards, initialStartingPosition); + } + + @VisibleForTesting + ListShardsStartingPosition getInitialPositionForShardDiscovery( + InitialPosition initialPosition, Instant currentTime) { + switch (initialPosition) { + case LATEST: + return ListShardsStartingPosition.fromTimestamp(currentTime); + case AT_TIMESTAMP: + Instant timestamp = + parseStreamTimestampStartingPosition(sourceConfig) + .orElseThrow( + () -> + new IllegalArgumentException( + "Stream initial timestamp must be specified when initial position set to AT_TIMESTAMP")); + return ListShardsStartingPosition.fromTimestamp(timestamp); + case TRIM_HORIZON: + return ListShardsStartingPosition.fromStart(); + } + + throw new IllegalArgumentException( + "Unsupported initial position configuration " + initialPosition); + } + + @VisibleForTesting + StartingPosition getInitialStartingPosition( + InitialPosition initialPosition, Instant currentTime) { + switch (initialPosition) { + case LATEST: + // If LATEST is requested, we still set the starting position to the time of + // startup. This way, the job starts reading from a deterministic timestamp + // (i.e. time of job submission), even if it enters a restart loop immediately + // after submission. + return StartingPosition.fromTimestamp(currentTime); + case AT_TIMESTAMP: + Instant timestamp = + parseStreamTimestampStartingPosition(sourceConfig) + .orElseThrow( + () -> + new IllegalArgumentException( + "Stream initial timestamp must be specified when initial position set to AT_TIMESTAMP")); + return StartingPosition.fromTimestamp(timestamp); + case TRIM_HORIZON: + return StartingPosition.fromStart(); + } + + throw new IllegalArgumentException( + "Unsupported initial position configuration " + initialPosition); } /** @@ -163,36 +260,29 @@ private List initialDiscoverSplits() { * @return list of discovered splits */ private List periodicallyDiscoverSplits() { - List shards = streamProxy.listShards(streamArn, lastSeenShardId); + List shards = + streamProxy.listShards( + streamArn, ListShardsStartingPosition.fromShardId(lastSeenShardId)); // Any shard discovered after the initial startup should be read from the start, since they // come from resharding - return mapToSplits(shards, InitialPosition.TRIM_HORIZON); + return mapToSplits(shards, StartingPosition.fromStart()); } private List mapToSplits( - List shards, InitialPosition initialPosition) { - StartingPosition startingPosition; - switch (initialPosition) { - case LATEST: - // If LATEST is requested, we still set the starting position to the time of - // startup. This way, the job starts reading from a deterministic timestamp - // (i.e. time of job submission), even if it enters a restart loop immediately - // after submission. - startingPosition = StartingPosition.fromTimestamp(Instant.now()); - break; - case AT_TIMESTAMP: - startingPosition = - StartingPosition.fromTimestamp( - parseStreamTimestampStartingPosition(sourceConfig).toInstant()); - break; - case TRIM_HORIZON: - default: - startingPosition = StartingPosition.fromStart(); - } + List shards, StartingPosition startingPosition) { List splits = new ArrayList<>(); for (Shard shard : shards) { - splits.add(new KinesisShardSplit(streamArn, shard.shardId(), startingPosition)); + Set parentShardIds = new HashSet<>(); + if (shard.parentShardId() != null) { + parentShardIds.add(shard.parentShardId()); + } + if (shard.adjacentParentShardId() != null) { + parentShardIds.add(shard.adjacentParentShardId()); + } + splits.add( + new KinesisShardSplit( + streamArn, shard.shardId(), startingPosition, parentShardIds)); } return splits; @@ -206,37 +296,39 @@ private List mapToSplits( * @param discoveredSplits list of discovered splits * @param throwable thrown when discovering splits. Will be null if no throwable thrown. */ - private void assignSplits(List discoveredSplits, Throwable throwable) { + private void processDiscoveredSplits( + List discoveredSplits, Throwable throwable) { if (throwable != null) { throw new KinesisStreamsSourceException("Failed to list shards.", throwable); } + splitTracker.addSplits(discoveredSplits); + updateLastSeenShardId(discoveredSplits); + if (context.registeredReaders().size() < context.currentParallelism()) { LOG.info( "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", context.currentParallelism(), context.registeredReaders().size()); - unassignedSplits.addAll(discoveredSplits); return; } + assignSplits(); + } + + private void assignSplits() { Map> newSplitAssignments = new HashMap<>(); - for (KinesisShardSplit split : unassignedSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - unassignedSplits.clear(); - for (KinesisShardSplit split : discoveredSplits) { + for (KinesisShardSplit split : splitTracker.splitsAvailableForAssignment()) { assignSplitToSubtask(split, newSplitAssignments); } - updateLastSeenShardId(discoveredSplits); updateSplitAssignment(newSplitAssignments); context.assignSplits(new SplitsAssignment<>(newSplitAssignments)); } private void assignSplitToSubtask( KinesisShardSplit split, Map> newSplitAssignments) { - if (assignedSplitIds.contains(split.splitId())) { + if (splitTracker.isAssigned(split.splitId())) { LOG.info( "Skipping assignment of shard {} from stream {} because it is already assigned.", split.getShardId(), @@ -261,7 +353,7 @@ private void assignSplitToSubtask( subtaskList.add(split); newSplitAssignments.put(selectedSubtask, subtaskList); } - assignedSplitIds.add(split.splitId()); + splitTracker.markAsAssigned(Collections.singletonList(split)); } private void updateLastSeenShardId(List discoveredSplits) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorState.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorState.java index 4ebf6676..0ed1ef5c 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorState.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorState.java @@ -19,11 +19,10 @@ package org.apache.flink.connector.kinesis.source.enumerator; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import javax.annotation.Nullable; -import java.util.Set; +import java.util.List; /** * State for the {@link KinesisStreamsSourceEnumerator}. This class is stored in state, so any @@ -31,12 +30,12 @@ */ @Internal public class KinesisStreamsSourceEnumeratorState { - private final Set unassignedSplits; + private final List knownSplits; @Nullable private final String lastSeenShardId; public KinesisStreamsSourceEnumeratorState( - Set unassignedSplits, String lastSeenShardId) { - this.unassignedSplits = unassignedSplits; + List knownSplits, String lastSeenShardId) { + this.knownSplits = knownSplits; this.lastSeenShardId = lastSeenShardId; } @@ -44,7 +43,7 @@ public String getLastSeenShardId() { return lastSeenShardId; } - public Set getUnassignedSplits() { - return unassignedSplits; + public List getKnownSplits() { + return knownSplits; } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java index 886728d3..4dadfcf0 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kinesis.source.enumerator; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -29,7 +30,10 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; /** Used to serialize and deserialize the {@link KinesisStreamsSourceEnumeratorState}. */ @@ -37,7 +41,8 @@ public class KinesisStreamsSourceEnumeratorStateSerializer implements SimpleVersionedSerializer { - private static final int CURRENT_VERSION = 0; + private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1)); + private static final int CURRENT_VERSION = 1; private final KinesisShardSplitSerializer splitSerializer; @@ -64,10 +69,39 @@ public byte[] serialize(KinesisStreamsSourceEnumeratorState kinesisStreamsSource out.writeUTF(kinesisStreamsSourceEnumeratorState.getLastSeenShardId()); } - out.writeInt(kinesisStreamsSourceEnumeratorState.getUnassignedSplits().size()); + out.writeInt(kinesisStreamsSourceEnumeratorState.getKnownSplits().size()); out.writeInt(splitSerializer.getVersion()); - for (KinesisShardSplit split : - kinesisStreamsSourceEnumeratorState.getUnassignedSplits()) { + for (KinesisShardSplitWithAssignmentStatus split : + kinesisStreamsSourceEnumeratorState.getKnownSplits()) { + byte[] serializedSplit = splitSerializer.serialize(split.split()); + out.writeInt(serializedSplit.length); + out.write(serializedSplit); + out.writeInt(split.assignmentStatus().getStatusCode()); + } + + out.flush(); + + return baos.toByteArray(); + } + } + + /** Used to test backwards compatibility of state. */ + @VisibleForTesting + byte[] serializeV0(KinesisStreamsSourceEnumeratorStateV0 kinesisStreamsSourceEnumeratorState) + throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + boolean hasLastSeenShardId = + kinesisStreamsSourceEnumeratorState.getLastSeenShardId() != null; + out.writeBoolean(hasLastSeenShardId); + if (hasLastSeenShardId) { + out.writeUTF(kinesisStreamsSourceEnumeratorState.getLastSeenShardId()); + } + + out.writeInt(kinesisStreamsSourceEnumeratorState.getKnownSplits().size()); + out.writeInt(splitSerializer.getVersion()); + for (KinesisShardSplit split : kinesisStreamsSourceEnumeratorState.getKnownSplits()) { byte[] serializedSplit = splitSerializer.serialize(split); out.writeInt(serializedSplit.length); out.write(serializedSplit); @@ -85,7 +119,7 @@ public KinesisStreamsSourceEnumeratorState deserialize( try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState); DataInputStream in = new DataInputStream(bais)) { - if (version != getVersion()) { + if (!COMPATIBLE_VERSIONS.contains(version)) { throw new VersionMismatchException( "Trying to deserialize KinesisStreamsSourceEnumeratorState serialized with unsupported version " + version @@ -102,20 +136,23 @@ public KinesisStreamsSourceEnumeratorState deserialize( final int numUnassignedSplits = in.readInt(); final int splitSerializerVersion = in.readInt(); - if (splitSerializerVersion != splitSerializer.getVersion()) { - throw new VersionMismatchException( - "Trying to deserialize KinesisShardSplit serialized with unsupported version " - + splitSerializerVersion - + ". Serializer version is " - + splitSerializer.getVersion()); - } - Set unassignedSplits = new HashSet<>(numUnassignedSplits); + + List knownSplits = + new ArrayList<>(numUnassignedSplits); for (int i = 0; i < numUnassignedSplits; i++) { int serializedLength = in.readInt(); byte[] serializedSplit = new byte[serializedLength]; if (in.read(serializedSplit) != -1) { - unassignedSplits.add( - splitSerializer.deserialize(splitSerializerVersion, serializedSplit)); + KinesisShardSplit deserializedSplit = + splitSerializer.deserialize(splitSerializerVersion, serializedSplit); + SplitAssignmentStatus assignmentStatus = SplitAssignmentStatus.UNASSIGNED; + if (version == CURRENT_VERSION) { + assignmentStatus = SplitAssignmentStatus.fromStatusCode(in.readInt()); + } + knownSplits.add( + new KinesisShardSplitWithAssignmentStatus( + deserializedSplit, assignmentStatus)); + } else { throw new IOException( "Unexpectedly reading more bytes than is present in stream."); @@ -126,7 +163,7 @@ public KinesisStreamsSourceEnumeratorState deserialize( throw new IOException("Unexpected trailing bytes when deserializing."); } - return new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId); + return new KinesisStreamsSourceEnumeratorState(knownSplits, lastSeenShardId); } } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateV0.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateV0.java new file mode 100644 index 00000000..8ea25092 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateV0.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * State for the {@link KinesisStreamsSourceEnumerator}. This class is stored in state, so any + * changes need to be backwards compatible + * + * @deprecated This class is preserved to test state compatibility only. + */ +@Internal +@Deprecated +public class KinesisStreamsSourceEnumeratorStateV0 { + private final List unassignedSplits; + @Nullable private final String lastSeenShardId; + + public KinesisStreamsSourceEnumeratorStateV0( + List unassignedSplits, String lastSeenShardId) { + this.unassignedSplits = unassignedSplits; + this.lastSeenShardId = lastSeenShardId; + } + + public String getLastSeenShardId() { + return lastSeenShardId; + } + + public List getKnownSplits() { + return unassignedSplits; + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/SplitAssignmentStatus.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/SplitAssignmentStatus.java new file mode 100644 index 00000000..fab9858f --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/SplitAssignmentStatus.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; + +/** + * Assignment status of {@link org.apache.flink.connector.kinesis.source.split.KinesisShardSplit}. + */ +@Internal +public enum SplitAssignmentStatus { + ASSIGNED(0), + UNASSIGNED(1); + + private final int statusCode; + + SplitAssignmentStatus(int statusCode) { + this.statusCode = statusCode; + } + + public int getStatusCode() { + return statusCode; + } + + public static SplitAssignmentStatus fromStatusCode(int statusCode) { + for (SplitAssignmentStatus status : SplitAssignmentStatus.values()) { + if (status.getStatusCode() == statusCode) { + return status; + } + } + + throw new IllegalArgumentException("Unknown status code: " + statusCode); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java new file mode 100644 index 00000000..dcd60fbd --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus; +import org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** This class is used to track shard hierarchy. */ +@Internal +public class SplitTracker { + /** + * Flag controlling if tracker should wait before all parent splits will be completed before + * assigning split to readers. + */ + private final boolean preserveShardOrdering; + + /** Map of all discovered splits that have not been completed. Split id is used as a key. */ + private final Map knownSplits = new ConcurrentHashMap<>(); + + /** Set of ids for currently assigned splits. */ + private final Set assignedSplits = new HashSet<>(); + + public SplitTracker(boolean preserveShardOrdering) { + this(preserveShardOrdering, Collections.emptyList()); + } + + public SplitTracker( + boolean preserveShardOrdering, + List initialState) { + this.preserveShardOrdering = preserveShardOrdering; + + initialState.forEach( + splitWithStatus -> { + knownSplits.put(splitWithStatus.split().splitId(), splitWithStatus.split()); + if (SplitAssignmentStatus.ASSIGNED.equals(splitWithStatus.assignmentStatus())) { + assignedSplits.add(splitWithStatus.split().splitId()); + } + }); + } + + /** + * Add newly discovered splits to tracker. + * + * @param splitsToAdd collection of splits to add to tracking + */ + public void addSplits(Collection splitsToAdd) { + splitsToAdd.forEach(split -> knownSplits.put(split.splitId(), split)); + } + + /** + * Mark splits as assigned. Assigned splits will no longer be returned as pending splits. + * + * @param splitsToAssign collection of splits to mark as assigned + */ + public void markAsAssigned(Collection splitsToAssign) { + splitsToAssign.forEach(split -> assignedSplits.add(split.splitId())); + } + + /** + * Mark splits with specified ids as finished. + * + * @param finishedSplitIds collection of split ids to mark as finished + */ + public void markAsFinished(Collection finishedSplitIds) { + finishedSplitIds.forEach( + splitId -> { + assignedSplits.remove(splitId); + knownSplits.remove(splitId); + }); + } + + /** + * Checks if split with specified id had been assigned to the reader. + * + * @param splitId split id + * @return {@code true} if split had been assigned, otherwise {@code false} + */ + public boolean isAssigned(String splitId) { + return assignedSplits.contains(splitId); + } + + /** + * Returns list of splits that can be assigned to readers. Does not include splits that are + * already assigned or finished. If shard ordering is enabled, only splits with finished parents + * will be returned. + * + * @return list of splits that can be assigned to readers. + */ + public List splitsAvailableForAssignment() { + return knownSplits.values().stream() + .filter( + split -> { + boolean splitIsNotAssigned = !isAssigned(split.splitId()); + if (preserveShardOrdering) { + // Check if all parent splits were finished + return splitIsNotAssigned + && verifyAllParentSplitsAreFinished(split); + } else { + return splitIsNotAssigned; + } + }) + .collect(Collectors.toList()); + } + + /** + * Prepare split graph representation to store in state. Method returns only splits that are + * currently assigned to readers or unassigned. Finished splits are not included in the result. + * + * @param checkpointId id of the checkpoint + * @return list of splits with current assignment status + */ + public List snapshotState(long checkpointId) { + return knownSplits.values().stream() + .map( + split -> { + SplitAssignmentStatus assignmentStatus = + isAssigned(split.splitId()) + ? SplitAssignmentStatus.ASSIGNED + : SplitAssignmentStatus.UNASSIGNED; + return new KinesisShardSplitWithAssignmentStatus( + split, assignmentStatus); + }) + .collect(Collectors.toList()); + } + + private boolean verifyAllParentSplitsAreFinished(KinesisShardSplit split) { + boolean allParentsFinished = true; + for (String parentSplitId : split.getParentShardIds()) { + allParentsFinished = allParentsFinished && isFinished(parentSplitId); + } + + return allParentsFinished; + } + + private boolean isFinished(String splitId) { + return !knownSplits.containsKey(splitId); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/event/SplitsFinishedEvent.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/event/SplitsFinishedEvent.java new file mode 100644 index 00000000..6eeb0811 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/event/SplitsFinishedEvent.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.event; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Set; + +/** Source event used by source reader to communicate that splits are finished to enumerator. */ +@Internal +public class SplitsFinishedEvent implements SourceEvent { + private static final long serialVersionUID = 1L; + + private final Set finishedSplitIds; + + public SplitsFinishedEvent(Set finishedSplitIds) { + this.finishedSplitIds = finishedSplitIds; + } + + public Set getFinishedSplitIds() { + return finishedSplitIds; + } + + @Override + public String toString() { + return "SplitsFinishedEvent{" + + "finishedSplitIds=[" + + String.join(",", finishedSplitIds) + + "]}"; + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java index 40b0ec2f..9e91df5c 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java @@ -21,8 +21,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -30,8 +34,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; - -import javax.annotation.Nullable; +import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import java.io.IOException; import java.time.Instant; @@ -43,6 +46,7 @@ /** Implementation of the {@link StreamProxy} for Kinesis data streams. */ @Internal public class KinesisStreamProxy implements StreamProxy { + private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamProxy.class); private final KinesisClient kinesisClient; private final SdkHttpClient httpClient; @@ -55,7 +59,15 @@ public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient httpClient) } @Override - public List listShards(String streamArn, @Nullable String lastSeenShardId) { + public StreamDescriptionSummary getStreamDescriptionSummary(String streamArn) { + DescribeStreamSummaryResponse response = + kinesisClient.describeStreamSummary( + DescribeStreamSummaryRequest.builder().streamARN(streamArn).build()); + return response.streamDescriptionSummary(); + } + + @Override + public List listShards(String streamArn, ListShardsStartingPosition startingPosition) { List shards = new ArrayList<>(); ListShardsResponse listShardsResponse; @@ -65,8 +77,10 @@ public List listShards(String streamArn, @Nullable String lastSeenShardId kinesisClient.listShards( ListShardsRequest.builder() .streamARN(streamArn) - .exclusiveStartShardId( - nextToken == null ? lastSeenShardId : null) + .shardFilter( + nextToken == null + ? startingPosition.getShardFilter() + : null) .nextToken(nextToken) .build()); @@ -120,7 +134,7 @@ private String getShardIterator( (Instant) startingPosition.getStartingMarker()); } else { throw new IllegalArgumentException( - "Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object."); + "Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Instant object."); } break; case AT_SEQUENCE_NUMBER: diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPosition.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPosition.java new file mode 100644 index 00000000..331ea67e --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPosition.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; + +import java.time.Instant; + +/** Starting position to perform list shard request. */ +@Internal +public class ListShardsStartingPosition { + private final ShardFilter shardFilter; + + private ListShardsStartingPosition(ShardFilter shardFilter) { + this.shardFilter = shardFilter; + } + + /** Returns shard filter used to perform listShard request. */ + public ShardFilter getShardFilter() { + return shardFilter; + } + + /** Used to get shards that were active at or after specified timestamp. */ + public static ListShardsStartingPosition fromTimestamp(Instant timestamp) { + Preconditions.checkNotNull(timestamp, "timestamp cannot be null"); + + return new ListShardsStartingPosition( + ShardFilter.builder() + .type(ShardFilterType.FROM_TIMESTAMP) + .timestamp(timestamp) + .build()); + } + + /** Used to get shards after specified shard id. */ + public static ListShardsStartingPosition fromShardId(String exclusiveStartShardId) { + Preconditions.checkNotNull(exclusiveStartShardId, "exclusiveStartShardId cannot be null"); + + return new ListShardsStartingPosition( + ShardFilter.builder() + .type(ShardFilterType.AFTER_SHARD_ID) + .shardId(exclusiveStartShardId) + .build()); + } + + /** Used to get all shards starting from TRIM_HORIZON. */ + public static ListShardsStartingPosition fromStart() { + return new ListShardsStartingPosition( + ShardFilter.builder().type(ShardFilterType.FROM_TRIM_HORIZON).build()); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java index 6f79d4ef..86678d87 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java @@ -23,8 +23,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; - -import javax.annotation.Nullable; +import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import java.io.Closeable; import java.util.List; @@ -32,16 +31,23 @@ /** Interface for a StreamProxy to interact with Streams service in a given region. */ @Internal public interface StreamProxy extends Closeable { + /** + * Obtains stream metadata. + * + * @param streamArn the ARN of the stream + * @return stream information. + */ + StreamDescriptionSummary getStreamDescriptionSummary(String streamArn); /** * Obtains the shards associated with a given stream. * * @param streamArn the ARN of the stream - * @param lastSeenShardId the last seen shard Id. Used for reducing the number of results - * returned. + * @param startingPosition starting position for shard discovery request. Used to skip already + * discovered/not used shards * @return shard list */ - List listShards(String streamArn, @Nullable String lastSeenShardId); + List listShards(String streamArn, ListShardsStartingPosition startingPosition); /** * Retrieves records from the stream. diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java index 0eb7c1a2..0fa5c0b4 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kinesis.source.event.SplitsFinishedEvent; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.Record; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -47,7 +49,7 @@ public class KinesisStreamsSourceReader extends SingleThreadMultiplexSourceReaderBase< Record, T, KinesisShardSplit, KinesisShardSplitState> { - private static final Logger log = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); + private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); private final Map shardMetricGroupMap; public KinesisStreamsSourceReader( @@ -63,6 +65,8 @@ public KinesisStreamsSourceReader( @Override protected void onSplitFinished(Map finishedSplitIds) { + context.sendSourceEventToCoordinator( + new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet()))); finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup); } @@ -93,7 +97,7 @@ private void registerShardMetricGroup(KinesisShardSplit split) { this.shardMetricGroupMap.put( split.getShardId(), new KinesisShardMetrics(split, context.metricGroup())); } else { - log.warn( + LOG.warn( "Metric group for shard with id {} has already been registered.", split.getShardId()); } @@ -104,7 +108,7 @@ private void unregisterShardMetricGroup(String shardId) { if (removed != null) { removed.unregister(); } else { - log.warn( + LOG.warn( "Shard metric group unregister failed. Metric group for {} does not exist.", shardId); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java index cdf22538..b7646f99 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java @@ -28,8 +28,11 @@ import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import javax.annotation.Nullable; @@ -49,6 +52,7 @@ */ @Internal public class PollingKinesisShardSplitReader implements SplitReader { + private static final Logger LOG = LoggerFactory.getLogger(PollingKinesisShardSplitReader.class); private static final RecordsWithSplitIds INCOMPLETE_SHARD_EMPTY_RECORDS = new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false); @@ -70,11 +74,20 @@ public RecordsWithSplitIds fetch() throws IOException { return INCOMPLETE_SHARD_EMPTY_RECORDS; } - GetRecordsResponse getRecordsResponse = - kinesis.getRecords( - splitState.getStreamArn(), - splitState.getShardId(), - splitState.getNextStartingPosition()); + GetRecordsResponse getRecordsResponse; + try { + getRecordsResponse = + kinesis.getRecords( + splitState.getStreamArn(), + splitState.getShardId(), + splitState.getNextStartingPosition()); + } catch (ResourceNotFoundException e) { + LOG.warn( + "Failed to fetch records from shard {}: shard no longer exists. Marking split as complete", + splitState.getSplitId()); + return new KinesisRecordsWithSplitIds( + Collections.emptyIterator(), splitState.getSplitId(), true); + } boolean isComplete = getRecordsResponse.nextShardIterator() == null; shardMetricGroupMap diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java index 2b75d7cd..309701de 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java @@ -23,7 +23,9 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -39,15 +41,22 @@ public final class KinesisShardSplit implements SourceSplit { private final String streamArn; private final String shardId; private final StartingPosition startingPosition; + private final Set parentShardIds; - public KinesisShardSplit(String streamArn, String shardId, StartingPosition startingPosition) { + public KinesisShardSplit( + String streamArn, + String shardId, + StartingPosition startingPosition, + Set parentShardIds) { checkNotNull(streamArn, "streamArn cannot be null"); checkNotNull(shardId, "shardId cannot be null"); checkNotNull(startingPosition, "startingPosition cannot be null"); + checkNotNull(parentShardIds, "parentShardIds cannot be null"); this.streamArn = streamArn; this.shardId = shardId; this.startingPosition = startingPosition; + this.parentShardIds = new HashSet<>(parentShardIds); } @Override @@ -67,6 +76,10 @@ public StartingPosition getStartingPosition() { return startingPosition; } + public Set getParentShardIds() { + return parentShardIds; + } + @Override public String toString() { return "KinesisShardSplit{" @@ -78,6 +91,9 @@ public String toString() { + '\'' + ", startingPosition=" + startingPosition + + ", parentShardIds=[" + + String.join(",", parentShardIds) + + ']' + '}'; } @@ -92,11 +108,12 @@ public boolean equals(Object o) { KinesisShardSplit that = (KinesisShardSplit) o; return Objects.equals(streamArn, that.streamArn) && Objects.equals(shardId, that.shardId) - && Objects.equals(startingPosition, that.startingPosition); + && Objects.equals(startingPosition, that.startingPosition) + && Objects.equals(parentShardIds, that.parentShardIds); } @Override public int hashCode() { - return Objects.hash(streamArn, shardId, startingPosition); + return Objects.hash(streamArn, shardId, startingPosition, parentShardIds); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java index fca87341..b5b17119 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kinesis.source.split; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.VersionMismatchException; @@ -30,6 +31,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.time.Instant; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; /** * Serializes and deserializes the {@link KinesisShardSplit}. This class needs to handle @@ -38,7 +42,8 @@ @Internal public class KinesisShardSplitSerializer implements SimpleVersionedSerializer { - private static final int CURRENT_VERSION = 0; + private static final int CURRENT_VERSION = 1; + private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1)); @Override public int getVersion() { @@ -47,6 +52,39 @@ public int getVersion() { @Override public byte[] serialize(KinesisShardSplit split) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + out.writeUTF(split.getStreamArn()); + out.writeUTF(split.getShardId()); + out.writeUTF(split.getStartingPosition().getShardIteratorType().toString()); + if (split.getStartingPosition().getStartingMarker() == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + Object startingMarker = split.getStartingPosition().getStartingMarker(); + out.writeBoolean(startingMarker instanceof Instant); + if (startingMarker instanceof Instant) { + out.writeLong(((Instant) startingMarker).toEpochMilli()); + } + out.writeBoolean(startingMarker instanceof String); + if (startingMarker instanceof String) { + out.writeUTF((String) startingMarker); + } + } + out.writeInt(split.getParentShardIds().size()); + for (String parentShardId : split.getParentShardIds()) { + out.writeUTF(parentShardId); + } + + out.flush(); + return baos.toByteArray(); + } + } + + /** This method used only to test backwards compatibility of deserialization logic. */ + @VisibleForTesting + byte[] serializeV0(KinesisShardSplit split) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos)) { @@ -76,7 +114,7 @@ public byte[] serialize(KinesisShardSplit split) throws IOException { public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { - if (version != getVersion()) { + if (!COMPATIBLE_VERSIONS.contains(version)) { throw new VersionMismatchException( "Trying to deserialize KinesisShardSplit serialized with unsupported version " + version @@ -99,8 +137,19 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx } } + Set parentShardIds = new HashSet<>(); + if (version == CURRENT_VERSION) { + int parentShardCount = in.readInt(); + for (int i = 0; i < parentShardCount; i++) { + parentShardIds.add(in.readUTF()); + } + } + return new KinesisShardSplit( - streamArn, shardId, new StartingPosition(shardIteratorType, startingMarker)); + streamArn, + shardId, + new StartingPosition(shardIteratorType, startingMarker), + parentShardIds); } } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java index 64c0480b..54e3a9b0 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java @@ -39,7 +39,8 @@ public KinesisShardSplit getKinesisShardSplit() { return new KinesisShardSplit( kinesisShardSplit.getStreamArn(), kinesisShardSplit.getShardId(), - nextStartingPosition); + nextStartingPosition, + kinesisShardSplit.getParentShardIds()); } public String getSplitId() { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java index 89a4a2ff..766da116 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java @@ -23,7 +23,7 @@ import org.junit.jupiter.api.Test; import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; @@ -35,8 +35,10 @@ class KinesisStreamsSourceConfigUtilTest { @Test void testParseStreamTimestampUsingDefaultFormat() throws Exception { String timestamp = "2023-04-13T09:18:00.0+01:00"; - Date expectedTimestamp = - new SimpleDateFormat(STREAM_TIMESTAMP_DATE_FORMAT.defaultValue()).parse(timestamp); + Instant expectedTimestamp = + new SimpleDateFormat(STREAM_TIMESTAMP_DATE_FORMAT.defaultValue()) + .parse(timestamp) + .toInstant(); Configuration sourceConfig = new Configuration(); sourceConfig.set(STREAM_INITIAL_TIMESTAMP, timestamp); @@ -44,14 +46,14 @@ void testParseStreamTimestampUsingDefaultFormat() throws Exception { assertThat( KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( sourceConfig)) - .isEqualTo(expectedTimestamp); + .contains(expectedTimestamp); } @Test void testParseStreamTimestampUsingCustomFormat() throws Exception { String format = "yyyy-MM-dd'T'HH:mm"; String timestamp = "2023-04-13T09:23"; - Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp); + Instant expectedTimestamp = new SimpleDateFormat(format).parse(timestamp).toInstant(); Configuration sourceConfig = new Configuration(); sourceConfig.set(STREAM_INITIAL_TIMESTAMP, timestamp); @@ -60,42 +62,41 @@ void testParseStreamTimestampUsingCustomFormat() throws Exception { assertThat( KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( sourceConfig)) - .isEqualTo(expectedTimestamp); + .contains(expectedTimestamp); } @Test void testParseStreamTimestampEpoch() { - long epoch = 1681910583L; - Date expectedTimestamp = new Date(epoch * 1000); + long epochMillis = 1681910583745L; + Instant expectedTimestamp = Instant.ofEpochMilli(epochMillis); Configuration sourceConfig = new Configuration(); - sourceConfig.set(STREAM_INITIAL_TIMESTAMP, String.valueOf(epoch)); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, String.valueOf(epochMillis / 1000.0)); assertThat( KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( sourceConfig)) - .isEqualTo(expectedTimestamp); + .contains(expectedTimestamp); } @Test - void testParseStreamTimestampParseError() { - String badTimestamp = "badTimestamp"; - + void testParseStreamTimestampTimestampNotSpecified() { Configuration sourceConfig = new Configuration(); - sourceConfig.set(STREAM_INITIAL_TIMESTAMP, badTimestamp); - assertThatExceptionOfType(NumberFormatException.class) - .isThrownBy( - () -> - KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( - sourceConfig)); + assertThat( + KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( + sourceConfig)) + .isEmpty(); } @Test - void testParseStreamTimestampTimestampNotSpecified() { + void testParseStreamTimestampParseError() { + String badTimestamp = "badTimestamp"; + Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, badTimestamp); - assertThatExceptionOfType(IllegalArgumentException.class) + assertThatExceptionOfType(NumberFormatException.class) .isThrownBy( () -> KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest.java index 79b9e5a5..97e8d266 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest.java @@ -20,18 +20,19 @@ import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer; +import org.apache.flink.connector.kinesis.source.util.TestUtil; import org.apache.flink.core.io.VersionMismatchException; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.Set; +import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static java.util.Arrays.copyOf; import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; -import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; @@ -39,15 +40,11 @@ class KinesisStreamsSourceEnumeratorStateSerializerTest { @Test void testSerializeAndDeserializeEverythingSpecified() throws Exception { - Set completedShardIds = - Stream.of("shardId-000000000001", "shardId-000000000002", "shardId-000000000003") - .collect(Collectors.toSet()); - Set unassignedSplits = - Stream.of(getTestSplit(generateShardId(1)), getTestSplit(generateShardId(2))) - .collect(Collectors.toSet()); - String lastSeenShardId = "shardId-000000000002"; + List splitsToStore = + getSplits(IntStream.rangeClosed(0, 3), IntStream.rangeClosed(4, 10)); + String lastSeenShardId = generateShardId(10); KinesisStreamsSourceEnumeratorState initialState = - new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId); + new KinesisStreamsSourceEnumeratorState(splitsToStore, lastSeenShardId); KinesisShardSplitSerializer splitSerializer = new KinesisShardSplitSerializer(); KinesisStreamsSourceEnumeratorStateSerializer serializer = @@ -62,15 +59,11 @@ void testSerializeAndDeserializeEverythingSpecified() throws Exception { @Test void testDeserializeWithWrongVersionStateSerializer() throws Exception { - Set completedShardIds = - Stream.of("shardId-000000000001", "shardId-000000000002", "shardId-000000000003") - .collect(Collectors.toSet()); - Set unassignedSplits = - Stream.of(getTestSplit(generateShardId(1)), getTestSplit(generateShardId(2))) - .collect(Collectors.toSet()); - String lastSeenShardId = "shardId-000000000002"; + List splitsToStore = + getSplits(IntStream.rangeClosed(0, 3), IntStream.rangeClosed(4, 10)); + String lastSeenShardId = generateShardId(10); KinesisStreamsSourceEnumeratorState initialState = - new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId); + new KinesisStreamsSourceEnumeratorState(splitsToStore, lastSeenShardId); KinesisShardSplitSerializer splitSerializer = new KinesisShardSplitSerializer(); KinesisStreamsSourceEnumeratorStateSerializer serializer = @@ -93,15 +86,11 @@ void testDeserializeWithWrongVersionStateSerializer() throws Exception { @Test void testDeserializeWithWrongVersionSplitSerializer() throws Exception { - Set completedShardIds = - Stream.of("shardId-000000000001", "shardId-000000000002", "shardId-000000000003") - .collect(Collectors.toSet()); - Set unassignedSplits = - Stream.of(getTestSplit(generateShardId(1)), getTestSplit(generateShardId(2))) - .collect(Collectors.toSet()); - String lastSeenShardId = "shardId-000000000002"; + List splitsToStore = + getSplits(IntStream.rangeClosed(0, 3), IntStream.rangeClosed(4, 10)); + String lastSeenShardId = generateShardId(10); KinesisStreamsSourceEnumeratorState initialState = - new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId); + new KinesisStreamsSourceEnumeratorState(splitsToStore, lastSeenShardId); KinesisShardSplitSerializer splitSerializer = new KinesisShardSplitSerializer(); KinesisStreamsSourceEnumeratorStateSerializer serializer = @@ -116,21 +105,17 @@ void testDeserializeWithWrongVersionSplitSerializer() throws Exception { .isThrownBy(() -> serializer.deserialize(serializer.getVersion(), serialized)) .withMessageContaining( "Trying to deserialize KinesisShardSplit serialized with unsupported version") - .withMessageContaining(String.valueOf(serializer.getVersion())) - .withMessageContaining(String.valueOf(wrongVersionStateSerializer.getVersion())); + .withMessageContaining(String.valueOf(splitSerializer.getVersion())) + .withMessageContaining(String.valueOf(wrongVersionSplitSerializer.getVersion())); } @Test void testSerializeWithTrailingBytes() throws Exception { - Set completedShardIds = - Stream.of("shardId-000000000001", "shardId-000000000002", "shardId-000000000003") - .collect(Collectors.toSet()); - Set unassignedSplits = - Stream.of(getTestSplit(generateShardId(1)), getTestSplit(generateShardId(2))) - .collect(Collectors.toSet()); - String lastSeenShardId = "shardId-000000000002"; + List splitsToStore = + getSplits(IntStream.rangeClosed(0, 3), IntStream.rangeClosed(4, 10)); + String lastSeenShardId = generateShardId(10); KinesisStreamsSourceEnumeratorState initialState = - new KinesisStreamsSourceEnumeratorState(unassignedSplits, lastSeenShardId); + new KinesisStreamsSourceEnumeratorState(splitsToStore, lastSeenShardId); KinesisShardSplitSerializer splitSerializer = new KinesisShardSplitSerializer(); KinesisStreamsSourceEnumeratorStateSerializer serializer = @@ -145,6 +130,57 @@ void testSerializeWithTrailingBytes() throws Exception { .withMessageContaining("Unexpected trailing bytes when deserializing."); } + @Test + void testDeserializeCompatibilityWithV0() throws Exception { + // V0 state only contains UNASSIGNED splits + List splitsWithStatusToStore = + getSplits(IntStream.empty(), IntStream.rangeClosed(10, 15)); + String lastSeenShardId = + splitsWithStatusToStore.get(splitsWithStatusToStore.size() - 1).split().splitId(); + KinesisStreamsSourceEnumeratorState expectedState = + new KinesisStreamsSourceEnumeratorState(splitsWithStatusToStore, lastSeenShardId); + + int version = 0; + List splitsToStoreForOldVersion = + splitsWithStatusToStore.stream() + .map(KinesisShardSplitWithAssignmentStatus::split) + .collect(Collectors.toList()); + KinesisStreamsSourceEnumeratorStateV0 initialState = + new KinesisStreamsSourceEnumeratorStateV0( + splitsToStoreForOldVersion, lastSeenShardId); + + KinesisShardSplitSerializer splitSerializer = new KinesisShardSplitSerializer(); + KinesisStreamsSourceEnumeratorStateSerializer serializer = + new KinesisStreamsSourceEnumeratorStateSerializer(splitSerializer); + + byte[] serialized = serializer.serializeV0(initialState); + KinesisStreamsSourceEnumeratorState deserializedState = + serializer.deserialize(version, serialized); + + assertThat(deserializedState).usingRecursiveComparison().isEqualTo(expectedState); + } + + private List getSplits( + IntStream assignedShardIdRange, IntStream unassignedShardIdRange) { + Stream assignedSplits = + assignedShardIdRange + .mapToObj(TestUtil::generateShardId) + .map(TestUtil::getTestSplit) + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.ASSIGNED)); + Stream unassignedSplits = + unassignedShardIdRange + .mapToObj(TestUtil::generateShardId) + .map(TestUtil::getTestSplit) + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.UNASSIGNED)); + return Stream.concat(assignedSplits, unassignedSplits).collect(Collectors.toList()); + } + private static class WrongVersionStateSerializer extends KinesisStreamsSourceEnumeratorStateSerializer { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java index b6056e4b..e8e5e89f 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; +import org.apache.flink.connector.kinesis.source.proxy.ListShardsStartingPosition; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.util.TestUtil; @@ -36,10 +37,10 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.stream.Stream; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; @@ -80,7 +81,8 @@ void testStartWithoutStateDiscoversAndAssignsShards( sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); // When enumerator starts enumerator.start(); // Then initial discovery scheduled, with periodic discovery after @@ -108,7 +110,7 @@ void testStartWithoutStateDiscoversAndAssignsShards( assertThat( initialSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getShardId)) - .containsExactly(shardIds); + .containsExactlyInAnyOrder(shardIds); assertThat( initialSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getStartingPosition)) @@ -137,7 +139,7 @@ void testStartWithoutStateDiscoversAndAssignsShards( assertThat( afterReshardingSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getShardId)) - .containsExactly(additionalShards); + .containsExactlyInAnyOrder(additionalShards); assertThat( afterReshardingSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getStartingPosition)) @@ -160,10 +162,9 @@ void testStartWithStateDoesNotAssignCompletedShards( TestKinesisStreamProxy streamProxy = getTestStreamProxy(); final String completedShard = generateShardId(0); final String lastSeenShard = generateShardId(1); - final Set completedShardIds = Collections.singleton(completedShard); KinesisStreamsSourceEnumeratorState state = - new KinesisStreamsSourceEnumeratorState(Collections.emptySet(), lastSeenShard); + new KinesisStreamsSourceEnumeratorState(Collections.emptyList(), lastSeenShard); final Configuration sourceConfig = new Configuration(); sourceConfig.set(STREAM_INITIAL_POSITION, initialPosition); @@ -177,7 +178,8 @@ void testStartWithStateDoesNotAssignCompletedShards( sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - state); + state, + true); // When enumerator starts enumerator.start(); // Then no initial discovery is scheduled, but a periodic discovery is scheduled @@ -203,7 +205,7 @@ completedShard, lastSeenShard, generateShardId(2), generateShardId(3) assertThat( firstUpdateSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getShardId)) - .containsExactly(generateShardId(2), generateShardId(3)); + .containsExactlyInAnyOrder(generateShardId(2), generateShardId(3)); assertThat( firstUpdateSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getStartingPosition)) @@ -214,57 +216,42 @@ completedShard, lastSeenShard, generateShardId(2), generateShardId(3) } } - @Test - void testReturnedSplitsWillBeReassigned() throws Throwable { + @ParameterizedTest + @MethodSource("provideInitialPositionForShardDiscovery") + void testInitialPositionForListShardsMapping( + Instant currentTimestamp, + InitialPosition initialPosition, + String initialTimestamp, + ListShardsStartingPosition expected) + throws Exception { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - KinesisStreamsSourceEnumerator enumerator = - getSimpleEnumeratorWithNoState(context, streamProxy); - // Given enumerator is initialised with one registered reader, with 4 shards in stream - final int subtaskId = 1; - context.registerReader(TestUtil.getTestReaderInfo(subtaskId)); - enumerator.addReader(subtaskId); - String[] shardIds = - new String[] { - generateShardId(0), - generateShardId(1), - generateShardId(2), - generateShardId(3) - }; - streamProxy.addShards(shardIds); + final Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_POSITION, initialPosition); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, initialTimestamp); - // When first discovery runs - context.runNextOneTimeCallable(); - SplitsAssignment initialSplitAssignment = - context.getSplitsAssignmentSequence().get(0); + KinesisStreamsSourceEnumerator enumerator = + new KinesisStreamsSourceEnumerator( + context, + STREAM_ARN, + sourceConfig, + streamProxy, + ShardAssignerFactory.uniformShardAssigner(), + null, + true); - // Then all 4 shards discovered on startup - assertThat(initialSplitAssignment.assignment()).containsOnlyKeys(subtaskId); assertThat( - initialSplitAssignment.assignment().get(subtaskId).stream() - .map(KinesisShardSplit::getShardId)) - .containsExactly(shardIds); - - // Given one shard split is returned - KinesisShardSplit returnedSplit = - initialSplitAssignment.assignment().get(subtaskId).get(0); - enumerator.addSplitsBack(Collections.singletonList(returnedSplit), subtaskId); - - // When first periodic discovery runs - context.runPeriodicCallable(0); - // Then returned split will be assigned - SplitsAssignment firstReturnedSplitAssignment = - context.getSplitsAssignmentSequence().get(1); - assertThat(firstReturnedSplitAssignment.assignment()).containsOnlyKeys(subtaskId); - assertThat(firstReturnedSplitAssignment.assignment().get(subtaskId)) - .containsExactly(returnedSplit); + enumerator.getInitialPositionForShardDiscovery( + initialPosition, currentTimestamp)) + .usingRecursiveComparison() + .isEqualTo(expected); } } @Test - void testAddSplitsBackWithoutSplitIsNoOp() throws Throwable { + void testAddSplitsBackThrowsException() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); @@ -272,56 +259,8 @@ void testAddSplitsBackWithoutSplitIsNoOp() throws Throwable { getSimpleEnumeratorWithNoState(context, streamProxy); List splits = Collections.singletonList(getTestSplit()); - // Given enumerator has no assigned splits - // When we add splits back - // Then handled gracefully with no exception thrown - assertThatNoException().isThrownBy(() -> enumerator.addSplitsBack(splits, 1)); - } - } - - @Test - void testAddSplitsBackAssignsUnassignedSplits() throws Throwable { - try (MockSplitEnumeratorContext context = - new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { - TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - KinesisStreamsSourceEnumerator enumerator = - getSimpleEnumeratorWithNoState(context, streamProxy); - - // Given enumerator has assigned splits - final int subtaskId = 1; - context.registerReader(TestUtil.getTestReaderInfo(subtaskId)); - enumerator.addReader(subtaskId); - String[] shardIds = - new String[] { - generateShardId(0), - generateShardId(1), - generateShardId(2), - generateShardId(3) - }; - streamProxy.addShards(shardIds); - context.runNextOneTimeCallable(); - - // Ensure that there are splits assigned - SplitsAssignment initialSplitAssignment = - context.getSplitsAssignmentSequence().get(0); - assertThat(initialSplitAssignment.assignment()).containsOnlyKeys(subtaskId); - assertThat( - initialSplitAssignment.assignment().get(subtaskId).stream() - .map(KinesisShardSplit::getShardId)) - .containsExactly(shardIds); - - // When we add splits back - KinesisShardSplit returnedSplit = - initialSplitAssignment.assignment().get(subtaskId).get(0); - enumerator.addSplitsBack(Collections.singletonList(returnedSplit), 1); - - // Then splits are reassigned - assertThat(context.getSplitsAssignmentSequence()).hasSizeGreaterThan(1); - SplitsAssignment secondSplitAssignment = - context.getSplitsAssignmentSequence().get(1); - assertThat(secondSplitAssignment.assignment()).containsOnlyKeys(subtaskId); - assertThat(secondSplitAssignment.assignment().get(subtaskId)) - .containsExactly(returnedSplit); + assertThatExceptionOfType(UnsupportedOperationException.class) + .isThrownBy(() -> enumerator.addSplitsBack(splits, 1)); } } @@ -354,7 +293,8 @@ void testAssignSplitsSurfacesThrowableIfUnableToListShards() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); // Given List Shard request throws an Exception @@ -383,7 +323,8 @@ void testAssignSplitsHandlesRepeatSplitsGracefully() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); // Given enumerator is initialised with one registered reader, with 4 shards in stream @@ -409,7 +350,7 @@ void testAssignSplitsHandlesRepeatSplitsGracefully() throws Throwable { assertThat( initialSplitAssignment.assignment().get(subtaskId).stream() .map(KinesisShardSplit::getShardId)) - .containsExactly(shardIds); + .containsExactlyInAnyOrder(shardIds); // Given ListShards doesn't respect lastSeenShardId, and returns already assigned shards streamProxy.setShouldRespectLastSeenShardId(false); @@ -436,7 +377,8 @@ void testAssignSplitWithoutRegisteredReaders() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); // Given enumerator is initialised without a reader @@ -494,7 +436,8 @@ void testAssignSplitWithInsufficientRegisteredReaders() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); // Given enumerator is initialised without only one reader @@ -560,7 +503,8 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); // Given enumerator is initialised with one registered reader, with 4 shards in stream @@ -587,7 +531,8 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - snapshottedState); + snapshottedState, + true); restoredEnumerator.start(); // Given enumerator is initialised with one registered reader, with 4 shards in stream restoredContext.registerReader(TestUtil.getTestReaderInfo(subtaskId)); @@ -595,7 +540,11 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { restoredContext.runPeriodicCallable(0); // Then ListShards receives a ListShards call with the lastSeenShardId - assertThat(streamProxy.getLastProvidedLastSeenShardId()) + assertThat( + streamProxy + .getLastProvidedListShardStartingPosition() + .getShardFilter() + .shardId()) .isEqualTo(shardIds[shardIds.length - 1]); } } @@ -613,7 +562,8 @@ void testHandleUnrecognisedSourceEventIsNoOp() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); assertThatNoException() .isThrownBy(() -> enumerator.handleSourceEvent(1, new SourceEvent() {})); @@ -633,7 +583,8 @@ void testCloseClosesStreamProxy() throws Throwable { sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); assertThatNoException().isThrownBy(enumerator::close); @@ -651,7 +602,8 @@ private KinesisStreamsSourceEnumerator getSimpleEnumeratorWithNoState( sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), - null); + null, + true); enumerator.start(); assertThat(context.getOneTimeCallables()).hasSize(1); assertThat(context.getPeriodicCallables()).hasSize(1); @@ -667,4 +619,26 @@ private static Stream provideInitialPositions() { "2023-04-13T09:18:00.0+01:00", ShardIteratorType.AT_TIMESTAMP)); } + + private static Stream provideInitialPositionForShardDiscovery() { + Instant currentTimestamp = Instant.now(); + + return Stream.of( + Arguments.of( + currentTimestamp, + InitialPosition.LATEST, + "", + ListShardsStartingPosition.fromTimestamp(currentTimestamp)), + Arguments.of( + currentTimestamp, + InitialPosition.TRIM_HORIZON, + "", + ListShardsStartingPosition.fromStart(), + Arguments.of( + currentTimestamp, + InitialPosition.AT_TIMESTAMP, + "1719776523", + ListShardsStartingPosition.fromTimestamp( + Instant.ofEpochSecond(1719776523))))); + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java index 24bfda1e..8ba3473b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.kinesis.source.enumerator.assigner; import org.apache.flink.api.connector.source.ReaderInfo; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java new file mode 100644 index 00000000..601a4408 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator.tracker; + +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus; +import org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +class SplitTrackerTest { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSplitWithoutParentAreAvailableToAssign(boolean preserveShardOrdering) { + SplitTracker splitTracker = new SplitTracker(preserveShardOrdering); + + KinesisShardSplit split = getTestSplit(generateShardId(1), Collections.emptySet()); + + splitTracker.addSplits(Collections.singletonList(split)); + + List pendingSplits = splitTracker.splitsAvailableForAssignment(); + + assertThat(pendingSplits).containsExactly(split); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStateSnapshot(boolean preserveShardOrdering) { + List assignedSplits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet())); + List unassignedSplits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(3), Collections.emptySet()), + // Shards produced by splitting parent shard + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(1))), + getTestSplit(generateShardId(5), Collections.singleton(generateShardId(1))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(6), + new HashSet<>( + Arrays.asList(generateShardId(2), generateShardId(3))))); + + List assignedSplitsWithStatus = + assignedSplits.stream() + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.ASSIGNED)) + .collect(Collectors.toList()); + List unassignedSplitsWithStatus = + unassignedSplits.stream() + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.UNASSIGNED)) + .collect(Collectors.toList()); + List expectedState = + new ArrayList<>(assignedSplitsWithStatus); + expectedState.addAll(unassignedSplitsWithStatus); + + SplitTracker splitTracker = new SplitTracker(preserveShardOrdering); + + splitTracker.addSplits(assignedSplits); + splitTracker.addSplits(unassignedSplits); + splitTracker.markAsAssigned(assignedSplits); + + // Verify that produced state is the same + assertThat(splitTracker.snapshotState(0)) + .containsExactlyInAnyOrderElementsOf(expectedState); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSplitSnapshotStateDoesNotIncludeFinishedSplits(boolean preserveShardOrdering) { + List splits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + getTestSplit(generateShardId(3), Collections.emptySet())); + + List expectedSplitState = + splits.stream() + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.ASSIGNED)) + .collect(Collectors.toList()); + + SplitTracker splitTracker = new SplitTracker(preserveShardOrdering); + splitTracker.addSplits(splits); + splitTracker.markAsAssigned(splits); + + String splitIdToFinish = expectedSplitState.get(0).split().splitId(); + splitTracker.markAsFinished(Collections.singletonList(splitIdToFinish)); + + // Verify that state does not contain finished split + assertThat(splitTracker.snapshotState(0)) + .doesNotContain(expectedSplitState.get(0)) + .containsExactlyInAnyOrder(expectedSplitState.get(1), expectedSplitState.get(2)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStateRestore(boolean preserveShardOrdering) { + List splits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + getTestSplit(generateShardId(3), Collections.emptySet()), + // Shards produced by splitting parent shard + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(1))), + getTestSplit(generateShardId(5), Collections.singleton(generateShardId(1))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(6), + new HashSet<>( + Arrays.asList(generateShardId(2), generateShardId(3))))); + + Stream assignedSplits = + splits.subList(0, 3).stream() + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.ASSIGNED)); + Stream unassignedSplits = + splits.subList(3, splits.size()).stream() + .map( + split -> + new KinesisShardSplitWithAssignmentStatus( + split, SplitAssignmentStatus.UNASSIGNED)); + List initialState = + Stream.concat(assignedSplits, unassignedSplits).collect(Collectors.toList()); + + SplitTracker splitTracker = new SplitTracker(preserveShardOrdering, initialState); + + // Verify that produced state is the same + assertThat(splitTracker.snapshotState(0)).containsExactlyInAnyOrderElementsOf(initialState); + } + + // ----------------------------------------------------------------------------------------- + // Shard ordering disabled + // ----------------------------------------------------------------------------------------- + + @Test + public void testUnorderedAllUnassignedSplitsAvailableForAssignment() { + List splits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + getTestSplit(generateShardId(3), Collections.emptySet()), + // Shards produced by splitting parent shard + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(1))), + getTestSplit(generateShardId(5), Collections.singleton(generateShardId(1))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(6), + new HashSet<>( + Arrays.asList(generateShardId(2), generateShardId(3))))); + + SplitTracker splitTracker = new SplitTracker(false); + splitTracker.addSplits(splits); + + List pendingSplits = splitTracker.splitsAvailableForAssignment(); + + assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(splits); + } + + @Test + public void testUnorderedAssignedSplitsNoLongerReturnedAsAvailableToAssign() { + List assignedSplits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + getTestSplit(generateShardId(3), Collections.emptySet())); + List unassignedSplits = + Arrays.asList( + // Shards produced by splitting parent shard + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(1))), + getTestSplit(generateShardId(5), Collections.singleton(generateShardId(1))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(6), + new HashSet<>( + Arrays.asList(generateShardId(2), generateShardId(3))))); + + SplitTracker splitTracker = new SplitTracker(false); + splitTracker.addSplits(assignedSplits); + splitTracker.addSplits(unassignedSplits); + + splitTracker.markAsAssigned(assignedSplits); + + List pendingSplits = splitTracker.splitsAvailableForAssignment(); + + assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(unassignedSplits); + } + + // ----------------------------------------------------------------------------------------- + // Shard ordering enabled + // ----------------------------------------------------------------------------------------- + + @Test + public void testOrderedAllUnassignedSplitsWithoutParentsAvailableForAssignment() { + List splitsWithoutParents = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + getTestSplit(generateShardId(3), Collections.emptySet())); + List splitsWithParents = + Arrays.asList( + // Shards produced by splitting parent shard + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(1))), + getTestSplit(generateShardId(5), Collections.singleton(generateShardId(1))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(6), + new HashSet<>( + Arrays.asList(generateShardId(2), generateShardId(3))))); + + SplitTracker splitTracker = new SplitTracker(true); + splitTracker.addSplits(splitsWithParents); + splitTracker.addSplits(splitsWithoutParents); + + List pendingSplits = splitTracker.splitsAvailableForAssignment(); + + assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(splitsWithoutParents); + } + + @Test + public void testOrderedMarkingParentSplitAsFinishedMakesChildrenAvailableForAssignment() { + List splits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(0), Collections.emptySet()), + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + // Shards produced by splitting parent shard + getTestSplit(generateShardId(3), Collections.singleton(generateShardId(0))), + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(0))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(5), + new HashSet<>( + Arrays.asList(generateShardId(1), generateShardId(2))))); + + SplitTracker splitTracker = new SplitTracker(true); + splitTracker.addSplits(splits); + + splitTracker.markAsAssigned(splits.subList(0, 3)); + // All splits without parents were assigned, no eligible splits + assertThat(splitTracker.splitsAvailableForAssignment()).isEmpty(); + + // Split 0 has 2 children (shard split) + splitTracker.markAsFinished(Collections.singletonList(splits.get(0).splitId())); + assertThat(splitTracker.splitsAvailableForAssignment()) + .containsExactlyInAnyOrder(splits.get(3), splits.get(4)); + } + + @Test + public void + testOrderedAllParentSplitShouldBeMarkedAsFinishedForChildrenToBecomeAvailableForAssignment() { + List splits = + Arrays.asList( + // Shards without parents + getTestSplit(generateShardId(0), Collections.emptySet()), + getTestSplit(generateShardId(1), Collections.emptySet()), + getTestSplit(generateShardId(2), Collections.emptySet()), + // Shards produced by splitting parent shard + getTestSplit(generateShardId(3), Collections.singleton(generateShardId(0))), + getTestSplit(generateShardId(4), Collections.singleton(generateShardId(0))), + // Shard produced by merging 2 parent shards + getTestSplit( + generateShardId(5), + new HashSet<>( + Arrays.asList(generateShardId(1), generateShardId(2))))); + + SplitTracker splitTracker = new SplitTracker(true); + splitTracker.addSplits(splits); + + splitTracker.markAsAssigned(splits.subList(0, 3)); + // All splits without parents were assigned, no eligible splits + assertThat(splitTracker.splitsAvailableForAssignment()).isEmpty(); + + // Splits 1 and 2 has 1 common child (shard merge) + // Marking split 1 as finished would not make child available, because another parent is + // still being read + splitTracker.markAsFinished(Collections.singletonList(splits.get(1).splitId())); + assertThat(splitTracker.splitsAvailableForAssignment()).isEmpty(); + + // Marking split 2 as finished makes child split available since both parent are finished + // now + splitTracker.markAsFinished(Collections.singletonList(splits.get(2).splitId())); + assertThat(splitTracker.splitsAvailableForAssignment()).containsExactly(splits.get(5)); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java index f53b1cee..6eb1e230 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java @@ -22,12 +22,14 @@ import org.apache.flink.connector.kinesis.source.util.KinesisClientProvider.ListShardItem; import org.apache.flink.connector.kinesis.source.util.KinesisClientProvider.TestingKinesisClient; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.NullAndEmptySource; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -36,7 +38,9 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import java.time.Instant; import java.util.ArrayList; @@ -54,12 +58,47 @@ class KinesisStreamProxyTest { private static final SdkHttpClient HTTP_CLIENT = ApacheHttpClient.builder().build(); + private static final String STREAM_ARN = + "arn:aws:kinesis:us-east-1:123456789012:stream/stream-name"; + + private TestingKinesisClient testKinesisClient; + private KinesisStreamProxy kinesisStreamProxy; + + @BeforeEach + public void setUp() { + testKinesisClient = new TestingKinesisClient(); + kinesisStreamProxy = new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); + } + + @Test + public void testDescribeStreamSummary() { + StreamDescriptionSummary streamDescriptionSummary = + StreamDescriptionSummary.builder() + .streamARN(STREAM_ARN) + .streamCreationTimestamp(Instant.now()) + .retentionPeriodHours(24) + .build(); + DescribeStreamSummaryResponse describeStreamSummaryResponse = + DescribeStreamSummaryResponse.builder() + .streamDescriptionSummary(streamDescriptionSummary) + .build(); + testKinesisClient.setDescribeStreamSummaryResponse(describeStreamSummaryResponse); + testKinesisClient.setDescribeStreamSummaryRequestValidation( + request -> { + DescribeStreamSummaryRequest expectedRequest = + DescribeStreamSummaryRequest.builder().streamARN(STREAM_ARN).build(); + assertThat(request).isEqualTo(expectedRequest); + }); + + StreamDescriptionSummary response = + kinesisStreamProxy.getStreamDescriptionSummary(STREAM_ARN); + + assertThat(response).isEqualTo(streamDescriptionSummary); + } + @ParameterizedTest - @NullAndEmptySource - @ValueSource(strings = {"shardId-000000000002"}) - void testListShardsSingleCall(String lastSeenShardId) { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; + @MethodSource("provideListShardStartingPosition") + void testListShardsSingleCall(final ListShardsStartingPosition startingPosition) { final List expectedShards = getTestShards(0, 3); List listShardItems = @@ -67,66 +106,67 @@ void testListShardsSingleCall(String lastSeenShardId) { ListShardItem.builder() .validation( getListShardRequestValidation( - streamArn, lastSeenShardId, null)) + STREAM_ARN, + startingPosition.getShardFilter(), + null)) .shards(expectedShards) .nextToken(null) .build()); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); testKinesisClient.setListShardsResponses(listShardItems); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - - assertThat(kinesisStreamProxy.listShards(streamArn, lastSeenShardId)) + assertThat(kinesisStreamProxy.listShards(STREAM_ARN, startingPosition)) .isEqualTo(expectedShards); } + private static Stream provideListShardStartingPosition() { + return Stream.of( + ListShardsStartingPosition.fromStart(), + ListShardsStartingPosition.fromTimestamp(Instant.ofEpochSecond(1720622954)), + ListShardsStartingPosition.fromShardId(generateShardId(12))); + } + @Test void testListShardsMultipleCalls() { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; final String lastSeenShardId = "shardId-000000000000"; final List expectedShards = getTestShards(0, 3); + ListShardsStartingPosition startingPosition = + ListShardsStartingPosition.fromShardId(lastSeenShardId); List listShardItems = Stream.of( ListShardItem.builder() .validation( getListShardRequestValidation( - streamArn, lastSeenShardId, null)) + STREAM_ARN, + startingPosition.getShardFilter(), + null)) .shards(expectedShards.subList(0, 1)) .nextToken("next-token-1") .build(), ListShardItem.builder() .validation( getListShardRequestValidation( - streamArn, null, "next-token-1")) + STREAM_ARN, null, "next-token-1")) .shards(expectedShards.subList(1, 2)) .nextToken("next-token-2") .build(), ListShardItem.builder() .validation( getListShardRequestValidation( - streamArn, null, "next-token-2")) + STREAM_ARN, null, "next-token-2")) .shards(expectedShards.subList(2, 4)) .nextToken(null) .build()) .collect(Collectors.toList()); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); testKinesisClient.setListShardsResponses(listShardItems); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - - assertThat(kinesisStreamProxy.listShards(streamArn, lastSeenShardId)) + assertThat(kinesisStreamProxy.listShards(STREAM_ARN, startingPosition)) .isEqualTo(expectedShards); } @Test void testGetRecordsInitialReadFromTrimHorizon() { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; final String shardId = "shardId-000000000002"; final StartingPosition startingPosition = StartingPosition.fromStart(); @@ -137,12 +177,11 @@ void testGetRecordsInitialReadFromTrimHorizon() { .nextShardIterator("next-iterator") .build(); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); testKinesisClient.setNextShardIterator(expectedShardIterator); testKinesisClient.setShardIteratorValidation( validateEqual( GetShardIteratorRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardId(shardId) .shardIteratorType(ShardIteratorType.TRIM_HORIZON) .build())); @@ -150,21 +189,16 @@ void testGetRecordsInitialReadFromTrimHorizon() { testKinesisClient.setGetRecordsValidation( validateEqual( GetRecordsRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardIterator(expectedShardIterator) .build())); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - - assertThat(kinesisStreamProxy.getRecords(streamArn, shardId, startingPosition)) + assertThat(kinesisStreamProxy.getRecords(STREAM_ARN, shardId, startingPosition)) .isEqualTo(expectedGetRecordsResponse); } @Test void testGetRecordsInitialReadFromTimestamp() { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; final String shardId = "shardId-000000000002"; final Instant timestamp = Instant.now(); final StartingPosition startingPosition = StartingPosition.fromTimestamp(timestamp); @@ -176,12 +210,11 @@ void testGetRecordsInitialReadFromTimestamp() { .nextShardIterator("next-iterator") .build(); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); testKinesisClient.setNextShardIterator(expectedShardIterator); testKinesisClient.setShardIteratorValidation( validateEqual( GetShardIteratorRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardId(shardId) .shardIteratorType(ShardIteratorType.AT_TIMESTAMP) .timestamp(timestamp) @@ -190,21 +223,16 @@ void testGetRecordsInitialReadFromTimestamp() { testKinesisClient.setGetRecordsValidation( validateEqual( GetRecordsRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardIterator(expectedShardIterator) .build())); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - - assertThat(kinesisStreamProxy.getRecords(streamArn, shardId, startingPosition)) + assertThat(kinesisStreamProxy.getRecords(STREAM_ARN, shardId, startingPosition)) .isEqualTo(expectedGetRecordsResponse); } @Test void testGetRecordsInitialReadFromSequenceNumber() { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; final String shardId = "shardId-000000000002"; final String sequenceNumber = "some-sequence-number"; final StartingPosition startingPosition = @@ -217,12 +245,11 @@ void testGetRecordsInitialReadFromSequenceNumber() { .nextShardIterator("next-iterator") .build(); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); testKinesisClient.setNextShardIterator(expectedShardIterator); testKinesisClient.setShardIteratorValidation( validateEqual( GetShardIteratorRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardId(shardId) .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .startingSequenceNumber(sequenceNumber) @@ -231,14 +258,11 @@ void testGetRecordsInitialReadFromSequenceNumber() { testKinesisClient.setGetRecordsValidation( validateEqual( GetRecordsRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardIterator(expectedShardIterator) .build())); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - - assertThat(kinesisStreamProxy.getRecords(streamArn, shardId, startingPosition)) + assertThat(kinesisStreamProxy.getRecords(STREAM_ARN, shardId, startingPosition)) .isEqualTo(expectedGetRecordsResponse); } @@ -262,10 +286,6 @@ void testConsecutiveGetRecordsUsesShardIteratorFromResponse() { .nextShardIterator("third-shard-iterator") .build(); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - // When read for the first time testKinesisClient.setNextShardIterator(firstShardIterator); // Then getShardIterator called @@ -308,8 +328,6 @@ void testConsecutiveGetRecordsUsesShardIteratorFromResponse() { @Test void testGetRecordsEagerlyRetriesExpiredIterators() { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; final String shardId = "shardId-000000000002"; final StartingPosition startingPosition = StartingPosition.fromStart(); @@ -321,10 +339,6 @@ void testGetRecordsEagerlyRetriesExpiredIterators() { .nextShardIterator(secondShardIterator) .build(); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - // When expired shard iterator is thrown on the first GetRecords() call AtomicBoolean firstGetRecordsCall = new AtomicBoolean(true); testKinesisClient.setNextShardIterator(firstShardIterator); @@ -340,21 +354,19 @@ void testGetRecordsEagerlyRetriesExpiredIterators() { // Then getRecords called with second shard iterator validateEqual( GetRecordsRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardIterator(secondShardIterator) .build()); }); // Then getRecords called with second shard iterator - assertThat(kinesisStreamProxy.getRecords(streamArn, shardId, startingPosition)) + assertThat(kinesisStreamProxy.getRecords(STREAM_ARN, shardId, startingPosition)) .isEqualTo(getRecordsResponse); assertThat(firstGetRecordsCall.get()).isFalse(); } @Test void testGetRecordsHandlesCompletedShard() { - final String streamArn = - "arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0"; final String shardId = "shardId-000000000002"; final String sequenceNumber = "some-sequence-number"; final StartingPosition startingPosition = @@ -365,12 +377,11 @@ void testGetRecordsHandlesCompletedShard() { final GetRecordsResponse expectedGetRecordsResponse = GetRecordsResponse.builder().records(Record.builder().build()).build(); - TestingKinesisClient testKinesisClient = new TestingKinesisClient(); testKinesisClient.setNextShardIterator(expectedShardIterator); testKinesisClient.setShardIteratorValidation( validateEqual( GetShardIteratorRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardId(shardId) .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .startingSequenceNumber(sequenceNumber) @@ -379,16 +390,13 @@ void testGetRecordsHandlesCompletedShard() { testKinesisClient.setGetRecordsValidation( validateEqual( GetRecordsRequest.builder() - .streamARN(streamArn) + .streamARN(STREAM_ARN) .shardIterator(expectedShardIterator) .build())); - KinesisStreamProxy kinesisStreamProxy = - new KinesisStreamProxy(testKinesisClient, HTTP_CLIENT); - assertThatNoException() .isThrownBy( - () -> kinesisStreamProxy.getRecords(streamArn, shardId, startingPosition)); + () -> kinesisStreamProxy.getRecords(STREAM_ARN, shardId, startingPosition)); } @Test @@ -411,12 +419,12 @@ private List getTestShards(final int startShardId, final int endShardId) } private Consumer getListShardRequestValidation( - final String streamArn, final String startShardId, final String nextToken) { + final String streamArn, final ShardFilter shardFilter, final String nextToken) { return req -> { ListShardsRequest expectedReq = ListShardsRequest.builder() .streamARN(streamArn) - .exclusiveStartShardId(startShardId) + .shardFilter(shardFilter) .nextToken(nextToken) .build(); assertThat(req).isEqualTo(expectedReq); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPositionTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPositionTest.java new file mode 100644 index 00000000..b94bba6f --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPositionTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.proxy; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; + +import java.time.Instant; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +class ListShardsStartingPositionTest { + @Test + void testShardPositionFromTimestamp() { + Instant timestamp = Instant.ofEpochMilli(1720543032243L); + ListShardsStartingPosition startingPosition = + ListShardsStartingPosition.fromTimestamp(timestamp); + + ShardFilter expected = + ShardFilter.builder() + .type(ShardFilterType.FROM_TIMESTAMP) + .timestamp(timestamp) + .build(); + + assertThat(startingPosition.getShardFilter()).isEqualTo(expected); + } + + @Test + void testShardPositionFromTimestampShouldFailOnNullTimestamp() { + assertThatThrownBy(() -> ListShardsStartingPosition.fromTimestamp(null)) + .isInstanceOf(NullPointerException.class); + } + + @Test + void testShardPositionFromShardId() { + String shardId = "shard-00000000002"; + ListShardsStartingPosition startingPosition = + ListShardsStartingPosition.fromShardId(shardId); + + ShardFilter expected = + ShardFilter.builder().type(ShardFilterType.AFTER_SHARD_ID).shardId(shardId).build(); + + assertThat(startingPosition.getShardFilter()).isEqualTo(expected); + } + + @Test + void testShardPositionFromShardIdShouldFailOnNullTimestamp() { + assertThatThrownBy(() -> ListShardsStartingPosition.fromShardId(null)) + .isInstanceOf(NullPointerException.class); + } + + @Test + void testShardPositionFromStart() { + ListShardsStartingPosition startingPosition = ListShardsStartingPosition.fromStart(); + + ShardFilter expected = + ShardFilter.builder().type(ShardFilterType.FROM_TRIM_HORIZON).build(); + + assertThat(startingPosition.getShardFilter()).isEqualTo(expected); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java index 55bd2bee..2b7703a4 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java @@ -18,10 +18,12 @@ package org.apache.flink.connector.kinesis.source.reader; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kinesis.source.event.SplitsFinishedEvent; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.model.TestData; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; @@ -29,6 +31,7 @@ import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; import org.apache.flink.connector.kinesis.source.util.KinesisContextProvider; import org.apache.flink.connector.kinesis.source.util.TestUtil; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.metrics.testutils.MetricListener; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +41,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -47,6 +51,7 @@ import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; class KinesisStreamsSourceReaderTest { + private TestingReaderContext testingReaderContext; private KinesisStreamsSourceReader sourceReader; private MetricListener metricListener; private Map shardMetricGroupMap; @@ -62,14 +67,17 @@ public void init() { FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); + testingReaderContext = + KinesisContextProvider.KinesisTestingContext.getKinesisTestingContext( + metricListener); sourceReader = new KinesisStreamsSourceReader<>( elementsQueue, - new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get), + new SingleThreadFetcherManager<>( + elementsQueue, splitReaderSupplier::get, new Configuration()), new KinesisStreamsRecordEmitter<>(null), new Configuration(), - KinesisContextProvider.KinesisTestingContext.getKinesisTestingContext( - metricListener), + testingReaderContext, shardMetricGroupMap); } @@ -90,6 +98,25 @@ void testToSplitType() throws Exception { .isEqualTo(splitState.getKinesisShardSplit()); } + @Test + void testOnSplitFinishedEventSent() { + KinesisShardSplit split = getTestSplit(); + + testingReaderContext.clearSentEvents(); + + sourceReader.onSplitFinished( + Collections.singletonMap(split.splitId(), new KinesisShardSplitState(split))); + + List events = testingReaderContext.getSentEvents(); + + Set expectedSplitIds = Collections.singleton(split.splitId()); + assertThat(events) + .singleElement() + .isInstanceOf(SplitsFinishedEvent.class) + .usingRecursiveComparison() + .isEqualTo(new SplitsFinishedEvent(expectedSplitIds)); + } + @Test void testOnSplitFinishedShardMetricGroupUnregistered() throws Exception { KinesisShardSplit split = getTestSplit(); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java index 09f27d51..7ef1080e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -90,6 +91,27 @@ void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception { assertThat(retrievedRecords.finishedSplits()).isEmpty(); } + @Test + void testSplitWithExpiredShardHandledAsCompleted() throws Exception { + // Given assigned split with expired shard + KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID); + testStreamProxy.addShards(testSplit.getShardId()); + testStreamProxy.setGetRecordsExceptionSupplier( + () -> + ResourceNotFoundException.builder() + .message("Shard " + testSplit.getShardId() + " does not exist") + .build()); + splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit))); + + // When fetching records + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + + // Then retrieve no records and mark split as complete + assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); + assertThat(retrievedRecords.nextSplit()).isNull(); + assertThat(retrievedRecords.finishedSplits()).containsExactly(testSplit.splitId()); + } + @Test void testSingleAssignedSplitAllConsumed() throws Exception { // Given assigned split with records diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java index 45f24369..72692d74 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java @@ -20,38 +20,30 @@ import org.apache.flink.core.io.VersionMismatchException; +import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.stream.Stream; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; class KinesisShardSplitSerializerTest { - @Test - void testSerializeAndDeserializeEverythingSpecified() throws Exception { - final KinesisShardSplit initialSplit = getTestSplit(); - - KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); - - byte[] serialized = serializer.serialize(initialSplit); - KinesisShardSplit deserializedSplit = - serializer.deserialize(serializer.getVersion(), serialized); - - assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); - } - @ParameterizedTest - @MethodSource("provideStartingPositions") - void testSerializeAndDeserializeWithStartingPosition(StartingPosition startingPosition) + @MethodSource("provideKinesisShardSplits") + void testSerializeAndDeserializeEverythingSpecified(KinesisShardSplit initialSplit) throws Exception { - final KinesisShardSplit initialSplit = getTestSplit(startingPosition); - KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); byte[] serialized = serializer.serialize(initialSplit); @@ -61,11 +53,39 @@ void testSerializeAndDeserializeWithStartingPosition(StartingPosition startingPo assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); } - private static Stream provideStartingPositions() { + private static Stream provideKinesisShardSplits() { return Stream.of( - StartingPosition.fromStart(), - StartingPosition.continueFromSequenceNumber("some-sequence-number"), - StartingPosition.fromTimestamp(Instant.ofEpochMilli(1683817847000L))); + getTestSplit(), + getTestSplit(generateShardId(2), Collections.singleton(generateShardId(1))), + getTestSplit( + generateShardId(5), + new HashSet<>(Arrays.asList(generateShardId(1), generateShardId(2)))), + getTestSplit(StartingPosition.fromStart()), + getTestSplit(StartingPosition.continueFromSequenceNumber("some-sequence-number")), + getTestSplit(StartingPosition.fromTimestamp(Instant.ofEpochMilli(1683817847000L)))); + } + + @Test + void testDeserializeVersion0() throws Exception { + final KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); + + final KinesisShardSplit initialSplit = + new KinesisShardSplit( + STREAM_ARN, + generateShardId(10), + StartingPosition.continueFromSequenceNumber("some-sequence-number"), + new HashSet<>(Arrays.asList(generateShardId(2), generateShardId(5)))); + + byte[] oldSerializedState = serializer.serializeV0(initialSplit); + KinesisShardSplit deserializedSplit = serializer.deserialize(0, oldSerializedState); + + assertThat(deserializedSplit) + .usingRecursiveComparison( + RecursiveComparisonConfiguration.builder() + .withIgnoredFields("parentShardIds") + .build()) + .isEqualTo(initialSplit); + assertThat(deserializedSplit.getParentShardIds()).isNotNull().matches(Set::isEmpty); } @Test @@ -76,16 +96,16 @@ void testDeserializeWrongVersion() throws Exception { KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); KinesisShardSplitSerializer wrongVersionSerializer = new WrongVersionSerializer(); - byte[] serialized = serializer.serialize(initialSplit); + byte[] serialized = wrongVersionSerializer.serialize(initialSplit); assertThatExceptionOfType(VersionMismatchException.class) .isThrownBy( () -> - wrongVersionSerializer.deserialize( - serializer.getVersion(), serialized)) + serializer.deserialize( + wrongVersionSerializer.getVersion(), serialized)) .withMessageContaining( "Trying to deserialize KinesisShardSplit serialized with unsupported version ") - .withMessageContaining(String.valueOf(wrongVersionSerializer.getVersion())) - .withMessageContaining(String.valueOf(serializer.getVersion())); + .withMessageContaining(String.valueOf(serializer.getVersion())) + .withMessageContaining(String.valueOf(wrongVersionSerializer.getVersion())); } private static class WrongVersionSerializer extends KinesisShardSplitSerializer { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java index 21ffaf14..baefeb41 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java @@ -1,8 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.kinesis.source.split; import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.Set; + import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; class KinesisShardSplitTest { @@ -11,28 +32,44 @@ class KinesisShardSplitTest { "arn:aws:kinesis:us-east-1:290038087681:stream/keeneses_stream"; private static final String SHARD_ID = "shardId-000000000002"; private static final StartingPosition STARTING_POSITION = StartingPosition.fromStart(); + private static final Set PARENT_SHARD_IDS = Collections.emptySet(); @Test void testStreamArnNull() { assertThatExceptionOfType(NullPointerException.class) - .isThrownBy(() -> new KinesisShardSplit(null, SHARD_ID, STARTING_POSITION)) + .isThrownBy( + () -> + new KinesisShardSplit( + null, SHARD_ID, STARTING_POSITION, PARENT_SHARD_IDS)) .withMessageContaining("streamArn cannot be null"); } @Test void testShardIdNull() { assertThatExceptionOfType(NullPointerException.class) - .isThrownBy(() -> new KinesisShardSplit(STREAM_ARN, null, STARTING_POSITION)) + .isThrownBy( + () -> + new KinesisShardSplit( + STREAM_ARN, null, STARTING_POSITION, PARENT_SHARD_IDS)) .withMessageContaining("shardId cannot be null"); } @Test void testStartingPositionNull() { assertThatExceptionOfType(NullPointerException.class) - .isThrownBy(() -> new KinesisShardSplit(STREAM_ARN, SHARD_ID, null)) + .isThrownBy( + () -> new KinesisShardSplit(STREAM_ARN, SHARD_ID, null, PARENT_SHARD_IDS)) .withMessageContaining("startingPosition cannot be null"); } + @Test + void testParentShardIdsNull() { + assertThatExceptionOfType(NullPointerException.class) + .isThrownBy( + () -> new KinesisShardSplit(STREAM_ARN, SHARD_ID, STARTING_POSITION, null)) + .withMessageContaining("parentShardIds cannot be null"); + } + @Test void testEquals() { EqualsVerifier.forClass(KinesisShardSplit.class).verify(); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionTest.java index c23af0ad..763510e5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.kinesis.source.split; import nl.jqno.equalsverifier.EqualsVerifier; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisClientProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisClientProvider.java index e7913c78..c691fa15 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisClientProvider.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisClientProvider.java @@ -23,6 +23,8 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisServiceClientConfiguration; import software.amazon.awssdk.services.kinesis.model.AccessDeniedException; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.ExpiredNextTokenException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; @@ -64,6 +66,8 @@ public static class TestingKinesisClient implements KinesisClient { private Consumer getShardIteratorValidation; private GetRecordsResponse getRecordsResponse; private Consumer getRecordsValidation; + private DescribeStreamSummaryResponse describeStreamSummaryResponse; + private Consumer describeStreamSummaryRequestValidation; private boolean closed = false; @Override @@ -136,6 +140,26 @@ public GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) return getRecordsResponse; } + public void setDescribeStreamSummaryResponse( + DescribeStreamSummaryResponse describeStreamSummaryResponse) { + this.describeStreamSummaryResponse = describeStreamSummaryResponse; + } + + public void setDescribeStreamSummaryRequestValidation( + Consumer describeStreamSummaryRequestValidation) { + this.describeStreamSummaryRequestValidation = describeStreamSummaryRequestValidation; + } + + @Override + public DescribeStreamSummaryResponse describeStreamSummary( + DescribeStreamSummaryRequest describeStreamSummaryRequest) + throws ResourceNotFoundException, LimitExceededException, InvalidArgumentException, + AccessDeniedException, AwsServiceException, SdkClientException, + KinesisException { + describeStreamSummaryRequestValidation.accept(describeStreamSummaryRequest); + return describeStreamSummaryResponse; + } + @Override public KinesisServiceClientConfiguration serviceClientConfiguration() { // This is not used diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java index 06511399..99ea2ad9 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java @@ -18,17 +18,21 @@ package org.apache.flink.connector.kinesis.source.util; +import org.apache.flink.connector.kinesis.source.proxy.ListShardsStartingPosition; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.split.StartingPosition; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.jetbrains.annotations.Nullable; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; +import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import java.io.IOException; +import java.time.Instant; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -52,21 +56,36 @@ public static TestKinesisStreamProxy getTestStreamProxy() { * behavior. */ public static class TestKinesisStreamProxy implements StreamProxy { + // Describe stream summary configuration + private Instant creationTimestamp = Instant.now(); + private int retentionPeriodHours = 24; // List shards configuration private final List shards = new ArrayList<>(); private Supplier listShardsExceptionSupplier; private boolean shouldRespectLastSeenShardId = true; - private String lastProvidedLastSeenShardId; + private ListShardsStartingPosition lastProvidedListShardStartingPosition; // GetRecords configuration + private Supplier getRecordsExceptionSupplier; private final Map>> storedRecords = new HashMap<>(); private boolean shouldCompleteNextShard = false; private boolean closed = false; @Override - public List listShards(String streamArn, @Nullable String lastSeenShardId) { - this.lastProvidedLastSeenShardId = lastSeenShardId; + public StreamDescriptionSummary getStreamDescriptionSummary(String streamArn) { + return StreamDescriptionSummary.builder() + .streamARN(streamArn) + .streamName(streamArn.substring(streamArn.lastIndexOf('/') + 1)) + .retentionPeriodHours(retentionPeriodHours) + .streamCreationTimestamp(creationTimestamp) + .build(); + } + + @Override + public List listShards( + String streamArn, ListShardsStartingPosition startingPosition) { + this.lastProvidedListShardStartingPosition = startingPosition; if (listShardsExceptionSupplier != null) { try { @@ -77,8 +96,11 @@ public List listShards(String streamArn, @Nullable String lastSeenShardId } List results = new ArrayList<>(); + ShardFilter shardFilter = startingPosition.getShardFilter(); for (Shard shard : shards) { - if (shouldRespectLastSeenShardId && shard.shardId().equals(lastSeenShardId)) { + if (shouldRespectLastSeenShardId + && shardFilter.type().equals(ShardFilterType.AFTER_SHARD_ID) + && shard.shardId().equals(shardFilter.shardId())) { results.clear(); continue; } @@ -92,6 +114,10 @@ public GetRecordsResponse getRecords( String streamArn, String shardId, StartingPosition startingPosition) { ShardHandle shardHandle = new ShardHandle(streamArn, shardId); + if (getRecordsExceptionSupplier != null) { + throw getRecordsExceptionSupplier.get(); + } + List records = null; if (storedRecords.containsKey(shardHandle)) { records = storedRecords.get(shardHandle).poll(); @@ -104,8 +130,18 @@ public GetRecordsResponse getRecords( .build(); } - public String getLastProvidedLastSeenShardId() { - return lastProvidedLastSeenShardId; + public void setStreamSummary(Instant creationTimestamp, int retentionPeriodHours) { + this.creationTimestamp = creationTimestamp; + this.retentionPeriodHours = retentionPeriodHours; + } + + public void setGetRecordsExceptionSupplier( + Supplier getRecordsExceptionSupplier) { + this.getRecordsExceptionSupplier = getRecordsExceptionSupplier; + } + + public ListShardsStartingPosition getLastProvidedListShardStartingPosition() { + return lastProvidedListShardStartingPosition; } public void addShards(String... shardIds) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index eb9394aa..362b20d5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -32,7 +32,9 @@ import software.amazon.awssdk.services.kinesis.model.Record; import java.time.Instant; +import java.util.Collections; import java.util.Optional; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; @@ -65,12 +67,24 @@ public static KinesisShardSplit getTestSplit(String shardId) { return getTestSplit(STREAM_ARN, shardId); } + public static KinesisShardSplit getTestSplit(String shardId, Set parentShards) { + return getTestSplit(STREAM_ARN, shardId, parentShards); + } + public static KinesisShardSplit getTestSplit(String streamArn, String shardId) { - return new KinesisShardSplit(streamArn, shardId, StartingPosition.fromStart()); + return new KinesisShardSplit( + streamArn, shardId, StartingPosition.fromStart(), Collections.emptySet()); + } + + public static KinesisShardSplit getTestSplit( + String streamArn, String shardId, Set parentShards) { + return new KinesisShardSplit( + streamArn, shardId, StartingPosition.fromStart(), parentShards); } public static KinesisShardSplit getTestSplit(StartingPosition startingPosition) { - return new KinesisShardSplit(STREAM_ARN, SHARD_ID, startingPosition); + return new KinesisShardSplit( + STREAM_ARN, SHARD_ID, startingPosition, Collections.emptySet()); } public static ReaderInfo getTestReaderInfo(final int subtaskId) {