diff --git a/.github/workflows/common.yml b/.github/workflows/common.yml index c0c0bb70..7a27bdf0 100644 --- a/.github/workflows/common.yml +++ b/.github/workflows/common.yml @@ -40,7 +40,7 @@ on: description: "The timeout in minutes for the compile and test step." required: false type: number - default: 50 + default: 30 jobs: compile_and_test: @@ -97,7 +97,8 @@ jobs: run: | set -o pipefail - mvn clean install -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \ + mvn clean install -Dflink.convergence.phase=install -Pcheck-convergence \ + -X -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \ -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \ -Dflink.version=${{ inputs.flink_version }} | tee ${{ env.MVN_BUILD_OUTPUT_FILE }} @@ -130,9 +131,45 @@ jobs: mvn clean - - name: Check licensing + - name: Print JVM thread dumps when cancelled + if: ${{ failure() }} run: | - mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \ - -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) $(pwd)" \ - ${{ env.MVN_CONNECTION_OPTIONS }} \ - -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties + # ---------------------------------------------------------------------------- + # Copyright 2023 The Netty Project + # + # ---------------------------------------------------------------------------- + # Source: https://github.com/netty/netty/blob/main/.github/actions/thread-dump-jvms/action.yml + echo "$OSTYPE" + if [[ "$OSTYPE" == "linux-gnu"* ]] && command -v sudo &> /dev/null; then + echo "Setting up JVM thread dumps" + # use jattach so that Java processes in docker containers are also covered + # download jattach + curl -s -L -o /tmp/jattach https://github.com/apangin/jattach/releases/download/v2.1/jattach + if command -v sha256sum &> /dev/null; then + # verify hash of jattach binary + sha256sum -c <(echo "07885fdc782e02e7302c6d190f54c3930afa10a38140365adf54076ec1086a8e /tmp/jattach") || exit 1 + fi + chmod +x /tmp/jattach + for java_pid in $(sudo pgrep java); do + echo "----------------------- pid $java_pid -----------------------" + echo "command line: $(sudo cat /proc/$java_pid/cmdline | xargs -0 echo)" + sudo /tmp/jattach $java_pid jcmd VM.command_line || true + sudo /tmp/jattach $java_pid jcmd "Thread.print -l" + sudo /tmp/jattach $java_pid jcmd GC.heap_info || true + done + else + for java_pid in $(jps -q -J-XX:+PerfDisableSharedMem); do + echo "----------------------- pid $java_pid -----------------------" + jcmd $java_pid VM.command_line || true + jcmd $java_pid Thread.print -l + jcmd $java_pid GC.heap_info || true + done + fi + exit 0 + +# - name: Check licensing +# run: | +# mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \ +# -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) $(pwd)" \ +# ${{ env.MVN_CONNECTION_OPTIONS }} \ +# -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index a30d95a0..a10216d5 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -20,6 +20,7 @@ name: "flink-connector-aws: nightly build" on: schedule: - cron: "0 0 * * *" + workflow_dispatch: jobs: compile_and_test: if: github.repository_owner == 'apache' diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java index 21fe050e..937b6332 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java @@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions; import io.netty.handler.timeout.ReadTimeoutException; +import org.apache.commons.lang3.RandomUtils; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -41,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.time.Duration; +import java.time.Instant; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionException; @@ -251,6 +253,7 @@ private FanOutShardSubscription openSubscriptionToShard(final StartingPosition s // Errors that occur during the subscription are surfaced here // and to the FanOutShardSubscription // (errors are ignored here once the subscription is open) + LOG.error("Error while trying to subscribe", e); if (waitForSubscriptionLatch.getCount() > 0) { exception.set(e); waitForSubscriptionLatch.countDown(); @@ -258,9 +261,16 @@ private FanOutShardSubscription openSubscriptionToShard(final StartingPosition s }) .subscriber(() -> subscription) .build(); - + int idx = RandomUtils.nextInt(); + LOG.info("Before call subscribeToShard {}", idx); kinesis.subscribeToShard(request, responseHandler); + LOG.info("After call subscribeToShard {} {}", idx, Instant.now()); + LOG.info( + "Wait {} for subscription... {} {}", + subscribeToShardTimeout.toString(), + idx, + Instant.now()); boolean subscriptionTimedOut = !waitForSubscriptionLatch.await( subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS); @@ -268,7 +278,7 @@ private FanOutShardSubscription openSubscriptionToShard(final StartingPosition s if (subscriptionTimedOut) { final String errorMessage = "Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")"; - LOG.error(errorMessage); + LOG.error("{} {} {}", errorMessage, idx, Instant.now()); subscription.cancelSubscription(); handleErrorAndRethrow(new TimeoutException(errorMessage)); } @@ -469,6 +479,7 @@ void requestRecord() { @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; + LOG.info("OnSubscribe"); waitForSubscriptionLatch.countDown(); } diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java index 9985fbf5..48df2faa 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java @@ -21,8 +21,11 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxySyncV2Interface; import com.amazonaws.kinesis.agg.RecordAggregator; +import org.apache.commons.lang3.RandomUtils; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.Consumer; import software.amazon.awssdk.services.kinesis.model.ConsumerDescription; @@ -44,6 +47,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -433,6 +437,8 @@ public SingleShardFanOutKinesisAsyncV2 build() { */ public abstract static class AbstractSingleShardFanOutKinesisAsyncV2 extends KinesisProxyAsyncV2InterfaceAdapter { + private final Logger logger = + LoggerFactory.getLogger(AbstractSingleShardFanOutKinesisAsyncV2.class); private final List requests = new ArrayList<>(); private int remainingSubscriptions; @@ -458,19 +464,29 @@ public CompletableFuture subscribeToShard( final SubscribeToShardResponseHandler responseHandler) { requests.add(request); - + logger.info("SubscribeToShard event"); return CompletableFuture.supplyAsync( () -> { + logger.info("Sending response received message"); responseHandler.responseReceived( SubscribeToShardResponse.builder().build()); + logger.info("Handling event stream"); responseHandler.onEventStream( subscriber -> { final List eventsToSend; - + logger.info( + "SubscribeToShardEvent, remainingSubscriptions={}", + remainingSubscriptions); if (remainingSubscriptions > 0) { + logger.info( + "Subscription event, remainingSubscriptions={}", + remainingSubscriptions); eventsToSend = getEventsToSend(); remainingSubscriptions--; } else { + logger.info( + "end subscription event, remainingSubscriptions={}", + remainingSubscriptions); eventsToSend = Collections.singletonList( SubscribeToShardEvent.builder() @@ -479,23 +495,15 @@ public CompletableFuture subscribeToShard( .build()); } - Subscription subscription = mock(Subscription.class); - Iterator iterator = - eventsToSend.iterator(); - - doAnswer( - a -> { - if (!iterator.hasNext()) { - completeSubscription(subscriber); - } else { - subscriber.onNext(iterator.next()); - } - - return null; - }) - .when(subscription) - .request(anyLong()); - + logger.info("Mock subscription object"); + // Subscription subscription = + // new MockSubscription( + // subscriber, + // eventsToSend, + // this::completeSubscription); + Subscription subscription = + mockSubscriptionObject(subscriber, eventsToSend); + logger.info("onSubscribe"); subscriber.onSubscribe(subscription); }); return null; @@ -506,6 +514,46 @@ void completeSubscription(Subscriber subscr subscriber.onComplete(); } + private Subscription mockSubscriptionObject( + Subscriber subscriber, + List eventsToSend) { + try { + int identifier = RandomUtils.nextInt(); + logger.info("Start mock creation (id {}) {}", identifier, Instant.now()); + Subscription subscription = mock(Subscription.class); + logger.info("Mock created, mocking response (id {}) {}", identifier, Instant.now()); + Iterator iterator = eventsToSend.iterator(); + doAnswer( + a -> { + if (!iterator.hasNext()) { + completeSubscription(subscriber); + } else { + subscriber.onNext(iterator.next()); + } + + return null; + }) + .when(subscription) + .request(anyLong()); + + logger.info("Done mocking (id {}) {}", identifier, Instant.now()); + return subscription; + } catch (Exception e) { + logger.error("Mocking failed {}", Instant.now(), e); + Thread.getAllStackTraces() + .forEach( + ((thread, stackTraceElements) -> { + logger.info( + "{}\n{}", + thread.getName(), + Arrays.stream(stackTraceElements) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n"))); + })); + throw e; + } + } + abstract List getEventsToSend(); } @@ -692,6 +740,39 @@ public void close() { } } + private static class MockSubscription implements Subscription { + + private final Subscriber subscriber; + private final Iterator eventsToSend; + private final java.util.function.Consumer> + onComplete; + + public MockSubscription( + Subscriber subscriber, + List eventsToSend, + java.util.function.Consumer> + onComplete) { + + this.subscriber = subscriber; + this.eventsToSend = eventsToSend.iterator(); + this.onComplete = onComplete; + } + + @Override + public void request(long n) { + if (eventsToSend.hasNext()) { + subscriber.onNext(eventsToSend.next()); + } else { + onComplete.accept(subscriber); + } + } + + @Override + public void cancel() { + // Nothing to do... + } + } + private static Record createRecord(final AtomicInteger sequenceNumber) { return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber); } diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/resources/log4j2-test.properties b/flink-connector-aws/flink-connector-kinesis/src/test/resources/log4j2-test.properties index 6faab06b..ace06c6d 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/resources/log4j2-test.properties +++ b/flink-connector-aws/flink-connector-kinesis/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger