diff --git a/README.adoc b/README.adoc
index 6fda0d4..710aa0c 100644
--- a/README.adoc
+++ b/README.adoc
@@ -37,6 +37,7 @@ lookups.hostname.file,etc/hostname.json,Path to username-to-hostname lookup tabl
lookups.appname.file,etc/appname.json,Path to username-to-appname lookup table
payload.splitRegex, \n (newline), A regex based on which incoming requests will be split into multiple outgoing messages
payload.splitEnabled, false, Sets whether splitting incoming messages by splitRegex is enabled
+prometheus.port, 1234, Port used by the server that provides DropWizard metrics
|===
=== Lookup tables
diff --git a/etc/config.properties b/etc/config.properties
index 722a5b7..1e5ea81 100644
--- a/etc/config.properties
+++ b/etc/config.properties
@@ -22,3 +22,5 @@ lookups.appname.file=etc/appname.json
payload.splitRegex=\n
payload.splitEnabled=false
+
+prometheus.port=1234
diff --git a/pom.xml b/pom.xml
index 20b14cf..36bf106 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,8 +15,10 @@
2.23.1
${java.version}
${java.version}
+ 4.2.8
4.1.108.Final
UTF-8
+ 0.16.0
0.0.1
1.0.1
4.0.1
@@ -90,6 +92,43 @@
netty-handler
${netty.version}
+
+
+ io.dropwizard.metrics
+ metrics-core
+ ${metrics.version}
+
+
+ io.dropwizard.metrics
+ metrics-jmx
+ ${metrics.version}
+
+
+ io.prometheus
+ simpleclient
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_dropwizard
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_servlet
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_hotspot
+ ${prometheus-simpleclient.version}
+
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ 10.0.15
+
org.apache.logging.log4j
diff --git a/src/main/java/com/teragrep/lsh_01/HttpServer.java b/src/main/java/com/teragrep/lsh_01/HttpServer.java
new file mode 100644
index 0000000..8c2eccb
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/HttpServer.java
@@ -0,0 +1,25 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01;
+
+import java.io.Closeable;
+
+public interface HttpServer extends Runnable, Closeable {
+}
diff --git a/src/main/java/com/teragrep/lsh_01/Main.java b/src/main/java/com/teragrep/lsh_01/Main.java
index ba5c235..9249639 100644
--- a/src/main/java/com/teragrep/lsh_01/Main.java
+++ b/src/main/java/com/teragrep/lsh_01/Main.java
@@ -19,13 +19,20 @@
*/
package com.teragrep.lsh_01;
+import com.codahale.metrics.MetricRegistry;
import com.teragrep.lsh_01.authentication.BasicAuthentication;
import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory;
import com.teragrep.lsh_01.config.*;
+import com.teragrep.lsh_01.metrics.HttpReport;
+import com.teragrep.lsh_01.metrics.JmxReport;
+import com.teragrep.lsh_01.metrics.Report;
+import com.teragrep.lsh_01.metrics.Slf4jReport;
import com.teragrep.lsh_01.pool.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
+
public class Main {
private final static Logger LOGGER = LogManager.getLogger(Main.class);
@@ -38,6 +45,7 @@ public static void main(String[] args) {
InternalEndpointUrlConfig internalEndpointUrlConfig = new InternalEndpointUrlConfig();
LookupConfig lookupConfig = new LookupConfig();
PayloadConfig payloadConfig = new PayloadConfig();
+ MetricsConfig metricsConfig = new MetricsConfig();
try {
nettyConfig.validate();
relpConfig.validate();
@@ -45,6 +53,7 @@ public static void main(String[] args) {
internalEndpointUrlConfig.validate();
lookupConfig.validate();
payloadConfig.validate();
+ metricsConfig.validate();
}
catch (IllegalArgumentException e) {
LOGGER.error("Can't parse config properly: {}", e.getMessage());
@@ -57,28 +66,32 @@ public static void main(String[] args) {
LOGGER.info("Got payload config: <[{}]>", payloadConfig);
LOGGER.info("Authentication required: <[{}]>", securityConfig.authRequired);
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ // metrics
+ MetricRegistry metricRegistry = new MetricRegistry();
+ Report report = new Slf4jReport(
+ new JmxReport(new HttpReport(metricRegistry, metricsConfig.prometheusPort), metricRegistry),
+ metricRegistry
+ );
+
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, metricRegistry);
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
- RelpConversion relpConversion = new RelpConversion(
- pool,
- securityConfig,
- basicAuthentication,
- lookupConfig,
- payloadConfig
+ IMessageHandler relpConversion = new MetricRelpConversion(
+ new RelpConversion(pool, securityConfig, basicAuthentication, lookupConfig, payloadConfig),
+ metricRegistry
);
try (
- NettyHttpServer server = new NettyHttpServer(
- nettyConfig,
- relpConversion,
- null,
- 200,
- internalEndpointUrlConfig
+ HttpServer server = new MetricHttpServer(
+ new NettyHttpServer(nettyConfig, relpConversion, null, 200, internalEndpointUrlConfig),
+ report
)
) {
server.run();
}
+ catch (IOException e) {
+ throw new IllegalArgumentException("Failed to close the server: " + e.getMessage());
+ }
finally {
pool.close();
}
diff --git a/src/main/java/com/teragrep/lsh_01/MetricHttpServer.java b/src/main/java/com/teragrep/lsh_01/MetricHttpServer.java
new file mode 100644
index 0000000..6575046
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/MetricHttpServer.java
@@ -0,0 +1,54 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01;
+
+import com.teragrep.lsh_01.metrics.Report;
+
+import java.io.IOException;
+
+/**
+ * Decorator for a HttpServer that applies metrics.
+ */
+public class MetricHttpServer implements HttpServer {
+
+ private final HttpServer server;
+ private final Report report;
+
+ public MetricHttpServer(HttpServer server, Report report) {
+ this.server = server;
+ this.report = report;
+ }
+
+ @Override
+ public void run() {
+ report.start();
+ server.run();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ server.close();
+ }
+ finally {
+ report.close();
+ }
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/MetricRelpConversion.java b/src/main/java/com/teragrep/lsh_01/MetricRelpConversion.java
new file mode 100644
index 0000000..9d58257
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/MetricRelpConversion.java
@@ -0,0 +1,75 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
+import com.teragrep.lsh_01.authentication.Subject;
+
+import java.util.Map;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * Decorator for IMessageHandler. Responsible for reporting metrics.
+ */
+public class MetricRelpConversion implements IMessageHandler {
+
+ private final IMessageHandler relpConversion;
+ private final MetricRegistry metricRegistry;
+ private final Timer sendLatency;
+
+ public MetricRelpConversion(IMessageHandler relpConversion, MetricRegistry metricRegistry) {
+ this.relpConversion = relpConversion;
+ this.metricRegistry = metricRegistry;
+ this.sendLatency = metricRegistry
+ .timer(name(MetricRelpConversion.class, "sendLatency"), () -> new Timer(new SlidingWindowReservoir(10000)));
+ }
+
+ @Override
+ public boolean onNewMessage(Subject subject, Map headers, String body) {
+ boolean sent;
+ try (Timer.Context ctx = sendLatency.time()) {
+ sent = relpConversion.onNewMessage(subject, headers, body);
+ }
+ return sent;
+ }
+
+ @Override
+ public IMessageHandler copy() {
+ return new MetricRelpConversion(relpConversion.copy(), metricRegistry);
+ }
+
+ @Override
+ public Subject asSubject(String token) {
+ return relpConversion.asSubject(token);
+ }
+
+ @Override
+ public boolean requiresToken() {
+ return relpConversion.requiresToken();
+ }
+
+ @Override
+ public Map responseHeaders() {
+ return relpConversion.responseHeaders();
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/NettyHttpServer.java b/src/main/java/com/teragrep/lsh_01/NettyHttpServer.java
index ae80997..353bc95 100644
--- a/src/main/java/com/teragrep/lsh_01/NettyHttpServer.java
+++ b/src/main/java/com/teragrep/lsh_01/NettyHttpServer.java
@@ -31,7 +31,6 @@
import com.teragrep.lsh_01.util.CustomRejectedExecutionHandler;
import com.teragrep.lsh_01.util.SslHandlerProvider;
-import java.io.Closeable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -41,7 +40,7 @@
/**
* Created by joaoduarte on 11/10/2017.
*/
-public class NettyHttpServer implements Runnable, Closeable {
+public class NettyHttpServer implements HttpServer {
private final ServerBootstrap serverBootstrap;
private final String host;
diff --git a/src/main/java/com/teragrep/lsh_01/config/MetricsConfig.java b/src/main/java/com/teragrep/lsh_01/config/MetricsConfig.java
new file mode 100644
index 0000000..382249a
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/config/MetricsConfig.java
@@ -0,0 +1,44 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.config;
+
+public class MetricsConfig implements Validateable {
+
+ public final int prometheusPort;
+
+ public MetricsConfig() {
+ PropertiesReaderUtilityClass propertiesReader = new PropertiesReaderUtilityClass(
+ System.getProperty("properties.file", "etc/config.properties")
+ );
+ prometheusPort = propertiesReader.getIntProperty("prometheus.port");
+ }
+
+ @Override
+ public void validate() {
+ if (prometheusPort < 1 || prometheusPort > 65535) {
+ throw new IllegalArgumentException("prometheus.port wasn't a valid port");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RelpConfig{" + "prometheusPort=" + prometheusPort + "}";
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/metrics/HttpReport.java b/src/main/java/com/teragrep/lsh_01/metrics/HttpReport.java
new file mode 100644
index 0000000..2d7cf9b
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/metrics/HttpReport.java
@@ -0,0 +1,73 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.dropwizard.DropwizardExports;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import java.io.IOException;
+
+public final class HttpReport implements Report {
+
+ private final Server jettyServer;
+ private final MetricRegistry metricRegistry;
+
+ public HttpReport(MetricRegistry metricRegistry, int prometheusPort) {
+ this.metricRegistry = metricRegistry;
+ jettyServer = new Server(prometheusPort);
+ }
+
+ @Override
+ public void start() {
+ // prometheus-exporter
+ CollectorRegistry.defaultRegistry.register(new DropwizardExports(metricRegistry));
+
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ jettyServer.setHandler(context);
+
+ MetricsServlet metricsServlet = new MetricsServlet();
+ ServletHolder servletHolder = new ServletHolder(metricsServlet);
+ context.addServlet(servletHolder, "/metrics");
+
+ // Start the webserver.
+ try {
+ jettyServer.start();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ jettyServer.stop();
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/metrics/JmxReport.java b/src/main/java/com/teragrep/lsh_01/metrics/JmxReport.java
new file mode 100644
index 0000000..b4738fe
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/metrics/JmxReport.java
@@ -0,0 +1,48 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jmx.JmxReporter;
+
+import java.io.IOException;
+
+public final class JmxReport implements Report {
+
+ private final Report report;
+ private final JmxReporter jmxReporter;
+
+ public JmxReport(Report report, MetricRegistry metricRegistry) {
+ this.report = report;
+ this.jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
+ }
+
+ @Override
+ public void start() {
+ jmxReporter.start();
+ report.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ report.close();
+ jmxReporter.close();
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/metrics/Report.java b/src/main/java/com/teragrep/lsh_01/metrics/Report.java
new file mode 100644
index 0000000..2c05202
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/metrics/Report.java
@@ -0,0 +1,27 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.metrics;
+
+import java.io.Closeable;
+
+public interface Report extends Closeable {
+
+ void start();
+}
diff --git a/src/main/java/com/teragrep/lsh_01/metrics/Slf4jReport.java b/src/main/java/com/teragrep/lsh_01/metrics/Slf4jReport.java
new file mode 100644
index 0000000..1d5d08d
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/metrics/Slf4jReport.java
@@ -0,0 +1,55 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public final class Slf4jReport implements Report {
+
+ private final Report report;
+ private final Slf4jReporter slf4jReporter;
+
+ public Slf4jReport(Report report, MetricRegistry metricRegistry) {
+ this.report = report;
+ this.slf4jReporter = Slf4jReporter
+ .forRegistry(metricRegistry)
+ .outputTo(LoggerFactory.getLogger(Slf4jReport.class))
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ @Override
+ public void start() {
+ slf4jReporter.start(1, TimeUnit.MINUTES);
+ report.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ report.close();
+ slf4jReporter.close();
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java
index bbb2f36..e10b333 100644
--- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java
+++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java
@@ -19,6 +19,8 @@
*/
package com.teragrep.lsh_01.pool;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
import com.teragrep.rlp_01.RelpBatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -26,15 +28,28 @@
import java.io.IOException;
import java.util.concurrent.TimeoutException;
+import static com.codahale.metrics.MetricRegistry.name;
+
public class ManagedRelpConnection implements IManagedRelpConnection {
private static final Logger LOGGER = LogManager.getLogger(ManagedRelpConnection.class);
private final IRelpConnection relpConnection;
private boolean hasConnected;
- public ManagedRelpConnection(IRelpConnection relpConnection) {
+ // metrics
+ private final Counter records;
+ private final Counter bytes;
+ private final Counter resends;
+ private final Counter retriedConnects;
+
+ public ManagedRelpConnection(IRelpConnection relpConnection, MetricRegistry metricRegistry) {
this.relpConnection = relpConnection;
this.hasConnected = false;
+
+ this.records = metricRegistry.counter(name(ManagedRelpConnection.class, "records"));
+ this.bytes = metricRegistry.counter(name(ManagedRelpConnection.class, "bytes"));
+ this.resends = metricRegistry.counter(name(ManagedRelpConnection.class, "resends"));
+ this.retriedConnects = metricRegistry.counter(name(ManagedRelpConnection.class, "retriedConnects"));
}
private void connect() {
@@ -54,6 +69,7 @@ private void connect() {
);
try {
+ retriedConnects.inc();
Thread.sleep(relpConnection.relpConfig().relpReconnectInterval);
}
catch (InterruptedException exception) {
@@ -89,11 +105,14 @@ public void ensureSent(byte[] bytes) {
relpBatch.retryAllFailed();
this.tearDown();
this.connect();
+ resends.inc();
}
else {
notSent = false;
}
}
+ records.inc();
+ this.bytes.inc(bytes.length);
}
@Override
diff --git a/src/main/java/com/teragrep/lsh_01/pool/MetricRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/MetricRelpConnection.java
new file mode 100644
index 0000000..d9946c8
--- /dev/null
+++ b/src/main/java/com/teragrep/lsh_01/pool/MetricRelpConnection.java
@@ -0,0 +1,138 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.pool;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
+import com.teragrep.lsh_01.config.RelpConfig;
+import com.teragrep.rlp_01.RelpBatch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * Decorator for IRelpConnection. Responsible for reporting metrics.
+ */
+public class MetricRelpConnection implements IRelpConnection {
+
+ private final IRelpConnection relpConnection;
+ private final Counter connects;
+ private final Timer connectLatency;
+
+ public MetricRelpConnection(IRelpConnection relpConnection, MetricRegistry metricRegistry) {
+ this.relpConnection = relpConnection;
+ this.connects = metricRegistry.counter(name(MetricRelpConnection.class, "connects"));
+ this.connectLatency = metricRegistry
+ .timer(name(MetricRelpConnection.class, "connectLatency"), () -> new Timer(new SlidingWindowReservoir(10000)));
+ }
+
+ @Override
+ public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException {
+ final Timer.Context context = connectLatency.time(); // reset the time (new context)
+ boolean connected = relpConnection.connect(hostname, port);
+ /*
+ Not closing the context in case of an exception thrown in .connect() will leave the timer.context
+ for garbage collector to remove. This will happen even if the context is closed because of how
+ the Timer is implemented.
+ */
+ context.close(); // manually close here, so the timer is only updated if no exceptions were thrown
+ connects.inc();
+ return connected;
+ }
+
+ @Override
+ public int getReadTimeout() {
+ return this.relpConnection.getReadTimeout();
+ }
+
+ @Override
+ public void setReadTimeout(int readTimeout) {
+ this.relpConnection.setReadTimeout(readTimeout);
+ }
+
+ @Override
+ public int getWriteTimeout() {
+ return this.relpConnection.getWriteTimeout();
+ }
+
+ @Override
+ public void setWriteTimeout(int writeTimeout) {
+ this.relpConnection.setWriteTimeout(writeTimeout);
+ }
+
+ @Override
+ public int getConnectionTimeout() {
+ return this.relpConnection.getConnectionTimeout();
+ }
+
+ @Override
+ public void setConnectionTimeout(int timeout) {
+ this.relpConnection.setConnectionTimeout(timeout);
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) {
+ this.relpConnection.setKeepAlive(on);
+ }
+
+ @Override
+ public int getRxBufferSize() {
+ return this.relpConnection.getRxBufferSize();
+ }
+
+ @Override
+ public void setRxBufferSize(int size) {
+ this.relpConnection.setRxBufferSize(size);
+ }
+
+ @Override
+ public int getTxBufferSize() {
+ return this.relpConnection.getTxBufferSize();
+ }
+
+ @Override
+ public void setTxBufferSize(int size) {
+ this.relpConnection.setTxBufferSize(size);
+ }
+
+ @Override
+ public void tearDown() {
+ this.relpConnection.tearDown();
+ }
+
+ @Override
+ public boolean disconnect() throws IOException, IllegalStateException, TimeoutException {
+ return this.relpConnection.disconnect();
+ }
+
+ @Override
+ public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException {
+ this.relpConnection.commit(relpBatch);
+ }
+
+ @Override
+ public RelpConfig relpConfig() {
+ return this.relpConnection.relpConfig();
+ }
+}
diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java
index c8ef654..86f7940 100644
--- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java
+++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java
@@ -19,6 +19,7 @@
*/
package com.teragrep.lsh_01.pool;
+import com.codahale.metrics.MetricRegistry;
import com.teragrep.lsh_01.config.RelpConfig;
import com.teragrep.rlp_01.RelpConnection;
@@ -27,17 +28,21 @@
public class RelpConnectionFactory implements Supplier {
private final RelpConfig relpConfig;
+ private final MetricRegistry metricRegistry;
- public RelpConnectionFactory(RelpConfig relpConfig) {
+ public RelpConnectionFactory(RelpConfig relpConfig, MetricRegistry metricRegistry) {
this.relpConfig = relpConfig;
+ this.metricRegistry = metricRegistry;
}
@Override
public IManagedRelpConnection get() {
- RelpConnection relpConnection = new RelpConnection();
+ IRelpConnection relpConnection = new MetricRelpConnection(
+ new RelpConnectionWithConfig(new RelpConnection(), relpConfig),
+ metricRegistry
+ );
- RelpConnectionWithConfig relpConnectionWithConfig = new RelpConnectionWithConfig(relpConnection, relpConfig);
- IManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnectionWithConfig);
+ IManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, metricRegistry);
if (relpConfig.rebindEnabled) {
managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount);
diff --git a/src/test/java/CredentialsTest.java b/src/test/java/com/teragrep/lsh_01/CredentialsTest.java
similarity index 95%
rename from src/test/java/CredentialsTest.java
rename to src/test/java/com/teragrep/lsh_01/CredentialsTest.java
index c2bdfb5..e1e9572 100644
--- a/src/test/java/CredentialsTest.java
+++ b/src/test/java/com/teragrep/lsh_01/CredentialsTest.java
@@ -17,8 +17,10 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
+package com.teragrep.lsh_01;
+
+import com.codahale.metrics.MetricRegistry;
import com.teragrep.lsh_01.authentication.BasicAuthentication;
-import com.teragrep.lsh_01.RelpConversion;
import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory;
import com.teragrep.lsh_01.config.LookupConfig;
import com.teragrep.lsh_01.config.PayloadConfig;
@@ -54,7 +56,7 @@ public void testNoAuthRequired() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -82,7 +84,7 @@ public void testAuthRequired() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -105,7 +107,7 @@ public void testValidBase64ButNoColon() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -128,7 +130,7 @@ public void testMultipleColons() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -149,7 +151,7 @@ public void testInvalidBase64Auth() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -171,7 +173,7 @@ public void testNonBasicAuth() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -196,7 +198,7 @@ public void testWrongCredentials() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -221,7 +223,7 @@ public void testEmptyUsername() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -246,7 +248,7 @@ public void testEmptyPassword() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -269,7 +271,7 @@ public void testNullToken() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
diff --git a/src/test/java/EndToEndTest.java b/src/test/java/com/teragrep/lsh_01/EndToEndTest.java
similarity index 99%
rename from src/test/java/EndToEndTest.java
rename to src/test/java/com/teragrep/lsh_01/EndToEndTest.java
index b308120..29f7f7f 100644
--- a/src/test/java/EndToEndTest.java
+++ b/src/test/java/com/teragrep/lsh_01/EndToEndTest.java
@@ -17,7 +17,8 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
-import com.teragrep.lsh_01.Main;
+package com.teragrep.lsh_01;
+
import com.teragrep.lsh_01.config.NettyConfig;
import com.teragrep.lsh_01.util.RelpServer;
import org.junit.jupiter.api.*;
diff --git a/src/test/java/LookupTest.java b/src/test/java/com/teragrep/lsh_01/LookupTest.java
similarity index 97%
rename from src/test/java/LookupTest.java
rename to src/test/java/com/teragrep/lsh_01/LookupTest.java
index 2d2a9b0..8c06896 100644
--- a/src/test/java/LookupTest.java
+++ b/src/test/java/com/teragrep/lsh_01/LookupTest.java
@@ -17,8 +17,10 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
+package com.teragrep.lsh_01;
+
+import com.codahale.metrics.MetricRegistry;
import com.teragrep.jlt_01.StringLookupTable;
-import com.teragrep.lsh_01.RelpConversion;
import com.teragrep.lsh_01.authentication.BasicAuthentication;
import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory;
import com.teragrep.lsh_01.authentication.Subject;
@@ -61,7 +63,7 @@ public void testAppnameLookup() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -86,7 +88,7 @@ public void testHostnameLookup() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
@@ -113,7 +115,7 @@ public void testMissingLookups() {
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
RelpConversion relpConversion = new RelpConversion(
pool,
diff --git a/src/test/java/com/teragrep/lsh_01/MetricTest.java b/src/test/java/com/teragrep/lsh_01/MetricTest.java
new file mode 100644
index 0000000..11a829f
--- /dev/null
+++ b/src/test/java/com/teragrep/lsh_01/MetricTest.java
@@ -0,0 +1,254 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory;
+import com.teragrep.lsh_01.authentication.SubjectAnonymous;
+import com.teragrep.lsh_01.config.LookupConfig;
+import com.teragrep.lsh_01.config.PayloadConfig;
+import com.teragrep.lsh_01.config.RelpConfig;
+import com.teragrep.lsh_01.config.SecurityConfig;
+import com.teragrep.lsh_01.fakes.RelpConnectionFactoryFake;
+import com.teragrep.lsh_01.fakes.RelpConnectionFake;
+import com.teragrep.lsh_01.fakes.ResendingRelpConnectionFake;
+import com.teragrep.lsh_01.fakes.ThrowingRelpConnectionFake;
+import com.teragrep.lsh_01.pool.*;
+import com.teragrep.rlo_14.Facility;
+import com.teragrep.rlo_14.Severity;
+import com.teragrep.rlo_14.SyslogMessage;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class MetricTest {
+
+ @Test
+ public void testRecordMetric() {
+ final int messages = 100;
+
+ final SyslogMessage syslogMessage = new SyslogMessage()
+ .withSeverity(Severity.INFORMATIONAL)
+ .withFacility(Facility.LOCAL0)
+ .withMsgId("123")
+ .withMsg("test");
+
+ RelpConfig relpConfig = new RelpConfig();
+ IRelpConnection relpConnection = new RelpConnectionFake(relpConfig);
+ MetricRegistry registry = new MetricRegistry();
+
+ try (ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, registry)) {
+ for (int i = 0; i < messages; i++) {
+ managedRelpConnection
+ .ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ Counter recordsCounter = registry.counter(name(ManagedRelpConnection.class, "records"));
+ Assertions.assertEquals(messages, recordsCounter.getCount());
+ }
+
+ @Test
+ public void testBytesMetric() {
+ final int messages = 100;
+
+ final SyslogMessage syslogMessage = new SyslogMessage()
+ .withSeverity(Severity.INFORMATIONAL)
+ .withFacility(Facility.LOCAL0)
+ .withMsgId("123")
+ .withMsg("test");
+
+ final byte[] bytes = syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8);
+ final int bytesLength = bytes.length;
+
+ RelpConfig relpConfig = new RelpConfig();
+ IRelpConnection relpConnection = new RelpConnectionFake(relpConfig);
+ MetricRegistry registry = new MetricRegistry();
+
+ try (ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, registry)) {
+ for (int i = 0; i < messages; i++) {
+ managedRelpConnection.ensureSent(bytes);
+ }
+ }
+
+ Counter bytesCounter = registry.counter(name(ManagedRelpConnection.class, "bytes"));
+ Assertions.assertEquals(bytesLength * 100L, bytesCounter.getCount());
+ }
+
+ @Test
+ public void testResendsMetric() {
+ final int messages = 100;
+ final int resends = 50;
+
+ final SyslogMessage syslogMessage = new SyslogMessage()
+ .withSeverity(Severity.INFORMATIONAL)
+ .withFacility(Facility.LOCAL0)
+ .withMsgId("123")
+ .withMsg("test");
+
+ RelpConfig relpConfig = new RelpConfig();
+ IRelpConnection relpConnection = new ResendingRelpConnectionFake(new RelpConnectionFake(relpConfig), resends);
+ MetricRegistry registry = new MetricRegistry();
+
+ try (ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, registry)) {
+ for (int i = 0; i < messages; i++) {
+ managedRelpConnection
+ .ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ Counter resendsCounter = registry.counter(name(ManagedRelpConnection.class, "resends"));
+ Assertions.assertEquals(resends, resendsCounter.getCount());
+ }
+
+ @Test
+ public void testConnectsMetric() {
+ final int messages = 100;
+
+ final SyslogMessage syslogMessage = new SyslogMessage()
+ .withSeverity(Severity.INFORMATIONAL)
+ .withFacility(Facility.LOCAL0)
+ .withMsgId("123")
+ .withMsg("test");
+
+ RelpConfig relpConfig = new RelpConfig();
+ MetricRegistry registry = new MetricRegistry();
+ // use a fake RelpConnection that forces a resend, so the connection is done in ManagedRelpConnection
+ IRelpConnection relpConnection = new MetricRelpConnection(
+ new ResendingRelpConnectionFake(new RelpConnectionFake(relpConfig), 1),
+ registry
+ );
+
+ try (ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, registry)) {
+ for (int i = 0; i < messages; i++) {
+ managedRelpConnection
+ .ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ Counter connectsCounter = registry.counter(name(MetricRelpConnection.class, "connects"));
+ Assertions.assertEquals(1, connectsCounter.getCount()); // just the initial connect (1)
+ }
+
+ @Test
+ public void testRetriedConnectsMetric() {
+ System.setProperty("relp.reconnectInterval", "1"); // set reconnect interval so the test is faster
+
+ final int messages = 100;
+ final int reconnects = 10;
+
+ final SyslogMessage syslogMessage = new SyslogMessage()
+ .withSeverity(Severity.INFORMATIONAL)
+ .withFacility(Facility.LOCAL0)
+ .withMsgId("123")
+ .withMsg("test");
+
+ RelpConfig relpConfig = new RelpConfig();
+ // ManagedRelpConnection only makes a connection if messages aren't going through, so a resending fake has to be used as well
+ IRelpConnection relpConnection = new ThrowingRelpConnectionFake(
+ new ResendingRelpConnectionFake(new RelpConnectionFake(relpConfig), 1),
+ reconnects
+ );
+ MetricRegistry registry = new MetricRegistry();
+
+ try (ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, registry)) {
+ for (int i = 0; i < messages; i++) {
+ managedRelpConnection
+ .ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ System.clearProperty("relp.reconnectInterval");
+ Counter retriedConnectsCounter = registry.counter(name(ManagedRelpConnection.class, "retriedConnects"));
+
+ Assertions.assertEquals(reconnects, retriedConnectsCounter.getCount());
+ }
+
+ @Test
+ public void testSendLatencyMetric() { // latency of the whole process, message in -> message out
+ final int messages = 10;
+ final int sendLatency = 10; // sleep for 10ms after sending a message (commit)
+
+ RelpConfig relpConfig = new RelpConfig();
+ MetricRegistry registry = new MetricRegistry();
+
+ // RelpConnectionFactory that provides fake RelpConnections
+ RelpConnectionFactoryFake relpConnectionFactory = new RelpConnectionFactoryFake(
+ sendLatency,
+ 0,
+ relpConfig,
+ registry
+ );
+
+ // the message processing starts from RelpConversion
+ IMessageHandler relpConversion = new MetricRelpConversion(
+ new RelpConversion(new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()), new SecurityConfig(), new BasicAuthenticationFactory().create(), new LookupConfig(), new PayloadConfig()), registry
+ );
+
+ for (int i = 0; i < messages; i++) {
+ relpConversion.onNewMessage(new SubjectAnonymous(), new HashMap<>(), "");
+ }
+
+ Timer sendLatencyTimer = registry.timer(name(MetricRelpConversion.class, "sendLatency"));
+
+ // mean rate means how many timer updates per second there were
+ Assertions.assertTrue(sendLatencyTimer.getMeanRate() > 0);
+ Assertions.assertTrue(sendLatencyTimer.getMeanRate() <= (double) 1000 / sendLatency);
+ }
+
+ @Test
+ public void testConnectLatencyMetric() {
+ final int messages = 10;
+ final int connectLatency = 200; // sleep for 200ms after connecting to RELP
+
+ final SyslogMessage syslogMessage = new SyslogMessage()
+ .withSeverity(Severity.INFORMATIONAL)
+ .withFacility(Facility.LOCAL0)
+ .withMsgId("123")
+ .withMsg("test");
+
+ RelpConfig relpConfig = new RelpConfig();
+ MetricRegistry registry = new MetricRegistry();
+ // use a fake RelpConnection that forces a resend, so the connection is done in ManagedRelpConnection
+ IRelpConnection relpConnection = new MetricRelpConnection(
+ new ResendingRelpConnectionFake(new RelpConnectionFake(relpConfig, 0, connectLatency), 1),
+ registry
+ );
+
+ try (ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection, registry)) {
+ for (int i = 0; i < messages; i++) {
+ managedRelpConnection
+ .ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ Timer connectLatencyTimer = registry.timer(name(MetricRelpConnection.class, "connectLatency"));
+
+ // mean rate means how many timer updates per second there were
+ Assertions.assertTrue(connectLatencyTimer.getMeanRate() >= 0); // rate exists
+ Assertions.assertTrue(connectLatencyTimer.getMeanRate() <= (double) 1000 / connectLatency); // rate is lower or equal to the highest possible
+ }
+}
diff --git a/src/test/java/PayloadTest.java b/src/test/java/com/teragrep/lsh_01/PayloadTest.java
similarity index 99%
rename from src/test/java/PayloadTest.java
rename to src/test/java/com/teragrep/lsh_01/PayloadTest.java
index 66eda15..369f76f 100644
--- a/src/test/java/PayloadTest.java
+++ b/src/test/java/com/teragrep/lsh_01/PayloadTest.java
@@ -17,7 +17,8 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
-import com.teragrep.lsh_01.Payload;
+package com.teragrep.lsh_01;
+
import com.teragrep.lsh_01.config.PayloadConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
diff --git a/src/test/java/RebindTest.java b/src/test/java/com/teragrep/lsh_01/RebindTest.java
similarity index 95%
rename from src/test/java/RebindTest.java
rename to src/test/java/com/teragrep/lsh_01/RebindTest.java
index b03fe5e..8e730ee 100644
--- a/src/test/java/RebindTest.java
+++ b/src/test/java/com/teragrep/lsh_01/RebindTest.java
@@ -17,6 +17,9 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
+package com.teragrep.lsh_01;
+
+import com.codahale.metrics.MetricRegistry;
import com.teragrep.lsh_01.config.RelpConfig;
import com.teragrep.lsh_01.pool.*;
import com.teragrep.lsh_01.util.CountingFrameDelegate;
@@ -66,8 +69,7 @@ void tearDown() {
public void testRebind() {
RelpConfig relpConfig = new RelpConfig();
RebindableRelpConnection connection = new RebindableRelpConnection(
- new ManagedRelpConnection(new RelpConnectionWithConfig(new RelpConnection(), relpConfig)),
- relpConfig.rebindRequestAmount
+ new ManagedRelpConnection(new RelpConnectionWithConfig(new RelpConnection(), relpConfig), new MetricRegistry()), relpConfig.rebindRequestAmount
);
SyslogMessage syslogMessage = new SyslogMessage()
@@ -93,7 +95,7 @@ public void testRebindMultipleConnections() {
RelpConfig relpConfig = new RelpConfig();
// Multiple connections together using the RelpConnectionFactory and the Pool
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
IManagedRelpConnection firstConnection = pool.get();
@@ -134,7 +136,7 @@ public void testRebindDisabled() {
RelpConfig relpConfig = new RelpConfig();
- RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig, new MetricRegistry());
Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
IManagedRelpConnection connection = pool.get();
@@ -157,7 +159,8 @@ public void testRebindDisabled() {
@Test
public void testCloseWithoutConnecting() {
ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(
- new RelpConnectionWithConfig(new RelpConnection(), new RelpConfig())
+ new RelpConnectionWithConfig(new RelpConnection(), new RelpConfig()),
+ new MetricRegistry()
);
Assertions.assertDoesNotThrow(managedRelpConnection::close);
}
diff --git a/src/test/java/com/teragrep/lsh_01/fakes/RelpConnectionFactoryFake.java b/src/test/java/com/teragrep/lsh_01/fakes/RelpConnectionFactoryFake.java
new file mode 100644
index 0000000..3cc437a
--- /dev/null
+++ b/src/test/java/com/teragrep/lsh_01/fakes/RelpConnectionFactoryFake.java
@@ -0,0 +1,55 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.fakes;
+
+import com.codahale.metrics.MetricRegistry;
+import com.teragrep.lsh_01.config.RelpConfig;
+import com.teragrep.lsh_01.pool.IManagedRelpConnection;
+import com.teragrep.lsh_01.pool.ManagedRelpConnection;
+import com.teragrep.lsh_01.pool.MetricRelpConnection;
+
+import java.util.function.Supplier;
+
+public class RelpConnectionFactoryFake implements Supplier {
+
+ private final int sendLatency;
+ private final int connectLatency;
+ private final RelpConfig relpConfig;
+ private final MetricRegistry metricRegistry;
+
+ public RelpConnectionFactoryFake(
+ int sendLatency,
+ int connectLatency,
+ RelpConfig relpConfig,
+ MetricRegistry metricRegistry
+ ) {
+ this.sendLatency = sendLatency;
+ this.connectLatency = connectLatency;
+ this.relpConfig = relpConfig;
+ this.metricRegistry = metricRegistry;
+ }
+
+ @Override
+ public IManagedRelpConnection get() {
+ return new ManagedRelpConnection(
+ new MetricRelpConnection(new RelpConnectionFake(relpConfig, sendLatency, connectLatency), metricRegistry), metricRegistry
+ );
+ }
+}
diff --git a/src/test/java/com/teragrep/lsh_01/fakes/RelpConnectionFake.java b/src/test/java/com/teragrep/lsh_01/fakes/RelpConnectionFake.java
new file mode 100644
index 0000000..ce1d386
--- /dev/null
+++ b/src/test/java/com/teragrep/lsh_01/fakes/RelpConnectionFake.java
@@ -0,0 +1,139 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.fakes;
+
+import com.teragrep.lsh_01.config.RelpConfig;
+import com.teragrep.lsh_01.pool.IRelpConnection;
+import com.teragrep.rlp_01.RelpBatch;
+
+public final class RelpConnectionFake implements IRelpConnection {
+
+ private final RelpConfig relpConfig;
+ private final int sendLatency;
+ private final int connectLatency;
+
+ public RelpConnectionFake(RelpConfig relpConfig) {
+ this(relpConfig, 0, 0);
+ }
+
+ public RelpConnectionFake(RelpConfig relpConfig, int sendLatency, int connectLatency) {
+ this.relpConfig = relpConfig;
+ this.sendLatency = sendLatency;
+ this.connectLatency = connectLatency;
+ }
+
+ @Override
+ public int getReadTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setReadTimeout(int readTimeout) {
+ // no-op in fake
+ }
+
+ @Override
+ public int getWriteTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setWriteTimeout(int writeTimeout) {
+ // no-op in fake
+ }
+
+ @Override
+ public int getConnectionTimeout() {
+ return 0;
+ }
+
+ @Override
+ public void setConnectionTimeout(int timeout) {
+ // no-op in fake
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) {
+ // no-op in fake
+ }
+
+ @Override
+ public int getRxBufferSize() {
+ return 0;
+ }
+
+ @Override
+ public void setRxBufferSize(int size) {
+ // no-op in fake
+ }
+
+ @Override
+ public int getTxBufferSize() {
+ return 0;
+ }
+
+ @Override
+ public void setTxBufferSize(int size) {
+ // no-op in fake
+ }
+
+ @Override
+ public boolean connect(String hostname, int port) {
+ try {
+ Thread.sleep(connectLatency);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+
+ @Override
+ public void tearDown() {
+ // no-op in fake
+ }
+
+ @Override
+ public boolean disconnect() {
+ return true;
+ }
+
+ @Override
+ public void commit(RelpBatch relpBatch) {
+ try {
+ Thread.sleep(sendLatency);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ // remove all the requests from relpBatch in the fake
+ // so that the batch will return true in verifyTransactionAll()
+ while (relpBatch.getWorkQueueLength() > 0) {
+ long reqId = relpBatch.popWorkQueue();
+ relpBatch.removeRequest(reqId);
+ }
+ }
+
+ @Override
+ public RelpConfig relpConfig() {
+ return relpConfig;
+ }
+}
diff --git a/src/test/java/com/teragrep/lsh_01/fakes/ResendingRelpConnectionFake.java b/src/test/java/com/teragrep/lsh_01/fakes/ResendingRelpConnectionFake.java
new file mode 100644
index 0000000..88104b7
--- /dev/null
+++ b/src/test/java/com/teragrep/lsh_01/fakes/ResendingRelpConnectionFake.java
@@ -0,0 +1,128 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.fakes;
+
+import com.teragrep.lsh_01.config.RelpConfig;
+import com.teragrep.lsh_01.pool.IRelpConnection;
+import com.teragrep.rlp_01.RelpBatch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Fake that resends data until the amount given is reached.
+ */
+public class ResendingRelpConnectionFake implements IRelpConnection {
+
+ final private IRelpConnection relpConnection;
+ final private int commitLimit;
+ private int timesCommitted;
+
+ public ResendingRelpConnectionFake(IRelpConnection relpConnection, int commitLimit) {
+ this.relpConnection = relpConnection;
+ this.commitLimit = commitLimit;
+ }
+
+ @Override
+ public int getReadTimeout() {
+ return relpConnection.getReadTimeout();
+ }
+
+ @Override
+ public void setReadTimeout(int readTimeout) {
+ relpConnection.setReadTimeout(readTimeout);
+ }
+
+ @Override
+ public int getWriteTimeout() {
+ return relpConnection.getWriteTimeout();
+ }
+
+ @Override
+ public void setWriteTimeout(int writeTimeout) {
+ relpConnection.setWriteTimeout(writeTimeout);
+ }
+
+ @Override
+ public int getConnectionTimeout() {
+ return relpConnection.getConnectionTimeout();
+ }
+
+ @Override
+ public void setConnectionTimeout(int timeout) {
+ relpConnection.setConnectionTimeout(timeout);
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) {
+ relpConnection.setKeepAlive(on);
+ }
+
+ @Override
+ public int getRxBufferSize() {
+ return relpConnection.getRxBufferSize();
+ }
+
+ @Override
+ public void setRxBufferSize(int size) {
+ relpConnection.setRxBufferSize(size);
+ }
+
+ @Override
+ public int getTxBufferSize() {
+ return relpConnection.getTxBufferSize();
+ }
+
+ @Override
+ public void setTxBufferSize(int size) {
+ relpConnection.setTxBufferSize(size);
+ }
+
+ @Override
+ public boolean connect(String hostname, int port) throws IOException, TimeoutException {
+ return relpConnection.connect(hostname, port);
+ }
+
+ @Override
+ public void tearDown() {
+ relpConnection.tearDown();
+ }
+
+ @Override
+ public boolean disconnect() throws IOException, IllegalStateException, TimeoutException {
+ return relpConnection.disconnect();
+ }
+
+ @Override
+ public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException {
+ timesCommitted++;
+ if (timesCommitted <= commitLimit) {
+ // no-op, will lead to verifyTransactionAll to return false
+ }
+ else {
+ relpConnection.commit(relpBatch);
+ }
+ }
+
+ @Override
+ public RelpConfig relpConfig() {
+ return relpConnection.relpConfig();
+ }
+}
diff --git a/src/test/java/com/teragrep/lsh_01/fakes/ThrowingRelpConnectionFake.java b/src/test/java/com/teragrep/lsh_01/fakes/ThrowingRelpConnectionFake.java
new file mode 100644
index 0000000..fedcb17
--- /dev/null
+++ b/src/test/java/com/teragrep/lsh_01/fakes/ThrowingRelpConnectionFake.java
@@ -0,0 +1,126 @@
+/*
+ logstash-http-input to syslog bridge
+ Copyright 2024 Suomen Kanuuna Oy
+
+ Derivative Work of Elasticsearch
+ Copyright 2012-2015 Elasticsearch
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.lsh_01.fakes;
+
+import com.teragrep.lsh_01.config.RelpConfig;
+import com.teragrep.lsh_01.pool.IRelpConnection;
+import com.teragrep.rlp_01.RelpBatch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Fake that throws an exception when connecting until the amount of connect attempts reach the given limit.
+ */
+public final class ThrowingRelpConnectionFake implements IRelpConnection {
+
+ final private IRelpConnection relpConnection;
+ final private int limit;
+ private int timesConnected;
+
+ public ThrowingRelpConnectionFake(IRelpConnection relpConnection, int limit) {
+ this.relpConnection = relpConnection;
+ this.limit = limit;
+ }
+
+ @Override
+ public int getReadTimeout() {
+ return relpConnection.getReadTimeout();
+ }
+
+ @Override
+ public void setReadTimeout(int readTimeout) {
+ relpConnection.setReadTimeout(readTimeout);
+ }
+
+ @Override
+ public int getWriteTimeout() {
+ return relpConnection.getWriteTimeout();
+ }
+
+ @Override
+ public void setWriteTimeout(int writeTimeout) {
+ relpConnection.setWriteTimeout(writeTimeout);
+ }
+
+ @Override
+ public int getConnectionTimeout() {
+ return relpConnection.getConnectionTimeout();
+ }
+
+ @Override
+ public void setConnectionTimeout(int timeout) {
+ relpConnection.setConnectionTimeout(timeout);
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) {
+ relpConnection.setKeepAlive(on);
+ }
+
+ @Override
+ public int getRxBufferSize() {
+ return relpConnection.getRxBufferSize();
+ }
+
+ @Override
+ public void setRxBufferSize(int size) {
+ relpConnection.setRxBufferSize(size);
+ }
+
+ @Override
+ public int getTxBufferSize() {
+ return relpConnection.getTxBufferSize();
+ }
+
+ @Override
+ public void setTxBufferSize(int size) {
+ relpConnection.setTxBufferSize(size);
+ }
+
+ @Override
+ public boolean connect(String hostname, int port) throws IOException, TimeoutException {
+ timesConnected++;
+ if (timesConnected <= limit) {
+ throw new IOException("Fake exception");
+ }
+ return relpConnection.connect(hostname, port);
+ }
+
+ @Override
+ public void tearDown() {
+ relpConnection.tearDown();
+ }
+
+ @Override
+ public boolean disconnect() throws IOException, IllegalStateException, TimeoutException {
+ return relpConnection.disconnect();
+ }
+
+ @Override
+ public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException {
+ relpConnection.commit(relpBatch);
+ }
+
+ @Override
+ public RelpConfig relpConfig() {
+ return relpConnection.relpConfig();
+ }
+}