Skip to content

Commit

Permalink
Add ReRun job API supported (caoyingjunz#542)
Browse files Browse the repository at this point in the history
  • Loading branch information
caoyingjunz authored Oct 4, 2024
1 parent 00d00b1 commit 511d4ef
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 7 deletions.
2 changes: 2 additions & 0 deletions api/server/router/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (cr *clusterRouter) initRoutes(httpEngine *gin.Engine) {
kubeRoute.GET("/ws", cr.webShell)
// node ws
kubeRoute.GET("/nodes/ws", cr.nodeWebShell)
// 重启Job action=rerun
kubeRoute.POST("/clusters/:cluster/namespaces/:namespace/jobs/:name", cr.ReRunJob)
}

// 从 pixiu 缓存中获取 kubernetes 对象
Expand Down
9 changes: 2 additions & 7 deletions api/server/router/cluster/helm_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import (
"github.com/gin-gonic/gin"

"github.com/caoyingjunz/pixiu/api/server/httputils"
"github.com/caoyingjunz/pixiu/pkg/types"
)

type HelmMeta struct {
Cluster string `uri:"cluster" binding:"required"`
Namespace string `uri:"namespace" binding:"required"`
Name string `uri:"name"`
}

func (cr *clusterRouter) ListReleases(c *gin.Context) {
r := httputils.NewResponse()
var (
err error
helmMeta HelmMeta
helmMeta types.PixiuObjectMeta
)
if err = c.ShouldBindUri(&helmMeta); err != nil {
httputils.SetFailed(c, r, err)
Expand Down
48 changes: 48 additions & 0 deletions api/server/router/cluster/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2024 The Pixiu Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"github.com/gin-gonic/gin"

"github.com/caoyingjunz/pixiu/api/server/httputils"
"github.com/caoyingjunz/pixiu/pkg/types"
)

type Action struct {
Act string `form:"action" binding:"required"`
ResourceVersion string `form:"resourceVersion" binding:"required"`
}

func (cr *clusterRouter) ReRunJob(c *gin.Context) {
r := httputils.NewResponse()
var (
jobMeta types.PixiuObjectMeta
action Action
err error
)
if err = httputils.ShouldBindAny(c, nil, &jobMeta, &action); err != nil {
httputils.SetFailed(c, r, err)
return
}
if err = cr.c.Cluster().ReRunJob(c, jobMeta.Cluster, jobMeta.Namespace, jobMeta.Name, action.ResourceVersion); err != nil {
httputils.SetFailed(c, r, err)
return
}

httputils.SetSuccess(c, r)
}
56 changes: 56 additions & 0 deletions pkg/controller/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/casbin/casbin/v2"
"github.com/gorilla/websocket"
"helm.sh/helm/v3/pkg/release"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -81,6 +82,8 @@ type Interface interface {

// WatchPodLog 实时获取 pod 的日志
WatchPodLog(ctx context.Context, cluster string, namespace string, podName string, containerName string, tailLine int64, w http.ResponseWriter, r *http.Request) error
// ReRunJob 重新执行指定任务
ReRunJob(ctx context.Context, cluster string, namespace string, jobName string, resourceVersion string) error

// ListReleases 获取 tenant release 列表
ListReleases(ctx context.Context, cluster string, namespace string) ([]*release.Release, error)
Expand Down Expand Up @@ -385,6 +388,59 @@ func (c *cluster) WatchPodLog(ctx context.Context, cluster string, namespace str
return nil
}

const Retries = 3

// ReRunJob 重新运行(创建)任务,通过先删除在创建的方式实现,极端情况下可能导致 job 丢失
func (c *cluster) ReRunJob(ctx context.Context, cluster string, namespace string, jobName string, resourceVersion string) error {
cs, err := c.GetClusterSetByName(ctx, cluster)
if err != nil {
return err
}

job, err := cs.Client.BatchV1().Jobs(namespace).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return err
}
if job.ResourceVersion != resourceVersion {
return fmt.Errorf("please apply your changes to the latest and re-run")
}

newJob := *job
// 重置不必要字段
newJob.ResourceVersion = ""
newJob.ObjectMeta.UID = ""
newJob.Status = batchv1.JobStatus{}
// 重置 uid 和 label
delete(newJob.Spec.Selector.MatchLabels, "controller-uid")
delete(newJob.Spec.Selector.MatchLabels, "batch.kubernetes.io/controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "batch.kubernetes.io/controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "batch.kubernetes.io/job-name")
delete(newJob.Spec.Template.ObjectMeta.Labels, "job-name")

// TODO: 备份一次job,避免失败job丢失
// 2. 删除job
if err = cs.Client.BatchV1().Jobs(namespace).Delete(ctx, jobName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to rerun job(%s) %v", jobName, err)
}

var jobErr error
// 3. 新建job,最多重试 3 次
for i := 0; i < Retries; i++ {
_, jobErr = cs.Client.BatchV1().Jobs(namespace).Create(ctx, &newJob, metav1.CreateOptions{})
if jobErr != nil {
time.Sleep(time.Second)
continue
}
break
}
if jobErr != nil {
return fmt.Errorf("failed to rerun job(%s) %v", jobName, err)
}

return nil
}

// AggregateEvents 聚合 k8s 资源的所有 events,比如 kind 为 deployment 时,则聚合 deployment,所属 rs 以及 pod 的事件
func (c *cluster) AggregateEvents(ctx context.Context, cluster string, namespace string, name string, kind string) (*v1.EventList, error) {
clusterSet, err := c.GetClusterSetByName(ctx, cluster)
Expand Down
6 changes: 6 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"github.com/caoyingjunz/pixiu/pkg/db/model"
)

type PixiuObjectMeta struct {
Cluster string `uri:"cluster" binding:"required"`
Namespace string `uri:"namespace" binding:"required"`
Name string `uri:"name"`
}

type PixiuMeta struct {
// pixiu 对象 ID
Id int64 `json:"id"`
Expand Down

0 comments on commit 511d4ef

Please sign in to comment.