Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2377 Support process group level controller services #1840

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

lordgamez
Copy link
Contributor

  • Controller services are only found if they are in the scope of the processor or controller that tries to get it (in the same process group or in a parent group)
  • Tests are modified to use a single root ProcessGroup that contains the used processors and processor handles are only stored as raw pointers
  • Processor start and shutdown is changed to avoid deadlock when a findProcessor call is running on the same process group

https://issues.apache.org/jira/browse/MINIFICPP-2377


Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.

@@ -200,7 +212,13 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent& timeScheduler, Ev

void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and similar changes in the startProcessingProcessors and startProcessing functions are to avoid deadlock. It appeared in ControllerServiceIntegrationTests when stop() is called while the getControllerService is called in the processor's onTrigger. The stop() function waits for the processor's onTrigger to finish, but it cannot finish the ProcessGroup's findProcessor method because it is waiting for the same mutex which is held here.

We may need to find a better solution as this may cause some concurrency issues I'm not currently aware of. I'm open for suggestions.

Comment on lines 136 to 139
this->mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(this->mock_s3_request_sender_ptr);
auto s3_processor_unique_ptr = std::unique_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
this->s3_processor = s3_processor_unique_ptr.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this->mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(this->mock_s3_request_sender_ptr);
auto s3_processor_unique_ptr = std::unique_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
this->s3_processor = s3_processor_unique_ptr.get();
auto mock_s3_request_sender = std::make_unique<MockS3RequestSender>();
this->mock_s3_request_sender_ptr = mock_s3_request_sender.get();
auto s3_processor_unique_ptr = std::make_unique<T>("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender));
this->s3_processor = s3_processor_unique_ptr.get();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the first 2 lines, but changing the rest would result in compile errors:

In file included from /usr/include/c++/11/memory:76,
                 from /home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/aws/tests/S3TestsFixture.h:23,
                 from /home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/aws/tests/FetchS3ObjectTests.cpp:21:
/usr/include/c++/11/bits/unique_ptr.h: In instantiation of ‘typename std::_MakeUniq<_Tp>::__single_object std::make_unique(_Args&& ...) [with _Tp = org::apache::nifi::minifi::aws::processors::FetchS3Object; _Args = {const char (&)[12], org::apache::nifi::minifi::utils::Identifier, std::unique_ptr<MockS3RequestSender, std::default_delete<MockS3RequestSender> >}; typename std::_MakeUniq<_Tp>::__single_object = std::unique_ptr<org::apache::nifi::minifi::aws::processors::FetchS3Object, std::default_delete<org::apache::nifi::minifi::aws::processors::FetchS3Object> >]’:
/home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/aws/tests/S3TestsFixture.h:138:55:   required from ‘FlowProcessorS3TestsFixture<T>::FlowProcessorS3TestsFixture() [with T = org::apache::nifi::minifi::aws::processors::FetchS3Object]’
/home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/aws/tests/FetchS3ObjectTests.cpp:33:31:   required from here
/usr/include/c++/11/bits/unique_ptr.h:962:30: error: ‘org::apache::nifi::minifi::aws::processors::FetchS3Object::FetchS3Object(const string&, const org::apache::nifi::minifi::utils::Identifier&, std::unique_ptr<org::apache::nifi::minifi::aws::s3::S3RequestSender>)’ is private within this context
  962 |     { return unique_ptr<_Tp>(new _Tp(std::forward<_Args>(__args)...)); }
      |                              ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/aws/tests/FetchS3ObjectTests.cpp:22:
/home/ggyimesi/projects/nifi-minifi-cpp-fork/extensions/aws/processors/FetchS3Object.h:90:12: note: declared private here
   90 |   explicit FetchS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
      |            ^~~~~~~~~~~~~

Comment on lines 198 to 201
this->mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(this->mock_s3_request_sender_ptr);
auto s3_processor_unique_ptr = std::unique_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
this->s3_processor = s3_processor_unique_ptr.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this->mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(this->mock_s3_request_sender_ptr);
auto s3_processor_unique_ptr = std::unique_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
this->s3_processor = s3_processor_unique_ptr.get();
auto mock_s3_request_sender = std::make_unique<MockS3RequestSender>();
this->mock_s3_request_sender_ptr = mock_s3_request_sender.get();
auto s3_processor_unique_ptr = std::make_unique<T>("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender));
this->s3_processor = s3_processor_unique_ptr.get();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated similarly to previous comment.

Comment on lines +64 to 65
auto azure_blob_storage_processor_unique_ptr = std::unique_ptr<ProcessorType>(
new ProcessorType("AzureBlobStorageProcessor", utils::Identifier(), std::move(mock_blob_storage)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto azure_blob_storage_processor_unique_ptr = std::unique_ptr<ProcessorType>(
new ProcessorType("AzureBlobStorageProcessor", utils::Identifier(), std::move(mock_blob_storage)));
auto azure_blob_storage_processor_unique_ptr = std::make_unique<ProcessorType>("AzureBlobStorageProcessor", utils::Identifier(), std::move(mock_blob_storage));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot be changed due to the same compile issue mentioned above.

Comment on lines +63 to 64
auto azure_data_lake_storage_unique_ptr = std::unique_ptr<AzureDataLakeStorageProcessor>(
new AzureDataLakeStorageProcessor("AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto azure_data_lake_storage_unique_ptr = std::unique_ptr<AzureDataLakeStorageProcessor>(
new AzureDataLakeStorageProcessor("AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client)));
auto azure_data_lake_storage_unique_ptr = std::make_unique<AzureDataLakeStorageProcessor>(
"AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot be changed due to the same compile issue mentioned above.

Comment on lines +45 to 46
auto list_azure_data_lake_storage_unique_ptr = std::unique_ptr<minifi::azure::processors::ListAzureDataLakeStorage>(
new minifi::azure::processors::ListAzureDataLakeStorage("ListAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto list_azure_data_lake_storage_unique_ptr = std::unique_ptr<minifi::azure::processors::ListAzureDataLakeStorage>(
new minifi::azure::processors::ListAzureDataLakeStorage("ListAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client)));
auto list_azure_data_lake_storage_unique_ptr = std::make_unique<minifi::azure::processors::ListAzureDataLakeStorage>(
"ListAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot be changed due to the same compile issue mentioned above.

Comment on lines +64 to +65
template<typename T = core::Processor>
T* getProcessor() const { return static_cast<T*>(processor_); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with this change, this class shouldn't expose its processor. The old pattern works fine, and keeps this utility simpler. And reverting this part would also make the diff smaller.

I like the move over to unique_ptr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the change of adding the unique_ptr of the root process group in the test plan and that root process group owning the processors and the controllers, retrieving the pointers to the processors became problematic in some cases. For example in test suite initialization:

  PushGrafanaLokiGrpcTestFixture()
      : mock_loki_("10991"),
        test_controller_(std::make_unique<PushGrafanaLokiGrpc>("PushGrafanaLokiGrpc")),
        push_grafana_loki_grpc_(test_controller_.getProcessor<PushGrafanaLokiGrpc>()) {

In this case initializing the processor pointer would be problematic without the getProcessor method, we would need to initialize them separately afterwards in the main constructor block.
Also I think it would make the diff larger, as the common pattern which currently looking like this:

test::SingleProcessorTestController test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();

would probably need to be changed to something like this:

auto processor = std::make_unique<InvokeHTTP>("InvokeHTTP");
auto invokehttp = processor.get();
test::SingleProcessorTestController test_controller{std::move(processor)};

If you think it's worth reverting and changing the initializations, I'm okay with that too, I just wanted to add some context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants