diff --git a/pkg/controller/chi/worker-chi-reconciler.go b/pkg/controller/chi/worker-chi-reconciler.go index 8121f1cfb..34b461740 100644 --- a/pkg/controller/chi/worker-chi-reconciler.go +++ b/pkg/controller/chi/worker-chi-reconciler.go @@ -132,29 +132,29 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal } // reconcile reconciles Custom Resource -func (w *worker) reconcile(ctx context.Context, ch *api.ClickHouseInstallation) error { +func (w *worker) reconcile(ctx context.Context, cr *api.ClickHouseInstallation) error { if util.IsContextDone(ctx) { log.V(2).Info("task is done") return nil } - w.a.V(2).M(ch).S().P() - defer w.a.V(2).M(ch).E().P() + w.a.V(2).M(cr).S().P() + defer w.a.V(2).M(cr).E().P() counters := api.NewHostReconcileAttributesCounters() - ch.WalkHosts(func(host *api.Host) error { + cr.WalkHosts(func(host *api.Host) error { counters.Add(host.GetReconcileAttributes()) return nil }) if counters.AddOnly() { - w.a.V(1).M(ch).Info("Enabling full fan-out mode. CHI: %s", util.NamespaceNameString(ch)) + w.a.V(1).M(cr).Info("Enabling full fan-out mode. CHI: %s", util.NamespaceNameString(cr)) ctx = context.WithValue(ctx, common.ReconcileShardsAndHostsOptionsCtxKey, &common.ReconcileShardsAndHostsOptions{ FullFanOut: true, }) } - return ch.WalkTillError( + return cr.WalkTillError( ctx, w.reconcileCRAuxObjectsPreliminary, w.reconcileCluster, @@ -307,7 +307,7 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o log.V(1).M(host).F().S().Info("reconcile StatefulSet start") defer log.V(1).M(host).F().E().Info("reconcile StatefulSet end") - version := w. getHostSoftwareVersion(ctx, host) + version := w.getHostSoftwareVersion(ctx, host) host.Runtime.CurStatefulSet, _ = w.c.kube.STS().Get(ctx, host) w.a.V(1).M(host).F().Info("Reconcile host: %s. App version: %s", host.GetName(), version) @@ -351,7 +351,7 @@ func (w *worker) getHostSoftwareVersion(ctx context.Context, host *api.Host) str ctx, host, versionOptions{ - skipNew: true, + skipNew: true, skipStoppedAncestor: true, }, ) @@ -399,7 +399,7 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *api.Cluster) err } } - w.reconcileClusterSecret(ctx , cluster) + w.reconcileClusterSecret(ctx, cluster) pdb := w.task.Creator().CreatePodDisruptionBudget(cluster) if err := w.reconcilePDB(ctx, cluster, pdb); err == nil { @@ -412,7 +412,6 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *api.Cluster) err return nil } - func (w *worker) reconcileClusterSecret(ctx context.Context, cluster *api.Cluster) { // Add cluster's Auto Secret if cluster.Secret.Source() == api.ClusterSecretSourceAuto { @@ -565,11 +564,6 @@ func (w *worker) reconcileShardService(ctx context.Context, shard api.IShard) er // reconcileHost reconciles specified ClickHouse host func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { - var ( - reconcileStatefulSetOpts *statefulset.ReconcileOptions - migrateTableOpts *migrateTableOptions - ) - if util.IsContextDone(ctx) { log.V(2).Info("task is done") return nil @@ -589,6 +583,40 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { // Create artifacts w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, false) + w.reconcileHostPrepare(ctx, host) + w.reconcileHostMain(ctx, host) + // Host is now added and functional + host.GetReconcileAttributes().UnsetAdd() + w.reconcileHostBootstrap(ctx, host) + + now := time.Now() + hostsCompleted := 0 + hostsCount := 0 + host.GetCR().IEnsureStatus().HostCompleted() + if host.GetCR() != nil && host.GetCR().GetStatus() != nil { + hostsCompleted = host.GetCR().GetStatus().GetHostsCompletedCount() + hostsCount = host.GetCR().GetStatus().GetHostsCount() + } + w.a.V(1). + WithEvent(host.GetCR(), common.EventActionProgress, common.EventReasonProgressHostsCompleted). + WithStatusAction(host.GetCR()). + M(host).F(). + Info("[now: %s] %s: %d of %d", now, common.EventReasonProgressHostsCompleted, hostsCompleted, hostsCount) + + _ = w.c.updateCRObjectStatus(ctx, host.GetCR(), types.UpdateStatusOptions{ + CopyStatusOptions: types.CopyStatusOptions{ + MainFields: true, + }, + }) + + metrics.HostReconcilesCompleted(ctx, host.GetCR()) + metrics.HostReconcilesTimings(ctx, host.GetCR(), time.Now().Sub(startTime).Seconds()) + + return nil +} + +// reconcileHostPrepare reconciles specified ClickHouse host +func (w *worker) reconcileHostPrepare(ctx context.Context, host *api.Host) error { // Check whether ClickHouse is running and accessible and what version is available if version, err := w.getHostClickHouseVersion(ctx, host, versionOptions{skipNew: true, skipStoppedAncestor: true}); err == nil { w.a.V(1). @@ -611,6 +639,16 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { _ = w.completeQueries(ctx, host) } + return nil +} + +// reconcileHostMain reconciles specified ClickHouse host +func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error { + var ( + reconcileStatefulSetOpts *statefulset.ReconcileOptions + migrateTableOpts *migrateTableOptions + ) + if err := w.reconcileConfigMapHost(ctx, host); err != nil { metrics.HostReconcilesErrors(ctx, host.GetCR()) w.a.V(1). @@ -660,8 +698,6 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { _ = w.reconcileHostService(ctx, host) - host.GetReconcileAttributes().UnsetAdd() - // Prepare for tables migration. // Sometimes service needs some time to start after creation|modification before being accessible for usage // Check whether ClickHouse is running and accessible and what version is available. @@ -676,6 +712,11 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { } _ = w.migrateTables(ctx, host, migrateTableOpts) + return nil +} + +// reconcileHostBootstrap reconciles specified ClickHouse host +func (w *worker) reconcileHostBootstrap(ctx context.Context, host *api.Host) error { if err := w.includeHost(ctx, host); err != nil { metrics.HostReconcilesErrors(ctx, host.GetCR()) w.a.V(1). @@ -700,28 +741,5 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { Warning("Reconcile Host completed. Host: %s Failed to get ClickHouse version: %s", host.GetName(), version) } - now := time.Now() - hostsCompleted := 0 - hostsCount := 0 - host.GetCR().IEnsureStatus().HostCompleted() - if host.GetCR() != nil && host.GetCR().GetStatus() != nil { - hostsCompleted = host.GetCR().GetStatus().GetHostsCompletedCount() - hostsCount = host.GetCR().GetStatus().GetHostsCount() - } - w.a.V(1). - WithEvent(host.GetCR(), common.EventActionProgress, common.EventReasonProgressHostsCompleted). - WithStatusAction(host.GetCR()). - M(host).F(). - Info("[now: %s] %s: %d of %d", now, common.EventReasonProgressHostsCompleted, hostsCompleted, hostsCount) - - _ = w.c.updateCRObjectStatus(ctx, host.GetCR(), types.UpdateStatusOptions{ - CopyStatusOptions: types.CopyStatusOptions{ - MainFields: true, - }, - }) - - metrics.HostReconcilesCompleted(ctx, host.GetCR()) - metrics.HostReconcilesTimings(ctx, host.GetCR(), time.Now().Sub(startTime).Seconds()) - return nil } diff --git a/pkg/controller/chi/worker.go b/pkg/controller/chi/worker.go index 9ec617ccb..690c01655 100644 --- a/pkg/controller/chi/worker.go +++ b/pkg/controller/chi/worker.go @@ -19,7 +19,6 @@ import ( "errors" "time" - "github.com/altinity/queue" core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,6 +45,7 @@ import ( commonNormalizer "github.com/altinity/clickhouse-operator/pkg/model/common/normalizer" "github.com/altinity/clickhouse-operator/pkg/model/managers" "github.com/altinity/clickhouse-operator/pkg/util" + "github.com/altinity/queue" ) // FinalizerName specifies name of the finalizer to be used with CHI @@ -68,7 +68,6 @@ type worker struct { } // newWorker -// func (c *Controller) newWorker(q workqueue.RateLimitingInterface) *worker { func (c *Controller) newWorker(q queue.PriorityQueue, sys bool) *worker { start := time.Now() if !sys { @@ -103,25 +102,25 @@ func (c *Controller) newWorker(q queue.PriorityQueue, sys bool) *worker { } } -func configGeneratorOptions(chi *api.ClickHouseInstallation) *config.GeneratorOptions { +func configGeneratorOptions(cr *api.ClickHouseInstallation) *config.GeneratorOptions { return &config.GeneratorOptions{ - Users: chi.GetSpecT().Configuration.Users, - Profiles: chi.GetSpecT().Configuration.Profiles, - Quotas: chi.GetSpecT().Configuration.Quotas, - Settings: chi.GetSpecT().Configuration.Settings, - Files: chi.GetSpecT().Configuration.Files, - DistributedDDL: chi.GetSpecT().Defaults.DistributedDDL, + Users: cr.GetSpecT().Configuration.Users, + Profiles: cr.GetSpecT().Configuration.Profiles, + Quotas: cr.GetSpecT().Configuration.Quotas, + Settings: cr.GetSpecT().Configuration.Settings, + Files: cr.GetSpecT().Configuration.Files, + DistributedDDL: cr.GetSpecT().Defaults.DistributedDDL, } } // newContext creates new reconcile task -func (w *worker) newTask(chi *api.ClickHouseInstallation) { +func (w *worker) newTask(cr *api.ClickHouseInstallation) { w.task = common.NewTask( commonCreator.NewCreator( - chi, - managers.NewConfigFilesGenerator(managers.FilesGeneratorTypeClickHouse, chi, configGeneratorOptions(chi)), + cr, + managers.NewConfigFilesGenerator(managers.FilesGeneratorTypeClickHouse, cr, configGeneratorOptions(cr)), managers.NewContainerManager(managers.ContainerManagerTypeClickHouse), - managers.NewTagManager(managers.TagManagerTypeClickHouse, chi), + managers.NewTagManager(managers.TagManagerTypeClickHouse, cr), managers.NewProbeManager(managers.ProbeManagerTypeClickHouse), managers.NewServiceManager(managers.ServiceManagerTypeClickHouse), managers.NewVolumeManager(managers.VolumeManagerTypeClickHouse), @@ -130,7 +129,7 @@ func (w *worker) newTask(chi *api.ClickHouseInstallation) { managers.NewOwnerReferencesManager(managers.OwnerReferencesManagerTypeClickHouse), namer.New(), commonMacro.New(macro.List), - labeler.New(chi), + labeler.New(cr), ), ) @@ -139,7 +138,7 @@ func (w *worker) newTask(chi *api.ClickHouseInstallation) { w.task, domain.NewHostStatefulSetPoller(domain.NewStatefulSetPoller(w.c.kube), w.c.kube, w.c.ctrlLabeler), w.c.namer, - labeler.New(chi), + labeler.New(cr), storage.NewStorageReconciler(w.task, w.c.namer, w.c.kube.Storage()), w.c.kube, w.c, diff --git a/pkg/controller/chk/worker-chk-reconciler.go b/pkg/controller/chk/worker-chk-reconciler.go index 47f5077f8..3fa716abf 100644 --- a/pkg/controller/chk/worker-chk-reconciler.go +++ b/pkg/controller/chk/worker-chk-reconciler.go @@ -115,29 +115,29 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *apiChk.ClickHouseKee } // reconcile reconciles Custom Resource -func (w *worker) reconcile(ctx context.Context, ch *apiChk.ClickHouseKeeperInstallation) error { +func (w *worker) reconcile(ctx context.Context, cr *apiChk.ClickHouseKeeperInstallation) error { if util.IsContextDone(ctx) { log.V(2).Info("task is done") return nil } - w.a.V(2).M(ch).S().P() - defer w.a.V(2).M(ch).E().P() + w.a.V(2).M(cr).S().P() + defer w.a.V(2).M(cr).E().P() counters := api.NewHostReconcileAttributesCounters() - ch.WalkHosts(func(host *api.Host) error { + cr.WalkHosts(func(host *api.Host) error { counters.Add(host.GetReconcileAttributes()) return nil }) if counters.AddOnly() { - w.a.V(1).M(ch).Info("Enabling full fan-out mode. CHI: %s", util.NamespaceNameString(ch)) + w.a.V(1).M(cr).Info("Enabling full fan-out mode. CHI: %s", util.NamespaceNameString(cr)) ctx = context.WithValue(ctx, common.ReconcileShardsAndHostsOptionsCtxKey, &common.ReconcileShardsAndHostsOptions{ FullFanOut: true, }) } - return ch.WalkTillError( + return cr.WalkTillError( ctx, w.reconcileCRAuxObjectsPreliminary, w.reconcileCluster, @@ -290,7 +290,7 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o log.V(1).M(host).F().S().Info("reconcile StatefulSet start") defer log.V(1).M(host).F().E().Info("reconcile StatefulSet end") - version := w. getHostSoftwareVersion(ctx, host) + version := w.getHostSoftwareVersion(ctx, host) host.Runtime.CurStatefulSet, _ = w.c.kube.STS().Get(ctx, host) w.a.V(1).M(host).F().Info("Reconcile host: %s. App version: %s", host.GetName(), version) @@ -374,7 +374,7 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *apiChk.Cluster) } } - w.reconcileClusterSecret(ctx , cluster) + w.reconcileClusterSecret(ctx, cluster) pdb := w.task.Creator().CreatePodDisruptionBudget(cluster) if err := w.reconcilePDB(ctx, cluster, pdb); err == nil { @@ -386,7 +386,6 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *apiChk.Cluster) return nil } - func (w *worker) reconcileClusterSecret(ctx context.Context, cluster *apiChk.Cluster) { } @@ -503,10 +502,6 @@ func (w *worker) reconcileShardService(ctx context.Context, shard api.IShard) er // reconcileHost reconciles specified ClickHouse host func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { - var ( - reconcileStatefulSetOpts *statefulset.ReconcileOptions - ) - if util.IsContextDone(ctx) { log.V(2).Info("task is done") return nil @@ -523,6 +518,45 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { // Create artifacts w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, false) + w.reconcileHostPrepare(ctx, host) + w.reconcileHostMain(ctx, host) + // Host is now added and functional + host.GetReconcileAttributes().UnsetAdd() + w.reconcileHostBootstrap(ctx, host) + + now := time.Now() + hostsCompleted := 0 + hostsCount := 0 + host.GetCR().IEnsureStatus().HostCompleted() + if host.GetCR() != nil && host.GetCR().GetStatus() != nil { + hostsCompleted = host.GetCR().GetStatus().GetHostsCompletedCount() + hostsCount = host.GetCR().GetStatus().GetHostsCount() + } + w.a.V(1). + WithEvent(host.GetCR(), common.EventActionProgress, common.EventReasonProgressHostsCompleted). + WithStatusAction(host.GetCR()). + M(host).F(). + Info("[now: %s] %s: %d of %d", now, common.EventReasonProgressHostsCompleted, hostsCompleted, hostsCount) + + _ = w.c.updateCRObjectStatus(ctx, host.GetCR(), types.UpdateStatusOptions{ + CopyStatusOptions: types.CopyStatusOptions{ + MainFields: true, + }, + }) + return nil +} + +// reconcileHostPrepare reconciles specified ClickHouse host +func (w *worker) reconcileHostPrepare(ctx context.Context, host *api.Host) error { + return nil +} + +// reconcileHostMain reconciles specified ClickHouse host +func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error { + var ( + reconcileStatefulSetOpts *statefulset.ReconcileOptions + ) + if err := w.reconcileConfigMapHost(ctx, host); err != nil { w.a.V(1). M(host).F(). @@ -564,26 +598,18 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error { _ = w.reconcileHostService(ctx, host) - host.GetReconcileAttributes().UnsetAdd() + return nil +} - now := time.Now() - hostsCompleted := 0 - hostsCount := 0 - host.GetCR().IEnsureStatus().HostCompleted() - if host.GetCR() != nil && host.GetCR().GetStatus() != nil { - hostsCompleted = host.GetCR().GetStatus().GetHostsCompletedCount() - hostsCount = host.GetCR().GetStatus().GetHostsCount() +// reconcileHostBootstrap reconciles specified ClickHouse host +func (w *worker) reconcileHostBootstrap(ctx context.Context, host *api.Host) error { + if err := w.includeHost(ctx, host); err != nil { + metrics.HostReconcilesErrors(ctx, host.GetCR()) + w.a.V(1). + M(host).F(). + Warning("Reconcile Host interrupted with an error 4. Host: %s Err: %v", host.GetName(), err) + return err } - w.a.V(1). - WithEvent(host.GetCR(), common.EventActionProgress, common.EventReasonProgressHostsCompleted). - WithStatusAction(host.GetCR()). - M(host).F(). - Info("[now: %s] %s: %d of %d", now, common.EventReasonProgressHostsCompleted, hostsCompleted, hostsCount) - _ = w.c.updateCRObjectStatus(ctx, host.GetCR(), types.UpdateStatusOptions{ - CopyStatusOptions: types.CopyStatusOptions{ - MainFields: true, - }, - }) return nil } diff --git a/pkg/controller/chk/worker-exclude-include-wait.go b/pkg/controller/chk/worker-exclude-include-wait.go index 7ba26f5e1..3ca7d0c70 100644 --- a/pkg/controller/chk/worker-exclude-include-wait.go +++ b/pkg/controller/chk/worker-exclude-include-wait.go @@ -20,6 +20,7 @@ import ( log "github.com/altinity/clickhouse-operator/pkg/announcer" apiChk "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse-keeper.altinity.com/v1" + api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" "github.com/altinity/clickhouse-operator/pkg/util" ) @@ -58,3 +59,57 @@ func (w *worker) waitForIPAddresses(ctx context.Context, chk *apiChk.ClickHouseK return true }) } + +// shouldIncludeHost determines whether host to be included into cluster after reconciling +func (w *worker) shouldIncludeHost(host *api.Host) bool { + switch { + case host.IsStopped(): + // No need to include stopped host + return false + } + return true +} + +// includeHost includes host back back into ClickHouse clusters +func (w *worker) includeHost(ctx context.Context, host *api.Host) error { + if util.IsContextDone(ctx) { + log.V(2).Info("task is done") + return nil + } + + if !w.shouldIncludeHost(host) { + w.a.V(1). + M(host).F(). + Info("No need to include host into cluster. Host/shard/cluster: %d/%d/%s", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + return nil + } + + w.a.V(1). + M(host).F(). + Info("Include host into cluster. Host/shard/cluster: %d/%d/%s", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + + w.includeHostIntoRaftCluster(ctx, host) + + return nil +} + +// includeHostIntoRaftCluster includes host into raft configuration +func (w *worker) includeHostIntoRaftCluster(ctx context.Context, host *api.Host) { + if util.IsContextDone(ctx) { + log.V(2).Info("task is done") + return + } + + w.a.V(1). + M(host).F(). + Info("going to include host. Host/shard/cluster: %d/%d/%s", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + + // Specify in options to add this host into ClickHouse config file + host.GetCR().GetRuntime().LockCommonConfig() + host.GetReconcileAttributes().UnsetExclude() + _ = w.reconcileConfigMapCommon(ctx, host.GetCR(), w.options()) + host.GetCR().GetRuntime().UnlockCommonConfig() +} diff --git a/pkg/controller/chk/worker.go b/pkg/controller/chk/worker.go index d9536b25b..7823e83a1 100644 --- a/pkg/controller/chk/worker.go +++ b/pkg/controller/chk/worker.go @@ -79,20 +79,20 @@ func (c *Controller) newWorker() *worker { } } -func configGeneratorOptions(chk *apiChk.ClickHouseKeeperInstallation) *config.GeneratorOptions { +func configGeneratorOptions(cr *apiChk.ClickHouseKeeperInstallation) *config.GeneratorOptions { return &config.GeneratorOptions{ - Settings: chk.GetSpecT().Configuration.Settings, - Files: chk.GetSpecT().Configuration.Files, + Settings: cr.GetSpecT().Configuration.Settings, + Files: cr.GetSpecT().Configuration.Files, } } -func (w *worker) newTask(chk *apiChk.ClickHouseKeeperInstallation) { +func (w *worker) newTask(cr *apiChk.ClickHouseKeeperInstallation) { w.task = common.NewTask( commonCreator.NewCreator( - chk, - managers.NewConfigFilesGenerator(managers.FilesGeneratorTypeKeeper, chk, configGeneratorOptions(chk)), + cr, + managers.NewConfigFilesGenerator(managers.FilesGeneratorTypeKeeper, cr, configGeneratorOptions(cr)), managers.NewContainerManager(managers.ContainerManagerTypeKeeper), - managers.NewTagManager(managers.TagManagerTypeKeeper, chk), + managers.NewTagManager(managers.TagManagerTypeKeeper, cr), managers.NewProbeManager(managers.ProbeManagerTypeKeeper), managers.NewServiceManager(managers.ServiceManagerTypeKeeper), managers.NewVolumeManager(managers.VolumeManagerTypeKeeper), @@ -101,7 +101,7 @@ func (w *worker) newTask(chk *apiChk.ClickHouseKeeperInstallation) { managers.NewOwnerReferencesManager(managers.OwnerReferencesManagerTypeKeeper), namer.New(), commonMacro.New(macro.List), - labeler.New(chk), + labeler.New(cr), ), ) @@ -111,7 +111,7 @@ func (w *worker) newTask(chk *apiChk.ClickHouseKeeperInstallation) { //poller.NewHostStatefulSetPoller(poller.NewStatefulSetPoller(w.c.kube), w.c.kube, w.c.labeler), domain.NewHostStatefulSetPoller(domain.NewStatefulSetPoller(w.c.kube), w.c.kube, nil), w.c.namer, - labeler.New(chk), + labeler.New(cr), storage.NewStorageReconciler(w.task, w.c.namer, w.c.kube.Storage()), w.c.kube, statefulset.NewDefaultFallback(),