From e109055eff5b54f13065fa922f48b3937d86c60e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Thu, 16 Nov 2023 15:06:12 +0530 Subject: [PATCH] Remove explicit publish of failed message to EventPublishService MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../client/listener/TransactionConsumer.java | 24 ++++--- .../client/service/EventPublishService.java | 17 +---- .../service/impl/EventPublishServiceImpl.java | 69 ------------------- 3 files changed, 14 insertions(+), 96 deletions(-) diff --git a/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java b/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java index 021a4c5d..ef034b9a 100644 --- a/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java +++ b/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java @@ -6,7 +6,6 @@ import hlf.java.rest.client.exception.ServiceException; import hlf.java.rest.client.metrics.EmitKafkaCustomMetrics; import hlf.java.rest.client.model.MultiDataTransactionPayload; -import hlf.java.rest.client.service.EventPublishService; import hlf.java.rest.client.service.TransactionFulfillment; import hlf.java.rest.client.util.FabricClientConstants; import java.nio.charset.StandardCharsets; @@ -30,11 +29,8 @@ public class TransactionConsumer { private static final String PAYLOAD_KIND = "payload_kind"; private static final String PL_KIND_MULTI_DATA = "multi_data"; - @Autowired TransactionFulfillment transactionFulfillment; - @Autowired ObjectMapper objectMapper; - - @Autowired(required = false) - EventPublishService eventPublishServiceImpl; + @Autowired private TransactionFulfillment transactionFulfillment; + @Autowired private ObjectMapper objectMapper; /** * This method routes the kafka messages to appropriate methods and acknowledges once processing @@ -125,9 +121,9 @@ public void listen(ConsumerRecord message) { if (isIdentifiableFunction(networkName, contractName, transactionFunctionName) && !transactionParams.isEmpty()) { - if (null != peerNames && !peerNames.isEmpty()) { + if (!peerNames.isEmpty()) { List lstPeerNames = Arrays.asList(peerNames.split(",")); - if (null != lstPeerNames && !lstPeerNames.isEmpty()) { + if (!lstPeerNames.isEmpty()) { if (StringUtils.isNotBlank(collections) && StringUtils.isNotBlank(transientKey)) { transactionFulfillment.writePrivateTransactionToLedger( networkName, @@ -166,13 +162,19 @@ public void listen(ConsumerRecord message) { } } else { - log.info("Incorrect Transaction Payload"); + log.error("Incorrect Transaction Payload"); + throw new ServiceException( + ErrorCode.VALIDATION_FAILED, + "Inbound transaction format is incorrect or doesn't contain valid parameters."); } } catch (FabricTransactionException fte) { - eventPublishServiceImpl.publishTransactionFailureEvent( - fte.getMessage(), networkName, contractName, transactionFunctionName, transactionParams); log.error("Error in Submitting Transaction - Exception - " + fte.getMessage()); + /* + If the error handler has dead letter publish enabled, the errored Record header will be enriched by extracting + the error cause and message from the thrown exception. + */ + throw fte; } catch (Exception ex) { log.error("Error in Kafka Listener - Message Format exception - " + ex.getMessage()); throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, ex.getMessage()); diff --git a/src/main/java/hlf/java/rest/client/service/EventPublishService.java b/src/main/java/hlf/java/rest/client/service/EventPublishService.java index c70fbaa8..ce94f863 100644 --- a/src/main/java/hlf/java/rest/client/service/EventPublishService.java +++ b/src/main/java/hlf/java/rest/client/service/EventPublishService.java @@ -4,7 +4,7 @@ /** * The EventPublishService is a service class, which include the kafka template. It sends the - * Message to the the Event Kafka message topic + * Message to the Event Kafka message topic */ @ConditionalOnProperty("kafka.event-listener.brokerHost") public interface EventPublishService { @@ -49,19 +49,4 @@ boolean publishBlockEvents( String chaincodeName, String functionName, Boolean isPrivateDataPresent); - - /** - * @param errorMsg contents of the error message - * @param networkName channel name in Fabric - * @param contractName chaincode name in the Fabric - * @param functionName function name in a given chaincode. - * @param parameters parameters sent to the chaincode - * @return status of the published message to Kafka, successful or not - */ - boolean publishTransactionFailureEvent( - String errorMsg, - String networkName, - String contractName, - String functionName, - String parameters); } diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 04513a68..7b2d4d83 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -222,73 +222,4 @@ public void onFailure(Throwable ex) { return status; } - - @Override - public boolean publishTransactionFailureEvent( - String errorMsg, - String networkName, - String contractName, - String functionName, - String parameters) { - boolean status = true; - - try { - - ProducerRecord producerRecord = - new ProducerRecord(topicName, functionName, parameters); - - producerRecord - .headers() - .add(new RecordHeader(FabricClientConstants.ERROR_MSG, errorMsg.getBytes())); - - producerRecord - .headers() - .add( - new RecordHeader( - FabricClientConstants.FABRIC_CHAINCODE_NAME, contractName.getBytes())); - producerRecord - .headers() - .add(new RecordHeader(FabricClientConstants.FABRIC_CHANNEL_NAME, networkName.getBytes())); - - producerRecord - .headers() - .add( - new RecordHeader( - FabricClientConstants.FABRIC_EVENT_TYPE, - FabricClientConstants.FABRIC_EVENT_TYPE_ERROR.getBytes())); - - producerRecord - .headers() - .add( - new RecordHeader( - FabricClientConstants.FABRIC_EVENT_FUNC_NAME, functionName.getBytes())); - - ListenableFuture> future = kafkaTemplate.send(producerRecord); - - future.addCallback( - new ListenableFutureCallback>() { - - @Override - public void onSuccess(SendResult result) { - log.info( - "Sent message=[" - + parameters - + "] with offset=[" - + result.getRecordMetadata().offset() - + "]"); - } - - @Override - public void onFailure(Throwable ex) { - log.error("Unable to send message=[" + parameters + "] due to : " + ex.getMessage()); - } - }); - - } catch (Exception ex) { - status = false; - log.error("Error sending message - " + ex.getMessage()); - } - - return status; - } }