Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

added ability to set event driven thread count #184

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Added

- [PR #184](https://github.com/Orange-OpenSource/nifikop/pull/184) - **[Operator/NiFiCluster]** Add ability to set MaxEventDrivenThreadCount.

### Changed

### Deprecated
Expand Down
9 changes: 9 additions & 0 deletions api/v1alpha1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Node struct {
type ReadOnlyConfig struct {
// MaximumTimerDrivenThreadCount define the maximum number of threads for timer driven processors available to the system.
MaximumTimerDrivenThreadCount *int32 `json:"maximumTimerDrivenThreadCount,omitempty"`
// MaximumEventDrivenThreadCount define the maximum number of threads for event driven processors available to the system.
MaximumEventDrivenThreadCount *int32 `json:"maximumEventDrivenThreadCount,omitempty"`
// AdditionalSharedEnvs define a set of additional env variables that will shared between all init containers and
// containers in the pod.
AdditionalSharedEnvs []corev1.EnvVar `json:"additionalSharedEnvs,omitempty"`
Expand Down Expand Up @@ -552,6 +554,13 @@ func (nReadOnlyConfig *ReadOnlyConfig) GetMaximumTimerDrivenThreadCount() int32
return *nReadOnlyConfig.MaximumTimerDrivenThreadCount
}

func (nReadOnlyConfig *ReadOnlyConfig) GetMaximumEventDrivenThreadCount() int32 {
if nReadOnlyConfig.MaximumEventDrivenThreadCount == nil {
return 5
}
return *nReadOnlyConfig.MaximumEventDrivenThreadCount
}

func (nTaskSpec *NifiClusterTaskSpec) GetDurationMinutes() float64 {
if nTaskSpec.RetryDurationMinutes == 0 {
return 5
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions config/crd/bases/nifi.orange.com_nificlusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,12 @@ spec:
- name
type: object
type: object
maximumEventDrivenThreadCount:
description: MaximumEventDrivenThreadCount define the maximum
number of threads for event driven processors available
to the system.
format: int32
type: integer
maximumTimerDrivenThreadCount:
description: MaximumTimerDrivenThreadCount define the maximum
number of threads for timer driven processors available
Expand Down Expand Up @@ -3124,6 +3130,12 @@ spec:
- name
type: object
type: object
maximumEventDrivenThreadCount:
description: MaximumEventDrivenThreadCount define the maximum
number of threads for event driven processors available to the
system.
format: int32
type: integer
maximumTimerDrivenThreadCount:
description: MaximumTimerDrivenThreadCount define the maximum
number of threads for timer driven processors available to the
Expand Down
12 changes: 12 additions & 0 deletions helm/nifikop/crds/nifi.orange.com_nificlusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,12 @@ spec:
- name
type: object
type: object
maximumEventDrivenThreadCount:
description: MaximumEventDrivenThreadCount define the maximum
number of threads for event driven processors available
to the system.
format: int32
type: integer
maximumTimerDrivenThreadCount:
description: MaximumTimerDrivenThreadCount define the maximum
number of threads for timer driven processors available
Expand Down Expand Up @@ -3124,6 +3130,12 @@ spec:
- name
type: object
type: object
maximumEventDrivenThreadCount:
description: MaximumEventDrivenThreadCount define the maximum
number of threads for event driven processors available to the
system.
format: int32
type: integer
maximumTimerDrivenThreadCount:
description: MaximumTimerDrivenThreadCount define the maximum
number of threads for timer driven processors available to the
Expand Down
4 changes: 3 additions & 1 deletion pkg/clientwrappers/controllersettings/controllersettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
var log = ctrl.Log.WithName("controllersettings-method")

func controllerConfigIsSync(cluster *v1alpha1.NifiCluster, entity *nigoapi.ControllerConfigurationEntity) bool {
return cluster.Spec.ReadOnlyConfig.GetMaximumTimerDrivenThreadCount() == entity.Component.MaxTimerDrivenThreadCount
return cluster.Spec.ReadOnlyConfig.GetMaximumTimerDrivenThreadCount() == entity.Component.MaxTimerDrivenThreadCount &&
cluster.Spec.ReadOnlyConfig.GetMaximumEventDrivenThreadCount() == entity.Component.MaxEventDrivenThreadCount
}

func SyncConfiguration(config *clientconfig.NifiConfig, cluster *v1alpha1.NifiCluster) error {
Expand Down Expand Up @@ -50,4 +51,5 @@ func updateControllerConfigEntity(cluster *v1alpha1.NifiCluster, entity *nigoapi
entity.Component = &nigoapi.ControllerConfigurationDto{}
}
entity.Component.MaxTimerDrivenThreadCount = cluster.Spec.ReadOnlyConfig.GetMaximumTimerDrivenThreadCount()
entity.Component.MaxEventDrivenThreadCount = cluster.Spec.ReadOnlyConfig.GetMaximumEventDrivenThreadCount()
}
11 changes: 6 additions & 5 deletions pkg/resources/nifi/nifi.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,9 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
}
}

if r.NifiCluster.Spec.ReadOnlyConfig.MaximumTimerDrivenThreadCount != nil {
if err := r.reconcileMaximumTimerDrivenThreadCount(log); err != nil {
if r.NifiCluster.Spec.ReadOnlyConfig.MaximumTimerDrivenThreadCount != nil ||
r.NifiCluster.Spec.ReadOnlyConfig.MaximumEventDrivenThreadCount != nil {
if err := r.reconcileMaximumThreadCounts(log); err != nil {
return errors.WrapIf(err, "failed to reconcile ressource")
}
}
Expand Down Expand Up @@ -909,7 +910,7 @@ func (r *Reconciler) reconcilePrometheusReportingTask(log logr.Logger) error {
return nil
}

func (r *Reconciler) reconcileMaximumTimerDrivenThreadCount(log logr.Logger) error {
func (r *Reconciler) reconcileMaximumThreadCounts(log logr.Logger) error {
configManager := config.GetClientConfigManager(r.Client, v1alpha1.ClusterReference{
Namespace: r.NifiCluster.Namespace,
Name: r.NifiCluster.Name,
Expand All @@ -919,10 +920,10 @@ func (r *Reconciler) reconcileMaximumTimerDrivenThreadCount(log logr.Logger) err
return err
}

// Sync Maximum Timer Driven Thread Count with NiFi side component
// Sync Maximum Timer Driven Thread Count and Maximum Event Driven Thread Count with NiFi side component
err = controllersettings.SyncConfiguration(clientConfig, r.NifiCluster)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to sync MaximumTimerDrivenThreadCount")
return errors.WrapIfWithDetails(err, "failed to sync MaximumThreadCount configuration")
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions site/docs/5_references/1_nifi_cluster/2_read_only_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ ReadOnlyConfig object specifies the read-only type Nifi config cluster wide, all
readOnlyConfig:
# MaximumTimerDrivenThreadCount define the maximum number of threads for timer driven processors available to the system.
maximumTimerDrivenThreadCount: 30
# MaximumEventDrivenThreadCount define the maximum number of threads for event driven processors available to the system.
maximumEventDrivenThreadCount: 10
# Logback configuration that will be applied to the node
logbackConfig:
# logback.xml configuration that will replace the one produced based on template
Expand Down Expand Up @@ -124,6 +126,7 @@ readOnlyConfig:
|Field|Type|Description|Required|Default|
|-----|----|-----------|--------|--------|
|maximumTimerDrivenThreadCount|int32|define the maximum number of threads for timer driven processors available to the system.|No|nil|
|maximumEventDrivenThreadCount|int32|define the maximum number of threads for event driven processors available to the system.|No|nil|
|additionalSharedEnvs|\[ \][corev1.EnvVar](https://pkg.go.dev/k8s.io/api/core/v1#EnvVar)|define a set of additional env variables that will shared between all init containers and ontainers in the pod..|No|\[ \]|
|nifiProperties|[NifiProperties](#nifiproperties)|nifi.properties configuration that will be applied to the node.|No|nil|
|zookeeperProperties|[ZookeeperProperties](#zookeeperproperties)|zookeeper.properties configuration that will be applied to the node.|No|nil|
Expand Down