Skip to content

Commit

Permalink
wip #1
Browse files Browse the repository at this point in the history
  • Loading branch information
yannick committed Jun 19, 2024
1 parent d3941bd commit c1e3f74
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 1 deletion.
15 changes: 15 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
alias(libs.plugins.hivemq.extension)
alias(libs.plugins.defaults)
alias(libs.plugins.license)
alias(libs.plugins.kotlin)
}

group = "com.hivemq.extensions"
Expand Down Expand Up @@ -42,6 +43,20 @@ testing {
implementation(libs.mockito)
}
}
"integrationTest"(JvmTestSuite::class) {
dependencies {
compileOnly(libs.jetbrains.annotations)
implementation(libs.awaitility)
implementation(libs.aws.sdkv2.cloudwatch)
implementation(libs.hivemq.mqttClient)
implementation(libs.kotlin.stdlib)
implementation(libs.okhttp)
implementation(libs.testcontainers.hivemq)
implementation(libs.testcontainers.junitJupiter)
implementation(libs.testcontainers.localstack)
runtimeOnly(libs.logback.classic)
}
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
[versions]
awaitility = "4.2.1"
aws-sdkv2 = "2.25.69"
dropwizard-metrics-cloudwatch = "2.0.8"
hivemq-extensionSdk = "4.6.4"
hivemq-mqttClient = "1.3.3"
jaxb-api = "2.3.3"
jaxb-impl = "2.3.9"
jetbrains-annotations = "24.1.0"
junit-jupiter = "5.10.0"
logback = "1.5.6"
mockito = "5.12.0"
testcontainers = "1.19.8"
okhttp = "4.12.0"
kotlin = "2.0.0"

[libraries]
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
aws-sdkv2-cloudwatch = { module = "software.amazon.awssdk:cloudwatch", version.ref = "aws-sdkv2" }
dropwizard-metrics-cloudwatch = { module = "io.github.azagniotov:dropwizard-metrics-cloudwatch", version.ref = "dropwizard-metrics-cloudwatch" }
hivemq-mqttClient = { module = "com.hivemq:hivemq-mqtt-client", version.ref = "hivemq-mqttClient" }
jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" }
jaxb-impl = { module = "com.sun.xml.bind:jaxb-impl", version.ref = "jaxb-impl" }
jetbrains-annotations = { module = "org.jetbrains:annotations", version.ref = "jetbrains-annotations" }
mockito = { module = "org.mockito:mockito-core", version.ref = "mockito" }
testcontainers-hivemq = { module = "org.testcontainers:hivemq", version.ref = "testcontainers" }
testcontainers-junitJupiter = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers" }
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
kotlin-stdlib = { module = "org.jetbrains.kotlin:kotlin-stdlib", version.ref = "kotlin" }
testcontainers-localstack = { module = "org.testcontainers:localstack", version.ref = "testcontainers" }

[plugins]
hivemq-extension = { id = "com.hivemq.extension", version = "3.1.0" }
defaults = { id = "io.github.sgtsilvio.gradle.defaults", version = "0.2.0" }
license = { id = "com.github.hierynomus.license", version = "0.16.1" }
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.hivemq.extensions.aws.cloudwatch

import org.testcontainers.utility.DockerImageName

val LOCALSTACK_DOCKER_IMAGE: DockerImageName = DockerImageName.parse("localstack/localstack:3.2.0")

val HIVEMQ_DOCKER_IMAGE: DockerImageName = DockerImageName.parse("hivemq/hivemq4:4.28.2")

Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.hivemq.extensions.aws.cloudwatch

import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
import org.testcontainers.containers.Network
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH
import org.testcontainers.hivemq.HiveMQContainer
import org.testcontainers.utility.MountableFile
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient
import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest
import software.amazon.awssdk.services.cloudwatch.model.Metric
import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery
import software.amazon.awssdk.services.cloudwatch.model.MetricStat
import software.amazon.awssdk.services.cloudwatch.model.Statistic
import java.net.URI
import java.time.Duration
import java.time.Instant
import java.util.concurrent.TimeUnit

class EndToEndIT {

@Test
@Timeout(value = 3, unit = TimeUnit.MINUTES)
fun endToEnd() {
val network = Network.newNetwork()

val localStack = LocalStackContainer(LOCALSTACK_DOCKER_IMAGE).apply {
withServices(CLOUDWATCH)
withNetwork(network)
withNetworkAliases("localstack")
}
localStack.start()

val hivemq = HiveMQContainer(HIVEMQ_DOCKER_IMAGE).apply {
withExtension(MountableFile.forClasspathResource("hivemq-aws-cloudwatch-extension"))
withFileInExtensionHomeFolder(
MountableFile.forClasspathResource("extension-config.xml"),
"hivemq-aws-cloudwatch-extension",
"extension-config.xml"
)
withEnv("AWS_ENDPOINT_OVERRIDE", URI("http://localstack:4566").toString())
withEnv("AWS_REGION_OVERRIDE", localStack.region)
withEnv("AWS_ACCESS_KEY_ID", localStack.accessKey)
withEnv("AWS_SECRET_ACCESS_KEY", localStack.secretKey)
withLogConsumer { println("HIVEMQ: " + it.utf8StringWithoutLineEnding) }
withNetwork(network)
}
hivemq.start()

val credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create(localStack.accessKey, localStack.secretKey))

val cloudWatchClient = CloudWatchClient.builder() //
.credentialsProvider(credentialsProvider) //
.endpointOverride(localStack.getEndpointOverride(CLOUDWATCH)) //
.region(Region.of(localStack.region)) //
.build()


await().timeout(Duration.ofMinutes(2)).until {
cloudWatchClient.listMetrics().metrics().any {
it.metricName() == "com.hivemq.messages.incoming.publish.count"
}
}

val metric = Metric.builder()//
.namespace("hivemq-metrics")//
.metricName("com.hivemq.messages.incoming.publish.count")//
.dimensions(emptyList())//
.build()

val metricStat = MetricStat.builder()
.stat(Statistic.MAXIMUM.toString())
.period(60)
.metric(metric)
.build()

val metricDataQuery = MetricDataQuery.builder()
.id("m1")
.metricStat(metricStat)
.returnData(true)
.build()

await().timeout(Duration.ofMinutes(2)).until {
val request = GetMetricDataRequest.builder()
.startTime(Instant.now().minusSeconds(3600))
.endTime(Instant.now())
.metricDataQueries(listOf(metricDataQuery))
.build()
val response = cloudWatchClient.getMetricData(request)
response.metricDataResults().maxOf {
it.values()[0]
} == 0.0
}

val mqttClient = Mqtt5Client.builder().serverHost(hivemq.host).serverPort(hivemq.mqttPort).buildBlocking()
mqttClient.connect()
mqttClient.publishWith().topic("wabern").send()

await().timeout(Duration.ofMinutes(2)).until {
val request = GetMetricDataRequest.builder()
.startTime(Instant.now().minusSeconds(3600))
.endTime(Instant.now())
.metricDataQueries(listOf(metricDataQuery))
.build()
val response = cloudWatchClient.getMetricData(request)
response.metricDataResults().maxOf {
it.values()[0]
} == 1.0
}

cloudWatchClient.close()
}
}
7 changes: 7 additions & 0 deletions src/integrationTest/resources/extension-config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cloudwatch-extension-configuration>
<report-interval>1</report-interval>
<metrics>
<metric>com.hivemq.messages.incoming.publish.count</metric>
</metrics>
</cloudwatch-extension-configuration>
37 changes: 37 additions & 0 deletions src/integrationTest/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<!--
Copyright 2019-present HiveMQ GmbH
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-30(%d %level)- %msg%n%ex</pattern>
</encoder>
</appender>

<root level="DEBUG">
<appender-ref ref="STDOUT"/>
</root>

<!-- silence test containers (https://www.testcontainers.org/supported_docker_environment/logging_config/) -->
<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>

<!-- silence netty -->
<logger name="io.netty" level="WARN"/>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -63,7 +65,16 @@ void startCloudWatchReporter(
} else {
final Duration apiTimeout = cloudWatchConfig.getApiTimeout().map(Duration::ofMillis).orElse(null);

final String endpointOverride = System.getenv("AWS_ENDPOINT_OVERRIDE");
final URI endpoint = URI.create(endpointOverride);
System.out.println("ENDPOINT IS " + endpoint);

final String awsRegion = System.getenv("AWS_REGION_OVERRIDE");
System.out.println("AWS_REGION IS " + awsRegion);

final CloudWatchAsyncClient cloudWatchAsync = CloudWatchAsyncClient.builder()
.endpointOverride(endpoint)
.region(Region.of(awsRegion))
.credentialsProvider(DefaultCredentialsProvider.create())
.asyncConfiguration(ClientAsyncConfiguration.builder()
.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, executorService)
Expand All @@ -75,9 +86,11 @@ void startCloudWatchReporter(
.build();

cloudWatchReporter = CloudWatchReporter.forRegistry(metricRegistry, cloudWatchAsync, METRIC_NAMESPACE)
.withZeroValuesSubmission()
.withReportRawCountValue()
.filter(new ConfiguredMetricsFilter(configuration.getEnabledMetrics()))
.build();
cloudWatchReporter.start(cloudWatchConfig.getReportInterval(), TimeUnit.MINUTES);
cloudWatchReporter.start(cloudWatchConfig.getReportInterval(), TimeUnit.SECONDS);
log.info("Started CloudWatchReporter for {} HiveMQ metrics", configuration.getEnabledMetrics().size());
}
}
Expand Down

0 comments on commit c1e3f74

Please sign in to comment.