Skip to content

Commit

Permalink
flaky test check
Browse files Browse the repository at this point in the history
  • Loading branch information
z3d1k committed Feb 3, 2024
1 parent f39b39e commit 2931c05
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
17 changes: 9 additions & 8 deletions .github/workflows/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ on:
description: "The timeout in minutes for the compile and test step."
required: false
type: number
default: 50
default: 30

jobs:
compile_and_test:
Expand Down Expand Up @@ -97,7 +97,8 @@ jobs:
run: |
set -o pipefail
mvn clean install -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \
mvn clean install -Dflink.convergence.phase=install -Pcheck-convergence \
-X -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \
-DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \
-Dflink.version=${{ inputs.flink_version }} | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
Expand Down Expand Up @@ -130,9 +131,9 @@ jobs:
mvn clean
- name: Check licensing
run: |
mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \
-Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) $(pwd)" \
${{ env.MVN_CONNECTION_OPTIONS }} \
-Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
# - name: Check licensing
# run: |
# mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \
# -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) $(pwd)" \
# ${{ env.MVN_CONNECTION_OPTIONS }} \
# -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
3 changes: 3 additions & 0 deletions flink-connector-aws/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ under the License.
<aws.kinesis-kpl.version>0.14.1</aws.kinesis-kpl.version>
<aws.dynamodbstreams-kinesis-adapter.version>1.5.3</aws.dynamodbstreams-kinesis-adapter.version>
<hamcrest.version>1.3</hamcrest.version>

<!-- <flink.forkCountUnitTest>1</flink.forkCountUnitTest>-->
<!-- <flink.forkCountITCase>1</flink.forkCountITCase>-->
</properties>

<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.amazonaws.kinesis.agg.RecordAggregator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.Consumer;
import software.amazon.awssdk.services.kinesis.model.ConsumerDescription;
Expand Down Expand Up @@ -433,6 +435,8 @@ public SingleShardFanOutKinesisAsyncV2 build() {
*/
public abstract static class AbstractSingleShardFanOutKinesisAsyncV2
extends KinesisProxyAsyncV2InterfaceAdapter {
private final Logger logger =
LoggerFactory.getLogger(AbstractSingleShardFanOutKinesisAsyncV2.class);

private final List<SubscribeToShardRequest> requests = new ArrayList<>();
private int remainingSubscriptions;
Expand Down Expand Up @@ -461,16 +465,31 @@ public CompletableFuture<Void> subscribeToShard(

return CompletableFuture.supplyAsync(
() -> {
logger.info(
"{}: Sending response received message",
Thread.currentThread().getName());
responseHandler.responseReceived(
SubscribeToShardResponse.builder().build());
logger.info("{}: Handling event stream", Thread.currentThread().getName());
responseHandler.onEventStream(
subscriber -> {
final List<SubscribeToShardEvent> eventsToSend;

logger.info(
"{}: SubscribeToShardEvent, remainingSubscriptions={}",
Thread.currentThread().getName(),
remainingSubscriptions);
if (remainingSubscriptions > 0) {
logger.info(
"{}: Subscription event, remainingSubscriptions={}",
Thread.currentThread().getName(),
remainingSubscriptions);
eventsToSend = getEventsToSend();
remainingSubscriptions--;
} else {
logger.info(
"{}: end subscription event, remainingSubscriptions={}",
Thread.currentThread().getName(),
remainingSubscriptions);
eventsToSend =
Collections.singletonList(
SubscribeToShardEvent.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
Expand Down

0 comments on commit 2931c05

Please sign in to comment.