Skip to content

Commit

Permalink
Merge pull request #79 from tkowalcz/77-agrona-sleeping-more
Browse files Browse the repository at this point in the history
77 agrona sleeping more
  • Loading branch information
tkowalcz authored Oct 23, 2021
2 parents aeec8d5 + 6f54179 commit 1bcc532
Show file tree
Hide file tree
Showing 20 changed files with 75 additions and 31 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ You can compare this with [design principles of Aeron](https://github.com/real-l
. ▼
┌────────────────────┐ ┌─────────┐ ┌─────────┐
. Log │ │ │ Reading │ │ I/O │
---------------▶│ Log buffer ├--→---→--┤ agent ├--→---→--┤ thread │
. │ │ │ thread │ │ (Netty) │
. Log │ │ │ Log │ │ I/O │
---------------▶│ Log buffer ├--→---→--┤ shipper ├--→---→--┤ thread │
. │ │ │ agent │ │ (Netty) │
└────────────────────┘ └─────────┘ └─────────┘
. ▲
Expand Down
3 changes: 2 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.io.IOException;

public class LogBufferAgent implements Agent {
public class LogShipperAgent implements Agent {

public static final int MAX_MESSAGES_TO_RETRIEVE = 100;

Expand All @@ -18,7 +18,7 @@ public class LogBufferAgent implements Agent {
private final LogBufferMessageHandler messageHandler;
private final TimeCappedBatchingStrategy batchStrategy;

public LogBufferAgent(
public LogShipperAgent(
TimeCappedBatchingStrategy batchStrategy,
ManyToOneRingBuffer logBuffer,
OutputBuffer outputBuffer,
Expand Down Expand Up @@ -69,6 +69,6 @@ public void onClose() {

@Override
public String roleName() {
return "ReadingLogBufferAndSendingHttp";
return "LogShipper";
}
}
8 changes: 5 additions & 3 deletions core/src/main/java/pl/tkowalcz/tjahzi/TjahziInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ public LoggingSystem createLoggingSystem(
long batchSizeBytes,
long batchWaitMillis,
int bufferSizeBytes,
long logShipperWakeupIntervalMillis,
long shutdownTimeoutMillis,
boolean offHeap) {
boolean offHeap
) {
bufferSizeBytes = findNearestPowerOfTwo(bufferSizeBytes);
ByteBuffer javaBuffer = allocateJavaBuffer(bufferSizeBytes, offHeap);

Expand All @@ -33,7 +35,7 @@ public LoggingSystem createLoggingSystem(
);

OutputBuffer outputBuffer = new OutputBuffer(PooledByteBufAllocator.DEFAULT.buffer());
LogBufferAgent agent = new LogBufferAgent(
LogShipperAgent agent = new LogShipperAgent(
new TimeCappedBatchingStrategy(
monitoringModule.getClock(),
outputBuffer,
Expand All @@ -52,7 +54,7 @@ public LoggingSystem createLoggingSystem(
);

AgentRunner runner = new AgentRunner(
new SleepingMillisIdleStrategy(),
new SleepingMillisIdleStrategy(logShipperWakeupIntervalMillis),
monitoringModule::addAgentError,
null,
agent
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/java/pl/tkowalcz/tjahzi/HeadersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ void shouldIncludeAdditionalHeaders() {
0,
0,
1024 * 1024,
250,
10_000,
false
);
Expand Down Expand Up @@ -141,6 +142,7 @@ void shouldHandleCaseWithNoAdditionalHeaders() {
0,
0,
1024 * 1024,
250,
10_000,
false
);
Expand Down Expand Up @@ -195,6 +197,7 @@ void shouldNotOverrideCrucialHeaders() {
0,
0,
1024 * 1024,
250,
10_000,
false
);
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/java/pl/tkowalcz/tjahzi/LogBufferAgentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void shouldSendDataIfOverSizeLimit() throws IOException {
NettyHttpClient httpClient = mock(NettyHttpClient.class);

OutputBuffer outputBuffer = new OutputBuffer(PooledByteBufAllocator.DEFAULT.buffer());
LogBufferAgent agent = new LogBufferAgent(
LogShipperAgent agent = new LogShipperAgent(
new TimeCappedBatchingStrategy(
clock,
outputBuffer,
Expand Down Expand Up @@ -98,7 +98,7 @@ void shouldNotSendDataBelowSizeLimit() throws IOException {

NettyHttpClient httpClient = mock(NettyHttpClient.class);
OutputBuffer outputBuffer = new OutputBuffer(PooledByteBufAllocator.DEFAULT.buffer());
LogBufferAgent agent = new LogBufferAgent(
LogShipperAgent agent = new LogShipperAgent(
new TimeCappedBatchingStrategy(
clock,
outputBuffer,
Expand Down Expand Up @@ -137,7 +137,7 @@ void shouldNotAttemptSendingDataIfThereIsNothingToSend() throws IOException {

NettyHttpClient httpClient = mock(NettyHttpClient.class);
OutputBuffer outputBuffer = new OutputBuffer(PooledByteBufAllocator.DEFAULT.buffer());
LogBufferAgent agent = new LogBufferAgent(
LogShipperAgent agent = new LogShipperAgent(
new TimeCappedBatchingStrategy(
clock,
outputBuffer,
Expand Down Expand Up @@ -186,7 +186,7 @@ void shouldSendDataBelowSizeLimitIfTimeoutExpires() throws IOException {
SettableClock clock = new SettableClock();

OutputBuffer outputBuffer = new OutputBuffer(PooledByteBufAllocator.DEFAULT.buffer());
LogBufferAgent agent = new LogBufferAgent(
LogShipperAgent agent = new LogShipperAgent(
new TimeCappedBatchingStrategy(
clock,
outputBuffer,
Expand Down Expand Up @@ -242,7 +242,7 @@ void shouldDrainAllMessagesOnClose() throws IOException {
SettableClock clock = new SettableClock();

OutputBuffer outputBuffer = new OutputBuffer(PooledByteBufAllocator.DEFAULT.buffer());
LogBufferAgent agent = new LogBufferAgent(
LogShipperAgent agent = new LogShipperAgent(
new TimeCappedBatchingStrategy(
clock,
outputBuffer,
Expand All @@ -260,7 +260,7 @@ void shouldDrainAllMessagesOnClose() throws IOException {
)
);

for (int i = 0; i < LogBufferAgent.MAX_MESSAGES_TO_RETRIEVE * 2 + 1; i++) {
for (int i = 0; i < LogShipperAgent.MAX_MESSAGES_TO_RETRIEVE * 2 + 1; i++) {
logger.log(
42L,
LabelSerializerCreator.from(Map.of()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
class LoggingSystemSanityCheckTest {

@Container
public GenericContainer loki = new GenericContainer("grafana/loki:latest")
public GenericContainer loki = new GenericContainer("grafana/loki:2.3.0")
.withCommand("-config.file=/etc/loki-config.yaml")
.withClasspathResourceMapping("loki-config.yaml",
"/etc/loki-config.yaml",
Expand Down Expand Up @@ -67,6 +67,7 @@ void setUp() {
0,
0,
1024 * 1024,
250,
10_000,
false
);
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/pl/tkowalcz/tjahzi/LoggingSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
class LoggingSystemTest {

@Container
public GenericContainer loki = new GenericContainer("grafana/loki:latest")
public GenericContainer loki = new GenericContainer("grafana/loki:2.3.0")
.withCommand("-config.file=/etc/loki-config.yaml")
.withClasspathResourceMapping("loki-config.yaml",
"/etc/loki-config.yaml",
Expand Down Expand Up @@ -71,6 +71,7 @@ void setUp() {
1024 * 1024,
0,
0,
250,
10_000,
false
);
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/pl/tkowalcz/tjahzi/ReconnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void shouldEventuallyReconnectIfLokiWasDownWhenStarting() {
10_000,
1000,
1024 * 1024,
250,
10_000,
false
);
Expand Down Expand Up @@ -134,6 +135,7 @@ void shouldReconnectIfLokiFailed() {
0,
0,
1024 * 1024,
250,
10_000,
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void shouldStopThreads() {
0,
0,
1024 * 1024,
250,
10_000,
false
);
Expand All @@ -83,7 +84,7 @@ void shouldStopThreads() {
assertThat(threadInfos)
.extracting(ThreadInfo::getThreadName)
.contains(
"ReadingLogBufferAndSendingHttp",
"LogShipper",
"tjahzi-worker"
);
});
Expand All @@ -102,7 +103,7 @@ void shouldStopThreads() {
assertThat(threadInfos)
.extracting(ThreadInfo::getThreadName)
.doesNotContain(
"ReadingLogBufferAndSendingHttp",
"LogShipper",
"tjahzi-worker"
);
});
Expand Down
7 changes: 7 additions & 0 deletions log4j2-appender/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ Like
in [promtail configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/) `maximum amount of time to wait before sending a batch, even if that batch isn't full`
.

#### logShipperWakeupIntervalMillis (optional, default = 10)

The agent that reads data from log buffer, compresses it and sends to Loki via http is called `LogShipper`. This property
controls how often it wakes up to perform its duties. Other properties control how often the data should be sent to Loki
(`batchSize`, `batchWait`) this one just control how often to wake up and check for these conditions. In versions before
`0.9.17` it was left at default 1ms which caused high CPU usage on some setups.

#### shutdownTimeoutSeconds (optional, default = 10s)

On logging system shutdown (or config reload) Tjahzi will flush its internal buffers so that no logs are lost. This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class LokiAppenderBuilder<B extends LokiAppenderBuilder<B>> extends Abstr
@PluginBuilderAttribute
private long batchWait = 5;

@PluginBuilderAttribute
private long logShipperWakeupIntervalMillis = 10;

@PluginBuilderAttribute
private int shutdownTimeoutSeconds = 10;

Expand Down Expand Up @@ -141,6 +144,7 @@ public LokiAppender build() {
batchSize,
TimeUnit.SECONDS.toMillis(batchWait),
bufferSizeBytes,
logShipperWakeupIntervalMillis,
TimeUnit.SECONDS.toMillis(shutdownTimeoutSeconds),
isUseOffHeapBuffer()
);
Expand Down Expand Up @@ -247,6 +251,14 @@ public void setBatchWait(long batchWait) {
this.batchWait = batchWait;
}

public long getLogShipperWakeupIntervalMillis() {
return logShipperWakeupIntervalMillis;
}

public void setLogShipperWakeupIntervalMillis(long logShipperWakeupIntervalMillis) {
this.logShipperWakeupIntervalMillis = logShipperWakeupIntervalMillis;
}

public int getShutdownTimeoutSeconds() {
return shutdownTimeoutSeconds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import pl.tkowalcz.tjahzi.stats.DropwizardMonitoringModule;

Expand Down Expand Up @@ -43,9 +44,12 @@ void shouldInjectMonitoringAndUseIt() {
.get("appender.loki.httpConnectAttempts");
assertThat(connectAttemptsCounter).isNotNull();

await().untilAsserted(() -> {
assertThat(connectAttemptsCounter.getCount()).isEqualTo(1);
}
);
await()
.atMost(Durations.ONE_MINUTE)
.pollInterval(Durations.ONE_SECOND)
.untilAsserted(() -> {
assertThat(connectAttemptsCounter.getCount()).isEqualTo(1);
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class IntegrationTest {

@Container
public GenericContainer loki = new GenericContainer("grafana/loki:latest")
public GenericContainer loki = new GenericContainer("grafana/loki:2.3.0")
.withCommand("-config.file=/etc/loki-config.yaml")
.withClasspathResourceMapping("loki-config.yaml",
"/etc/loki-config.yaml",
Expand Down Expand Up @@ -51,10 +51,8 @@ public static LoggerContext loadConfig(String fileName) {

return loadConfig(uri);
} catch (URISyntaxException e) {
Assertions.fail(e);
return Assertions.fail(e);
}

return null;
}

public static LoggerContext loadConfig(URI uri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ThresholdFilter level="ALL"/>
<PatternLayout>
<Pattern>%X{ctx:tid} [%t] %d{DEFAULT} %5p %c{1} - %m%n%exception{full}</Pattern>
<Pattern>[%t] %d{DEFAULT} %5p %c{1} - %m%n%exception{full}</Pattern>
</PatternLayout>

<Header name="server" value="127.0.0.1"/>
Expand Down
1 change: 1 addition & 0 deletions logback-appender/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,5 @@ or use Tjahzi optimized efficient, low allocation encoder:
| UseOffHeapBuffer | true | Whether Tjahzi should allocate native buffer for `Log buffer` component. We can go into a rabbit hole of divagations what are the implications of this. Most important in our view is that having 10s or 100s of MB of space taken out of heap is not very friendly to garbage collector which might have to occasionally copy it around. |
| BatchSize | 100 KB | Like in [promtail configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/) `maximum batch size (in bytes) of logs to accumulate before sending the batch to Loki`.|
| BatchWait | 5s | Like in [promtail configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/) `maximum amount of time to wait before sending a batch, even if that batch isn't full`.|
| logShipperWakeupIntervalMillis | 10 | The agent that reads data from log buffer, compresses it and sends to Loki via http is called `LogShipper`. This property controls how often it wakes up to perform its duties. Other properties control how often the data should be sent to Loki (`batchSize`, `batchWait`) this one just control how often to wake up and check for these conditions. In versions before `0.9.17` it was left at default 1ms which caused high CPU usage on some setups. |
| shutdownTimeoutSeconds | 10s | On logging system shutdown (or config reload) Tjahzi will flush its internal buffers so that no logs are lost. This property sets limit on how long to wait for this to complete before proceeding with shutdown. |
3 changes: 2 additions & 1 deletion logback-appender/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public abstract class LokiAppenderConfigurator extends UnsynchronizedAppenderBas
private long batchSize = 10_2400;
private long batchWait = 5;
private long shutdownTimeoutSeconds = 10;
private long logShipperWakeupIntervalMillis = 10;

private int maxRequestsInFlight = 100;

Expand Down Expand Up @@ -120,6 +121,14 @@ public void setShutdownTimeoutSeconds(long shutdownTimeoutSeconds) {
this.shutdownTimeoutSeconds = shutdownTimeoutSeconds;
}

public long getLogShipperWakeupIntervalMillis() {
return logShipperWakeupIntervalMillis;
}

public void setLogShipperWakeupIntervalMillis(long logShipperWakeupIntervalMillis) {
this.logShipperWakeupIntervalMillis = logShipperWakeupIntervalMillis;
}

public int getMaxRequestsInFlight() {
return maxRequestsInFlight;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public LoggingSystem createAppender() {
configurator.getBatchSize(),
TimeUnit.SECONDS.toMillis(configurator.getBatchWait()),
bufferSizeBytes,
configurator.getLogShipperWakeupIntervalMillis(),
TimeUnit.SECONDS.toMillis(configurator.getShutdownTimeoutSeconds()),
configurator.isUseOffHeapBuffer()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class IntegrationTest {

@Container
public GenericContainer loki = new GenericContainer("grafana/loki:latest")
public GenericContainer loki = new GenericContainer("grafana/loki:2.3.0")
.withCommand("-config.file=/etc/loki-config.yaml")
.withClasspathResourceMapping("loki-config.yaml",
"/etc/loki-config.yaml",
Expand Down

0 comments on commit 1bcc532

Please sign in to comment.