Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2294 Flow migration #1850

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -1231,10 +1231,8 @@ In the list below, the names of required properties appear in bold. Any other pr
| invokehttp-proxy-username | | | Username to set when authenticating against proxy |
| invokehttp-proxy-password | | | Password to set when authenticating against proxy<br/>**Sensitive Property: true** |
| Content-type | application/octet-stream | | The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. In the case of an empty value after evaluating an expression language expression, Content-Type defaults to |
| send-message-body | true | true<br/>false | DEPRECATED. Only kept for backwards compatibility, no functionality is included. |
| Send Message Body | true | true<br/>false | If true, sends the HTTP message body on POST/PUT/PATCH requests (default). If false, suppresses the message body and content-type header for these requests. |
| Use Chunked Encoding | false | true<br/>false | When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header and instead send 'Transfer-Encoding' with a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 to pass data of unknown lengths in chunks. |
| Disable Peer Verification | false | true<br/>false | DEPRECATED. The value is ignored, peer and host verification are always performed when using SSL/TLS. |
| Put Response Body in Attribute | | | If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. |
| Always Output Response | false | true<br/>false | Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is |
| Penalize on "No Retry" | false | true<br/>false | Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship. |
Expand Down Expand Up @@ -2041,12 +2039,7 @@ In the list below, the names of required properties appear in bold. Any other pr
| Queue Max Message | 1000 | | Maximum number of messages allowed on the producer queue |
| Compress Codec | none | none<br/>gzip<br/>snappy | compression codec to use for compressing message sets |
| Max Flow Segment Size | 0 B | | Maximum flow content payload segment size for the kafka record. 0 B means unlimited. |
| Security CA | | | DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key |
| Security Cert | | | DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication |
| Security Private Key | | | DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication |
| Security Pass Phrase | | | DEPRECATED in favor of SSL Context Service.Private key passphrase<br/>**Sensitive Property: true** |
| Kafka Key | | | The key to use for the message. If not specified, the UUID of the flow file is used as the message key.<br/>**Supports Expression Language: true** |
| Message Key Field | | | DEPRECATED, does not work -- use Kafka Key instead |
| Debug contexts | | | A comma-separated list of debug contexts to enable.Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all |
| Fail empty flow files | true | true<br/>false | Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is deprecated. Use connections to drop empty flow files! |

Expand Down
1 change: 0 additions & 1 deletion docker/test/integration/features/kafka.feature
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And the Minifi logs contain the following message: "PublishKafka: client.id [client_no_42]" in less than 10 seconds
And the Minifi logs contain the following message: "PublishKafka: Message Key [unique_message_key_123]" in less than 10 seconds
And the Minifi logs contain the following message: "PublishKafka: DynamicProperty: [retry.backoff.ms] -> [150]" in less than 10 seconds
And the Minifi logs contain the following message: "The Message Key Field property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead." in less than 10 seconds

Scenario: PublishKafka sends flowfiles to failure when the broker is not available
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
Expand Down
2 changes: 1 addition & 1 deletion extensions/librdkafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use_bundled_librdkafka(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})

include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)

file(GLOB SOURCES "*.cpp")
file(GLOB SOURCES "*.cpp" "migrators/*.cpp")

add_minifi_library(minifi-rdkafka-extensions SHARED ${SOURCES})

Expand Down
13 changes: 0 additions & 13 deletions extensions/librdkafka/PublishKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,6 @@ void PublishKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio
conn_ = std::make_unique<KafkaConnection>(key_);
configureNewConnection(context);

std::string message_key_field;
if (context.getProperty(MessageKeyField, message_key_field) && !message_key_field.empty()) {
logger_->log_error("The {} property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead.", MessageKeyField.name);
}

logger_->log_debug("Successfully configured PublishKafka");
}

Expand Down Expand Up @@ -641,14 +636,6 @@ std::optional<utils::net::SslData> PublishKafka::getSslData(core::ProcessContext
}

utils::net::SslData ssl_data;
if (auto security_ca = context.getProperty(SecurityCA))
ssl_data.ca_loc = *security_ca;
if (auto security_cert = context.getProperty(SecurityCert))
ssl_data.cert_loc = *security_cert;
if (auto security_private_key = context.getProperty(SecurityPrivateKey))
ssl_data.key_loc = *security_private_key;
if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord))
ssl_data.key_pw = *security_private_key_pass;
return ssl_data;
}

Expand Down
21 changes: 0 additions & 21 deletions extensions/librdkafka/PublishKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,10 @@ class PublishKafka : public KafkaProcessorBase {
.withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
.withDefaultValue("0 B")
.build();
EXTENSIONAPI static constexpr auto SecurityCA = core::PropertyDefinitionBuilder<>::createProperty("Security CA")
.withDescription("DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key")
.build();
EXTENSIONAPI static constexpr auto SecurityCert = core::PropertyDefinitionBuilder<>::createProperty("Security Cert")
.withDescription("DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication")
.build();
EXTENSIONAPI static constexpr auto SecurityPrivateKey = core::PropertyDefinitionBuilder<>::createProperty("Security Private Key")
.withDescription("DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication")
.build();
EXTENSIONAPI static constexpr auto SecurityPrivateKeyPassWord = core::PropertyDefinitionBuilder<>::createProperty("Security Pass Phrase")
.withDescription("DEPRECATED in favor of SSL Context Service.Private key passphrase")
.isSensitive(true)
.build();
EXTENSIONAPI static constexpr auto KafkaKey = core::PropertyDefinitionBuilder<>::createProperty("Kafka Key")
.withDescription("The key to use for the message. If not specified, the UUID of the flow file is used as the message key.")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto MessageKeyField = core::PropertyDefinitionBuilder<>::createProperty("Message Key Field")
.withDescription("DEPRECATED, does not work -- use Kafka Key instead")
.build();
EXTENSIONAPI static constexpr auto DebugContexts = core::PropertyDefinitionBuilder<>::createProperty("Debug contexts")
.withDescription("A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all")
Expand Down Expand Up @@ -195,12 +179,7 @@ class PublishKafka : public KafkaProcessorBase {
QueueBufferMaxMessage,
CompressCodec,
MaxFlowSegSize,
SecurityCA,
SecurityCert,
SecurityPrivateKey,
SecurityPrivateKeyPassWord,
KafkaKey,
MessageKeyField,
DebugContexts,
FailEmptyFlowFiles
}));
Expand Down
83 changes: 83 additions & 0 deletions extensions/librdkafka/migrators/PublishKafkaMigrator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "PublishKafkaMigrator.h"

#include "core/Resource.h"
#include "core/flow/FlowSchema.h"
#include "controllers/SSLContextService.h"
#include "../PublishKafka.h"


namespace org::apache::nifi::minifi::kafka::migration {

namespace {
constexpr std::string_view DEPRECATED_MESSAGE_KEY_FIELD = "Message Key Field";
constexpr std::string_view DEPRECATED_SECURITY_CA = "Security CA";
constexpr std::string_view DEPRECATED_SECURITY_CERT = "Security Cert";
constexpr std::string_view DEPRECATED_SECURITY_PRIVATE_KEY = "Security Private Key";
constexpr std::string_view DEPRECATED_SECURITY_PASS_PHRASE = "Security Pass Phrase";

void migrateKafkaPropertyToSSLContextService(
const std::string_view deprecated_publish_kafka_property,
const std::string_view ssl_context_service_property,
core::flow::Node& publish_kafka_properties,
core::flow::Node& ssl_controller_service_properties) {
const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property);
if (const auto security_ca_str = security_ca ? security_ca.getString() : std::nullopt) {
ssl_controller_service_properties.addMember(ssl_context_service_property, *security_ca_str);
}

std::ignore = publish_kafka_properties.remove(deprecated_publish_kafka_property);
}
} // namespace

void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) {
auto publish_kafka_processors = getProcessors(root_node, schema, "PublishKafka");
for (auto& publish_kafka_processor : publish_kafka_processors) {
auto publish_kafka_properties = publish_kafka_processor[schema.processor_properties];
if (publish_kafka_properties.remove(DEPRECATED_MESSAGE_KEY_FIELD)) {
logger_->log_warn("Removed deprecated property \"{}\" from {}", DEPRECATED_MESSAGE_KEY_FIELD, *publish_kafka_processor[schema.identifier].getString());
}
if (publish_kafka_properties.contains(DEPRECATED_SECURITY_CA) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_CERT) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_PRIVATE_KEY) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_PASS_PHRASE)) {
std::string publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString().value_or(std::string{utils::IdGenerator::getIdGenerator()->generate().to_string()});
auto ssl_context_service_name = fmt::format("GeneratedSSLContextServiceFor_{}", publish_kafka_id_str);
auto root_group = root_node[schema.root_group];
auto controller_services = root_group[schema.controller_services];
auto ssl_controller_service = *controller_services.pushBack();
ssl_controller_service.addMember(schema.name[0], ssl_context_service_name);
ssl_controller_service.addMember(schema.identifier[0], utils::IdGenerator::getIdGenerator()->generate().to_string().c_str());
ssl_controller_service.addMember(schema.type[0], "SSLContextService");

publish_kafka_properties.addMember(processors::PublishKafka::SSLContextService.name, ssl_context_service_name);
auto ssl_controller_service_properties = ssl_controller_service.addObject(schema.controller_service_properties[0]);

migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CA, controllers::SSLContextService::CACertificate.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CERT, controllers::SSLContextService::ClientCertificate.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PRIVATE_KEY, controllers::SSLContextService::PrivateKey.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PASS_PHRASE, controllers::SSLContextService::Passphrase.name, publish_kafka_properties, *ssl_controller_service_properties);

logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", *publish_kafka_processor[schema.identifier].getString());
}
}
}

REGISTER_RESOURCE(PublishKafkaMigrator, FlowMigrator);
} // namespace org::apache::nifi::minifi::kafka::migration
Loading