Skip to content

Commit

Permalink
flaky test check
Browse files Browse the repository at this point in the history
  • Loading branch information
z3d1k committed Feb 4, 2024
1 parent f39b39e commit dc05cd7
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 29 deletions.
51 changes: 44 additions & 7 deletions .github/workflows/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ on:
description: "The timeout in minutes for the compile and test step."
required: false
type: number
default: 50
default: 30

jobs:
compile_and_test:
Expand Down Expand Up @@ -97,7 +97,8 @@ jobs:
run: |
set -o pipefail
mvn clean install -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \
mvn clean install -Dflink.convergence.phase=install -Pcheck-convergence \
-X -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \
-DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \
-Dflink.version=${{ inputs.flink_version }} | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
Expand Down Expand Up @@ -130,9 +131,45 @@ jobs:
mvn clean
- name: Check licensing
- name: Print JVM thread dumps when cancelled
if: ${{ failure() }}
run: |
mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \
-Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) $(pwd)" \
${{ env.MVN_CONNECTION_OPTIONS }} \
-Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
# ----------------------------------------------------------------------------
# Copyright 2023 The Netty Project
#
# ----------------------------------------------------------------------------
# Source: https://github.com/netty/netty/blob/main/.github/actions/thread-dump-jvms/action.yml
echo "$OSTYPE"
if [[ "$OSTYPE" == "linux-gnu"* ]] && command -v sudo &> /dev/null; then
echo "Setting up JVM thread dumps"
# use jattach so that Java processes in docker containers are also covered
# download jattach
curl -s -L -o /tmp/jattach https://github.com/apangin/jattach/releases/download/v2.1/jattach
if command -v sha256sum &> /dev/null; then
# verify hash of jattach binary
sha256sum -c <(echo "07885fdc782e02e7302c6d190f54c3930afa10a38140365adf54076ec1086a8e /tmp/jattach") || exit 1
fi
chmod +x /tmp/jattach
for java_pid in $(sudo pgrep java); do
echo "----------------------- pid $java_pid -----------------------"
echo "command line: $(sudo cat /proc/$java_pid/cmdline | xargs -0 echo)"
sudo /tmp/jattach $java_pid jcmd VM.command_line || true
sudo /tmp/jattach $java_pid jcmd "Thread.print -l"
sudo /tmp/jattach $java_pid jcmd GC.heap_info || true
done
else
for java_pid in $(jps -q -J-XX:+PerfDisableSharedMem); do
echo "----------------------- pid $java_pid -----------------------"
jcmd $java_pid VM.command_line || true
jcmd $java_pid Thread.print -l
jcmd $java_pid GC.heap_info || true
done
fi
exit 0
# - name: Check licensing
# run: |
# mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \
# -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) $(pwd)" \
# ${{ env.MVN_CONNECTION_OPTIONS }} \
# -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
1 change: 1 addition & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ name: "flink-connector-aws: nightly build"
on:
schedule:
- cron: "0 0 * * *"
workflow_dispatch:
jobs:
compile_and_test:
if: github.repository_owner == 'apache'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.util.Preconditions;

import io.netty.handler.timeout.ReadTimeoutException;
import org.apache.commons.lang3.RandomUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
Expand All @@ -41,6 +42,7 @@
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -251,24 +253,32 @@ private FanOutShardSubscription openSubscriptionToShard(final StartingPosition s
// Errors that occur during the subscription are surfaced here
// and to the FanOutShardSubscription
// (errors are ignored here once the subscription is open)
LOG.error("Error while trying to subscribe", e);
if (waitForSubscriptionLatch.getCount() > 0) {
exception.set(e);
waitForSubscriptionLatch.countDown();
}
})
.subscriber(() -> subscription)
.build();

int idx = RandomUtils.nextInt();
LOG.info("Before call subscribeToShard {}", idx);
kinesis.subscribeToShard(request, responseHandler);
LOG.info("After call subscribeToShard {} {}", idx, Instant.now());

LOG.info(
"Wait {} for subscription... {} {}",
subscribeToShardTimeout.toString(),
idx,
Instant.now());
boolean subscriptionTimedOut =
!waitForSubscriptionLatch.await(
subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);

if (subscriptionTimedOut) {
final String errorMessage =
"Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")";
LOG.error(errorMessage);
LOG.error("{} {} {}", errorMessage, idx, Instant.now());
subscription.cancelSubscription();
handleErrorAndRethrow(new TimeoutException(errorMessage));
}
Expand Down Expand Up @@ -469,6 +479,7 @@ void requestRecord() {
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
LOG.info("OnSubscribe");
waitForSubscriptionLatch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxySyncV2Interface;

import com.amazonaws.kinesis.agg.RecordAggregator;
import org.apache.commons.lang3.RandomUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.Consumer;
import software.amazon.awssdk.services.kinesis.model.ConsumerDescription;
Expand All @@ -44,6 +47,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -433,6 +437,8 @@ public SingleShardFanOutKinesisAsyncV2 build() {
*/
public abstract static class AbstractSingleShardFanOutKinesisAsyncV2
extends KinesisProxyAsyncV2InterfaceAdapter {
private final Logger logger =
LoggerFactory.getLogger(AbstractSingleShardFanOutKinesisAsyncV2.class);

private final List<SubscribeToShardRequest> requests = new ArrayList<>();
private int remainingSubscriptions;
Expand All @@ -458,19 +464,29 @@ public CompletableFuture<Void> subscribeToShard(
final SubscribeToShardResponseHandler responseHandler) {

requests.add(request);

logger.info("SubscribeToShard event");
return CompletableFuture.supplyAsync(
() -> {
logger.info("Sending response received message");
responseHandler.responseReceived(
SubscribeToShardResponse.builder().build());
logger.info("Handling event stream");
responseHandler.onEventStream(
subscriber -> {
final List<SubscribeToShardEvent> eventsToSend;

logger.info(
"SubscribeToShardEvent, remainingSubscriptions={}",
remainingSubscriptions);
if (remainingSubscriptions > 0) {
logger.info(
"Subscription event, remainingSubscriptions={}",
remainingSubscriptions);
eventsToSend = getEventsToSend();
remainingSubscriptions--;
} else {
logger.info(
"end subscription event, remainingSubscriptions={}",
remainingSubscriptions);
eventsToSend =
Collections.singletonList(
SubscribeToShardEvent.builder()
Expand All @@ -479,23 +495,15 @@ public CompletableFuture<Void> subscribeToShard(
.build());
}

Subscription subscription = mock(Subscription.class);
Iterator<SubscribeToShardEvent> iterator =
eventsToSend.iterator();

doAnswer(
a -> {
if (!iterator.hasNext()) {
completeSubscription(subscriber);
} else {
subscriber.onNext(iterator.next());
}

return null;
})
.when(subscription)
.request(anyLong());

logger.info("Mock subscription object");
// Subscription subscription =
// new MockSubscription(
// subscriber,
// eventsToSend,
// this::completeSubscription);
Subscription subscription =
mockSubscriptionObject(subscriber, eventsToSend);
logger.info("onSubscribe");
subscriber.onSubscribe(subscription);
});
return null;
Expand All @@ -506,6 +514,46 @@ void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscr
subscriber.onComplete();
}

private Subscription mockSubscriptionObject(
Subscriber<? super SubscribeToShardEventStream> subscriber,
List<SubscribeToShardEvent> eventsToSend) {
try {
int identifier = RandomUtils.nextInt();
logger.info("Start mock creation (id {}) {}", identifier, Instant.now());
Subscription subscription = mock(Subscription.class);
logger.info("Mock created, mocking response (id {}) {}", identifier, Instant.now());
Iterator<SubscribeToShardEvent> iterator = eventsToSend.iterator();
doAnswer(
a -> {
if (!iterator.hasNext()) {
completeSubscription(subscriber);
} else {
subscriber.onNext(iterator.next());
}

return null;
})
.when(subscription)
.request(anyLong());

logger.info("Done mocking (id {}) {}", identifier, Instant.now());
return subscription;
} catch (Exception e) {
logger.error("Mocking failed {}", Instant.now(), e);
Thread.getAllStackTraces()
.forEach(
((thread, stackTraceElements) -> {
logger.info(
"{}\n{}",
thread.getName(),
Arrays.stream(stackTraceElements)
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n")));
}));
throw e;
}
}

abstract List<SubscribeToShardEvent> getEventsToSend();
}

Expand Down Expand Up @@ -692,6 +740,39 @@ public void close() {
}
}

private static class MockSubscription implements Subscription {

private final Subscriber<? super SubscribeToShardEventStream> subscriber;
private final Iterator<SubscribeToShardEvent> eventsToSend;
private final java.util.function.Consumer<Subscriber<? super SubscribeToShardEventStream>>
onComplete;

public MockSubscription(
Subscriber<? super SubscribeToShardEventStream> subscriber,
List<SubscribeToShardEvent> eventsToSend,
java.util.function.Consumer<Subscriber<? super SubscribeToShardEventStream>>
onComplete) {

this.subscriber = subscriber;
this.eventsToSend = eventsToSend.iterator();
this.onComplete = onComplete;
}

@Override
public void request(long n) {
if (eventsToSend.hasNext()) {
subscriber.onNext(eventsToSend.next());
} else {
onComplete.accept(subscriber);
}
}

@Override
public void cancel() {
// Nothing to do...
}
}

private static Record createRecord(final AtomicInteger sequenceNumber) {
return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
Expand Down

0 comments on commit dc05cd7

Please sign in to comment.