From 48863ff4cf9146906a7d7879c2ca93265c7ad662 Mon Sep 17 00:00:00 2001 From: Ricardo Noriega De Soto Date: Wed, 20 Apr 2022 15:52:40 +0200 Subject: [PATCH] Restart MicroShift when IP address changes (#650) * The IPWatchController will restart MicroShift when IP address changes have been detected. Signed-off-by: Miguel Angel Ajo Signed-off-by: Ricardo Noriega * Recreate Endpoints after IP change Signed-off-by: Miguel Angel Ajo Co-authored-by: Ricardo Noriega * Fix DeepEqual condition and endpoints creation logic Signed-off-by: Ricardo Noriega --- pkg/cmd/run.go | 5 ++ pkg/controllers/apiservice.go | 109 +++++++++++++++++++++------------- pkg/ipwatch/ipwatch.go | 67 +++++++++++++++++++++ 3 files changed, 139 insertions(+), 42 deletions(-) create mode 100644 pkg/ipwatch/ipwatch.go diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 4a71e156cf6..e3da639354c 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/go-systemd/daemon" "github.com/openshift/microshift/pkg/config" "github.com/openshift/microshift/pkg/controllers" + "github.com/openshift/microshift/pkg/ipwatch" "github.com/openshift/microshift/pkg/kustomize" "github.com/openshift/microshift/pkg/mdns" "github.com/openshift/microshift/pkg/node" @@ -95,6 +96,7 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { m := servicemanager.NewServiceManager() if config.StringInList("controlplane", cfg.Roles) { util.Must(m.AddService(controllers.NewEtcd(cfg))) + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg))) util.Must(m.AddService(controllers.NewKubeAPIServer(cfg))) util.Must(m.AddService(controllers.NewKubeScheduler(cfg))) util.Must(m.AddService(controllers.NewKubeControllerManager(cfg))) @@ -110,6 +112,9 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error { } if config.StringInList("node", cfg.Roles) { + if len(cfg.Roles) == 1 { + util.Must(m.AddService(ipwatch.NewIPWatchController(cfg))) + } util.Must(m.AddService(node.NewKubeletServer(cfg))) util.Must(m.AddService(node.NewKubeProxyServer(cfg))) } diff --git a/pkg/controllers/apiservice.go b/pkg/controllers/apiservice.go index 3650da490ba..1d0fc499d97 100644 --- a/pkg/controllers/apiservice.go +++ b/pkg/controllers/apiservice.go @@ -18,12 +18,12 @@ package controllers import ( "context" "io/ioutil" + "reflect" "strings" - "k8s.io/klog/v2" - "github.com/openshift/microshift/pkg/assets" "github.com/openshift/microshift/pkg/config" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,6 +31,7 @@ import ( coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/util/intstr" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -44,6 +45,64 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort } client := coreclientv1.NewForConfigOrDie(rest.AddUserAgent(restConfig, "core-agent")) + + if err = ensureService(client, svcName); err != nil { + return errors.Wrapf(err, "Error creating service %q", svcName) + } + + if err != ensureIPEndpoints(client, svcName, cfg.NodeIP, svcPort) { + return errors.Wrapf(err, "Error creating IP endpoints for service %q, IP %q:%d", svcName, cfg.NodeIP, svcPort) + } + + return nil +} + +func ensureIPEndpoints(client *coreclientv1.CoreV1Client, svcName, svcIP string, svcPort int) error { + expectedEndpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: svcIP}}, + Ports: []corev1.EndpointPort{ + { + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + } + + endpoints, err := client.Endpoints("default").Get(context.TODO(), expectedEndpoints.Name, metav1.GetOptions{}) + if err == nil { + if !reflect.DeepEqual(endpoints.Subsets, expectedEndpoints.Subsets) { + klog.Infof("deleting outdated endpoints %s", endpoints.Name) + if err := client.Endpoints("default").Delete(context.TODO(), endpoints.Name, metav1.DeleteOptions{}); err != nil { + return errors.Wrapf(err, "Error deleting outdated endpoints %q", endpoints.Name) + } + } else { + klog.Infof("expected endpoint already exists %s", endpoints.Name) + return nil + } + } + + klog.Infof("creating endpoints %s", endpoints.Name) + _, err = client.Endpoints("default").Create(context.TODO(), expectedEndpoints, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("error creating endpoints %q: %v", endpoints.Name, err) + return err + } + return nil +} + +func ensureService(client *coreclientv1.CoreV1Client, svcName string) error { svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -64,52 +123,18 @@ func createAPIHeadlessSvc(cfg *config.MicroshiftConfig, svcName string, svcPort }, }, } - _, err = client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) + + _, err := client.Services("default").Get(context.TODO(), svc.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - klog.Infof("Creating service %s", svc.Name) + klog.Infof("creating service %s", svc.Name) _, err = client.Services("default").Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { return err } - endpoints := &corev1.Endpoints{ - TypeMeta: metav1.TypeMeta{ - Kind: "Endpoints", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: svcName, - Namespace: "default", - }, - } - - k8s_endpoints, err := client.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) - if err != nil { - klog.Infof("Failed to find kubernetes endpoints") - } - subsets := endpoints.Subsets - for _, sub := range k8s_endpoints.Subsets { - addr := sub.Addresses - ports := []corev1.EndpointPort{ - { - Port: int32(svcPort), - }, - } - subsets = append(subsets, - corev1.EndpointSubset{ - Addresses: addr, - Ports: ports, - }) - } - endpoints.Subsets = subsets - _, err = client.Endpoints("default").Get(context.TODO(), endpoints.Name, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - klog.Infof("Creating endpoints %s", endpoints.Name) - _, err = client.Endpoints("default").Create(context.TODO(), endpoints, metav1.CreateOptions{}) - return err - } } return nil } + func trimFirst(s string, sep string) string { parts := strings.Split(s, sep) return strings.Join(parts[1:], sep) @@ -159,7 +184,7 @@ func createAPIRegistration(cfg *config.MicroshiftConfig) error { } _, err = client.APIServices().Get(context.TODO(), api.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - klog.Infof("Creating api registration %s", api.Name) + klog.Infof("creating api registration %s", api.Name) _, _ = client.APIServices().Create(context.TODO(), api, metav1.CreateOptions{}) } } @@ -210,7 +235,7 @@ func ApplyDefaultSCCs(cfg *config.MicroshiftConfig) error { } ) if err := assets.ApplySCCs(sccs, nil, nil, kubeconfigPath); err != nil { - klog.Warningf("Failed to apply sccs %v", err) + klog.Warningf("failed to apply sccs %v", err) return err } return nil diff --git a/pkg/ipwatch/ipwatch.go b/pkg/ipwatch/ipwatch.go new file mode 100644 index 00000000000..7347f1f4509 --- /dev/null +++ b/pkg/ipwatch/ipwatch.go @@ -0,0 +1,67 @@ +/* +Copyright © 2022 Microshift Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipwatch + +import ( + "context" + "os" + "time" + + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/util" + "k8s.io/klog/v2" +) + +const ipCheckInterval = time.Second * 5 + +type IPWatchController struct { + NodeIP string +} + +func NewIPWatchController(cfg *config.MicroshiftConfig) *IPWatchController { + return &IPWatchController{ + NodeIP: cfg.NodeIP, + } +} + +func (s *IPWatchController) Name() string { return "ipwatch-controller" } +func (s *IPWatchController) Dependencies() []string { + return []string{} +} + +func (c *IPWatchController) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + ticker := time.NewTicker(ipCheckInterval) + defer ticker.Stop() + + klog.Infof("Starting ipwatch-controller with IP address %q", c.NodeIP) + + for { + select { + case <-ticker.C: + currentIP, _ := util.GetHostIP() + if c.NodeIP != currentIP { + klog.Warningf("IP address has changed from %q to %q, restarting MicroShift", c.NodeIP, currentIP) + os.Exit(0) + return nil + } + + case <-ctx.Done(): + return ctx.Err() + } + } +}