Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2462 - Divide libminifi #1870

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ endfunction()
SET(TEST_DIR ${CMAKE_SOURCE_DIR}/libminifi/test)
include(Extensions)

add_subdirectory(core)
add_subdirectory(utils)
add_subdirectory(extension-utils)
add_subdirectory(libminifi)

if (ENABLE_ALL OR ENABLE_AZURE)
Expand Down
6 changes: 3 additions & 3 deletions controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::shared_ptr<minifi::core::controller::ControllerService> getControllerServic
const std::string &service_name) {
std::string nifi_configuration_class_name = "adaptiveconfiguration";

minifi::core::extension::ExtensionManager::get().initialize(configuration);
minifi::core::extension::ExtensionManagerImpl::get().initialize(configuration);

configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
auto flow_configuration = minifi::core::createFlowConfiguration(
Expand Down Expand Up @@ -78,7 +78,7 @@ std::shared_ptr<minifi::controllers::SSLContextService> getSSLContextService(con
if (nullptr == secure_context) {
std::string secureStr;
if (configuration->get(minifi::Configure::nifi_remote_input_secure, secureStr) && minifi::utils::string::toBool(secureStr).value_or(false)) {
secure_context = std::make_shared<minifi::controllers::SSLContextService>("ControllerSocketProtocolSSL", configuration);
secure_context = std::make_shared<minifi::controllers::SSLContextServiceImpl>("ControllerSocketProtocolSSL", configuration);
secure_context->onEnable();
}
} else {
Expand All @@ -96,7 +96,7 @@ int main(int argc, char **argv) {
return -1;
}

const auto configuration = std::make_shared<minifi::Configure>();
const auto configuration = std::make_shared<minifi::ConfigureImpl>();
configuration->setHome(minifi_home);
configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);

Expand Down
19 changes: 10 additions & 9 deletions controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "controllers/SSLContextService.h"
#include "utils/StringUtils.h"
#include "state/UpdateController.h"
#include "core/state/nodes/ResponseNodeLoader.h"

using namespace std::literals::chrono_literals;

Expand Down Expand Up @@ -205,10 +206,10 @@ class TestControllerSocketReporter : public c2::ControllerSocketReporter {
}
};

class TestControllerServiceProvider : public core::controller::ControllerServiceProvider {
class TestControllerServiceProvider : public core::controller::ControllerServiceProviderImpl {
public:
explicit TestControllerServiceProvider(std::shared_ptr<controllers::SSLContextService> ssl_context_service)
: core::controller::ControllerServiceProvider("TestControllerServiceProvider"),
: core::controller::ControllerServiceProviderImpl("TestControllerServiceProvider"),
ssl_context_service_(std::move(ssl_context_service)) {
}
std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string&) const override {
Expand Down Expand Up @@ -243,7 +244,7 @@ class ControllerTestFixture {
};

ControllerTestFixture()
: configuration_(std::make_shared<minifi::Configure>()),
: configuration_(std::make_shared<minifi::ConfigureImpl>()),
controller_(std::make_shared<TestStateController>()),
update_sink_(std::make_unique<TestUpdateSink>(controller_)) {
configuration_->set(minifi::Configure::controller_socket_host, "localhost");
Expand All @@ -253,7 +254,7 @@ class ControllerTestFixture {
configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, "abcdefgh");
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "root-ca.pem").string());
configuration_->set(minifi::Configure::controller_ssl_context_service, "SSLContextService");
ssl_context_service_ = std::make_shared<controllers::SSLContextService>("SSLContextService", configuration_);
ssl_context_service_ = std::make_shared<controllers::SSLContextServiceImpl>("SSLContextService", configuration_);
ssl_context_service_->onEnable();
controller_service_provider_ = std::make_unique<TestControllerServiceProvider>(ssl_context_service_);
controller_socket_data_.host = "localhost";
Expand Down Expand Up @@ -482,7 +483,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test manifest getter", "[controllerTest
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand All @@ -505,7 +506,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests]
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand Down Expand Up @@ -533,7 +534,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", "[controller
}

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand All @@ -547,7 +548,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to non-exi
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand All @@ -561,7 +562,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);

auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);

Expand Down
3 changes: 3 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_library(minifi-core INTERFACE)
target_include_directories(minifi-core INTERFACE include)
target_link_libraries(minifi-core INTERFACE gsl-lite)
75 changes: 75 additions & 0 deletions core/include/minifi-cpp/Connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* @file Connection.h
* Connection class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <set>
#include <string>
#include <vector>
#include <map>
#include <mutex>
#include <atomic>
#include <algorithm>
#include <utility>
#include "core/Core.h"
#include "core/Connectable.h"
#include "core/logging/Logger.h"
#include "core/Relationship.h"
#include "core/FlowFile.h"
#include "core/Repository.h"
#include "minifi-cpp/utils/Literals.h"

namespace org::apache::nifi::minifi {

class Connection : public virtual core::Connectable {
public:
~Connection() override = default;

static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_COUNT = 2000;
static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE = 100_MB;

virtual void setSourceUUID(const utils::Identifier &uuid) = 0;
virtual void setDestinationUUID(const utils::Identifier &uuid) = 0;
virtual utils::Identifier getSourceUUID() const = 0;
virtual utils::Identifier getDestinationUUID() const = 0;
virtual void setSource(core::Connectable* source) = 0;
virtual core::Connectable* getSource() const = 0;
virtual void setDestination(core::Connectable* dest) = 0;
virtual core::Connectable* getDestination() const = 0;
virtual void addRelationship(core::Relationship relationship) = 0;
virtual const std::set<core::Relationship> &getRelationships() const = 0;
virtual void setBackpressureThresholdCount(uint64_t size) = 0;
virtual uint64_t getBackpressureThresholdCount() const = 0;
virtual void setBackpressureThresholdDataSize(uint64_t size) = 0;
virtual uint64_t getBackpressureThresholdDataSize() const = 0;
virtual void setSwapThreshold(uint64_t size) = 0;
virtual void setFlowExpirationDuration(std::chrono::milliseconds duration) = 0;
virtual std::chrono::milliseconds getFlowExpirationDuration() const = 0;
virtual void setDropEmptyFlowFiles(bool drop) = 0;
virtual bool getDropEmptyFlowFiles() const = 0;
virtual bool isEmpty() const = 0;
virtual bool backpressureThresholdReached() const = 0;
virtual uint64_t getQueueSize() const = 0;
virtual uint64_t getQueueDataSize() = 0;
virtual void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) = 0;
virtual std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) = 0;
virtual void drain(bool delete_permanently) = 0;
};
} // namespace org::apache::nifi::minifi
File renamed without changes.
55 changes: 55 additions & 0 deletions core/include/minifi-cpp/FlowFileRecord.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* @file FlowFileRecord.h
* Flow file record class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <map>
#include <mutex>
#include <atomic>
#include <iostream>
#include <sstream>
#include <fstream>
#include <set>
#include "minifi-cpp/core/ContentRepository.h"
#include "minifi-cpp/core/FlowFile.h"
#include "minifi-cpp/core/Repository.h"
#include "io/OutputStream.h"

namespace org::apache::nifi::minifi {

class FlowFileRecord : public virtual core::FlowFile {
public:
virtual bool Serialize(io::OutputStream &outStream) = 0;

//! Serialize and Persistent to the repository
virtual bool Persist(const std::shared_ptr<core::Repository>& flowRepository) = 0;

static std::shared_ptr<FlowFileRecord> DeSerialize(std::span<const std::byte> buffer, const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container);
static std::shared_ptr<FlowFileRecord> DeSerialize(io::InputStream &stream, const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container);
static std::shared_ptr<FlowFileRecord> DeSerialize(const std::string& key, const std::shared_ptr<core::Repository>& flowRepository,
const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container);

virtual std::string getContentFullPath() const = 0;
};

} // namespace org::apache::nifi::minifi
61 changes: 61 additions & 0 deletions core/include/minifi-cpp/ResourceClaim.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <vector>
#include <queue>
#include <map>
#include <memory>
#include <mutex>
#include <atomic>
#include "core/Core.h"
#include "core/StreamManager.h"
#include "properties/Configure.h"
#include "utils/Id.h"

namespace org::apache::nifi::minifi {

namespace core {
class ContentRepository;
} // namespace core

class ResourceClaim {
public:
using Path = std::string;

virtual ~ResourceClaim() = default;
virtual void increaseFlowFileRecordOwnedCount() = 0;
virtual void decreaseFlowFileRecordOwnedCount() = 0;
virtual uint64_t getFlowFileRecordOwnedCount() = 0;
virtual Path getContentFullPath() const = 0;
virtual bool exists() = 0;

static std::shared_ptr<ResourceClaim> create(std::shared_ptr<core::ContentRepository> repository);

virtual std::ostream& write(std::ostream& stream) const = 0;

friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) {
return claim.write(stream);
}

friend std::ostream& operator<<(std::ostream& stream, const std::shared_ptr<ResourceClaim>& claim) {
return claim->write(stream);
}
};

} // namespace org::apache::nifi::minifi
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <vector>
#include <memory>

#include "core/FlowFile.h"
#include "minifi-cpp/core/FlowFile.h"
#include "utils/Id.h"

namespace org::apache::nifi::minifi {
Expand Down
71 changes: 71 additions & 0 deletions core/include/minifi-cpp/agent/agent_docs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <map>
#include <string>
#include <utility>
#include <vector>

#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/DynamicProperty.h"
#include "minifi-cpp/core/OutputAttributeDefinition.h"
#include "minifi-cpp/core/Property.h"
#include "minifi-cpp/core/Relationship.h"
#include "minifi-cpp/core/RelationshipDefinition.h"

namespace org::apache::nifi::minifi {

enum class ResourceType {
Processor, ControllerService, InternalResource, DescriptionOnly
};

struct ClassDescription {
ResourceType type_ = ResourceType::Processor;
std::string short_name_{};
std::string full_name_{};
std::string description_{};
std::vector<core::Property> class_properties_{};
std::span<const core::DynamicProperty> dynamic_properties_{};
std::vector<core::Relationship> class_relationships_{};
std::span<const core::OutputAttributeReference> output_attributes_{};
bool supports_dynamic_properties_ = false;
bool supports_dynamic_relationships_ = false;
std::string inputRequirement_{};
bool isSingleThreaded_ = false;
};

struct Components {
std::vector<ClassDescription> processors_;
std::vector<ClassDescription> controller_services_;
std::vector<ClassDescription> other_components_;

[[nodiscard]] bool empty() const noexcept {
return processors_.empty() && controller_services_.empty() && other_components_.empty();
}
};

class AgentDocs {
public:
static const std::map<std::string, Components>& getClassDescriptions();
static std::map<std::string, Components>& getMutableClassDescriptions();

template<typename Class, ResourceType Type>
static void createClassDescription(const std::string& group, const std::string& name);
};

} // namespace org::apache::nifi::minifi
Loading
Loading