Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pod template support for init containers #5750

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/deployment/configuration/general.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ as the base container configuration for all primary containers. If both containe
names exist in the default PodTemplate, Flyte first applies the default
configuration, followed by the primary configuration.

Note: Init containers can be configured with similar granularity using "default-init"
and "primary-init" init container names.

The ``containers`` field is required in each k8s PodSpec. If no default
configuration is desired, specifying a container with a name other than "default"
or "primary" (for example, "noop") is considered best practice. Since Flyte only
Expand Down
16 changes: 9 additions & 7 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@
return nil
}

func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) error {
func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) (string, error) {
if pilot == nil || !pilot.Enabled {
return nil
return "", nil
}

logger.Infof(ctx, "CoPilot Enabled for task [%s]", taskExecMetadata.GetTaskExecutionID().GetID().TaskId.Name)
shareProcessNamespaceEnabled := true
coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled
primaryInitContainerName := ""
if iFace != nil {
if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 {
inPath := cfg.DefaultInputDataPath
Expand All @@ -231,13 +232,14 @@
// Lets add the Inputs init container
args, err := DownloadCommandArgs(inputPaths.GetInputPath(), outputPaths.GetOutputPrefixPath(), inPath, format, iFace.Inputs)
if err != nil {
return err
return primaryInitContainerName, err

Check warning on line 235 in flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go#L235

Added line #L235 was not covered by tests
}
downloader, err := FlyteCoPilotContainer(flyteInitContainerName, cfg, args, inputsVolumeMount)
if err != nil {
return err
return primaryInitContainerName, err

Check warning on line 239 in flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go#L239

Added line #L239 was not covered by tests
}
coPilotPod.InitContainers = append(coPilotPod.InitContainers, downloader)
primaryInitContainerName = downloader.Name
}

if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 {
Expand All @@ -260,15 +262,15 @@
// Lets add the Inputs init container
args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, iFace)
if err != nil {
return err
return primaryInitContainerName, err

Check warning on line 265 in flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go#L265

Added line #L265 was not covered by tests
}
sidecar, err := FlyteCoPilotContainer(flyteSidecarContainerName, cfg, args, outputsVolumeMount)
if err != nil {
return err
return primaryInitContainerName, err

Check warning on line 269 in flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go#L269

Added line #L269 was not covered by tests
}
coPilotPod.Containers = append(coPilotPod.Containers, sidecar)
}
}

return nil
return primaryInitContainerName, nil
}
24 changes: 18 additions & 6 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Equal(t, "test-downloader", primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})
Expand All @@ -545,7 +547,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, nil, &pod)
})
Expand All @@ -565,7 +569,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Equal(t, "test-downloader", primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})
Expand All @@ -584,7 +590,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})
Expand All @@ -603,11 +611,15 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
assert.Len(t, pod.Volumes, 0)
})

t.Run("nil", func(t *testing.T) {
assert.NoError(t, AddCoPilotToPod(ctx, cfg, nil, nil, taskMetadata, inputPaths, opath, nil))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, nil, nil, taskMetadata, inputPaths, opath, nil)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
})
}
64 changes: 58 additions & 6 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
const SIGKILL = 137

const defaultContainerTemplateName = "default"
const defaultInitContainerTemplateName = "default-init"
const primaryContainerTemplateName = "primary"
const primaryInitContainerTemplateName = "primary-init"
const PrimaryContainerKey = "primary_container_name"

// AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement
Expand Down Expand Up @@ -387,14 +389,17 @@
dataLoadingConfig = pod.GetDataConfig()
}

primaryInitContainerName := ""

if dataLoadingConfig != nil {
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot,
primaryContainer, taskTemplate.Interface, dataLoadingConfig); err != nil {
return nil, nil, err
}

if err := AddCoPilotToPod(ctx, config.GetK8sPluginConfig().CoPilot, podSpec, taskTemplate.GetInterface(),
tCtx.TaskExecutionMetadata(), tCtx.InputReader(), tCtx.OutputWriter(), dataLoadingConfig); err != nil {
primaryInitContainerName, err = AddCoPilotToPod(ctx, config.GetK8sPluginConfig().CoPilot, podSpec, taskTemplate.GetInterface(),
tCtx.TaskExecutionMetadata(), tCtx.InputReader(), tCtx.OutputWriter(), dataLoadingConfig)
if err != nil {

Check warning on line 402 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L400-L402

Added lines #L400 - L402 were not covered by tests
return nil, nil, err
}
}
Expand All @@ -406,7 +411,7 @@
}

// merge PodSpec and ObjectMeta with configuration pod template (if exists)
podSpec, objectMeta, err = MergeWithBasePodTemplate(ctx, tCtx, podSpec, objectMeta, primaryContainerName)
podSpec, objectMeta, err = MergeWithBasePodTemplate(ctx, tCtx, podSpec, objectMeta, primaryContainerName, primaryInitContainerName)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -495,7 +500,7 @@
// MergeWithBasePodTemplate attempts to merge the provided PodSpec and ObjectMeta with the configuration PodTemplate for
// this task.
func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionContext,
podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error) {
podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error) {

// attempt to retrieve base PodTemplate
podTemplate, err := getBasePodTemplate(ctx, tCtx, DefaultPodTemplateStore)
Expand All @@ -507,7 +512,7 @@
}

// merge podSpec with podTemplate
mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName)
mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName)
if err != nil {
return nil, nil, err
}
Expand All @@ -524,7 +529,7 @@
// mergePodSpecs merges the two provided PodSpecs. This process uses the first as the base configuration, where values
// set by the first PodSpec are overwritten by the second in the return value. Additionally, this function applies
// container-level configuration from the basePodSpec.
func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContainerName string) (*v1.PodSpec, error) {
func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, error) {
if basePodSpec == nil || podSpec == nil {
return nil, errors.New("neither the basePodSpec or the podSpec can be nil")
}
Expand All @@ -539,6 +544,16 @@
}
}

// extract defaultInitContainerTemplate and primaryInitContainerTemplate
var defaultInitContainerTemplate, primaryInitContainerTemplate *v1.Container
for i := 0; i < len(basePodSpec.InitContainers); i++ {
if basePodSpec.InitContainers[i].Name == defaultInitContainerTemplateName {
defaultInitContainerTemplate = &basePodSpec.InitContainers[i]
} else if basePodSpec.InitContainers[i].Name == primaryInitContainerTemplateName {
primaryInitContainerTemplate = &basePodSpec.InitContainers[i]
}
}

// merge PodTemplate PodSpec with podSpec
var mergedPodSpec *v1.PodSpec = basePodSpec.DeepCopy()
if err := mergo.Merge(mergedPodSpec, podSpec, mergo.WithOverride, mergo.WithAppendSlice); err != nil {
Expand Down Expand Up @@ -580,6 +595,43 @@
}

mergedPodSpec.Containers = mergedContainers

// merge PodTemplate init containers
var mergedInitContainers []v1.Container
for _, initContainer := range podSpec.InitContainers {
// if applicable start with defaultContainerTemplate
var mergedInitContainer *v1.Container
if defaultInitContainerTemplate != nil {
mergedInitContainer = defaultInitContainerTemplate.DeepCopy()
}

// if applicable merge with primaryInitContainerTemplate
if initContainer.Name == primaryInitContainerName && primaryInitContainerTemplate != nil {
if mergedInitContainer == nil {
mergedInitContainer = primaryInitContainerTemplate.DeepCopy()
} else {
err := mergo.Merge(mergedInitContainer, primaryInitContainerTemplate, mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return nil, err

Check warning on line 615 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L615

Added line #L615 was not covered by tests
}
}
}

// if applicable merge with existing init initContainer
if mergedInitContainer == nil {
mergedInitContainers = append(mergedInitContainers, initContainer)
} else {
err := mergo.Merge(mergedInitContainer, initContainer, mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return nil, err

Check warning on line 626 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L626

Added line #L626 was not covered by tests
}

mergedInitContainers = append(mergedInitContainers, *mergedInitContainer)
}
}

mergedPodSpec.InitContainers = mergedInitContainers

return mergedPodSpec, nil
}

Expand Down
Loading
Loading