Skip to content

Commit

Permalink
Resolved review comments for issue #1744.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed Aug 18, 2023
1 parent f575689 commit c9692a8
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public FailedHttpResponseInterceptor(final String url){
}

@Override
public void process(HttpResponse response, EntityDetails entity, HttpContext context) throws IOException {
public void process(final HttpResponse response, final EntityDetails entity, final HttpContext context) throws IOException {
if (response.getCode() == ERROR_CODE_500 ||
response.getCode() == ERROR_CODE_400 ||
response.getCode() == ERROR_CODE_404 ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public boolean isTokenExpired(final Integer tokenExpired){
return false;
}

private OAuth20Service getOAuth20ServiceObj(BearerTokenOptions bearerTokenOptions){
private OAuth20Service getOAuth20ServiceObj(final BearerTokenOptions bearerTokenOptions){
return new ServiceBuilder(bearerTokenOptions.getClientId())
.apiSecret(bearerTokenOptions.getClientSecret())
.defaultScope(bearerTokenOptions.getScope())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkCon
* This method process buffer records and send to Http End points based on configured codec
* @param records Collection of Event
*/
public void output(Collection<Record<Event>> records) {
public void output(final Collection<Record<Event>> records) {
reentrantLock.lock();
try {
records.forEach(record -> {
Expand All @@ -148,23 +148,23 @@ public void output(Collection<Record<Event>> records) {
if (event.getMetadata().getEventType().equals("METRIC")) {
Remote.WriteRequest message = null;
if (event instanceof JacksonGauge) {
JacksonGauge jacksonGauge = (JacksonGauge) event;
final JacksonGauge jacksonGauge = (JacksonGauge) event;
message = buildRemoteWriteRequest(jacksonGauge.getTime(),
jacksonGauge.getStartTime(), jacksonGauge.getValue(), jacksonGauge.getAttributes(),jacksonGauge.getName());
} else if (event instanceof JacksonSum) {
JacksonSum jacksonSum = (JacksonSum) event;
final JacksonSum jacksonSum = (JacksonSum) event;
message = buildRemoteWriteRequest(jacksonSum.getTime(),
jacksonSum.getStartTime(), jacksonSum.getValue(), jacksonSum.getAttributes(), jacksonSum.getName());
} else if (event instanceof JacksonSummary) {
JacksonSummary jacksonSummary = (JacksonSummary) event;
final JacksonSummary jacksonSummary = (JacksonSummary) event;
message = buildRemoteWriteRequest(jacksonSummary.getTime(),
jacksonSummary.getStartTime(), jacksonSummary.getSum(), jacksonSummary.getAttributes(), jacksonSummary.getName());
} else if (event instanceof JacksonHistogram) {
JacksonHistogram jacksonHistogram = (JacksonHistogram) event;
final JacksonHistogram jacksonHistogram = (JacksonHistogram) event;
message = buildRemoteWriteRequest(jacksonHistogram.getTime(),
jacksonHistogram.getStartTime(), jacksonHistogram.getSum(), jacksonHistogram.getAttributes(), jacksonHistogram.getName());
} else if (event instanceof JacksonExponentialHistogram) {
JacksonExponentialHistogram jacksonExpHistogram = (JacksonExponentialHistogram) event;
final JacksonExponentialHistogram jacksonExpHistogram = (JacksonExponentialHistogram) event;
message = buildRemoteWriteRequest(jacksonExpHistogram.getTime(),
jacksonExpHistogram.getStartTime(), jacksonExpHistogram.getSum(), jacksonExpHistogram.getAttributes(), jacksonExpHistogram.getName());
} else {
Expand All @@ -175,23 +175,15 @@ public void output(Collection<Record<Event>> records) {
if (event.getEventHandle() != null) {
this.bufferedEventHandles.add(event.getEventHandle());
}
HttpEndPointResponse failedHttpEndPointResponses = null;
try {
failedHttpEndPointResponses = pushToEndPoint(bytes);

if (failedHttpEndPointResponses != null) {
logFailedData(failedHttpEndPointResponses);
releaseEventHandles(Boolean.FALSE);
prometheusSinkRecordsFailedCounter.increment();
} else {
LOG.info("data pushed to the end point successfully");
releaseEventHandles(Boolean.TRUE);
prometheusSinkRecordsSuccessCounter.increment();
}
} catch (IOException e) {
LOG.error("Error while pushing to the end point ", e);
HttpEndPointResponse failedHttpEndPointResponses = pushToEndPoint(bytes);

if (failedHttpEndPointResponses != null) {
logFailedData(failedHttpEndPointResponses);
releaseEventHandles(Boolean.FALSE);
} else {
LOG.info("data pushed to the end point successfully");
releaseEventHandles(Boolean.TRUE);
}

});

}finally {
Expand All @@ -207,18 +199,18 @@ public void output(Collection<Record<Event>> records) {
* @param attributeMap attributes
* @param metricName metricName
*/
private static Remote.WriteRequest buildRemoteWriteRequest(String time, String startTime,
Double value, Map<String, Object> attributeMap, final String metricName) {
Remote.WriteRequest.Builder writeRequestBuilder = Remote.WriteRequest.newBuilder();
private static Remote.WriteRequest buildRemoteWriteRequest(final String time, final String startTime,
final Double value, final Map<String, Object> attributeMap, final String metricName) {
final Remote.WriteRequest.Builder writeRequestBuilder = Remote.WriteRequest.newBuilder();

Types.TimeSeries.Builder timeSeriesBuilder = Types.TimeSeries.newBuilder();
final Types.TimeSeries.Builder timeSeriesBuilder = Types.TimeSeries.newBuilder();

List<Types.Label> arrayList = new ArrayList<>();
final List<Types.Label> arrayList = new ArrayList<>();

setMetricName(metricName, arrayList);
prepareLabelList(attributeMap, arrayList);

Types.Sample.Builder prometheusSampleBuilder = Types.Sample.newBuilder();
final Types.Sample.Builder prometheusSampleBuilder = Types.Sample.newBuilder();
long timeStampVal;
if (time != null) {
timeStampVal = getTimeStampVal(time);
Expand All @@ -227,44 +219,44 @@ private static Remote.WriteRequest buildRemoteWriteRequest(String time, String s
}

prometheusSampleBuilder.setValue(value).setTimestamp(timeStampVal);
Types.Sample prometheusSample = prometheusSampleBuilder.build();
final Types.Sample prometheusSample = prometheusSampleBuilder.build();

timeSeriesBuilder.addAllLabels(arrayList);
timeSeriesBuilder.addAllSamples(Arrays.asList(prometheusSample));

Types.TimeSeries timeSeries = timeSeriesBuilder.build();
final Types.TimeSeries timeSeries = timeSeriesBuilder.build();
writeRequestBuilder.addAllTimeseries(Arrays.asList(timeSeries));

return writeRequestBuilder.build();
}

private static void prepareLabelList(Map<String, Object> hashMap, List<Types.Label> arrayList) {
for (Map.Entry<String, Object> entry : hashMap.entrySet()) {
String key = sanitizeName(entry.getKey());
Object value = entry.getValue();
private static void prepareLabelList(final Map<String, Object> hashMap, final List<Types.Label> arrayList) {
for (final Map.Entry<String, Object> entry : hashMap.entrySet()) {
final String key = sanitizeName(entry.getKey());
final Object value = entry.getValue();
if (entry.getValue() instanceof Map) {
Object innerMap = entry.getValue();
final Object innerMap = entry.getValue();
prepareLabelList(objectMapper.convertValue(innerMap, Map.class), arrayList);
continue;
}
Types.Label.Builder labelBuilder = Types.Label.newBuilder();
final Types.Label.Builder labelBuilder = Types.Label.newBuilder();
labelBuilder.setName(key).setValue(value.toString());
Types.Label label = labelBuilder.build();
final Types.Label label = labelBuilder.build();
arrayList.add(label);
}
}

private static String sanitizeName(String name) {
private static String sanitizeName(final String name) {
return BODY_PATTERN
.matcher(PREFIX_PATTERN.matcher(name).replaceFirst("_"))
.replaceAll("_");
}

private static long getTimeStampVal(String time) {
LocalDateTime localDateTimeParse = LocalDateTime.parse(time,
private static long getTimeStampVal(final String time) {
final LocalDateTime localDateTimeParse = LocalDateTime.parse(time,
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"));
LocalDateTime localDateTime = LocalDateTime.parse(localDateTimeParse.toString());
ZonedDateTime zdt = ZonedDateTime.of(localDateTime, ZoneId.systemDefault());
final LocalDateTime localDateTime = LocalDateTime.parse(localDateTimeParse.toString());
final ZonedDateTime zdt = ZonedDateTime.of(localDateTime, ZoneId.systemDefault());
return zdt.toInstant().toEpochMilli();
}

Expand All @@ -273,7 +265,7 @@ private static long getTimeStampVal(String time) {
* @param endPointResponses HttpEndPointResponses.
*/
private void logFailedData(final HttpEndPointResponse endPointResponses) {
FailedDlqData failedDlqData =
final FailedDlqData failedDlqData =
FailedDlqData.builder()
.withUrl(endPointResponses.getUrl())
.withMessage(endPointResponses.getErrorMessage())
Expand All @@ -284,7 +276,7 @@ private void logFailedData(final HttpEndPointResponse endPointResponses) {
}

private void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
for (final EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}
bufferedEventHandles.clear();
Expand All @@ -294,28 +286,30 @@ private void releaseEventHandles(final boolean result) {
* * This method pushes bufferData to configured HttpEndPoints
* @param data byte[] data.
*/
private HttpEndPointResponse pushToEndPoint(final byte[] data) throws IOException {
private HttpEndPointResponse pushToEndPoint(final byte[] data) {
HttpEndPointResponse httpEndPointResponses = null;
final ClassicRequestBuilder classicHttpRequestBuilder =
httpAuthOptions.get(prometheusSinkConfiguration.getUrl()).getClassicHttpRequestBuilder();

final byte[] compressedBufferData = Snappy.compress(data);
HttpEntity entity = new ByteArrayEntity(compressedBufferData,
ContentType.create(prometheusSinkConfiguration.getContentType()), prometheusSinkConfiguration.getEncoding());

classicHttpRequestBuilder.setEntity(entity);
classicHttpRequestBuilder.addHeader("Content-Encoding", prometheusSinkConfiguration.getEncoding());
classicHttpRequestBuilder.addHeader("Content-Type", prometheusSinkConfiguration.getContentType());
classicHttpRequestBuilder.addHeader("X-Prometheus-Remote-Write-Version", prometheusSinkConfiguration.getRemoteWriteVersion());

try {
final byte[] compressedBufferData = Snappy.compress(data);
final HttpEntity entity = new ByteArrayEntity(compressedBufferData,
ContentType.create(prometheusSinkConfiguration.getContentType()), prometheusSinkConfiguration.getEncoding());

classicHttpRequestBuilder.setEntity(entity);
if(AuthTypeOptions.BEARER_TOKEN.equals(prometheusSinkConfiguration.getAuthType()))
accessTokenIfExpired(prometheusSinkConfiguration.getAuthentication().getBearerTokenOptions().getTokenExpired(),prometheusSinkConfiguration.getUrl());

httpAuthOptions.get(prometheusSinkConfiguration.getUrl()).getHttpClientBuilder().build()
.execute(classicHttpRequestBuilder.build(), HttpClientContext.create());
LOG.info("Records successfully pushed to endpoint {}", prometheusSinkConfiguration.getUrl());
prometheusSinkRecordsSuccessCounter.increment();
} catch (IOException e) {
prometheusSinkRecordsFailedCounter.increment();
LOG.info("Records failed to push endpoint {}");
LOG.error("Exception while pushing buffer data to end point. URL : {}, Exception : ", prometheusSinkConfiguration.getUrl(), e);
httpEndPointResponses = new HttpEndPointResponse(prometheusSinkConfiguration.getUrl(), HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
Expand Down Expand Up @@ -43,8 +44,9 @@

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.lenient;

public class PrometheusSinkServiceTest {

Expand Down Expand Up @@ -97,6 +99,10 @@ public class PrometheusSinkServiceTest {

private CloseableHttpResponse closeableHttpResponse;

private Counter prometheusSinkRecordsSuccessCounter;

private Counter prometheusSinkRecordsFailedCounter;

@BeforeEach
void setup() throws IOException {
this.pluginMetrics = mock(PluginMetrics.class);
Expand All @@ -107,10 +113,15 @@ void setup() throws IOException {
this.awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
this.closeableHttpClient = mock(CloseableHttpClient.class);
this.closeableHttpResponse = mock(CloseableHttpResponse.class);
this.prometheusSinkRecordsSuccessCounter = mock(Counter.class);
this.prometheusSinkRecordsFailedCounter = mock(Counter.class);
lenient().when(httpClientBuilder.setConnectionManager(null)).thenReturn(httpClientBuilder);
lenient().when(httpClientBuilder.addResponseInterceptorLast(any(FailedHttpResponseInterceptor.class))).thenReturn(httpClientBuilder);
lenient().when(httpClientBuilder.build()).thenReturn(closeableHttpClient);
lenient().when(closeableHttpClient.execute(any(ClassicHttpRequest.class),any(HttpClientContext.class))).thenReturn(closeableHttpResponse);
when(pluginMetrics.counter(PrometheusSinkService.PROMETHEUS_SINK_RECORDS_SUCCESS_COUNTER)).thenReturn(prometheusSinkRecordsSuccessCounter);
when(pluginMetrics.counter(PrometheusSinkService.PROMETHEUS_SINK_RECORDS_FAILED_COUNTER)).thenReturn(prometheusSinkRecordsFailedCounter);

}

PrometheusSinkService createObjectUnderTest(final int eventCount, final PrometheusSinkConfiguration httpSinkConfig) throws NoSuchFieldException, IllegalAccessException {
Expand Down Expand Up @@ -160,6 +171,7 @@ void prometheus_sink_service_test_output_with_single_record_for_jackson_sum() th
.withData("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")
.withEventMetadata(eventMetadata).build());
Collection<Record<Event>> records = List.of(eventRecord);
objectUnderTest.output(records);
assertDoesNotThrow(() -> { objectUnderTest.output(records);});
}

Expand Down

0 comments on commit c9692a8

Please sign in to comment.