Skip to content

Commit

Permalink
Add RejectedExecutionHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
kchrusciel committed Mar 16, 2020
1 parent 23f4997 commit 7cca4d5
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 0 deletions.
40 changes: 40 additions & 0 deletions RejectedExecutionHandler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>pl.codecouple</groupId>
<artifactId>RejectedExecutionHandler</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.12</maven.compiler.source>
<maven.compiler.target>1.12</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.15.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package pl.codecouple;


import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class RejectedExecutionHandlerTest {

@Test
void shouldThrowRejectedExecutionExceptionWithAbortPolicy() {
// given
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.AbortPolicy()
);
// when
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
// then
assertThatExceptionOfType(RejectedExecutionException.class)
.isThrownBy(() -> threadPool.submit(() -> sleep(10_000)));
}

@Test
void shouldNotThrowRejectedExecutionExceptionWithDiscardPolicy() {
// given
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy()
);
// when
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
// then
assertThatCode(() -> threadPool.submit(() -> sleep(10_000)))
.doesNotThrowAnyException();
}

@Test
void shouldReturnNewestElementsWithDiscardOldestPolicy() throws InterruptedException {
// given
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
// when
threadPool.execute(() -> sleep(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
threadPool.execute(() -> queue.offer("Oldest"));
threadPool.execute(() -> queue.offer("Job"));
threadPool.execute(() -> queue.offer("Newest"));

threadPool.awaitTermination(100, TimeUnit.MILLISECONDS);

List<String> results = new ArrayList<>();
queue.drainTo(results);

// then
assertThat(results).containsExactlyInAnyOrder("Job", "Newest")
.doesNotContain("Oldest");
}

@Test
void shouldSaveRejectedTasksWithCustomPolicy() {
// given
RejectedTasksRepository repository = Mockito.mock(RejectedTasksRepository.class);
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new CustomRejectedExecutionHandler(repository)
);
// when
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));

// then
verify(repository).save(anyString());
}

@Test
void shouldBlockCallerThread() {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());

threadPool.submit(() -> sleep(1_000));
threadPool.submit(() -> sleep(1_000));

long startTime = System.currentTimeMillis();
threadPool.submit(() -> sleep(1_000));

long blockedDuration = System.currentTimeMillis() - startTime;

assertThat(blockedDuration).isGreaterThanOrEqualTo(1_000);
}

void sleep(long millis) {
try {
Thread.sleep(millis);
} catch(InterruptedException e) {
e.printStackTrace();
}
}

}

class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

private final RejectedTasksRepository rejectedTasksRepository;

CustomRejectedExecutionHandler(final RejectedTasksRepository rejectedTasksRepository) {
this.rejectedTasksRepository = rejectedTasksRepository;
}

@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
rejectedTasksRepository.save(r.toString());
Executors.newFixedThreadPool(10);
}
}

interface RejectedTasksRepository {

void save(final String taskName);

}

0 comments on commit 7cca4d5

Please sign in to comment.