From 21540cd583f83348a6a156e016c538fc5ebf1ba9 Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Wed, 20 Dec 2023 11:15:59 +0000 Subject: [PATCH] Translation converting custom task types to docker tasks (#3108) Adds a translation layer to the orchestrator, allowing for custom job types to be converted to known job types (docker/wasm) for execution on a compute node. The translation most consists of specifying the docker image to use, putting the engine parameters in the correct place, and adding metadata to the newly created job. This initial PR only support `python` and `duckdb` with special support for installing python dependencies implemented within the image. By default translations in the requester node are disabled, but can be enabled with `--requester-job-translation-enabled` --- cmd/cli/devstack/devstack.go | 1 + cmd/cli/exec/exec.go | 19 +- cmd/cli/exec/exec_test.go | 8 +- cmd/cli/serve/serve.go | 1 + cmd/cli/serve/util.go | 2 + cmd/util/flags/configflags/job_translation.go | 12 ++ docker/custom-job-images/Makefile | 6 +- docker/custom-job-images/python/Dockerfile | 3 +- docker/custom-job-images/python/Makefile | 4 +- docker/custom-job-images/python/launcher.py | 81 ++++--- go.mod | 2 + go.sum | 4 + pkg/config/types/generated_constants.go | 1 + pkg/config/types/generated_viper_defaults.go | 2 + pkg/config/types/requester.go | 2 + pkg/models/constants.go | 5 + pkg/models/engine.go | 5 + pkg/models/task_builder.go | 9 + pkg/node/config_defaults.go | 4 + pkg/node/config_requester.go | 3 + pkg/node/requester.go | 7 + pkg/orchestrator/endpoint.go | 31 +++ pkg/orchestrator/endpoint_test.go | 6 +- pkg/translation/translation.go | 118 +++++++++++ pkg/translation/translation_test.go | 200 ++++++++++++++++++ pkg/translation/translators/duckdb.go | 54 +++++ pkg/translation/translators/errors.go | 7 + pkg/translation/translators/python.go | 103 +++++++++ pkg/util/conversion.go | 30 +++ pkg/util/conversion_test.go | 93 ++++++++ 30 files changed, 778 insertions(+), 45 deletions(-) create mode 100644 cmd/util/flags/configflags/job_translation.go create mode 100644 pkg/models/engine.go create mode 100644 pkg/translation/translation.go create mode 100644 pkg/translation/translation_test.go create mode 100644 pkg/translation/translators/duckdb.go create mode 100644 pkg/translation/translators/errors.go create mode 100644 pkg/translation/translators/python.go create mode 100644 pkg/util/conversion.go create mode 100644 pkg/util/conversion_test.go diff --git a/cmd/cli/devstack/devstack.go b/cmd/cli/devstack/devstack.go index 08ab61205b..282221a3e6 100644 --- a/cmd/cli/devstack/devstack.go +++ b/cmd/cli/devstack/devstack.go @@ -71,6 +71,7 @@ func NewCmd() *cobra.Command { "disable-features": configflags.DisabledFeatureFlags, "capacity": configflags.CapacityFlags, "job-timeouts": configflags.ComputeTimeoutFlags, + "translations": configflags.JobTranslationFlags, } devstackCmd := &cobra.Command{ diff --git a/cmd/cli/exec/exec.go b/cmd/cli/exec/exec.go index b03e89dea7..7fbaeaa071 100644 --- a/cmd/cli/exec/exec.go +++ b/cmd/cli/exec/exec.go @@ -6,8 +6,10 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/spf13/cobra" + shellescape "gopkg.in/alessio/shellescape.v1" "k8s.io/kubectl/pkg/util/i18n" "github.com/bacalhau-project/bacalhau/cmd/util" @@ -120,6 +122,14 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti // don't have a template, then we don't know how to submit that job type. jobType = cmdArgs[0] + for i := range cmdArgs { + // If any parameters were quoted, we should make sure we try and add + // them back in after they were stripped for us. + if strings.Contains(cmdArgs[i], " ") { + cmdArgs[i] = shellescape.Quote(cmdArgs[i]) + } + } + tpl, err := NewTemplateMap(embeddedFiles, "templates") if err != nil { return nil, fmt.Errorf("failed to find supported job types, templates missing") @@ -198,6 +208,13 @@ func addInlineContent(ctx context.Context, codeLocation string, job *models.Job) return err } + target := "/code" + + finfo, _ := os.Stat(absPath) + if !finfo.IsDir() { + target = fmt.Sprintf("/code/%s", finfo.Name()) + } + specConfig, err := inline.NewStorage().Upload(ctx, absPath) if err != nil { return fmt.Errorf("failed to attach code '%s' to job submission: %w", codeLocation, err) @@ -206,7 +223,7 @@ func addInlineContent(ctx context.Context, codeLocation string, job *models.Job) job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, &models.InputSource{ Source: &specConfig, Alias: "code", - Target: "/code", + Target: target, }) return nil diff --git a/cmd/cli/exec/exec_test.go b/cmd/cli/exec/exec_test.go index 65116655e4..1a983c0a22 100644 --- a/cmd/cli/exec/exec_test.go +++ b/cmd/cli/exec/exec_test.go @@ -43,11 +43,11 @@ var testcases []testCase = []testCase{ { // bacalhau exec python --version=3.10 -- -c "import this" name: "zen of python", - cmdLine: []string{"python", "--version=3.10", "--", "-c", "\"import this\""}, - expectedUnknownArgs: []string{"--version=3.10", "-c=\"import this\""}, + cmdLine: []string{"python", "--version=3.10", "--", "-c", "import this"}, + expectedUnknownArgs: []string{"--version=3.10", "-c=import this"}, expectedErrMsg: "", jobCommand: "python", - jobArguments: []string{"-c", `"import this"`}, + jobArguments: []string{"-c", "'import this'"}, numInlinedAttachments: 0, numTotalAttachments: 0, }, @@ -91,7 +91,7 @@ var testcases []testCase = []testCase{ expectedUnknownArgs: []string{}, expectedErrMsg: "", jobCommand: "duckdb", - jobArguments: []string{"select * from /inputs/test.csv"}, + jobArguments: []string{"'select * from /inputs/test.csv'"}, numInlinedAttachments: 0, numTotalAttachments: 1, }, diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index e6080c21e7..277fa5a529 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -108,6 +108,7 @@ func NewCmd() *cobra.Command { "requester-store": configflags.RequesterJobStorageFlags, "web-ui": configflags.WebUIFlags, "node-info-store": configflags.NodeInfoStoreFlags, + "translations": configflags.JobTranslationFlags, } serveCmd := &cobra.Command{ diff --git a/cmd/cli/serve/util.go b/cmd/cli/serve/util.go index 720c89c3b8..d61fa8ac15 100644 --- a/cmd/cli/serve/util.go +++ b/cmd/cli/serve/util.go @@ -64,6 +64,7 @@ func GetRequesterConfig() (node.RequesterConfig, error) { if err := config.ForKey(types.NodeRequester, &cfg); err != nil { return node.RequesterConfig{}, err } + return node.NewRequesterConfigWith(node.RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: time.Duration(cfg.JobDefaults.ExecutionTimeout), @@ -89,6 +90,7 @@ func GetRequesterConfig() (node.RequesterConfig, error) { WorkerEvalDequeueMaxBackoff: time.Duration(cfg.Worker.WorkerEvalDequeueMaxBackoff), S3PreSignedURLExpiration: time.Duration(cfg.StorageProvider.S3.PreSignedURLExpiration), S3PreSignedURLDisabled: cfg.StorageProvider.S3.PreSignedURLDisabled, + TranslationEnabled: cfg.TranslationEnabled, }) } diff --git a/cmd/util/flags/configflags/job_translation.go b/cmd/util/flags/configflags/job_translation.go new file mode 100644 index 0000000000..0817ce4b45 --- /dev/null +++ b/cmd/util/flags/configflags/job_translation.go @@ -0,0 +1,12 @@ +package configflags + +import "github.com/bacalhau-project/bacalhau/pkg/config/types" + +var JobTranslationFlags = []Definition{ + { + FlagName: "requester-job-translation-enabled", + DefaultValue: Default.Node.Requester.TranslationEnabled, + ConfigPath: types.NodeRequesterTranslationEnabled, + Description: `Whether jobs should be translated at the requester node or not. Default: false`, + }, +} diff --git a/docker/custom-job-images/Makefile b/docker/custom-job-images/Makefile index 9c87a3cc5e..be0e80ed9b 100644 --- a/docker/custom-job-images/Makefile +++ b/docker/custom-job-images/Makefile @@ -20,6 +20,6 @@ duckdb-local: python-test: - docker run --rm -it -v $(shell pwd)/python/test/single-file:/code bacalhauproject/exec-python-3.11:0.1 python /build/launcher.py -- python hello.py - docker run --rm -it -v $(shell pwd)/python/test/multi-file-reqtxt:/code bacalhauproject/exec-python-3.11:0.1 python /build/launcher.py -- python main.py - docker run --rm -it -v $(shell pwd)/python/test/multi-file-poetry:/code bacalhauproject/exec-python-3.11:0.1 python /build/launcher.py -- poetry run mfp \ No newline at end of file + docker run --rm -it -v $(shell pwd)/python/test/single-file:/code bacalhauproject/exec-python-3.11:0.5 python /build/launcher.py -- python hello.py + docker run --rm -it -v $(shell pwd)/python/test/multi-file-reqtxt:/code bacalhauproject/exec-python-3.11:0.5 python /build/launcher.py -- python main.py + docker run --rm -it -v $(shell pwd)/python/test/multi-file-poetry:/code bacalhauproject/exec-python-3.11:0.5 python /build/launcher.py -- poetry run mfp \ No newline at end of file diff --git a/docker/custom-job-images/python/Dockerfile b/docker/custom-job-images/python/Dockerfile index 11a58a2443..2c8c8b45fe 100644 --- a/docker/custom-job-images/python/Dockerfile +++ b/docker/custom-job-images/python/Dockerfile @@ -27,10 +27,9 @@ RUN python -mpip install --upgrade pip RUN python -mpip install poetry COPY base_requirements.txt /build -COPY launcher.py /build RUN python -mpip install -r /build/base_requirements.txt - +COPY launcher.py /build CMD ["/build/launcher.py"] LABEL org.opencontainers.image.source https://github.com/bacalhau-project/bacalhau-images diff --git a/docker/custom-job-images/python/Makefile b/docker/custom-job-images/python/Makefile index a8adbca3ee..5a6859dc4f 100644 --- a/docker/custom-job-images/python/Makefile +++ b/docker/custom-job-images/python/Makefile @@ -1,13 +1,13 @@ MACHINE = $(shell uname -m) USERNAME ?= bacalhauproject -VERSION ?= 0.1 +VERSION ?= 0.5 ifeq ($(MACHINE),x86_64) MACHINE := amd64 endif local: - @echo - Building local python $(VERSION) + @echo - Building local python $(VERSION) - $(MACHINE) docker buildx build \ --platform linux/$(MACHINE) \ -t $(USERNAME)/exec-python-3.11:$(VERSION) \ diff --git a/docker/custom-job-images/python/launcher.py b/docker/custom-job-images/python/launcher.py index 34735297b5..6ae4c412da 100755 --- a/docker/custom-job-images/python/launcher.py +++ b/docker/custom-job-images/python/launcher.py @@ -15,27 +15,45 @@ ) CODE_DIR = "/code" # The mounted code folder -WORKING_DIR = "/app" # Created by the shutil.copytree OUTPUT_DIR = "/outputs" # The output folder def main(): - # Unpack the contents of /code to the working directory which - # will create that working_directory, ignoring the files that - # match the globs in IGNORE - ignore_pattern = shutil.ignore_patterns(*IGNORE) - shutil.copytree(CODE_DIR, WORKING_DIR, ignore=ignore_pattern) - os.chdir(WORKING_DIR) - - # Figure out how to install requirements - for f in ( - single_file, - pyproject, - requirements_txt, - setup_py, - ): - if f(): - break + working_dir = "/app" # Created by the shutil.copytree + + # it's possible we haven't been sent any code (and we're running via -c) + # so let's support not sending code. + if os.path.exists(CODE_DIR): + # Unpack the contents of /code to the working directory which + # will create that working_directory, ignoring the files that + # match the globs in IGNORE + ignore_pattern = shutil.ignore_patterns(*IGNORE) + shutil.copytree(CODE_DIR, working_dir, ignore=ignore_pattern) + os.chdir(working_dir) + + # The inline attachments will have adding the last part of the + # path when adding a directory, and so WORKING_DIR won't contain + # the code, it'll contain that directory. In these cases we'll + # just change the WORKING_DIR. + wd_list = os.listdir(working_dir) + if len(wd_list) == 1: + pth = os.path.join(working_dir, wd_list[0]) + if os.path.isdir(pth): + working_dir = pth + + # Figure out how to install requirements + for f in ( + single_file, + pyproject, + requirements_txt, + setup_py, + ): + if f(working_dir): + break + else: + # We will use the current directory as the working directory as + # we won't have created /app with the copy + working_dir = os.curdir # Run the program in that working directory past = False @@ -47,7 +65,7 @@ def main(): past = True cmd = " ".join(args) - proc = subprocess.run(cmd, capture_output=False, shell=True, cwd=WORKING_DIR) + proc = subprocess.run(cmd, capture_output=False, shell=True, cwd=working_dir) def to_requirements_log(stdoutBytes, stderrBytes): @@ -60,16 +78,17 @@ def to_requirements_log(stdoutBytes, stderrBytes): f.write(stderrBytes.decode("utf-8")) -def single_file(): +def single_file(working_dir): """ If we only find a single file ready to be deployed, we'll read pip install instrcutions from the module doc (if it exists). """ installed = 0 doclines = [] - files = glob("*.py", root_dir=WORKING_DIR) + files = glob("*.py", root_dir=working_dir) + if len(files) == 1: - with open(os.path.join(WORKING_DIR, files[0])) as f: + with open(os.path.join(working_dir, files[0])) as f: mod = ast.parse(f.read()) if not mod: return False @@ -84,7 +103,7 @@ def single_file(): line = line.strip() if line.startswith("pip"): proc = subprocess.run( - f"python -m{line}", capture_output=True, shell=True, cwd=WORKING_DIR + f"python -m{line}", capture_output=True, shell=True, cwd=working_dir ) to_requirements_log(proc.stdout, proc.stderr) @@ -93,13 +112,13 @@ def single_file(): return installed > 0 -def pyproject(): +def pyproject(working_dir): """ If there is a pyproject.toml we'll check to see if it is a poetry app, and if so then we will get poetry to install dependencies. If not then we will attempt to pip install them. """ - pth = os.path.join(WORKING_DIR, "pyproject.toml") + pth = os.path.join(working_dir, "pyproject.toml") if not os.path.exists(pth): return False @@ -113,13 +132,13 @@ def pyproject(): if not is_poetry: cmd = f"python -mpip install {pth}" - proc = subprocess.run(cmd, capture_output=True, shell=True, cwd=WORKING_DIR) + proc = subprocess.run(cmd, capture_output=True, shell=True, cwd=working_dir) to_requirements_log(proc.stdout, proc.stderr) return True -def requirements_txt(): +def requirements_txt(working_dir): """ Look for a requirements file (or several) based on common names to load the dependencies from @@ -127,13 +146,13 @@ def requirements_txt(): installed = 0 files = ("dev-requirements.txt", "requirements-dev.txt", "requirements.txt") for f in files: - pth = os.path.join(WORKING_DIR, f) + pth = os.path.join(working_dir, f) if os.path.exists(pth): proc = subprocess.run( f"python -mpip install -r {f}", capture_output=True, shell=True, - cwd=WORKING_DIR, + cwd=working_dir, ) to_requirements_log(proc.stdout, proc.stderr) @@ -142,17 +161,17 @@ def requirements_txt(): return installed > 0 -def setup_py(): +def setup_py(working_dir): """ Look for a setup.py file as a last resort and try to install it locally """ - pth = os.path.join(WORKING_DIR, "setup.py") + pth = os.path.join(working_dir, "setup.py") if os.path.exists(pth): proc = subprocess.run( f"python -m pip install -e .", capture_output=True, shell=True, - cwd=WORKING_DIR, + cwd=working_dir, ) to_requirements_log(proc.stdout, proc.stderr) return True diff --git a/go.mod b/go.mod index d40e7f202a..a6013bae87 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,7 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/crypto v0.17.0 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 + gopkg.in/alessio/shellescape.v1 v1.0.0-20170105083845-52074bc9df61 k8s.io/apimachinery v0.28.4 k8s.io/kubectl v0.28.4 sigs.k8s.io/yaml v1.4.0 @@ -87,6 +88,7 @@ require ( require ( dario.cat/mergo v1.0.0 // indirect github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 // indirect + github.com/alessio/shellescape v1.4.2 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.42 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.12 // indirect diff --git a/go.sum b/go.sum index 23c87411ae..f9e49748db 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0= +github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM= github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= @@ -1695,6 +1697,8 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/alessio/shellescape.v1 v1.0.0-20170105083845-52074bc9df61 h1:8ajkpB4hXVftY5ko905id+dOnmorcS2CHNxxHLLDcFM= +gopkg.in/alessio/shellescape.v1 v1.0.0-20170105083845-52074bc9df61/go.mod h1:IfMagxm39Ys4ybJrDb7W3Ob8RwxftP0Yy+or/NVz1O8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/config/types/generated_constants.go b/pkg/config/types/generated_constants.go index 421a2cdba1..0591fc0139 100644 --- a/pkg/config/types/generated_constants.go +++ b/pkg/config/types/generated_constants.go @@ -91,6 +91,7 @@ const NodeRequesterNodeRankRandomnessRange = "Node.Requester.NodeRankRandomnessR const NodeRequesterOverAskForBidsFactor = "Node.Requester.OverAskForBidsFactor" const NodeRequesterFailureInjectionConfig = "Node.Requester.FailureInjectionConfig" const NodeRequesterFailureInjectionConfigIsBadActor = "Node.Requester.FailureInjectionConfig.IsBadActor" +const NodeRequesterTranslationEnabled = "Node.Requester.TranslationEnabled" const NodeRequesterEvaluationBroker = "Node.Requester.EvaluationBroker" const NodeRequesterEvaluationBrokerEvalBrokerVisibilityTimeout = "Node.Requester.EvaluationBroker.EvalBrokerVisibilityTimeout" const NodeRequesterEvaluationBrokerEvalBrokerInitialRetryDelay = "Node.Requester.EvaluationBroker.EvalBrokerInitialRetryDelay" diff --git a/pkg/config/types/generated_viper_defaults.go b/pkg/config/types/generated_viper_defaults.go index 277bc56c72..7e0ac97181 100644 --- a/pkg/config/types/generated_viper_defaults.go +++ b/pkg/config/types/generated_viper_defaults.go @@ -113,6 +113,7 @@ func SetDefaults(cfg BacalhauConfig, opts ...SetOption) { p.Viper.SetDefault(NodeRequesterOverAskForBidsFactor, cfg.Node.Requester.OverAskForBidsFactor) p.Viper.SetDefault(NodeRequesterFailureInjectionConfig, cfg.Node.Requester.FailureInjectionConfig) p.Viper.SetDefault(NodeRequesterFailureInjectionConfigIsBadActor, cfg.Node.Requester.FailureInjectionConfig.IsBadActor) + p.Viper.SetDefault(NodeRequesterTranslationEnabled, cfg.Node.Requester.TranslationEnabled) p.Viper.SetDefault(NodeRequesterEvaluationBroker, cfg.Node.Requester.EvaluationBroker) p.Viper.SetDefault(NodeRequesterEvaluationBrokerEvalBrokerVisibilityTimeout, cfg.Node.Requester.EvaluationBroker.EvalBrokerVisibilityTimeout.AsTimeDuration()) p.Viper.SetDefault(NodeRequesterEvaluationBrokerEvalBrokerInitialRetryDelay, cfg.Node.Requester.EvaluationBroker.EvalBrokerInitialRetryDelay.AsTimeDuration()) @@ -257,6 +258,7 @@ func Set(cfg BacalhauConfig, opts ...SetOption) { p.Viper.Set(NodeRequesterOverAskForBidsFactor, cfg.Node.Requester.OverAskForBidsFactor) p.Viper.Set(NodeRequesterFailureInjectionConfig, cfg.Node.Requester.FailureInjectionConfig) p.Viper.Set(NodeRequesterFailureInjectionConfigIsBadActor, cfg.Node.Requester.FailureInjectionConfig.IsBadActor) + p.Viper.Set(NodeRequesterTranslationEnabled, cfg.Node.Requester.TranslationEnabled) p.Viper.Set(NodeRequesterEvaluationBroker, cfg.Node.Requester.EvaluationBroker) p.Viper.Set(NodeRequesterEvaluationBrokerEvalBrokerVisibilityTimeout, cfg.Node.Requester.EvaluationBroker.EvalBrokerVisibilityTimeout.AsTimeDuration()) p.Viper.Set(NodeRequesterEvaluationBrokerEvalBrokerInitialRetryDelay, cfg.Node.Requester.EvaluationBroker.EvalBrokerInitialRetryDelay.AsTimeDuration()) diff --git a/pkg/config/types/requester.go b/pkg/config/types/requester.go index de21adac5b..bd4dee6b5d 100644 --- a/pkg/config/types/requester.go +++ b/pkg/config/types/requester.go @@ -17,6 +17,8 @@ type RequesterConfig struct { OverAskForBidsFactor uint `yaml:"OverAskForBidsFactor"` FailureInjectionConfig model.FailureInjectionRequesterConfig `yaml:"FailureInjectionConfig"` + TranslationEnabled bool `yaml:"TranslationEnabled"` + EvaluationBroker EvaluationBrokerConfig `yaml:"EvaluationBroker"` Worker WorkerConfig `yaml:"Worker"` StorageProvider StorageProviderConfig `yaml:"StorageProvider"` diff --git a/pkg/models/constants.go b/pkg/models/constants.go index ca952f1e82..fac87da4cc 100644 --- a/pkg/models/constants.go +++ b/pkg/models/constants.go @@ -61,4 +61,9 @@ const ( MetaRequesterID = "bacalhau.org/requester.id" MetaRequesterPublicKey = "bacalhau.org/requester.publicKey" MetaClientID = "bacalhau.org/client.id" + + // Job provenance metadata used to track the origin of a job where + // it may have been translated from another job. + MetaDerivedFrom = "bacalhau.org/derivedFrom" + MetaTranslatedBy = "bacalhau.org/translatedBy" ) diff --git a/pkg/models/engine.go b/pkg/models/engine.go new file mode 100644 index 0000000000..fcae6c970e --- /dev/null +++ b/pkg/models/engine.go @@ -0,0 +1,5 @@ +package models + +func IsDefaultEngineType(kind string) bool { + return kind == EngineDocker || kind == EngineNoop || kind == EngineWasm +} diff --git a/pkg/models/task_builder.go b/pkg/models/task_builder.go index fff7d8f8f2..4756b1eb84 100644 --- a/pkg/models/task_builder.go +++ b/pkg/models/task_builder.go @@ -27,6 +27,15 @@ func (b *TaskBuilder) Engine(engine *SpecConfig) *TaskBuilder { return b } +func (b *TaskBuilder) Meta(key string, value string) *TaskBuilder { + if b.task.Meta == nil { + b.task.Meta = make(map[string]string) + } + + b.task.Meta[key] = value + return b +} + func (b *TaskBuilder) Publisher(publisher *SpecConfig) *TaskBuilder { b.task.Publisher = publisher return b diff --git a/pkg/node/config_defaults.go b/pkg/node/config_defaults.go index 21bab6552b..a0de20399a 100644 --- a/pkg/node/config_defaults.go +++ b/pkg/node/config_defaults.go @@ -54,6 +54,8 @@ var DefaultRequesterConfig = RequesterConfigParams{ S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, + + TranslationEnabled: false, } var TestRequesterConfig = RequesterConfigParams{ @@ -78,6 +80,8 @@ var TestRequesterConfig = RequesterConfigParams{ WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, + TranslationEnabled: false, + S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, } diff --git a/pkg/node/config_requester.go b/pkg/node/config_requester.go index b12f36d335..ee025b0094 100644 --- a/pkg/node/config_requester.go +++ b/pkg/node/config_requester.go @@ -41,6 +41,9 @@ type RequesterConfigParams struct { WorkerEvalDequeueBaseBackoff time.Duration WorkerEvalDequeueMaxBackoff time.Duration + // Should the orchestrator attempt to translate jobs? + TranslationEnabled bool + S3PreSignedURLDisabled bool S3PreSignedURLExpiration time.Duration } diff --git a/pkg/node/requester.go b/pkg/node/requester.go index 83884da4b9..13e89efcff 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -19,6 +19,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/pubsub/libp2p" "github.com/bacalhau-project/bacalhau/pkg/requester/pubsub/jobinfo" s3helper "github.com/bacalhau-project/bacalhau/pkg/s3" + "github.com/bacalhau-project/bacalhau/pkg/translation" "github.com/bacalhau-project/bacalhau/pkg/util" libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" @@ -245,6 +246,11 @@ func NewRequesterNode( DefaultJobExecutionTimeout: requesterConfig.JobDefaults.ExecutionTimeout, }) + var translationProvider translation.TranslatorProvider + if requesterConfig.TranslationEnabled { + translationProvider = translation.NewStandardTranslatorsProvider() + } + endpointV2 := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ ID: host.ID().String(), EvaluationBroker: evalBroker, @@ -258,6 +264,7 @@ func NewRequesterNode( transformer.RequesterInfo(host.ID().String(), marshaledPublicKey), transformer.NewInlineStoragePinner(storageProviders), }, + TaskTranslator: translationProvider, ResultTransformer: resultTransformers, }) diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index 31f1782533..4bfe196cca 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -10,6 +10,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/transformer" + "github.com/bacalhau-project/bacalhau/pkg/translation" "github.com/google/uuid" "github.com/rs/zerolog/log" ) @@ -21,6 +22,7 @@ type BaseEndpointParams struct { EventEmitter EventEmitter ComputeProxy compute.Endpoint JobTransformer transformer.JobTransformer + TaskTranslator translation.TranslatorProvider ResultTransformer transformer.ResultTransformer } @@ -31,6 +33,7 @@ type BaseEndpoint struct { eventEmitter EventEmitter computeProxy compute.Endpoint jobTransformer transformer.JobTransformer + taskTranslator translation.TranslatorProvider resultTransformer transformer.ResultTransformer } @@ -42,6 +45,7 @@ func NewBaseEndpoint(params *BaseEndpointParams) *BaseEndpoint { eventEmitter: params.EventEmitter, computeProxy: params.ComputeProxy, jobTransformer: params.JobTransformer, + taskTranslator: params.TaskTranslator, resultTransformer: params.ResultTransformer, } } @@ -60,6 +64,33 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) return nil, err } + // We will only perform task translation in the orchestrator if we were provided with a provider + // that can give translators to perform the translation. + if e.taskTranslator != nil { + // Before we create an evaluation for the job, we want to check that none of the job's tasks + // need translating from a custom job type to a known job type (docker, wasm). If they do, + // then we will perform the translation and create the evaluation for the new job instead. + translatedJob, err := translation.Translate(ctx, e.taskTranslator, job) + if err != nil { + return nil, err + } + + // If we have translated the job (i.e. at least one task was translated) then we will switch + // to using the translated job after we have saved it in the jobstore. This results in us + // sending the translated job ID to the user for tracking their job, although it will contain + // a reference to the job they submitted. This may cause confusion and we may in future want + // to move to versioning of jobs so that we can present both to the user should they request + // it. + if translatedJob != nil { + translatedJob.Meta[models.MetaDerivedFrom] = job.ID + + job = translatedJob + if err := e.store.CreateJob(ctx, *job); err != nil { + return nil, err + } + } + } + eval := &models.Evaluation{ ID: uuid.NewString(), JobID: job.ID, diff --git a/pkg/orchestrator/endpoint_test.go b/pkg/orchestrator/endpoint_test.go index eb923e5007..9e5addf494 100644 --- a/pkg/orchestrator/endpoint_test.go +++ b/pkg/orchestrator/endpoint_test.go @@ -21,6 +21,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/orchestrator" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/transformer" "github.com/bacalhau-project/bacalhau/pkg/system" + "github.com/bacalhau-project/bacalhau/pkg/translation" "github.com/stretchr/testify/suite" gomock "go.uber.org/mock/gomock" ) @@ -82,6 +83,7 @@ func (s *EndpointSuite) TestInlinePinnerTransformInSubmit() { transformer.JobFn(transformer.IDGenerator), transformer.NewInlineStoragePinner(storageProviders), }, + TaskTranslator: translation.NewStandardTranslatorsProvider(), }) sb := strings.Builder{} @@ -96,11 +98,11 @@ func (s *EndpointSuite) TestInlinePinnerTransformInSubmit() { Name: "testjob", Type: "batch", Tasks: []*models.Task{ - &models.Task{ + { Name: "Task 1", Engine: &models.SpecConfig{Type: models.EngineNoop}, InputSources: []*models.InputSource{ - &models.InputSource{ + { Source: &models.SpecConfig{ Type: models.StorageSourceInline, Params: map[string]interface{}{ diff --git a/pkg/translation/translation.go b/pkg/translation/translation.go new file mode 100644 index 0000000000..a8ff2fd611 --- /dev/null +++ b/pkg/translation/translation.go @@ -0,0 +1,118 @@ +// Package translation provides interfaces for translating from a Job to a +// different Job. This allow us to accept more job types than we have +// executors as we translate from the abstract type to the concrete executor. +// +// When presented with a Job, this package iterates through the tasks +// belonging to the job to determine whether any of the tasks have an +// Engine type that is not one of the core executors (docker or wasm). +// If it does not, then it returns immediately. +// +// For the discovered tasks, the TranslatorProvider is asked to provide an +// implementation of the Translator interface based on the task's engine type. +// The newly obtained Translator processes the task and returns a new task +// with a known engine type (docker or wasm). Depending on where the +// translation occurs, extra work might result in the generation of a derived +// job. + +package translation + +import ( + "context" + "fmt" + + "github.com/bacalhau-project/bacalhau/pkg/lib/provider" + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/translation/translators" + "github.com/bacalhau-project/bacalhau/pkg/util/idgen" + "github.com/hashicorp/go-multierror" +) + +// Translator defines what functions are required for a component that +// is able to translate from one job to another. It is important that +// implementors ensure that their implementation is reentrant - which +// means it should not use any mutable state after initialisation. +type Translator interface { + provider.Providable + + Translate(*models.Task) (*models.Task, error) +} + +// TranslatorProvider is an alias for `provider.Provider[Translator]` +type TranslatorProvider interface { + provider.Provider[Translator] +} + +// NewStandardTranslatorsProvider returns a TranslatorProvider which maps names +// to implementations of the Translator interface +func NewStandardTranslatorsProvider() TranslatorProvider { + return provider.NewMappedProvider(map[string]Translator{ + "python": &translators.PythonTranslator{}, + "duckdb": &translators.DuckDBTranslator{}, + }) +} + +// Translate attempts to translate from one job to another, based on the engine type +// of the tasks in the job. After ensuring that each of the tasks is either a default +// (docker, wasm) or available via the provider, then a new Job is cloned from the +// original and the individual tasks updated. +func Translate(ctx context.Context, provider TranslatorProvider, original *models.Job) (*models.Job, error) { + if shouldTr, err := ShouldTranslate(ctx, provider, original.Tasks); err != nil { + return nil, err + } else { + // Nothing for us to do so we should return immediately + if !shouldTr { + return nil, nil + } + } + + newJob := original.Copy() + newJob.ID = idgen.NewJobID() + + errs := new(multierror.Error) + + for i := range newJob.Tasks { + task := newJob.Tasks[i] + kind := task.Engine.Type + + if models.IsDefaultEngineType(kind) { + continue // and leave this task in place + } + + if translator, err := provider.Get(ctx, kind); err != nil { + errs = multierror.Append(errs, err) + } else { + t, err := translator.Translate(task) + if err != nil { + errs = multierror.Append(errs, err) + continue + } + + // Copy the newly translated task over the top of the task + // that was copied from the original job + newJob.Tasks[i] = t + } + } + + return newJob, errs.ErrorOrNil() +} + +// ShouldTranslate works out whether we need to carry on with translation, that is +// are there any engine types specified that are not a default engine and we know +// how to translate. If not, then we can exit early. +func ShouldTranslate(ctx context.Context, provider TranslatorProvider, tasks []*models.Task) (bool, error) { + errs := new(multierror.Error) + needTranslationCount := 0 + + for i := range tasks { + kind := tasks[i].Engine.Type + if provider.Has(ctx, kind) { + needTranslationCount += 1 + } else if kind == models.EngineDocker || kind == models.EngineWasm || kind == models.EngineNoop { + continue + } else { + errs = multierror.Append(errs, fmt.Errorf("unknown task type identified in translation: '%s'", kind)) + } + } + + return needTranslationCount > 0, errs.ErrorOrNil() +} diff --git a/pkg/translation/translation_test.go b/pkg/translation/translation_test.go new file mode 100644 index 0000000000..d71a694efb --- /dev/null +++ b/pkg/translation/translation_test.go @@ -0,0 +1,200 @@ +//go:build unit || !integration + +package translation_test + +import ( + "context" + "testing" + + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/translation" + "github.com/stretchr/testify/suite" +) + +type TranslationTestSuite struct { + suite.Suite + ctx context.Context + provider translation.TranslatorProvider +} + +func TestTranslationTestSuite(t *testing.T) { + suite.Run(t, new(TranslationTestSuite)) +} + +func (s *TranslationTestSuite) SetupSuite() { + s.ctx = context.Background() + s.provider = translation.NewStandardTranslatorsProvider() +} + +var testcases = []struct { + name string + spec *models.SpecConfig + expected *models.SpecConfig +}{ + { + name: "python", + spec: &models.SpecConfig{ + Type: "python", + Params: map[string]interface{}{ + "Command": "python", + "Arguments": []interface{}{"-c", "print('Hello, world!')"}, + }, + }, + expected: &models.SpecConfig{ + Type: "docker", + Params: map[string]interface{}{ + "Image": "bacalhauproject/exec-python-3.11:0.5", + "Entrypoint": []string{}, + "Parameters": []string{ + "/build/launcher.py", "--", "python", "-c", "print('Hello, world!')", + }, + "EnvironmentVariables": []string{}, + "WorkingDirectory": "", + }, + }, + }, + { + name: "python with spaces", + spec: &models.SpecConfig{ + Type: "python", + Params: map[string]interface{}{ + "Command": "python", + "Arguments": []interface{}{"-c", `"import this"`}, + }, + }, + expected: &models.SpecConfig{ + Type: "docker", + Params: map[string]interface{}{ + "Image": "bacalhauproject/exec-python-3.11:0.5", + "Entrypoint": []string{}, + "Parameters": []string{ + "/build/launcher.py", "--", "python", "-c", `"import this"`, + }, + "EnvironmentVariables": []string{}, + "WorkingDirectory": "", + }, + }, + }, +} + +func (s *TranslationTestSuite) TestTranslate() { + for _, tc := range testcases { + s.Run(tc.name, func() { + job := &models.Job{ + ID: tc.name, + Tasks: []*models.Task{ + { + Name: "task1", + Engine: tc.spec, + }, + }, + } + + translated, err := translation.Translate(s.ctx, s.provider, job) + s.Require().NoError(err) + + s.Require().Equal(tc.expected, translated.Task().Engine) + }) + } +} + +func (s *TranslationTestSuite) TestTranslateWithInvalidEngine() { + job := &models.Job{ + ID: "invalid_engine", + Tasks: []*models.Task{ + { + Name: "task1", + Engine: &models.SpecConfig{ + Type: "invalid", + }, + }, + }, + } + + _, err := translation.Translate(s.ctx, s.provider, job) + s.Require().Error(err) +} + +func (s *TranslationTestSuite) TestTranslateWithDefaultEngine() { + job := &models.Job{ + ID: "invalid_engine", + Tasks: []*models.Task{ + { + Name: "task1", + Engine: &models.SpecConfig{ + Type: "docker", + }, + }, + }, + } + + translated, err := translation.Translate(s.ctx, s.provider, job) + s.Require().NoError(err) + s.Require().Nil(translated) +} + +func (s *TranslationTestSuite) TestTranslateWithMixedEngines() { + job := &models.Job{ + ID: "invalid_engine", + Tasks: []*models.Task{ + { + Name: "task1", + Engine: &models.SpecConfig{ + Type: "docker", + }, + }, + { + Name: "task2", + Engine: &models.SpecConfig{ + Type: "duckdb", + Params: map[string]interface{}{ + "Command": "duckdb", + "Arguments": []interface{}{"-csv", "-c", "select * from table;"}, + }, + }, + }, + }, + } + + translated, err := translation.Translate(s.ctx, s.provider, job) + s.Require().NoError(err) + s.Require().NotNil(translated) + + // Before + s.Require().Equal("docker", job.Tasks[0].Engine.Type) + s.Require().Equal("duckdb", job.Tasks[1].Engine.Type) + + // After + s.Require().Equal("docker", translated.Tasks[0].Engine.Type) + s.Require().Equal("docker", translated.Tasks[1].Engine.Type) +} + +func (s *TranslationTestSuite) TestShouldTranslateWithDefaultEngine() { + tasks := []*models.Task{ + { + Name: "task1", + Engine: &models.SpecConfig{ + Type: "docker", + }, + }, + } + + should, err := translation.ShouldTranslate(s.ctx, s.provider, tasks) + s.Require().NoError(err) + s.Require().False(should) +} + +func (s *TranslationTestSuite) TestShouldTranslateWithNonDefaultEngine() { + tasks := []*models.Task{ + { + Name: "task1", + Engine: &models.SpecConfig{ + Type: "python", + }, + }, + } + + should, err := translation.ShouldTranslate(s.ctx, s.provider, tasks) + s.Require().NoError(err) + s.Require().True(should) +} diff --git a/pkg/translation/translators/duckdb.go b/pkg/translation/translators/duckdb.go new file mode 100644 index 0000000000..fec6c9f785 --- /dev/null +++ b/pkg/translation/translators/duckdb.go @@ -0,0 +1,54 @@ +package translators + +import ( + "context" + + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/util" +) + +const DuckDBImage = "bacalhauproject/exec-duckdb:0.1" + +type DuckDBTranslator struct{} + +func (d *DuckDBTranslator) IsInstalled(context.Context) (bool, error) { + return true, nil +} + +func (d *DuckDBTranslator) Translate(original *models.Task) (*models.Task, error) { + dkrSpec, err := d.dockerEngine(original.Engine) + if err != nil { + return nil, err + } + + builder := original. + ToBuilder(). + Meta(models.MetaTranslatedBy, "translators/duckdb"). + Engine(dkrSpec) + + return builder.BuildOrDie(), nil +} + +func (d *DuckDBTranslator) dockerEngine(origin *models.SpecConfig) (*models.SpecConfig, error) { + // It'd be nice to use pkg/executor/docker/types/EngineSpec here, but it + // would mean adding a dependency on yet another package. + cmd := origin.Params["Command"].(string) + args, err := util.InterfaceToStringArray(origin.Params["Arguments"]) + if err != nil { + return nil, err + } + + params := []string{} + + params = append(params, cmd) + params = append(params, args...) + + spec := models.NewSpecConfig(models.EngineDocker) + spec.Params["Image"] = DuckDBImage + spec.Params["Entrypoint"] = []string{} + spec.Params["Parameters"] = params + spec.Params["EnvironmentVariables"] = []string{} + spec.Params["WorkingDirectory"] = "" + + return spec, nil +} diff --git a/pkg/translation/translators/errors.go b/pkg/translation/translators/errors.go new file mode 100644 index 0000000000..fa50464ca7 --- /dev/null +++ b/pkg/translation/translators/errors.go @@ -0,0 +1,7 @@ +package translators + +import "fmt" + +func ErrMissingParameters(trs string) error { + return fmt.Errorf("missing parameters in task for '%s' translator", trs) +} diff --git a/pkg/translation/translators/python.go b/pkg/translation/translators/python.go new file mode 100644 index 0000000000..6ad4d27a52 --- /dev/null +++ b/pkg/translation/translators/python.go @@ -0,0 +1,103 @@ +package translators + +import ( + "context" + "fmt" + + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/util" + "golang.org/x/exp/maps" +) + +// PythonPackageDomains lists all of the domains that might be needed to install +// dependencies at runtime. +var PythonPackageDomains = []string{ + "pypi.python.org", + "pypi.org", + "pythonhosted.org", + "files.pythonhosted.org", + "repo.anaconda.com", + "repo.continuum.io", + "conda.anaconda.org", +} + +// SupportedPythonVersions maps the python version to the docker image that +// provides support for that version. +var SupportedPythonVersions = map[string]string{ + "3.11": "bacalhauproject/exec-python-3.11:0.5", +} + +type PythonTranslator struct{} + +func (p *PythonTranslator) IsInstalled(context.Context) (bool, error) { + return true, nil +} + +func (p *PythonTranslator) Translate(original *models.Task) (*models.Task, error) { + dkrSpec, err := p.dockerEngine(original.Engine) + if err != nil { + return nil, err + } + + builder := original. + ToBuilder(). + Meta(models.MetaTranslatedBy, "translators/python"). + Engine(dkrSpec) + + original.Network = &models.NetworkConfig{ + Type: models.NetworkHTTP, + Domains: PythonPackageDomains, + } + + return builder.BuildOrDie(), nil +} + +func (p *PythonTranslator) dockerEngine(origin *models.SpecConfig) (*models.SpecConfig, error) { + // It'd be nice to use pkg/executor/docker/types/EngineSpec here, but it + // would mean adding a dependency on yet another package. + cmd := origin.Params["Command"].(string) + args, err := util.InterfaceToStringArray(origin.Params["Arguments"]) + if err != nil { + return nil, err + } + + versionString := "3.11" // Default version + version := origin.Params["Version"] + if version != nil { + versionString = version.(string) + } + + image, err := getImageName(versionString) + if err != nil { + return nil, err + } + + params := []string{ + "/build/launcher.py", "--", + } + + params = append(params, cmd) + params = append(params, args...) + + spec := models.NewSpecConfig(models.EngineDocker) + spec.Params["Image"] = image + spec.Params["Entrypoint"] = []string{} + spec.Params["Parameters"] = params + spec.Params["EnvironmentVariables"] = []string{} + spec.Params["WorkingDirectory"] = "" + + return spec, nil +} + +func getImageName(version string) (string, error) { + image, found := SupportedPythonVersions[version] + if !found { + supported := "" + versions := maps.Keys(SupportedPythonVersions) + for i := range versions { + supported += fmt.Sprintf(" * %s\n", versions[i]) + } + return "", fmt.Errorf("unsupported python version: %s\nsupported versions are:\n%s", version, supported) + } + return image, nil +} diff --git a/pkg/util/conversion.go b/pkg/util/conversion.go new file mode 100644 index 0000000000..216f30e440 --- /dev/null +++ b/pkg/util/conversion.go @@ -0,0 +1,30 @@ +package util + +import "fmt" + +// InterfaceToStringArray converts an interface{} that we know is a []string +// to that []string via []interface{}. This is useful when we have a map[string]interface{} +// and we want to get the []string{} out of it. +func InterfaceToStringArray(source interface{}) ([]string, error) { + if source == nil { + return nil, nil + } + + // // If it is already a string array, then return it. + // strArray, ok := source.([]string) + // if ok { + // return strArray, nil + // } + + sourceArray, ok := source.([]interface{}) + if !ok { + return nil, fmt.Errorf("expected []interface{} but got %T", source) + } + + result := make([]string, len(sourceArray)) + for i, v := range sourceArray { + result[i] = fmt.Sprint(v) + } + + return result, nil +} diff --git a/pkg/util/conversion_test.go b/pkg/util/conversion_test.go new file mode 100644 index 0000000000..1a3ec4f735 --- /dev/null +++ b/pkg/util/conversion_test.go @@ -0,0 +1,93 @@ +//go:build unit || !integration + +package util_test + +import ( + "testing" + + "github.com/bacalhau-project/bacalhau/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestInterfaceToStringArray(t *testing.T) { + testcases := []struct { + name string + source interface{} + expected []string + shouldError bool + }{ + { + name: "nil", + source: nil, + expected: nil, + shouldError: false, + }, + { + name: "empty", + source: []interface{}{}, + expected: []string{}, + shouldError: false, + }, + { + name: "string", + source: []interface{}{"foo"}, + expected: []string{"foo"}, + shouldError: false, + }, + { + name: "int", + source: []interface{}{1}, + expected: []string{"1"}, + shouldError: false, + }, + { + name: "float", + source: []interface{}{1.1}, + expected: []string{"1.1"}, + shouldError: false, + }, + { + name: "bool", + source: []interface{}{true}, + expected: []string{"true"}, + shouldError: false, + }, + { + name: "mixed", + source: []interface{}{"foo", 1, 1.1, true}, + expected: []string{"foo", "1", "1.1", "true"}, + shouldError: false, + }, + { + name: "map", + source: map[string]interface{}{"foo": "bar"}, + expected: nil, + shouldError: true, + }, + { + name: "string array", + source: []interface{}{"foo", "bar"}, + expected: []string{"foo", "bar"}, + shouldError: false, + }, + { + name: "int array", + source: []interface{}{1, 2}, + expected: []string{"1", "2"}, + shouldError: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + actual, err := util.InterfaceToStringArray(tc.source) + if tc.shouldError { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } +}