Skip to content

Commit

Permalink
Translation converting custom task types to docker tasks (#3108)
Browse files Browse the repository at this point in the history
Adds a translation layer to the orchestrator, allowing for custom job
types to be converted to known job types (docker/wasm) for execution on
a compute node. The translation most consists of specifying the docker
image to use, putting the engine parameters in the correct place, and
adding metadata to the newly created job.

This initial PR only support `python` and `duckdb` with special support
for installing python dependencies implemented within the image.

By default translations in the requester node are disabled, but can be
enabled with `--requester-job-translation-enabled`
  • Loading branch information
rossjones authored Dec 20, 2023
1 parent e0198f3 commit 21540cd
Show file tree
Hide file tree
Showing 30 changed files with 778 additions and 45 deletions.
1 change: 1 addition & 0 deletions cmd/cli/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewCmd() *cobra.Command {
"disable-features": configflags.DisabledFeatureFlags,
"capacity": configflags.CapacityFlags,
"job-timeouts": configflags.ComputeTimeoutFlags,
"translations": configflags.JobTranslationFlags,
}

devstackCmd := &cobra.Command{
Expand Down
19 changes: 18 additions & 1 deletion cmd/cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/spf13/cobra"
shellescape "gopkg.in/alessio/shellescape.v1"
"k8s.io/kubectl/pkg/util/i18n"

"github.com/bacalhau-project/bacalhau/cmd/util"
Expand Down Expand Up @@ -120,6 +122,14 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti
// don't have a template, then we don't know how to submit that job type.
jobType = cmdArgs[0]

for i := range cmdArgs {
// If any parameters were quoted, we should make sure we try and add
// them back in after they were stripped for us.
if strings.Contains(cmdArgs[i], " ") {
cmdArgs[i] = shellescape.Quote(cmdArgs[i])
}
}

tpl, err := NewTemplateMap(embeddedFiles, "templates")
if err != nil {
return nil, fmt.Errorf("failed to find supported job types, templates missing")
Expand Down Expand Up @@ -198,6 +208,13 @@ func addInlineContent(ctx context.Context, codeLocation string, job *models.Job)
return err
}

target := "/code"

finfo, _ := os.Stat(absPath)
if !finfo.IsDir() {
target = fmt.Sprintf("/code/%s", finfo.Name())
}

specConfig, err := inline.NewStorage().Upload(ctx, absPath)
if err != nil {
return fmt.Errorf("failed to attach code '%s' to job submission: %w", codeLocation, err)
Expand All @@ -206,7 +223,7 @@ func addInlineContent(ctx context.Context, codeLocation string, job *models.Job)
job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, &models.InputSource{
Source: &specConfig,
Alias: "code",
Target: "/code",
Target: target,
})

return nil
Expand Down
8 changes: 4 additions & 4 deletions cmd/cli/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ var testcases []testCase = []testCase{
{
// bacalhau exec python --version=3.10 -- -c "import this"
name: "zen of python",
cmdLine: []string{"python", "--version=3.10", "--", "-c", "\"import this\""},
expectedUnknownArgs: []string{"--version=3.10", "-c=\"import this\""},
cmdLine: []string{"python", "--version=3.10", "--", "-c", "import this"},
expectedUnknownArgs: []string{"--version=3.10", "-c=import this"},
expectedErrMsg: "",
jobCommand: "python",
jobArguments: []string{"-c", `"import this"`},
jobArguments: []string{"-c", "'import this'"},
numInlinedAttachments: 0,
numTotalAttachments: 0,
},
Expand Down Expand Up @@ -91,7 +91,7 @@ var testcases []testCase = []testCase{
expectedUnknownArgs: []string{},
expectedErrMsg: "",
jobCommand: "duckdb",
jobArguments: []string{"select * from /inputs/test.csv"},
jobArguments: []string{"'select * from /inputs/test.csv'"},
numInlinedAttachments: 0,
numTotalAttachments: 1,
},
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func NewCmd() *cobra.Command {
"requester-store": configflags.RequesterJobStorageFlags,
"web-ui": configflags.WebUIFlags,
"node-info-store": configflags.NodeInfoStoreFlags,
"translations": configflags.JobTranslationFlags,
}

serveCmd := &cobra.Command{
Expand Down
2 changes: 2 additions & 0 deletions cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func GetRequesterConfig() (node.RequesterConfig, error) {
if err := config.ForKey(types.NodeRequester, &cfg); err != nil {
return node.RequesterConfig{}, err
}

return node.NewRequesterConfigWith(node.RequesterConfigParams{
JobDefaults: transformer.JobDefaults{
ExecutionTimeout: time.Duration(cfg.JobDefaults.ExecutionTimeout),
Expand All @@ -89,6 +90,7 @@ func GetRequesterConfig() (node.RequesterConfig, error) {
WorkerEvalDequeueMaxBackoff: time.Duration(cfg.Worker.WorkerEvalDequeueMaxBackoff),
S3PreSignedURLExpiration: time.Duration(cfg.StorageProvider.S3.PreSignedURLExpiration),
S3PreSignedURLDisabled: cfg.StorageProvider.S3.PreSignedURLDisabled,
TranslationEnabled: cfg.TranslationEnabled,
})
}

Expand Down
12 changes: 12 additions & 0 deletions cmd/util/flags/configflags/job_translation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package configflags

import "github.com/bacalhau-project/bacalhau/pkg/config/types"

var JobTranslationFlags = []Definition{
{
FlagName: "requester-job-translation-enabled",
DefaultValue: Default.Node.Requester.TranslationEnabled,
ConfigPath: types.NodeRequesterTranslationEnabled,
Description: `Whether jobs should be translated at the requester node or not. Default: false`,
},
}
6 changes: 3 additions & 3 deletions docker/custom-job-images/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ duckdb-local:


python-test:
docker run --rm -it -v $(shell pwd)/python/test/single-file:/code bacalhauproject/exec-python-3.11:0.1 python /build/launcher.py -- python hello.py
docker run --rm -it -v $(shell pwd)/python/test/multi-file-reqtxt:/code bacalhauproject/exec-python-3.11:0.1 python /build/launcher.py -- python main.py
docker run --rm -it -v $(shell pwd)/python/test/multi-file-poetry:/code bacalhauproject/exec-python-3.11:0.1 python /build/launcher.py -- poetry run mfp
docker run --rm -it -v $(shell pwd)/python/test/single-file:/code bacalhauproject/exec-python-3.11:0.5 python /build/launcher.py -- python hello.py
docker run --rm -it -v $(shell pwd)/python/test/multi-file-reqtxt:/code bacalhauproject/exec-python-3.11:0.5 python /build/launcher.py -- python main.py
docker run --rm -it -v $(shell pwd)/python/test/multi-file-poetry:/code bacalhauproject/exec-python-3.11:0.5 python /build/launcher.py -- poetry run mfp
3 changes: 1 addition & 2 deletions docker/custom-job-images/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ RUN python -mpip install --upgrade pip
RUN python -mpip install poetry

COPY base_requirements.txt /build
COPY launcher.py /build
RUN python -mpip install -r /build/base_requirements.txt


COPY launcher.py /build
CMD ["/build/launcher.py"]

LABEL org.opencontainers.image.source https://github.com/bacalhau-project/bacalhau-images
Expand Down
4 changes: 2 additions & 2 deletions docker/custom-job-images/python/Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
MACHINE = $(shell uname -m)
USERNAME ?= bacalhauproject
VERSION ?= 0.1
VERSION ?= 0.5

ifeq ($(MACHINE),x86_64)
MACHINE := amd64
endif

local:
@echo - Building local python $(VERSION)
@echo - Building local python $(VERSION) - $(MACHINE)
docker buildx build \
--platform linux/$(MACHINE) \
-t $(USERNAME)/exec-python-3.11:$(VERSION) \
Expand Down
81 changes: 50 additions & 31 deletions docker/custom-job-images/python/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,45 @@
)

CODE_DIR = "/code" # The mounted code folder
WORKING_DIR = "/app" # Created by the shutil.copytree
OUTPUT_DIR = "/outputs" # The output folder


def main():
# Unpack the contents of /code to the working directory which
# will create that working_directory, ignoring the files that
# match the globs in IGNORE
ignore_pattern = shutil.ignore_patterns(*IGNORE)
shutil.copytree(CODE_DIR, WORKING_DIR, ignore=ignore_pattern)
os.chdir(WORKING_DIR)

# Figure out how to install requirements
for f in (
single_file,
pyproject,
requirements_txt,
setup_py,
):
if f():
break
working_dir = "/app" # Created by the shutil.copytree

# it's possible we haven't been sent any code (and we're running via -c)
# so let's support not sending code.
if os.path.exists(CODE_DIR):
# Unpack the contents of /code to the working directory which
# will create that working_directory, ignoring the files that
# match the globs in IGNORE
ignore_pattern = shutil.ignore_patterns(*IGNORE)
shutil.copytree(CODE_DIR, working_dir, ignore=ignore_pattern)
os.chdir(working_dir)

# The inline attachments will have adding the last part of the
# path when adding a directory, and so WORKING_DIR won't contain
# the code, it'll contain that directory. In these cases we'll
# just change the WORKING_DIR.
wd_list = os.listdir(working_dir)
if len(wd_list) == 1:
pth = os.path.join(working_dir, wd_list[0])
if os.path.isdir(pth):
working_dir = pth

# Figure out how to install requirements
for f in (
single_file,
pyproject,
requirements_txt,
setup_py,
):
if f(working_dir):
break
else:
# We will use the current directory as the working directory as
# we won't have created /app with the copy
working_dir = os.curdir

# Run the program in that working directory
past = False
Expand All @@ -47,7 +65,7 @@ def main():
past = True

cmd = " ".join(args)
proc = subprocess.run(cmd, capture_output=False, shell=True, cwd=WORKING_DIR)
proc = subprocess.run(cmd, capture_output=False, shell=True, cwd=working_dir)


def to_requirements_log(stdoutBytes, stderrBytes):
Expand All @@ -60,16 +78,17 @@ def to_requirements_log(stdoutBytes, stderrBytes):
f.write(stderrBytes.decode("utf-8"))


def single_file():
def single_file(working_dir):
"""
If we only find a single file ready to be deployed, we'll read pip install instrcutions
from the module doc (if it exists).
"""
installed = 0
doclines = []
files = glob("*.py", root_dir=WORKING_DIR)
files = glob("*.py", root_dir=working_dir)

if len(files) == 1:
with open(os.path.join(WORKING_DIR, files[0])) as f:
with open(os.path.join(working_dir, files[0])) as f:
mod = ast.parse(f.read())
if not mod:
return False
Expand All @@ -84,7 +103,7 @@ def single_file():
line = line.strip()
if line.startswith("pip"):
proc = subprocess.run(
f"python -m{line}", capture_output=True, shell=True, cwd=WORKING_DIR
f"python -m{line}", capture_output=True, shell=True, cwd=working_dir
)
to_requirements_log(proc.stdout, proc.stderr)

Expand All @@ -93,13 +112,13 @@ def single_file():
return installed > 0


def pyproject():
def pyproject(working_dir):
"""
If there is a pyproject.toml we'll check to see if it is a poetry app, and if
so then we will get poetry to install dependencies. If not then we will attempt
to pip install them.
"""
pth = os.path.join(WORKING_DIR, "pyproject.toml")
pth = os.path.join(working_dir, "pyproject.toml")
if not os.path.exists(pth):
return False

Expand All @@ -113,27 +132,27 @@ def pyproject():
if not is_poetry:
cmd = f"python -mpip install {pth}"

proc = subprocess.run(cmd, capture_output=True, shell=True, cwd=WORKING_DIR)
proc = subprocess.run(cmd, capture_output=True, shell=True, cwd=working_dir)
to_requirements_log(proc.stdout, proc.stderr)

return True


def requirements_txt():
def requirements_txt(working_dir):
"""
Look for a requirements file (or several) based on common names to load the
dependencies from
"""
installed = 0
files = ("dev-requirements.txt", "requirements-dev.txt", "requirements.txt")
for f in files:
pth = os.path.join(WORKING_DIR, f)
pth = os.path.join(working_dir, f)
if os.path.exists(pth):
proc = subprocess.run(
f"python -mpip install -r {f}",
capture_output=True,
shell=True,
cwd=WORKING_DIR,
cwd=working_dir,
)
to_requirements_log(proc.stdout, proc.stderr)

Expand All @@ -142,17 +161,17 @@ def requirements_txt():
return installed > 0


def setup_py():
def setup_py(working_dir):
"""
Look for a setup.py file as a last resort and try to install it locally
"""
pth = os.path.join(WORKING_DIR, "setup.py")
pth = os.path.join(working_dir, "setup.py")
if os.path.exists(pth):
proc = subprocess.run(
f"python -m pip install -e .",
capture_output=True,
shell=True,
cwd=WORKING_DIR,
cwd=working_dir,
)
to_requirements_log(proc.stdout, proc.stderr)
return True
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.17.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
gopkg.in/alessio/shellescape.v1 v1.0.0-20170105083845-52074bc9df61
k8s.io/apimachinery v0.28.4
k8s.io/kubectl v0.28.4
sigs.k8s.io/yaml v1.4.0
Expand All @@ -87,6 +88,7 @@ require (
require (
dario.cat/mergo v1.0.0 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.42 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.12 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0=
github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down Expand Up @@ -1695,6 +1697,8 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alessio/shellescape.v1 v1.0.0-20170105083845-52074bc9df61 h1:8ajkpB4hXVftY5ko905id+dOnmorcS2CHNxxHLLDcFM=
gopkg.in/alessio/shellescape.v1 v1.0.0-20170105083845-52074bc9df61/go.mod h1:IfMagxm39Ys4ybJrDb7W3Ob8RwxftP0Yy+or/NVz1O8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
1 change: 1 addition & 0 deletions pkg/config/types/generated_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const NodeRequesterNodeRankRandomnessRange = "Node.Requester.NodeRankRandomnessR
const NodeRequesterOverAskForBidsFactor = "Node.Requester.OverAskForBidsFactor"
const NodeRequesterFailureInjectionConfig = "Node.Requester.FailureInjectionConfig"
const NodeRequesterFailureInjectionConfigIsBadActor = "Node.Requester.FailureInjectionConfig.IsBadActor"
const NodeRequesterTranslationEnabled = "Node.Requester.TranslationEnabled"
const NodeRequesterEvaluationBroker = "Node.Requester.EvaluationBroker"
const NodeRequesterEvaluationBrokerEvalBrokerVisibilityTimeout = "Node.Requester.EvaluationBroker.EvalBrokerVisibilityTimeout"
const NodeRequesterEvaluationBrokerEvalBrokerInitialRetryDelay = "Node.Requester.EvaluationBroker.EvalBrokerInitialRetryDelay"
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/generated_viper_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func SetDefaults(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.SetDefault(NodeRequesterOverAskForBidsFactor, cfg.Node.Requester.OverAskForBidsFactor)
p.Viper.SetDefault(NodeRequesterFailureInjectionConfig, cfg.Node.Requester.FailureInjectionConfig)
p.Viper.SetDefault(NodeRequesterFailureInjectionConfigIsBadActor, cfg.Node.Requester.FailureInjectionConfig.IsBadActor)
p.Viper.SetDefault(NodeRequesterTranslationEnabled, cfg.Node.Requester.TranslationEnabled)
p.Viper.SetDefault(NodeRequesterEvaluationBroker, cfg.Node.Requester.EvaluationBroker)
p.Viper.SetDefault(NodeRequesterEvaluationBrokerEvalBrokerVisibilityTimeout, cfg.Node.Requester.EvaluationBroker.EvalBrokerVisibilityTimeout.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterEvaluationBrokerEvalBrokerInitialRetryDelay, cfg.Node.Requester.EvaluationBroker.EvalBrokerInitialRetryDelay.AsTimeDuration())
Expand Down Expand Up @@ -257,6 +258,7 @@ func Set(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.Set(NodeRequesterOverAskForBidsFactor, cfg.Node.Requester.OverAskForBidsFactor)
p.Viper.Set(NodeRequesterFailureInjectionConfig, cfg.Node.Requester.FailureInjectionConfig)
p.Viper.Set(NodeRequesterFailureInjectionConfigIsBadActor, cfg.Node.Requester.FailureInjectionConfig.IsBadActor)
p.Viper.Set(NodeRequesterTranslationEnabled, cfg.Node.Requester.TranslationEnabled)
p.Viper.Set(NodeRequesterEvaluationBroker, cfg.Node.Requester.EvaluationBroker)
p.Viper.Set(NodeRequesterEvaluationBrokerEvalBrokerVisibilityTimeout, cfg.Node.Requester.EvaluationBroker.EvalBrokerVisibilityTimeout.AsTimeDuration())
p.Viper.Set(NodeRequesterEvaluationBrokerEvalBrokerInitialRetryDelay, cfg.Node.Requester.EvaluationBroker.EvalBrokerInitialRetryDelay.AsTimeDuration())
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type RequesterConfig struct {
OverAskForBidsFactor uint `yaml:"OverAskForBidsFactor"`
FailureInjectionConfig model.FailureInjectionRequesterConfig `yaml:"FailureInjectionConfig"`

TranslationEnabled bool `yaml:"TranslationEnabled"`

EvaluationBroker EvaluationBrokerConfig `yaml:"EvaluationBroker"`
Worker WorkerConfig `yaml:"Worker"`
StorageProvider StorageProviderConfig `yaml:"StorageProvider"`
Expand Down
Loading

0 comments on commit 21540cd

Please sign in to comment.