diff --git a/PROCESSORS.md b/PROCESSORS.md
index b1ea31cb31..b88a38de51 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1231,10 +1231,8 @@ In the list below, the names of required properties appear in bold. Any other pr
| invokehttp-proxy-username | | | Username to set when authenticating against proxy |
| invokehttp-proxy-password | | | Password to set when authenticating against proxy
**Sensitive Property: true** |
| Content-type | application/octet-stream | | The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. In the case of an empty value after evaluating an expression language expression, Content-Type defaults to |
-| send-message-body | true | true
false | DEPRECATED. Only kept for backwards compatibility, no functionality is included. |
| Send Message Body | true | true
false | If true, sends the HTTP message body on POST/PUT/PATCH requests (default). If false, suppresses the message body and content-type header for these requests. |
| Use Chunked Encoding | false | true
false | When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header and instead send 'Transfer-Encoding' with a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 to pass data of unknown lengths in chunks. |
-| Disable Peer Verification | false | true
false | DEPRECATED. The value is ignored, peer and host verification are always performed when using SSL/TLS. |
| Put Response Body in Attribute | | | If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. |
| Always Output Response | false | true
false | Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is |
| Penalize on "No Retry" | false | true
false | Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship. |
@@ -2041,12 +2039,7 @@ In the list below, the names of required properties appear in bold. Any other pr
| Queue Max Message | 1000 | | Maximum number of messages allowed on the producer queue |
| Compress Codec | none | none
gzip
snappy | compression codec to use for compressing message sets |
| Max Flow Segment Size | 0 B | | Maximum flow content payload segment size for the kafka record. 0 B means unlimited. |
-| Security CA | | | DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key |
-| Security Cert | | | DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication |
-| Security Private Key | | | DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication |
-| Security Pass Phrase | | | DEPRECATED in favor of SSL Context Service.Private key passphrase
**Sensitive Property: true** |
| Kafka Key | | | The key to use for the message. If not specified, the UUID of the flow file is used as the message key.
**Supports Expression Language: true** |
-| Message Key Field | | | DEPRECATED, does not work -- use Kafka Key instead |
| Debug contexts | | | A comma-separated list of debug contexts to enable.Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all |
| Fail empty flow files | true | true
false | Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is deprecated. Use connections to drop empty flow files! |
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index 02aca2cc05..14c00f2bb4 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -60,7 +60,6 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And the Minifi logs contain the following message: "PublishKafka: client.id [client_no_42]" in less than 10 seconds
And the Minifi logs contain the following message: "PublishKafka: Message Key [unique_message_key_123]" in less than 10 seconds
And the Minifi logs contain the following message: "PublishKafka: DynamicProperty: [retry.backoff.ms] -> [150]" in less than 10 seconds
- And the Minifi logs contain the following message: "The Message Key Field property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead." in less than 10 seconds
Scenario: PublishKafka sends flowfiles to failure when the broker is not available
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
diff --git a/extensions/librdkafka/CMakeLists.txt b/extensions/librdkafka/CMakeLists.txt
index 74fff6ceda..c7b4d20c67 100644
--- a/extensions/librdkafka/CMakeLists.txt
+++ b/extensions/librdkafka/CMakeLists.txt
@@ -26,7 +26,7 @@ use_bundled_librdkafka(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-file(GLOB SOURCES "*.cpp")
+file(GLOB SOURCES "*.cpp" "migrators/*.cpp")
add_minifi_library(minifi-rdkafka-extensions SHARED ${SOURCES})
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 657b452f5c..ab22e7abf2 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -388,11 +388,6 @@ void PublishKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio
conn_ = std::make_unique(key_);
configureNewConnection(context);
- std::string message_key_field;
- if (context.getProperty(MessageKeyField, message_key_field) && !message_key_field.empty()) {
- logger_->log_error("The {} property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead.", MessageKeyField.name);
- }
-
logger_->log_debug("Successfully configured PublishKafka");
}
@@ -641,14 +636,6 @@ std::optional PublishKafka::getSslData(core::ProcessContext
}
utils::net::SslData ssl_data;
- if (auto security_ca = context.getProperty(SecurityCA))
- ssl_data.ca_loc = *security_ca;
- if (auto security_cert = context.getProperty(SecurityCert))
- ssl_data.cert_loc = *security_cert;
- if (auto security_private_key = context.getProperty(SecurityPrivateKey))
- ssl_data.key_loc = *security_private_key;
- if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord))
- ssl_data.key_pw = *security_private_key_pass;
return ssl_data;
}
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index df38820ec8..46d2ad1a82 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -148,26 +148,10 @@ class PublishKafka : public KafkaProcessorBase {
.withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
.withDefaultValue("0 B")
.build();
- EXTENSIONAPI static constexpr auto SecurityCA = core::PropertyDefinitionBuilder<>::createProperty("Security CA")
- .withDescription("DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key")
- .build();
- EXTENSIONAPI static constexpr auto SecurityCert = core::PropertyDefinitionBuilder<>::createProperty("Security Cert")
- .withDescription("DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication")
- .build();
- EXTENSIONAPI static constexpr auto SecurityPrivateKey = core::PropertyDefinitionBuilder<>::createProperty("Security Private Key")
- .withDescription("DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication")
- .build();
- EXTENSIONAPI static constexpr auto SecurityPrivateKeyPassWord = core::PropertyDefinitionBuilder<>::createProperty("Security Pass Phrase")
- .withDescription("DEPRECATED in favor of SSL Context Service.Private key passphrase")
- .isSensitive(true)
- .build();
EXTENSIONAPI static constexpr auto KafkaKey = core::PropertyDefinitionBuilder<>::createProperty("Kafka Key")
.withDescription("The key to use for the message. If not specified, the UUID of the flow file is used as the message key.")
.supportsExpressionLanguage(true)
.build();
- EXTENSIONAPI static constexpr auto MessageKeyField = core::PropertyDefinitionBuilder<>::createProperty("Message Key Field")
- .withDescription("DEPRECATED, does not work -- use Kafka Key instead")
- .build();
EXTENSIONAPI static constexpr auto DebugContexts = core::PropertyDefinitionBuilder<>::createProperty("Debug contexts")
.withDescription("A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all")
@@ -195,12 +179,7 @@ class PublishKafka : public KafkaProcessorBase {
QueueBufferMaxMessage,
CompressCodec,
MaxFlowSegSize,
- SecurityCA,
- SecurityCert,
- SecurityPrivateKey,
- SecurityPrivateKeyPassWord,
KafkaKey,
- MessageKeyField,
DebugContexts,
FailEmptyFlowFiles
}));
diff --git a/extensions/librdkafka/migrators/PublishKafkaMigrator.cpp b/extensions/librdkafka/migrators/PublishKafkaMigrator.cpp
new file mode 100644
index 0000000000..0981376f92
--- /dev/null
+++ b/extensions/librdkafka/migrators/PublishKafkaMigrator.cpp
@@ -0,0 +1,83 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PublishKafkaMigrator.h"
+
+#include "core/Resource.h"
+#include "core/flow/FlowSchema.h"
+#include "controllers/SSLContextService.h"
+#include "../PublishKafka.h"
+
+
+namespace org::apache::nifi::minifi::kafka::migration {
+
+namespace {
+constexpr std::string_view DEPRECATED_MESSAGE_KEY_FIELD = "Message Key Field";
+constexpr std::string_view DEPRECATED_SECURITY_CA = "Security CA";
+constexpr std::string_view DEPRECATED_SECURITY_CERT = "Security Cert";
+constexpr std::string_view DEPRECATED_SECURITY_PRIVATE_KEY = "Security Private Key";
+constexpr std::string_view DEPRECATED_SECURITY_PASS_PHRASE = "Security Pass Phrase";
+
+void migrateKafkaPropertyToSSLContextService(
+ const std::string_view deprecated_publish_kafka_property,
+ const std::string_view ssl_context_service_property,
+ core::flow::Node& publish_kafka_properties,
+ core::flow::Node& ssl_controller_service_properties) {
+ const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property);
+ if (const auto security_ca_str = security_ca ? security_ca.getString() : std::nullopt) {
+ ssl_controller_service_properties.addMember(ssl_context_service_property, *security_ca_str);
+ }
+
+ std::ignore = publish_kafka_properties.remove(deprecated_publish_kafka_property);
+}
+} // namespace
+
+void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) {
+ auto publish_kafka_processors = getProcessors(root_node, schema, "PublishKafka");
+ for (auto& publish_kafka_processor : publish_kafka_processors) {
+ auto publish_kafka_properties = publish_kafka_processor[schema.processor_properties];
+ if (publish_kafka_properties.remove(DEPRECATED_MESSAGE_KEY_FIELD)) {
+ logger_->log_warn("Removed deprecated property \"{}\" from {}", DEPRECATED_MESSAGE_KEY_FIELD, *publish_kafka_processor[schema.identifier].getString());
+ }
+ if (publish_kafka_properties.contains(DEPRECATED_SECURITY_CA) ||
+ publish_kafka_properties.contains(DEPRECATED_SECURITY_CERT) ||
+ publish_kafka_properties.contains(DEPRECATED_SECURITY_PRIVATE_KEY) ||
+ publish_kafka_properties.contains(DEPRECATED_SECURITY_PASS_PHRASE)) {
+ std::string publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString().value_or(std::string{utils::IdGenerator::getIdGenerator()->generate().to_string()});
+ auto ssl_context_service_name = fmt::format("GeneratedSSLContextServiceFor_{}", publish_kafka_id_str);
+ auto root_group = root_node[schema.root_group];
+ auto controller_services = root_group[schema.controller_services];
+ auto ssl_controller_service = *controller_services.pushBack();
+ ssl_controller_service.addMember(schema.name[0], ssl_context_service_name);
+ ssl_controller_service.addMember(schema.identifier[0], utils::IdGenerator::getIdGenerator()->generate().to_string().c_str());
+ ssl_controller_service.addMember(schema.type[0], "SSLContextService");
+
+ publish_kafka_properties.addMember(processors::PublishKafka::SSLContextService.name, ssl_context_service_name);
+ auto ssl_controller_service_properties = ssl_controller_service.addObject(schema.controller_service_properties[0]);
+
+ migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CA, controllers::SSLContextService::CACertificate.name, publish_kafka_properties, *ssl_controller_service_properties);
+ migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CERT, controllers::SSLContextService::ClientCertificate.name, publish_kafka_properties, *ssl_controller_service_properties);
+ migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PRIVATE_KEY, controllers::SSLContextService::PrivateKey.name, publish_kafka_properties, *ssl_controller_service_properties);
+ migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PASS_PHRASE, controllers::SSLContextService::Passphrase.name, publish_kafka_properties, *ssl_controller_service_properties);
+
+ logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", *publish_kafka_processor[schema.identifier].getString());
+ }
+ }
+}
+
+REGISTER_RESOURCE(PublishKafkaMigrator, FlowMigrator);
+} // namespace org::apache::nifi::minifi::kafka::migration
diff --git a/extensions/librdkafka/migrators/PublishKafkaMigrator.h b/extensions/librdkafka/migrators/PublishKafkaMigrator.h
new file mode 100644
index 0000000000..04eb11c51e
--- /dev/null
+++ b/extensions/librdkafka/migrators/PublishKafkaMigrator.h
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "core/flow/FlowMigrator.h"
+#include "core/flow/FlowSchema.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::kafka::migration {
+
+class PublishKafkaMigrator final : public core::flow::FlowMigrator {
+ public:
+ explicit PublishKafkaMigrator(const std::string_view name, const utils::Identifier& uuid = {}) : FlowMigrator(name, uuid) {}
+
+ void migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) override;
+
+ private:
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger();
+};
+
+
+} // namespace org::apache::nifi::minifi::kafka::migration
diff --git a/extensions/librdkafka/tests/PublishKafkaMigratorTests.cpp b/extensions/librdkafka/tests/PublishKafkaMigratorTests.cpp
new file mode 100644
index 0000000000..afd3188709
--- /dev/null
+++ b/extensions/librdkafka/tests/PublishKafkaMigratorTests.cpp
@@ -0,0 +1,197 @@
+/**
+*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "PublishKafka.h"
+#include "unit/ConfigurationTestController.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+#include "yaml-cpp/yaml.h"
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("PublishKafkaMigratorTest yaml") {
+ static constexpr std::string_view ORIGINAL_YAML = R"(
+MiNiFi Config Version: 3
+Flow Controller:
+ name: MiNiFi Flow
+Processors:
+ - name: Get files from /tmp/input
+ id: 7fd166aa-0662-4c42-affa-88f6fb39807f
+ class: org.apache.nifi.processors.standard.GetFile
+ scheduling period: 2 sec
+ scheduling strategy: TIMER_DRIVEN
+ Properties:
+ Input Directory: /tmp/input
+ - name: Publish messages to Kafka topic test
+ id: 8a534b4a-2b4a-4e1e-ab07-8a09fa08f848
+ class: org.apache.nifi.processors.standard.PublishKafka
+ scheduling strategy: EVENT_DRIVEN
+ auto-terminated relationships list:
+ - success
+ - failure
+ Properties:
+ Message Key Field: foo
+ Batch Size: 10
+ Client Name: test-client
+ Compress Codec: none
+ Delivery Guarantee: 1
+ Known Brokers: kafka-broker:9092
+ Message Timeout: 12 sec
+ Request Timeout: 10 sec
+ Topic Name: test
+ Security CA: /tmp/resources/certs/ca-cert
+ Security Cert: /tmp/resources/certs/client_test_client_client.pem
+ Security Pass Phrase: abcdefgh
+ Security Private Key: /tmp/resources/certs/client_test_client_client.key
+Connections:
+ - name: GetFile/success/PublishKafka
+ id: 1edd529e-eee9-4b05-9e35-f1607bb0243b
+ source id: 7fd166aa-0662-4c42-affa-88f6fb39807f
+ source relationship name: success
+ destination id: 8a534b4a-2b4a-4e1e-ab07-8a09fa08f848
+Controller Services: []
+Remote Processing Groups: []
+)";
+ ConfigurationTestController test_controller;
+ std::string serialized_flow_definition;
+ SECTION("YamlConfiguration") {
+ core::YamlConfiguration yaml_config(test_controller.getContext());
+ auto root_flow_definition = yaml_config.getRootFromPayload(std::string{ORIGINAL_YAML});
+ REQUIRE(root_flow_definition);
+ serialized_flow_definition = yaml_config.serialize(*root_flow_definition);
+ }
+ SECTION("Adaptive Yaml Configuration") {
+ core::flow::AdaptiveConfiguration adaptive_configuration(test_controller.getContext());
+ auto root_flow_definition = adaptive_configuration.getRootFromPayload(std::string{ORIGINAL_YAML});
+ REQUIRE(root_flow_definition);
+ serialized_flow_definition = adaptive_configuration.serialize(*root_flow_definition);
+ }
+ YAML::Node migrated_flow = YAML::Load(std::string{serialized_flow_definition});
+ CHECK(migrated_flow["Controller Services"].IsSequence());
+ CHECK(migrated_flow["Controller Services"].size() == 1);
+ CHECK(migrated_flow["Controller Services"][0]["name"].as() == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848");
+ CHECK(migrated_flow["Controller Services"][0]["class"].as() == "SSLContextService");
+ CHECK(migrated_flow["Controller Services"][0]["Properties"].IsMap());
+ CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as() == "/tmp/resources/certs/ca-cert");
+ CHECK(migrated_flow["Controller Services"][0]["Properties"]["Client Certificate"].as() == "/tmp/resources/certs/client_test_client_client.pem");
+ CHECK(migrated_flow["Controller Services"][0]["Properties"]["Private Key"].as() == "/tmp/resources/certs/client_test_client_client.key");
+ CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as() == "/tmp/resources/certs/ca-cert");
+
+ CHECK(!migrated_flow["Processors"][1]["Properties"]["Message Key Field"].IsDefined());
+
+ CHECK(!migrated_flow["Processors"][1]["Properties"]["Security CA"].IsDefined());
+ CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Cert"].IsDefined());
+ CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Pass Phrase"].IsDefined());
+ CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Private Key"].IsDefined());
+
+ CHECK(migrated_flow["Processors"][1]["Properties"]["SSL Context Service"].as() == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848");
+}
+
+TEST_CASE("PublishKafkaMigratorTest json") {
+ static constexpr std::string_view ORIGINAL_JSON = R"(
+{
+ "rootGroup": {
+ "name": "MiNiFi Flow",
+ "processors": [
+ {
+ "name": "Get files from /tmp/input",
+ "identifier": "7fd166aa-0662-4c42-affa-88f6fb39807f",
+ "type": "org.apache.nifi.processors.standard.GetFile",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "schedulingPeriod": "2 sec",
+ "properties": {
+ "Input Directory": "/tmp/input"
+ },
+ "autoTerminatedRelationships": []
+ },
+ {
+ "name": "Publish messages to Kafka topic test",
+ "identifier": "8a534b4a-2b4a-4e1e-ab07-8a09fa08f848",
+ "type": "org.apache.nifi.processors.standard.PublishKafka",
+ "schedulingStrategy": "EVENT_DRIVEN",
+ "properties": {
+ "Batch Size": "10",
+ "Client Name": "test-client",
+ "Compress Codec": "none",
+ "Delivery Guarantee": "1",
+ "Known Brokers": "kafka-broker:9092",
+ "Message Timeout": "12 sec",
+ "Request Timeout": "10 sec",
+ "Topic Name": "test",
+ "Security CA": "/tmp/resources/certs/ca-cert",
+ "Security Cert": "/tmp/resources/certs/client_test_client_client.pem",
+ "Security Pass Phrase": "abcdefgh",
+ "Security Private Key": "/tmp/resources/certs/client_test_client_client.key"
+ },
+ "autoTerminatedRelationships": [
+ "success",
+ "failure"
+ ]
+ }
+ ],
+ "connections": [
+ {
+ "identifier": "1edd529e-eee9-4b05-9e35-f1607bb0243b",
+ "name": "GetFile/success/PublishKafka",
+ "source": {
+ "id": "7fd166aa-0662-4c42-affa-88f6fb39807f"
+ },
+ "destination": {
+ "id": "8a534b4a-2b4a-4e1e-ab07-8a09fa08f848"
+ },
+ "selectedRelationships": [
+ "success"
+ ]
+ }
+ ],
+ "remoteProcessGroups": [],
+ "controllerServices": []
+ }
+}
+)";
+ ConfigurationTestController test_controller;
+ core::flow::AdaptiveConfiguration adaptive_configuration(test_controller.getContext());
+ auto root_flow_definition = adaptive_configuration.getRootFromPayload(std::string{ORIGINAL_JSON});
+ REQUIRE(root_flow_definition);
+ std::string serialized_flow_definition = adaptive_configuration.serialize(*root_flow_definition);
+ rapidjson::Document migrated_flow;
+ const rapidjson::ParseResult json_parse_result = migrated_flow.Parse(serialized_flow_definition.c_str(), serialized_flow_definition.length());
+ REQUIRE(json_parse_result);
+ CHECK(migrated_flow["rootGroup"]["controllerServices"].IsArray());
+ CHECK(migrated_flow["rootGroup"]["controllerServices"].Size() == 1);
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["name"] == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848");
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["type"] == "SSLContextService");
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"].IsObject());
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["CA Certificate"] == "/tmp/resources/certs/ca-cert");
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["Client Certificate"] == "/tmp/resources/certs/client_test_client_client.pem");
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["Private Key"] == "/tmp/resources/certs/client_test_client_client.key");
+ CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["CA Certificate"] == "/tmp/resources/certs/ca-cert");
+
+ CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Message Key Field"));
+
+ CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security CA"));
+ CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Cert"));
+ CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Pass Phrase"));
+ CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Private Key"));
+
+ CHECK(migrated_flow["rootGroup"]["processors"][1]["properties"]["SSL Context Service"] == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848");
+}
+
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/extensions/python/PythonCreator.h b/extensions/python/PythonCreator.h
index 98717a2d5f..c8cc042fc0 100644
--- a/extensions/python/PythonCreator.h
+++ b/extensions/python/PythonCreator.h
@@ -54,7 +54,7 @@ class PythonCreator : public minifi::core::CoreComponent {
~PythonCreator() override {
for (const auto& clazz : registered_classes_) {
- core::getClassLoader().unregisterClass(clazz);
+ core::getClassLoader().unregisterClass(clazz, ResourceType::Processor);
}
}
@@ -88,11 +88,11 @@ class PythonCreator : public minifi::core::CoreComponent {
}
logger_->log_info("Registering NiFi python processor: {}", class_name);
core::getClassLoader().registerClass(class_name, std::make_unique(path.string(), script_name.string(),
- PythonProcessorType::NIFI_TYPE, std::vector{python_lib_path, std::filesystem::path{pathListings.value()}, path.parent_path()}, qualified_module_name));
+ PythonProcessorType::NIFI_TYPE, std::vector{python_lib_path, std::filesystem::path{pathListings.value()}, path.parent_path()}, qualified_module_name), ResourceType::Processor);
} else {
logger_->log_info("Registering MiNiFi python processor: {}", class_name);
core::getClassLoader().registerClass(class_name, std::make_unique(path.string(), script_name.string(),
- PythonProcessorType::MINIFI_TYPE, std::vector{python_lib_path, std::filesystem::path{pathListings.value()}}, qualified_module_name));
+ PythonProcessorType::MINIFI_TYPE, std::vector{python_lib_path, std::filesystem::path{pathListings.value()}}, qualified_module_name), ResourceType::Processor);
}
registered_classes_.push_back(class_name);
try {
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index f62003e758..8b0bcb1b3c 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -20,7 +20,7 @@
include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-file(GLOB SOURCES "processors/*.cpp" "controllers/*.cpp" "utils/*.cpp" "modbus/*.cpp")
+file(GLOB SOURCES "processors/*.cpp" "controllers/*.cpp" "utils/*.cpp" "modbus/*.cpp" "migrators/*.cpp")
add_minifi_library(minifi-standard-processors SHARED ${SOURCES})
target_include_directories(minifi-standard-processors PUBLIC "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
diff --git a/extensions/standard-processors/migrators/InvokeHTTPMigrator.cpp b/extensions/standard-processors/migrators/InvokeHTTPMigrator.cpp
new file mode 100644
index 0000000000..5072bfd6c6
--- /dev/null
+++ b/extensions/standard-processors/migrators/InvokeHTTPMigrator.cpp
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "InvokeHTTPMigrator.h"
+
+#include "core/Resource.h"
+#include "flow/FlowSchema.h"
+
+namespace org::apache::nifi::minifi::standard::migration {
+
+void InvokeHTTPMigrator::migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) {
+ auto invoke_http_processors = getProcessors(root_node, schema, "InvokeHTTP");
+ for (auto& invoke_http_processor : invoke_http_processors) {
+ auto invoke_http_properties = invoke_http_processor[schema.processor_properties];
+ if (invoke_http_properties.remove("Send Body")) {
+ logger_->log_warn("Removed deprecated property \"Send Body\" from {}", *invoke_http_processor[schema.identifier].getString());
+ }
+ if (invoke_http_properties.remove("Disable Peer Verification")) {
+ logger_->log_warn("Removed deprecated property \"Disable Peer Verification\" from {}", *invoke_http_processor[schema.identifier].getString());
+ }
+ }
+}
+
+REGISTER_RESOURCE(InvokeHTTPMigrator, FlowMigrator);
+} // namespace org::apache::nifi::minifi::standard::migration
+
diff --git a/extensions/standard-processors/migrators/InvokeHTTPMigrator.h b/extensions/standard-processors/migrators/InvokeHTTPMigrator.h
new file mode 100644
index 0000000000..ca2e4bac84
--- /dev/null
+++ b/extensions/standard-processors/migrators/InvokeHTTPMigrator.h
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "core/flow/FlowMigrator.h"
+#include "core/flow/FlowSchema.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::standard::migration {
+
+class InvokeHTTPMigrator final : public core::flow::FlowMigrator {
+ public:
+ explicit InvokeHTTPMigrator(const std::string_view name, const utils::Identifier& uuid = {}) : FlowMigrator(name, uuid) {}
+
+ void migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) override;
+
+ private:
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger();
+};
+
+
+} // namespace org::apache::nifi::minifi::standard::migration
diff --git a/extensions/standard-processors/processors/InvokeHTTP.h b/extensions/standard-processors/processors/InvokeHTTP.h
index bb5e9d2837..76211468d1 100644
--- a/extensions/standard-processors/processors/InvokeHTTP.h
+++ b/extensions/standard-processors/processors/InvokeHTTP.h
@@ -131,11 +131,6 @@ class InvokeHTTP : public core::Processor {
"Content-Type defaults to")
.withDefaultValue("application/octet-stream")
.build();
- EXTENSIONAPI static constexpr auto SendBody = core::PropertyDefinitionBuilder<>::createProperty("send-message-body", "Send Body")
- .withDescription("DEPRECATED. Only kept for backwards compatibility, no functionality is included.")
- .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
- .withDefaultValue("true")
- .build();
EXTENSIONAPI static constexpr auto SendMessageBody = core::PropertyDefinitionBuilder<>::createProperty("Send Message Body")
.withDescription("If true, sends the HTTP message body on POST/PUT/PATCH requests (default). "
"If false, suppresses the message body and content-type header for these requests.")
@@ -149,11 +144,6 @@ class InvokeHTTP : public core::Processor {
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("false")
.build();
- EXTENSIONAPI static constexpr auto DisablePeerVerification = core::PropertyDefinitionBuilder<>::createProperty("Disable Peer Verification")
- .withDescription("DEPRECATED. The value is ignored, peer and host verification are always performed when using SSL/TLS.")
- .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
- .withDefaultValue("false")
- .build();
EXTENSIONAPI static constexpr auto PutResponseBodyInAttribute = core::PropertyDefinitionBuilder<>::createProperty("Put Response Body in Attribute")
.withDescription("If set, the response body received back will be put into an attribute of the original "
"FlowFile instead of a separate FlowFile. "
@@ -201,10 +191,8 @@ class InvokeHTTP : public core::Processor {
ProxyUsername,
ProxyPassword,
ContentType,
- SendBody,
SendMessageBody,
UseChunkedEncoding,
- DisablePeerVerification,
PutResponseBodyInAttribute,
AlwaysOutputResponse,
PenalizeOnNoRetry,
@@ -213,7 +201,6 @@ class InvokeHTTP : public core::Processor {
DownloadSpeedLimit
});
-
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success",
"The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the success of the request."};
EXTENSIONAPI static constexpr auto RelResponse = core::RelationshipDefinition{"response",
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index bf444a0cf1..c816e09562 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -888,7 +888,6 @@ Security Properties:
Always Output Response: 'false'
Connection Timeout: 5 s
Content-type: application/octet-stream
- Disable Peer Verification: 'false'
Follow Redirects: 'true'
HTTP Method: POST
Include Date Header: 'true'
@@ -1020,7 +1019,6 @@ Security Properties:
Always Output Response: false
Connection Timeout: 5 s
Content-type: application/octet-stream
- Disable Peer Verification: false
Follow Redirects: true
HTTP Method: POST
Include Date Header: true
diff --git a/libminifi/include/agent/agent_docs.h b/libminifi/include/agent/agent_docs.h
index a234478c35..8814c813a7 100644
--- a/libminifi/include/agent/agent_docs.h
+++ b/libminifi/include/agent/agent_docs.h
@@ -34,7 +34,7 @@
namespace org::apache::nifi::minifi {
enum class ResourceType {
- Processor, ControllerService, InternalResource, DescriptionOnly
+ Processor, ControllerService, InternalResource, DescriptionOnly, FlowMigrator
};
struct ClassDescription {
diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h
index 96cbd1ba88..67e2ef93f7 100644
--- a/libminifi/include/core/ClassLoader.h
+++ b/libminifi/include/core/ClassLoader.h
@@ -22,16 +22,14 @@
#include
#include
#include