Skip to content

Commit

Permalink
[FLINK-32218][Connector/Kinesis] Add support for parent-child shard o…
Browse files Browse the repository at this point in the history
…rdering
  • Loading branch information
z3d1k committed Jul 10, 2024
1 parent 7f3483c commit 93d96e7
Show file tree
Hide file tree
Showing 34 changed files with 1,754 additions and 395 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ public class KinesisStreamsSource<T>
private final Configuration sourceConfig;
private final KinesisDeserializationSchema<T> deserializationSchema;
private final KinesisShardAssigner kinesisShardAssigner;
private final boolean preserveShardOrder;

KinesisStreamsSource(
String streamArn,
Configuration sourceConfig,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner kinesisShardAssigner) {
KinesisShardAssigner kinesisShardAssigner,
boolean preserveShardOrder) {
Preconditions.checkNotNull(streamArn);
Preconditions.checkArgument(!streamArn.isEmpty(), "stream ARN cannot be empty string");
Preconditions.checkNotNull(sourceConfig);
Expand All @@ -103,6 +105,7 @@ public class KinesisStreamsSource<T>
this.sourceConfig = sourceConfig;
this.deserializationSchema = deserializationSchema;
this.kinesisShardAssigner = kinesisShardAssigner;
this.preserveShardOrder = preserveShardOrder;
}

/**
Expand Down Expand Up @@ -167,7 +170,8 @@ public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> c
sourceConfig,
createKinesisStreamProxy(sourceConfig),
kinesisShardAssigner,
checkpoint);
checkpoint,
preserveShardOrder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class KinesisStreamsSourceBuilder<T> {
private Configuration sourceConfig;
private KinesisDeserializationSchema<T> deserializationSchema;
private KinesisShardAssigner kinesisShardAssigner = ShardAssignerFactory.uniformShardAssigner();
private boolean preserveShardOrder = true;

public KinesisStreamsSourceBuilder<T> setStreamArn(String streamArn) {
this.streamArn = streamArn;
Expand Down Expand Up @@ -84,8 +85,17 @@ public KinesisStreamsSourceBuilder<T> setKinesisShardAssigner(
return this;
}

public KinesisStreamsSourceBuilder<T> setPreserveShardOrder(boolean preserveShardOrder) {
this.preserveShardOrder = preserveShardOrder;
return this;
}

public KinesisStreamsSource<T> build() {
return new KinesisStreamsSource<>(
streamArn, sourceConfig, deserializationSchema, kinesisShardAssigner);
streamArn,
sourceConfig,
deserializationSchema,
kinesisShardAssigner,
preserveShardOrder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.time.Instant;
import java.util.Optional;

import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
Expand All @@ -42,20 +43,25 @@ private KinesisStreamsSourceConfigUtil() {
* configuration.
*
* @param sourceConfig the configuration to parse timestamp from
* @return the timestamp
* @return {@link Optional} containing the initial timestamp if configured, an empty {@link
* Optional} otherwise
*/
public static Date parseStreamTimestampStartingPosition(final Configuration sourceConfig) {
public static Optional<Instant> parseStreamTimestampStartingPosition(
final Configuration sourceConfig) {
Preconditions.checkNotNull(sourceConfig);
String timestamp = sourceConfig.get(STREAM_INITIAL_TIMESTAMP);
if (timestamp == null || timestamp.isEmpty()) {
return Optional.empty();
}

try {
String format = sourceConfig.get(STREAM_TIMESTAMP_DATE_FORMAT);
SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
return customDateFormat.parse(timestamp);
return Optional.of(customDateFormat.parse(timestamp).toInstant());
} catch (IllegalArgumentException | NullPointerException exception) {
throw new IllegalArgumentException(exception);
} catch (ParseException exception) {
return new Date((long) (Double.parseDouble(timestamp) * 1000));
return Optional.of(Instant.ofEpochMilli((long) (Double.parseDouble(timestamp) * 1000)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kinesis.source.enumerator;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;

import java.util.Objects;

/** Kinesis shard split with assignment status. */
@Internal
public class KinesisShardSplitWithAssignmentStatus {
private final KinesisShardSplit kinesisShardSplit;
private final SplitAssignmentStatus splitAssignmentStatus;

public KinesisShardSplitWithAssignmentStatus(
KinesisShardSplit kinesisShardSplit, SplitAssignmentStatus splitAssignmentStatus) {
this.kinesisShardSplit = kinesisShardSplit;
this.splitAssignmentStatus = splitAssignmentStatus;
}

public KinesisShardSplit split() {
return kinesisShardSplit;
}

public SplitAssignmentStatus assignmentStatus() {
return splitAssignmentStatus;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KinesisShardSplitWithAssignmentStatus that = (KinesisShardSplitWithAssignmentStatus) o;
return Objects.equals(kinesisShardSplit, that.kinesisShardSplit)
&& Objects.equals(splitAssignmentStatus, that.splitAssignmentStatus);
}

@Override
public int hashCode() {
return Objects.hash(kinesisShardSplit, splitAssignmentStatus);
}

@Override
public String toString() {
return "KinesisShardSplitWithAssignmentStatus{"
+ "kinesisShardSplit="
+ kinesisShardSplit
+ ", splitAssignmentStatus="
+ splitAssignmentStatus
+ '}';
}
}
Loading

0 comments on commit 93d96e7

Please sign in to comment.