Skip to content

Commit

Permalink
dev: split host reconcile into 3 stages
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsingerus committed Oct 4, 2024
1 parent 8c4f7e8 commit 3fd7dc2
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 95 deletions.
98 changes: 58 additions & 40 deletions pkg/controller/chi/worker-chi-reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,29 +132,29 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal
}

// reconcile reconciles Custom Resource
func (w *worker) reconcile(ctx context.Context, ch *api.ClickHouseInstallation) error {
func (w *worker) reconcile(ctx context.Context, cr *api.ClickHouseInstallation) error {
if util.IsContextDone(ctx) {
log.V(2).Info("task is done")
return nil
}

w.a.V(2).M(ch).S().P()
defer w.a.V(2).M(ch).E().P()
w.a.V(2).M(cr).S().P()
defer w.a.V(2).M(cr).E().P()

counters := api.NewHostReconcileAttributesCounters()
ch.WalkHosts(func(host *api.Host) error {
cr.WalkHosts(func(host *api.Host) error {
counters.Add(host.GetReconcileAttributes())
return nil
})

if counters.AddOnly() {
w.a.V(1).M(ch).Info("Enabling full fan-out mode. CHI: %s", util.NamespaceNameString(ch))
w.a.V(1).M(cr).Info("Enabling full fan-out mode. CHI: %s", util.NamespaceNameString(cr))
ctx = context.WithValue(ctx, common.ReconcileShardsAndHostsOptionsCtxKey, &common.ReconcileShardsAndHostsOptions{
FullFanOut: true,
})
}

return ch.WalkTillError(
return cr.WalkTillError(
ctx,
w.reconcileCRAuxObjectsPreliminary,
w.reconcileCluster,
Expand Down Expand Up @@ -307,7 +307,7 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o
log.V(1).M(host).F().S().Info("reconcile StatefulSet start")
defer log.V(1).M(host).F().E().Info("reconcile StatefulSet end")

version := w. getHostSoftwareVersion(ctx, host)
version := w.getHostSoftwareVersion(ctx, host)
host.Runtime.CurStatefulSet, _ = w.c.kube.STS().Get(ctx, host)

w.a.V(1).M(host).F().Info("Reconcile host: %s. App version: %s", host.GetName(), version)
Expand Down Expand Up @@ -351,7 +351,7 @@ func (w *worker) getHostSoftwareVersion(ctx context.Context, host *api.Host) str
ctx,
host,
versionOptions{
skipNew: true,
skipNew: true,
skipStoppedAncestor: true,
},
)
Expand Down Expand Up @@ -399,7 +399,7 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *api.Cluster) err
}
}

w.reconcileClusterSecret(ctx , cluster)
w.reconcileClusterSecret(ctx, cluster)

pdb := w.task.Creator().CreatePodDisruptionBudget(cluster)
if err := w.reconcilePDB(ctx, cluster, pdb); err == nil {
Expand All @@ -412,7 +412,6 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *api.Cluster) err
return nil
}


func (w *worker) reconcileClusterSecret(ctx context.Context, cluster *api.Cluster) {
// Add cluster's Auto Secret
if cluster.Secret.Source() == api.ClusterSecretSourceAuto {
Expand Down Expand Up @@ -565,11 +564,6 @@ func (w *worker) reconcileShardService(ctx context.Context, shard api.IShard) er

// reconcileHost reconciles specified ClickHouse host
func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
var (
reconcileStatefulSetOpts *statefulset.ReconcileOptions
migrateTableOpts *migrateTableOptions
)

if util.IsContextDone(ctx) {
log.V(2).Info("task is done")
return nil
Expand All @@ -589,6 +583,40 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
// Create artifacts
w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, false)

w.reconcileHostPrepare(ctx, host)
w.reconcileHostMain(ctx, host)
// Host is now added and functional
host.GetReconcileAttributes().UnsetAdd()
w.reconcileHostBootstrap(ctx, host)

now := time.Now()
hostsCompleted := 0
hostsCount := 0
host.GetCR().IEnsureStatus().HostCompleted()
if host.GetCR() != nil && host.GetCR().GetStatus() != nil {
hostsCompleted = host.GetCR().GetStatus().GetHostsCompletedCount()
hostsCount = host.GetCR().GetStatus().GetHostsCount()
}
w.a.V(1).
WithEvent(host.GetCR(), common.EventActionProgress, common.EventReasonProgressHostsCompleted).
WithStatusAction(host.GetCR()).
M(host).F().
Info("[now: %s] %s: %d of %d", now, common.EventReasonProgressHostsCompleted, hostsCompleted, hostsCount)

_ = w.c.updateCRObjectStatus(ctx, host.GetCR(), types.UpdateStatusOptions{
CopyStatusOptions: types.CopyStatusOptions{
MainFields: true,
},
})

metrics.HostReconcilesCompleted(ctx, host.GetCR())
metrics.HostReconcilesTimings(ctx, host.GetCR(), time.Now().Sub(startTime).Seconds())

return nil
}

// reconcileHostPrepare reconciles specified ClickHouse host
func (w *worker) reconcileHostPrepare(ctx context.Context, host *api.Host) error {
// Check whether ClickHouse is running and accessible and what version is available
if version, err := w.getHostClickHouseVersion(ctx, host, versionOptions{skipNew: true, skipStoppedAncestor: true}); err == nil {
w.a.V(1).
Expand All @@ -611,6 +639,16 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
_ = w.completeQueries(ctx, host)
}

return nil
}

// reconcileHostMain reconciles specified ClickHouse host
func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error {
var (
reconcileStatefulSetOpts *statefulset.ReconcileOptions
migrateTableOpts *migrateTableOptions
)

if err := w.reconcileConfigMapHost(ctx, host); err != nil {
metrics.HostReconcilesErrors(ctx, host.GetCR())
w.a.V(1).
Expand Down Expand Up @@ -660,8 +698,6 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {

_ = w.reconcileHostService(ctx, host)

host.GetReconcileAttributes().UnsetAdd()

// Prepare for tables migration.
// Sometimes service needs some time to start after creation|modification before being accessible for usage
// Check whether ClickHouse is running and accessible and what version is available.
Expand All @@ -676,6 +712,11 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
}
_ = w.migrateTables(ctx, host, migrateTableOpts)

return nil
}

// reconcileHostBootstrap reconciles specified ClickHouse host
func (w *worker) reconcileHostBootstrap(ctx context.Context, host *api.Host) error {
if err := w.includeHost(ctx, host); err != nil {
metrics.HostReconcilesErrors(ctx, host.GetCR())
w.a.V(1).
Expand All @@ -700,28 +741,5 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
Warning("Reconcile Host completed. Host: %s Failed to get ClickHouse version: %s", host.GetName(), version)
}

now := time.Now()
hostsCompleted := 0
hostsCount := 0
host.GetCR().IEnsureStatus().HostCompleted()
if host.GetCR() != nil && host.GetCR().GetStatus() != nil {
hostsCompleted = host.GetCR().GetStatus().GetHostsCompletedCount()
hostsCount = host.GetCR().GetStatus().GetHostsCount()
}
w.a.V(1).
WithEvent(host.GetCR(), common.EventActionProgress, common.EventReasonProgressHostsCompleted).
WithStatusAction(host.GetCR()).
M(host).F().
Info("[now: %s] %s: %d of %d", now, common.EventReasonProgressHostsCompleted, hostsCompleted, hostsCount)

_ = w.c.updateCRObjectStatus(ctx, host.GetCR(), types.UpdateStatusOptions{
CopyStatusOptions: types.CopyStatusOptions{
MainFields: true,
},
})

metrics.HostReconcilesCompleted(ctx, host.GetCR())
metrics.HostReconcilesTimings(ctx, host.GetCR(), time.Now().Sub(startTime).Seconds())

return nil
}
29 changes: 14 additions & 15 deletions pkg/controller/chi/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"time"

"github.com/altinity/queue"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -46,6 +45,7 @@ import (
commonNormalizer "github.com/altinity/clickhouse-operator/pkg/model/common/normalizer"
"github.com/altinity/clickhouse-operator/pkg/model/managers"
"github.com/altinity/clickhouse-operator/pkg/util"
"github.com/altinity/queue"
)

// FinalizerName specifies name of the finalizer to be used with CHI
Expand All @@ -68,7 +68,6 @@ type worker struct {
}

// newWorker
// func (c *Controller) newWorker(q workqueue.RateLimitingInterface) *worker {
func (c *Controller) newWorker(q queue.PriorityQueue, sys bool) *worker {
start := time.Now()
if !sys {
Expand Down Expand Up @@ -103,25 +102,25 @@ func (c *Controller) newWorker(q queue.PriorityQueue, sys bool) *worker {
}
}

func configGeneratorOptions(chi *api.ClickHouseInstallation) *config.GeneratorOptions {
func configGeneratorOptions(cr *api.ClickHouseInstallation) *config.GeneratorOptions {
return &config.GeneratorOptions{
Users: chi.GetSpecT().Configuration.Users,
Profiles: chi.GetSpecT().Configuration.Profiles,
Quotas: chi.GetSpecT().Configuration.Quotas,
Settings: chi.GetSpecT().Configuration.Settings,
Files: chi.GetSpecT().Configuration.Files,
DistributedDDL: chi.GetSpecT().Defaults.DistributedDDL,
Users: cr.GetSpecT().Configuration.Users,
Profiles: cr.GetSpecT().Configuration.Profiles,
Quotas: cr.GetSpecT().Configuration.Quotas,
Settings: cr.GetSpecT().Configuration.Settings,
Files: cr.GetSpecT().Configuration.Files,
DistributedDDL: cr.GetSpecT().Defaults.DistributedDDL,
}
}

// newContext creates new reconcile task
func (w *worker) newTask(chi *api.ClickHouseInstallation) {
func (w *worker) newTask(cr *api.ClickHouseInstallation) {
w.task = common.NewTask(
commonCreator.NewCreator(
chi,
managers.NewConfigFilesGenerator(managers.FilesGeneratorTypeClickHouse, chi, configGeneratorOptions(chi)),
cr,
managers.NewConfigFilesGenerator(managers.FilesGeneratorTypeClickHouse, cr, configGeneratorOptions(cr)),
managers.NewContainerManager(managers.ContainerManagerTypeClickHouse),
managers.NewTagManager(managers.TagManagerTypeClickHouse, chi),
managers.NewTagManager(managers.TagManagerTypeClickHouse, cr),
managers.NewProbeManager(managers.ProbeManagerTypeClickHouse),
managers.NewServiceManager(managers.ServiceManagerTypeClickHouse),
managers.NewVolumeManager(managers.VolumeManagerTypeClickHouse),
Expand All @@ -130,7 +129,7 @@ func (w *worker) newTask(chi *api.ClickHouseInstallation) {
managers.NewOwnerReferencesManager(managers.OwnerReferencesManagerTypeClickHouse),
namer.New(),
commonMacro.New(macro.List),
labeler.New(chi),
labeler.New(cr),
),
)

Expand All @@ -139,7 +138,7 @@ func (w *worker) newTask(chi *api.ClickHouseInstallation) {
w.task,
domain.NewHostStatefulSetPoller(domain.NewStatefulSetPoller(w.c.kube), w.c.kube, w.c.ctrlLabeler),
w.c.namer,
labeler.New(chi),
labeler.New(cr),
storage.NewStorageReconciler(w.task, w.c.namer, w.c.kube.Storage()),
w.c.kube,
w.c,
Expand Down
Loading

0 comments on commit 3fd7dc2

Please sign in to comment.