diff --git a/Makefile b/Makefile index beab2c26cf..54da79acc6 100644 --- a/Makefile +++ b/Makefile @@ -185,10 +185,10 @@ release-bacalhau-flyte: resolve-earthly # Target: build ################################################################################ .PHONY: build -build: resolve-earthly build-bacalhau build-plugins +build: resolve-earthly build-bacalhau .PHONY: build-ci -build-ci: build-bacalhau install-plugins +build-ci: build-bacalhau .PHONY: build-dev build-dev: build-ci @@ -208,7 +208,7 @@ build-webui: resolve-earthly ################################################################################ # Target: build-bacalhau ################################################################################ -${BINARY_PATH}: build-bacalhau build-plugins +${BINARY_PATH}: build-bacalhau .PHONY: build-bacalhau build-bacalhau: binary-web binary @@ -306,7 +306,7 @@ images: docker/.pulled # Target: clean ################################################################################ .PHONY: clean -clean: clean-plugins +clean: ${GO} clean ${RM} -r bin/* ${RM} -r webui/build/* @@ -385,14 +385,6 @@ devstack-250: devstack-20: go run . devstack --compute-nodes 20 -.PHONY: devstack-noop -devstack-noop: - go run . devstack --noop - -.PHONY: devstack-noop-100 -devstack-noop-100: - go run . devstack --noop --compute-nodes 100 - .PHONY: devstack-race devstack-race: go run -race . devstack @@ -476,46 +468,6 @@ security: release: build-bacalhau cp bin/bacalhau . -ifeq ($(OS),Windows_NT) - detected_OS := Windows -else - detected_OS := $(shell sh -c 'uname 2>/dev/null || echo Unknown') -endif - -# TODO make the plugin path configurable instead of using the bacalhau config path. -BACALHAU_CONFIG_PATH := $(shell echo $$BACALHAU_PATH) -INSTALL_PLUGINS_DEST := $(if $(BACALHAU_CONFIG_PATH),$(BACALHAU_CONFIG_PATH)plugins/,~/.bacalhau/plugins/) - -EXECUTOR_PLUGINS := $(wildcard ./pkg/executor/plugins/executors/*/.) - -# TODO fix install on windows -ifeq ($(detected_OS),Windows) - build-plugins clean-plugins install-plugins: - @echo "Skipping executor plugins on Windows" -else - build-plugins: plugins-build - clean-plugins: plugins-clean - install-plugins: plugins-install - - .PHONY: plugins-build $(EXECUTOR_PLUGINS) - - plugins-build: $(EXECUTOR_PLUGINS) - @echo "Building executor plugins..." - @$(foreach plugin,$(EXECUTOR_PLUGINS),$(MAKE) --no-print-directory -C $(plugin) &&) true - - .PHONY: plugins-clean $(addsuffix .clean,$(EXECUTOR_PLUGINS)) - - plugins-clean: $(addsuffix .clean,$(EXECUTOR_PLUGINS)) - @echo "Cleaning executor plugins..." - @$(foreach plugin,$(addsuffix .clean,$(EXECUTOR_PLUGINS)),$(MAKE) --no-print-directory -C $(basename $(plugin)) clean &&) true - - .PHONY: plugins-install $(addsuffix .install,$(EXECUTOR_PLUGINS)) - - plugins-install: plugins-build $(addsuffix .install,$(EXECUTOR_PLUGINS)) - @echo "Installing executor plugins..." - @$(foreach plugin,$(addsuffix .install,$(EXECUTOR_PLUGINS)),mkdir -p $(INSTALL_PLUGINS_DEST) && cp $(basename $(plugin))/bin/* $(INSTALL_PLUGINS_DEST) &&) true -endif - .PHONY: spellcheck-code spellcheck-code: cspell lint -c cspell.yaml --quiet "**/*.{go,js,ts,jsx,tsx,md,yml,yaml,json}" diff --git a/go.mod b/go.mod index 8ca5ac5afd..6be2986261 100644 --- a/go.mod +++ b/go.mod @@ -22,8 +22,6 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 - github.com/hashicorp/go-hclog v1.6.3 - github.com/hashicorp/go-plugin v1.6.0 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.18.0 @@ -120,7 +118,6 @@ require ( github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/yamux v0.1.1 // indirect github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -133,14 +130,12 @@ require ( github.com/lestrrat-go/option v1.0.1 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/minio/highwayhash v1.0.3 // indirect - github.com/mitchellh/go-testing-interface v1.0.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/nats-io/jwt/v2 v2.7.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect - github.com/oklog/run v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect @@ -188,7 +183,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect @@ -279,7 +273,7 @@ require ( golang.org/x/time v0.6.0 golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/grpc v1.66.1 + google.golang.org/grpc v1.66.1 // indirect google.golang.org/protobuf v1.34.2 gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 30f730af03..f908e81593 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,6 @@ github.com/bmatcuk/doublestar/v4 v4.6.1 h1:FH9SifrbvJhnlQpztAx++wlkk70QBf0iBWDwN github.com/bmatcuk/doublestar/v4 v4.6.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= -github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= -github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 h1:3uZCA/BLTIu+DqCfguByNMJa2HVHpXvjfy0Dy7g6fuA= github.com/bytecodealliance/wasmtime-go/v3 v3.0.2/go.mod h1:RnUjnIXxEJcL6BgCvNyzCCRzZcxCgsZCi+RNlvYor5Q= github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY= @@ -683,8 +681,6 @@ github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB1 github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= -github.com/hashicorp/go-plugin v1.6.0 h1:wgd4KxHJTVGGqWBq4QPB1i5BZNEx9BR8+OFmHDmTk8A= -github.com/hashicorp/go-plugin v1.6.0/go.mod h1:lBS5MtSSBZk0SHc66KACcjjlU6WzEVP/8pwz68aMkCI= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -695,8 +691,6 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= -github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= @@ -814,8 +808,6 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0 github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jedib0t/go-pretty/v6 v6.5.3 h1:GIXn6Er/anHTkVUoufs7ptEvxdD6KIhR7Axa2wYCPF0= github.com/jedib0t/go-pretty/v6 v6.5.3/go.mod h1:5LQIxa52oJ/DlDSLv0HEkWOFMDGoWkJb9ss5KqPpJBg= -github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= -github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -926,7 +918,6 @@ github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8 github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= @@ -957,8 +948,6 @@ github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dz github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0= -github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= @@ -1024,8 +1013,6 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= -github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= github.com/open-policy-agent/opa v0.60.0 h1:ZPoPt4yeNs5UXCpd/P/btpSyR8CR0wfhVoh9BOwgJNs= @@ -1178,7 +1165,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -1612,7 +1598,6 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/config/types/paths.go b/pkg/config/types/paths.go index f74c4ccc55..c4e773edbf 100644 --- a/pkg/config/types/paths.go +++ b/pkg/config/types/paths.go @@ -110,19 +110,6 @@ func (b Bacalhau) ResultsStorageDir() (string, error) { return path, nil } -const PluginsDirName = "plugins" - -func (b Bacalhau) PluginsDir() (string, error) { - if b.DataDir == "" { - return "", fmt.Errorf("data dir not set") - } - path := filepath.Join(b.DataDir, PluginsDirName) - if err := ensureDir(path); err != nil { - return "", fmt.Errorf("getting plugins path: %w", err) - } - return path, nil -} - const ExecutionStoreFileName = "state_boltdb.db" func (b Bacalhau) ExecutionStoreFilePath() (string, error) { diff --git a/pkg/executor/plugins/executors/docker/Makefile b/pkg/executor/plugins/executors/docker/Makefile deleted file mode 100644 index 84bcf56356..0000000000 --- a/pkg/executor/plugins/executors/docker/Makefile +++ /dev/null @@ -1,5 +0,0 @@ -docker: main.go - go build -o ./bin/bacalhau-docker-executor main.go - -clean: - rm -f ./bin/bacalhau-docker-executor diff --git a/pkg/executor/plugins/executors/docker/bin/.gitignore b/pkg/executor/plugins/executors/docker/bin/.gitignore deleted file mode 100644 index c538edc3e7..0000000000 --- a/pkg/executor/plugins/executors/docker/bin/.gitignore +++ /dev/null @@ -1 +0,0 @@ -bacalhau-docker-executor diff --git a/pkg/executor/plugins/executors/docker/bin/.keep b/pkg/executor/plugins/executors/docker/bin/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/pkg/executor/plugins/executors/docker/main.go b/pkg/executor/plugins/executors/docker/main.go deleted file mode 100644 index b560e1b5f7..0000000000 --- a/pkg/executor/plugins/executors/docker/main.go +++ /dev/null @@ -1,57 +0,0 @@ -package main - -import ( - "os" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" - - "github.com/bacalhau-project/bacalhau/pkg/config/types" - "github.com/bacalhau-project/bacalhau/pkg/executor/docker" - "github.com/bacalhau-project/bacalhau/pkg/executor/plugins/grpc" -) - -const PluggableExecutorPluginName = "PLUGGABLE_EXECUTOR" - -// HandshakeConfig is used to just do a basic handshake between -// a plugin and host. If the handshake fails, a user friendly error is shown. -// This prevents users from executing bad plugins or executing a plugin -// directory. It is a UX feature, not a security feature. -var HandshakeConfig = plugin.HandshakeConfig{ - ProtocolVersion: 1, - MagicCookieKey: "EXECUTOR_PLUGIN", - MagicCookieValue: "bacalhau_executor", -} - -func main() { // Create an hclog.Logger - logger := hclog.New(&hclog.LoggerOptions{ - Name: "docker-plugin", - Output: os.Stderr, - Level: hclog.Trace, - }) - - cfg := types.DockerManifestCache{ - Size: 1000, - TTL: types.Duration(1 * time.Hour), - Refresh: types.Duration(1 * time.Hour), - } - dockerExecutor, err := docker.NewExecutor( - "bacalhau-pluggable-executor-docker", - types.Docker{ManifestCache: cfg}, - ) - if err != nil { - logger.Error(err.Error()) - } - - plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: HandshakeConfig, - Plugins: map[string]plugin.Plugin{ - PluggableExecutorPluginName: &grpc.ExecutorGRPCPlugin{ - Impl: dockerExecutor, - }, - }, - Logger: logger, - GRPCServer: plugin.DefaultGRPCServer, - }) -} diff --git a/pkg/executor/plugins/executors/wasm/Makefile b/pkg/executor/plugins/executors/wasm/Makefile deleted file mode 100644 index f62172bbc6..0000000000 --- a/pkg/executor/plugins/executors/wasm/Makefile +++ /dev/null @@ -1,5 +0,0 @@ -wasm: main.go - go build -o ./bin/bacalhau-wasm-executor main.go - -clean: - rm -f ./bin/bacalhau-wasm-executor diff --git a/pkg/executor/plugins/executors/wasm/bin/.gitignore b/pkg/executor/plugins/executors/wasm/bin/.gitignore deleted file mode 100644 index 7f34dc1a2c..0000000000 --- a/pkg/executor/plugins/executors/wasm/bin/.gitignore +++ /dev/null @@ -1 +0,0 @@ -bacalhau-wasm-executor diff --git a/pkg/executor/plugins/executors/wasm/bin/.keep b/pkg/executor/plugins/executors/wasm/bin/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/pkg/executor/plugins/executors/wasm/main.go b/pkg/executor/plugins/executors/wasm/main.go deleted file mode 100644 index 2712113123..0000000000 --- a/pkg/executor/plugins/executors/wasm/main.go +++ /dev/null @@ -1,47 +0,0 @@ -package main - -import ( - "log" - "os" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" - - "github.com/bacalhau-project/bacalhau/pkg/executor/plugins/grpc" - "github.com/bacalhau-project/bacalhau/pkg/executor/wasm" -) - -const PluggableExecutorPluginName = "PLUGGABLE_EXECUTOR" - -// HandshakeConfig is used to just do a basic handshake between -// a plugin and host. If the handshake fails, a user friendly error is shown. -// This prevents users from executing bad plugins or executing a plugin -// directory. It is a UX feature, not a security feature. -var HandshakeConfig = plugin.HandshakeConfig{ - ProtocolVersion: 1, - MagicCookieKey: "EXECUTOR_PLUGIN", - MagicCookieValue: "bacalhau_executor", -} - -func main() { - logger := hclog.New(&hclog.LoggerOptions{ - Name: "wasm-plugin", - Output: os.Stderr, - Level: hclog.Trace, - }) - wasmExecutor, err := wasm.NewExecutor() - if err != nil { - log.Fatal(err) - } - - plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: HandshakeConfig, - Plugins: map[string]plugin.Plugin{ - PluggableExecutorPluginName: &grpc.ExecutorGRPCPlugin{ - Impl: wasmExecutor, - }, - }, - Logger: logger, - GRPCServer: plugin.DefaultGRPCServer, - }) -} diff --git a/pkg/executor/plugins/grpc/client.go b/pkg/executor/plugins/grpc/client.go deleted file mode 100644 index 1e843ac62d..0000000000 --- a/pkg/executor/plugins/grpc/client.go +++ /dev/null @@ -1,192 +0,0 @@ -package grpc - -import ( - "context" - "encoding/json" - "io" - - "github.com/bacalhau-project/bacalhau/pkg/bidstrategy" - "github.com/bacalhau-project/bacalhau/pkg/executor" - "github.com/bacalhau-project/bacalhau/pkg/executor/plugins/grpc/proto" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// TODO: Complete protobuf structure, rather than merely wrapping serialized JSON bytes in protobuf containers. -// Details in: https://github.com/bacalhau-project/bacalhau/issues/2700 - -var _ (executor.Executor) = (*GRPCClient)(nil) - -type GRPCClient struct { - client proto.ExecutorClient -} - -func (c *GRPCClient) IsInstalled(ctx context.Context) (bool, error) { - resp, err := c.client.IsInstalled(ctx, &proto.IsInstalledRequest{}) - if err != nil { - return false, err - } - return resp.Installed, nil -} - -func (c *GRPCClient) ShouldBid(ctx context.Context, request bidstrategy.BidStrategyRequest) (bidstrategy.BidStrategyResponse, error) { - b, err := json.Marshal(request) - if err != nil { - return bidstrategy.BidStrategyResponse{}, err - } - resp, err := c.client.ShouldBid(ctx, &proto.ShouldBidRequest{ - BidRequest: b, - }) - if err != nil { - return bidstrategy.BidStrategyResponse{}, err - } - var out bidstrategy.BidStrategyResponse - if err := json.Unmarshal(resp.BidResponse, &out); err != nil { - return bidstrategy.BidStrategyResponse{}, nil - } - return out, nil -} - -func (c *GRPCClient) ShouldBidBasedOnUsage( - ctx context.Context, - request bidstrategy.BidStrategyRequest, - usage models.Resources) (bidstrategy.BidStrategyResponse, error) { - reqBytes, err := json.Marshal(request) - if err != nil { - return bidstrategy.BidStrategyResponse{}, err - } - usageBytes, err := json.Marshal(usage) - if err != nil { - return bidstrategy.BidStrategyResponse{}, err - } - resp, err := c.client.ShouldBidBasedOnUsage(ctx, &proto.ShouldBidBasedOnUsageRequest{ - BidRequest: reqBytes, - Usage: usageBytes, - }) - if err != nil { - return bidstrategy.BidStrategyResponse{}, err - } - var out bidstrategy.BidStrategyResponse - if err := json.Unmarshal(resp.BidResponse, &out); err != nil { - return bidstrategy.BidStrategyResponse{}, nil - } - return out, nil -} - -func (c *GRPCClient) Run(ctx context.Context, args *executor.RunCommandRequest) (*models.RunCommandResult, error) { - b, err := json.Marshal(args) - if err != nil { - return nil, err - } - resp, err := c.client.Run(ctx, &proto.RunCommandRequest{Params: b}) - if err != nil { - return nil, err - } - out := new(models.RunCommandResult) - if err := json.Unmarshal(resp.Params, out); err != nil { - return nil, err - } - return out, nil -} - -func (c *GRPCClient) Start(ctx context.Context, request *executor.RunCommandRequest) error { - b, err := json.Marshal(request) - if err != nil { - return err - } - _, err = c.client.Start(ctx, &proto.RunCommandRequest{Params: b}) - if err != nil { - return err - } - - return nil -} - -func (c *GRPCClient) Wait(ctx context.Context, executionID string) (<-chan *models.RunCommandResult, <-chan error) { - // Create output and error channels - resultC := make(chan *models.RunCommandResult, 1) - errC := make(chan error, 1) - - // Initialize the WaitRequest - waitReq := &proto.WaitRequest{ - ExecutionID: executionID, - } - - // Make a server-streaming RPC call - stream, err := c.client.Wait(ctx, waitReq) - if err != nil { - errC <- err - return resultC, errC - } - - go func() { - defer close(resultC) - defer close(errC) - - // block until we receive a message from the stream or an error. - resp, err := stream.Recv() - if err != nil { - errC <- err - return - } - - // Convert proto.WaitResponse to models.RunCommandResult - out := new(models.RunCommandResult) - if err := json.Unmarshal(resp.Params, out); err != nil { - errC <- err - return - } - - // Send the result to the channel - resultC <- out - }() - - return resultC, errC -} - -func (c *GRPCClient) Cancel(ctx context.Context, id string) error { - _, err := c.client.Cancel(ctx, &proto.CancelCommandRequest{ExecutionID: id}) - if err != nil { - return err - } - return nil -} - -func (c *GRPCClient) GetLogStream(ctx context.Context, request executor.LogStreamRequest) (io.ReadCloser, error) { - respStream, err := c.client.GetOutputStream(ctx, &proto.OutputStreamRequest{ - ExecutionID: request.ExecutionID, - History: request.Tail, - Follow: request.Follow, - }) - if err != nil { - return nil, err - } - - return &StreamReader{stream: respStream}, nil -} - -type StreamReader struct { - stream proto.Executor_GetOutputStreamClient - buffer []byte -} - -func (sr *StreamReader) Read(p []byte) (n int, err error) { - if len(sr.buffer) == 0 { // if buffer is empty, fill it by reading from the stream - response, err := sr.stream.Recv() - if err != nil { - if err == io.EOF { - return 0, nil - } - return 0, err - } - sr.buffer = response.Data - } - - n = copy(p, sr.buffer) // copy from buffer to p - sr.buffer = sr.buffer[n:] // update buffer - - return n, nil -} - -func (sr *StreamReader) Close() error { - return sr.stream.CloseSend() -} diff --git a/pkg/executor/plugins/grpc/interface.go b/pkg/executor/plugins/grpc/interface.go deleted file mode 100644 index d735597feb..0000000000 --- a/pkg/executor/plugins/grpc/interface.go +++ /dev/null @@ -1,25 +0,0 @@ -package grpc - -import ( - "context" - - "github.com/hashicorp/go-plugin" - "google.golang.org/grpc" - - "github.com/bacalhau-project/bacalhau/pkg/executor" - "github.com/bacalhau-project/bacalhau/pkg/executor/plugins/grpc/proto" -) - -type ExecutorGRPCPlugin struct { - plugin.Plugin - Impl executor.Executor -} - -func (p *ExecutorGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { - proto.RegisterExecutorServer(s, &GRPCServer{Impl: p.Impl}) - return nil -} - -func (p *ExecutorGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { - return &GRPCClient{client: proto.NewExecutorClient(c)}, nil -} diff --git a/pkg/executor/plugins/grpc/proto/Makefile b/pkg/executor/plugins/grpc/proto/Makefile deleted file mode 100644 index 94852376ca..0000000000 --- a/pkg/executor/plugins/grpc/proto/Makefile +++ /dev/null @@ -1,4 +0,0 @@ -.PHONY: all -all: executor.proto - @echo "Done elsewhere" - # protoc --go_out=plugins=grpc:. executor.proto diff --git a/pkg/executor/plugins/grpc/proto/executor.pb.go b/pkg/executor/plugins/grpc/proto/executor.pb.go deleted file mode 100644 index c28e0497c7..0000000000 --- a/pkg/executor/plugins/grpc/proto/executor.pb.go +++ /dev/null @@ -1,951 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc v4.24.3 -// source: executor.proto - -package proto - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type StartResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *StartResponse) Reset() { - *x = StartResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StartResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StartResponse) ProtoMessage() {} - -func (x *StartResponse) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StartResponse.ProtoReflect.Descriptor instead. -func (*StartResponse) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{0} -} - -type RunCommandRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Params []byte `protobuf:"bytes,1,opt,name=Params,proto3" json:"Params,omitempty"` -} - -func (x *RunCommandRequest) Reset() { - *x = RunCommandRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *RunCommandRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*RunCommandRequest) ProtoMessage() {} - -func (x *RunCommandRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use RunCommandRequest.ProtoReflect.Descriptor instead. -func (*RunCommandRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{1} -} - -func (x *RunCommandRequest) GetParams() []byte { - if x != nil { - return x.Params - } - return nil -} - -type RunCommandResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Params []byte `protobuf:"bytes,1,opt,name=Params,proto3" json:"Params,omitempty"` -} - -func (x *RunCommandResponse) Reset() { - *x = RunCommandResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *RunCommandResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*RunCommandResponse) ProtoMessage() {} - -func (x *RunCommandResponse) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use RunCommandResponse.ProtoReflect.Descriptor instead. -func (*RunCommandResponse) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{2} -} - -func (x *RunCommandResponse) GetParams() []byte { - if x != nil { - return x.Params - } - return nil -} - -type CancelCommandRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ExecutionID string `protobuf:"bytes,1,opt,name=ExecutionID,proto3" json:"ExecutionID,omitempty"` -} - -func (x *CancelCommandRequest) Reset() { - *x = CancelCommandRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CancelCommandRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CancelCommandRequest) ProtoMessage() {} - -func (x *CancelCommandRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CancelCommandRequest.ProtoReflect.Descriptor instead. -func (*CancelCommandRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{3} -} - -func (x *CancelCommandRequest) GetExecutionID() string { - if x != nil { - return x.ExecutionID - } - return "" -} - -type CancelCommandResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *CancelCommandResponse) Reset() { - *x = CancelCommandResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CancelCommandResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CancelCommandResponse) ProtoMessage() {} - -func (x *CancelCommandResponse) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CancelCommandResponse.ProtoReflect.Descriptor instead. -func (*CancelCommandResponse) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{4} -} - -type IsInstalledRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *IsInstalledRequest) Reset() { - *x = IsInstalledRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *IsInstalledRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*IsInstalledRequest) ProtoMessage() {} - -func (x *IsInstalledRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use IsInstalledRequest.ProtoReflect.Descriptor instead. -func (*IsInstalledRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{5} -} - -type IsInstalledResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Installed bool `protobuf:"varint,1,opt,name=Installed,proto3" json:"Installed,omitempty"` -} - -func (x *IsInstalledResponse) Reset() { - *x = IsInstalledResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *IsInstalledResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*IsInstalledResponse) ProtoMessage() {} - -func (x *IsInstalledResponse) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[6] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use IsInstalledResponse.ProtoReflect.Descriptor instead. -func (*IsInstalledResponse) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{6} -} - -func (x *IsInstalledResponse) GetInstalled() bool { - if x != nil { - return x.Installed - } - return false -} - -type ShouldBidRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - BidRequest []byte `protobuf:"bytes,1,opt,name=BidRequest,proto3" json:"BidRequest,omitempty"` -} - -func (x *ShouldBidRequest) Reset() { - *x = ShouldBidRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ShouldBidRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ShouldBidRequest) ProtoMessage() {} - -func (x *ShouldBidRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[7] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ShouldBidRequest.ProtoReflect.Descriptor instead. -func (*ShouldBidRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{7} -} - -func (x *ShouldBidRequest) GetBidRequest() []byte { - if x != nil { - return x.BidRequest - } - return nil -} - -type ShouldBidBasedOnUsageRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - BidRequest []byte `protobuf:"bytes,1,opt,name=BidRequest,proto3" json:"BidRequest,omitempty"` - Usage []byte `protobuf:"bytes,2,opt,name=Usage,proto3" json:"Usage,omitempty"` -} - -func (x *ShouldBidBasedOnUsageRequest) Reset() { - *x = ShouldBidBasedOnUsageRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ShouldBidBasedOnUsageRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ShouldBidBasedOnUsageRequest) ProtoMessage() {} - -func (x *ShouldBidBasedOnUsageRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ShouldBidBasedOnUsageRequest.ProtoReflect.Descriptor instead. -func (*ShouldBidBasedOnUsageRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{8} -} - -func (x *ShouldBidBasedOnUsageRequest) GetBidRequest() []byte { - if x != nil { - return x.BidRequest - } - return nil -} - -func (x *ShouldBidBasedOnUsageRequest) GetUsage() []byte { - if x != nil { - return x.Usage - } - return nil -} - -// shared by both semantic and resource bid -type ShouldBidResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - BidResponse []byte `protobuf:"bytes,1,opt,name=BidResponse,proto3" json:"BidResponse,omitempty"` -} - -func (x *ShouldBidResponse) Reset() { - *x = ShouldBidResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ShouldBidResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ShouldBidResponse) ProtoMessage() {} - -func (x *ShouldBidResponse) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[9] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ShouldBidResponse.ProtoReflect.Descriptor instead. -func (*ShouldBidResponse) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{9} -} - -func (x *ShouldBidResponse) GetBidResponse() []byte { - if x != nil { - return x.BidResponse - } - return nil -} - -type OutputStreamRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ExecutionID string `protobuf:"bytes,1,opt,name=ExecutionID,proto3" json:"ExecutionID,omitempty"` - History bool `protobuf:"varint,2,opt,name=History,proto3" json:"History,omitempty"` - Follow bool `protobuf:"varint,3,opt,name=Follow,proto3" json:"Follow,omitempty"` -} - -func (x *OutputStreamRequest) Reset() { - *x = OutputStreamRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *OutputStreamRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*OutputStreamRequest) ProtoMessage() {} - -func (x *OutputStreamRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[10] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use OutputStreamRequest.ProtoReflect.Descriptor instead. -func (*OutputStreamRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{10} -} - -func (x *OutputStreamRequest) GetExecutionID() string { - if x != nil { - return x.ExecutionID - } - return "" -} - -func (x *OutputStreamRequest) GetHistory() bool { - if x != nil { - return x.History - } - return false -} - -func (x *OutputStreamRequest) GetFollow() bool { - if x != nil { - return x.Follow - } - return false -} - -type OutputStreamResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"` -} - -func (x *OutputStreamResponse) Reset() { - *x = OutputStreamResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[11] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *OutputStreamResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*OutputStreamResponse) ProtoMessage() {} - -func (x *OutputStreamResponse) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[11] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use OutputStreamResponse.ProtoReflect.Descriptor instead. -func (*OutputStreamResponse) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{11} -} - -func (x *OutputStreamResponse) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - -type WaitRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ExecutionID string `protobuf:"bytes,1,opt,name=ExecutionID,proto3" json:"ExecutionID,omitempty"` -} - -func (x *WaitRequest) Reset() { - *x = WaitRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_executor_proto_msgTypes[12] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *WaitRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*WaitRequest) ProtoMessage() {} - -func (x *WaitRequest) ProtoReflect() protoreflect.Message { - mi := &file_executor_proto_msgTypes[12] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use WaitRequest.ProtoReflect.Descriptor instead. -func (*WaitRequest) Descriptor() ([]byte, []int) { - return file_executor_proto_rawDescGZIP(), []int{12} -} - -func (x *WaitRequest) GetExecutionID() string { - if x != nil { - return x.ExecutionID - } - return "" -} - -var File_executor_proto protoreflect.FileDescriptor - -var file_executor_proto_rawDesc = []byte{ - 0x0a, 0x0e, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x0f, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x72, 0x74, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x2b, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, - 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, - 0x06, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x50, - 0x61, 0x72, 0x61, 0x6d, 0x73, 0x22, 0x2c, 0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x50, - 0x61, 0x72, 0x61, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x50, 0x61, 0x72, - 0x61, 0x6d, 0x73, 0x22, 0x38, 0x0a, 0x14, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x43, 0x6f, 0x6d, - 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x45, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0b, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x22, 0x17, 0x0a, - 0x15, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x14, 0x0a, 0x12, 0x49, 0x73, 0x49, 0x6e, 0x73, 0x74, - 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x33, 0x0a, 0x13, - 0x49, 0x73, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, - 0x64, 0x22, 0x32, 0x0a, 0x10, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x69, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x42, 0x69, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x54, 0x0a, 0x1c, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, - 0x69, 0x64, 0x42, 0x61, 0x73, 0x65, 0x64, 0x4f, 0x6e, 0x55, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x69, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x42, 0x69, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x55, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x55, 0x73, 0x61, 0x67, 0x65, 0x22, 0x35, 0x0a, 0x11, 0x53, - 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x69, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x42, 0x69, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x69, 0x0a, 0x13, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x45, 0x78, 0x65, - 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, - 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x48, - 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x48, 0x69, - 0x73, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x22, 0x2a, 0x0a, - 0x14, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x0b, 0x57, 0x61, 0x69, - 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x45, 0x78, 0x65, 0x63, - 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x45, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x32, 0xa9, 0x04, 0x0a, 0x08, 0x45, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x12, 0x3a, 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12, 0x18, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x37, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x18, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, - 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x37, 0x0a, 0x04, - 0x57, 0x61, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x61, 0x69, - 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x43, 0x0a, 0x06, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x12, - 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x43, 0x6f, 0x6d, 0x6d, 0x61, - 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, 0x0b, 0x49, 0x73, - 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x49, 0x73, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49, 0x73, 0x49, - 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x3e, 0x0a, 0x09, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x12, 0x17, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, - 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x56, 0x0a, 0x15, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x42, 0x61, 0x73, - 0x65, 0x64, 0x4f, 0x6e, 0x55, 0x73, 0x61, 0x67, 0x65, 0x12, 0x23, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, 0x42, 0x61, 0x73, 0x65, 0x64, - 0x4f, 0x6e, 0x55, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x42, 0x69, 0x64, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4c, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x4f, - 0x75, 0x74, 0x70, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1a, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x23, 0x5a, 0x21, 0x2e, 0x2f, 0x70, 0x6b, 0x67, 0x2f, - 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, - 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, -} - -var ( - file_executor_proto_rawDescOnce sync.Once - file_executor_proto_rawDescData = file_executor_proto_rawDesc -) - -func file_executor_proto_rawDescGZIP() []byte { - file_executor_proto_rawDescOnce.Do(func() { - file_executor_proto_rawDescData = protoimpl.X.CompressGZIP(file_executor_proto_rawDescData) - }) - return file_executor_proto_rawDescData -} - -var file_executor_proto_msgTypes = make([]protoimpl.MessageInfo, 13) -var file_executor_proto_goTypes = []interface{}{ - (*StartResponse)(nil), // 0: proto.StartResponse - (*RunCommandRequest)(nil), // 1: proto.RunCommandRequest - (*RunCommandResponse)(nil), // 2: proto.RunCommandResponse - (*CancelCommandRequest)(nil), // 3: proto.CancelCommandRequest - (*CancelCommandResponse)(nil), // 4: proto.CancelCommandResponse - (*IsInstalledRequest)(nil), // 5: proto.IsInstalledRequest - (*IsInstalledResponse)(nil), // 6: proto.IsInstalledResponse - (*ShouldBidRequest)(nil), // 7: proto.ShouldBidRequest - (*ShouldBidBasedOnUsageRequest)(nil), // 8: proto.ShouldBidBasedOnUsageRequest - (*ShouldBidResponse)(nil), // 9: proto.ShouldBidResponse - (*OutputStreamRequest)(nil), // 10: proto.OutputStreamRequest - (*OutputStreamResponse)(nil), // 11: proto.OutputStreamResponse - (*WaitRequest)(nil), // 12: proto.WaitRequest -} -var file_executor_proto_depIdxs = []int32{ - 1, // 0: proto.Executor.Run:input_type -> proto.RunCommandRequest - 1, // 1: proto.Executor.Start:input_type -> proto.RunCommandRequest - 12, // 2: proto.Executor.Wait:input_type -> proto.WaitRequest - 3, // 3: proto.Executor.Cancel:input_type -> proto.CancelCommandRequest - 5, // 4: proto.Executor.IsInstalled:input_type -> proto.IsInstalledRequest - 7, // 5: proto.Executor.ShouldBid:input_type -> proto.ShouldBidRequest - 8, // 6: proto.Executor.ShouldBidBasedOnUsage:input_type -> proto.ShouldBidBasedOnUsageRequest - 10, // 7: proto.Executor.GetOutputStream:input_type -> proto.OutputStreamRequest - 2, // 8: proto.Executor.Run:output_type -> proto.RunCommandResponse - 0, // 9: proto.Executor.Start:output_type -> proto.StartResponse - 2, // 10: proto.Executor.Wait:output_type -> proto.RunCommandResponse - 4, // 11: proto.Executor.Cancel:output_type -> proto.CancelCommandResponse - 6, // 12: proto.Executor.IsInstalled:output_type -> proto.IsInstalledResponse - 9, // 13: proto.Executor.ShouldBid:output_type -> proto.ShouldBidResponse - 9, // 14: proto.Executor.ShouldBidBasedOnUsage:output_type -> proto.ShouldBidResponse - 11, // 15: proto.Executor.GetOutputStream:output_type -> proto.OutputStreamResponse - 8, // [8:16] is the sub-list for method output_type - 0, // [0:8] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_executor_proto_init() } -func file_executor_proto_init() { - if File_executor_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_executor_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StartResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RunCommandRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RunCommandResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CancelCommandRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CancelCommandResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*IsInstalledRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*IsInstalledResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ShouldBidRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ShouldBidBasedOnUsageRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ShouldBidResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*OutputStreamRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*OutputStreamResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_executor_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WaitRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_executor_proto_rawDesc, - NumEnums: 0, - NumMessages: 13, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_executor_proto_goTypes, - DependencyIndexes: file_executor_proto_depIdxs, - MessageInfos: file_executor_proto_msgTypes, - }.Build() - File_executor_proto = out.File - file_executor_proto_rawDesc = nil - file_executor_proto_goTypes = nil - file_executor_proto_depIdxs = nil -} diff --git a/pkg/executor/plugins/grpc/proto/executor.proto b/pkg/executor/plugins/grpc/proto/executor.proto deleted file mode 100644 index ef64b00406..0000000000 --- a/pkg/executor/plugins/grpc/proto/executor.proto +++ /dev/null @@ -1,73 +0,0 @@ - -syntax = "proto3"; -package proto; - -// TODO: Complete these structure, rather than merely wrapping serialized JSON bytes in protobuf containers. -// Details in: https://github.com/bacalhau-project/bacalhau/issues/2700 - -message StartResponse { - -} - -message RunCommandRequest { - bytes Params = 1; -} - -message RunCommandResponse { - bytes Params = 1; -} - -message CancelCommandRequest { - string ExecutionID =1; -} - -message CancelCommandResponse { - -} - -message IsInstalledRequest { - -} - -message IsInstalledResponse { - bool Installed = 1; -} - -message ShouldBidRequest { - bytes BidRequest = 1; -} - -message ShouldBidBasedOnUsageRequest { - bytes BidRequest = 1; - bytes Usage = 2; -} - -// shared by both semantic and resource bid -message ShouldBidResponse { - bytes BidResponse = 1; -} - -message OutputStreamRequest { - string ExecutionID = 1; - bool History = 2; - bool Follow = 3; -} - -message OutputStreamResponse { - bytes Data =1; -} - -message WaitRequest { - string ExecutionID = 1; -} - -service Executor { - rpc Run(RunCommandRequest) returns (RunCommandResponse); - rpc Start(RunCommandRequest) returns (StartResponse); - rpc Wait(WaitRequest) returns (stream RunCommandResponse); - rpc Cancel(CancelCommandRequest) returns (CancelCommandResponse); - rpc IsInstalled(IsInstalledRequest) returns (IsInstalledResponse); - rpc ShouldBid(ShouldBidRequest) returns (ShouldBidResponse); - rpc ShouldBidBasedOnUsage(ShouldBidBasedOnUsageRequest) returns (ShouldBidResponse); - rpc GetOutputStream(OutputStreamRequest) returns (stream OutputStreamResponse); -} diff --git a/pkg/executor/plugins/grpc/proto/executor_grpc.pb.go b/pkg/executor/plugins/grpc/proto/executor_grpc.pb.go deleted file mode 100644 index 29132d3715..0000000000 --- a/pkg/executor/plugins/grpc/proto/executor_grpc.pb.go +++ /dev/null @@ -1,412 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.24.3 -// source: executor.proto - -package proto - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// ExecutorClient is the client API for Executor service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type ExecutorClient interface { - Run(ctx context.Context, in *RunCommandRequest, opts ...grpc.CallOption) (*RunCommandResponse, error) - Start(ctx context.Context, in *RunCommandRequest, opts ...grpc.CallOption) (*StartResponse, error) - Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (Executor_WaitClient, error) - Cancel(ctx context.Context, in *CancelCommandRequest, opts ...grpc.CallOption) (*CancelCommandResponse, error) - IsInstalled(ctx context.Context, in *IsInstalledRequest, opts ...grpc.CallOption) (*IsInstalledResponse, error) - ShouldBid(ctx context.Context, in *ShouldBidRequest, opts ...grpc.CallOption) (*ShouldBidResponse, error) - ShouldBidBasedOnUsage(ctx context.Context, in *ShouldBidBasedOnUsageRequest, opts ...grpc.CallOption) (*ShouldBidResponse, error) - GetOutputStream(ctx context.Context, in *OutputStreamRequest, opts ...grpc.CallOption) (Executor_GetOutputStreamClient, error) -} - -type executorClient struct { - cc grpc.ClientConnInterface -} - -func NewExecutorClient(cc grpc.ClientConnInterface) ExecutorClient { - return &executorClient{cc} -} - -func (c *executorClient) Run(ctx context.Context, in *RunCommandRequest, opts ...grpc.CallOption) (*RunCommandResponse, error) { - out := new(RunCommandResponse) - err := c.cc.Invoke(ctx, "/proto.Executor/Run", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *executorClient) Start(ctx context.Context, in *RunCommandRequest, opts ...grpc.CallOption) (*StartResponse, error) { - out := new(StartResponse) - err := c.cc.Invoke(ctx, "/proto.Executor/Start", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *executorClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (Executor_WaitClient, error) { - stream, err := c.cc.NewStream(ctx, &Executor_ServiceDesc.Streams[0], "/proto.Executor/Wait", opts...) - if err != nil { - return nil, err - } - x := &executorWaitClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Executor_WaitClient interface { - Recv() (*RunCommandResponse, error) - grpc.ClientStream -} - -type executorWaitClient struct { - grpc.ClientStream -} - -func (x *executorWaitClient) Recv() (*RunCommandResponse, error) { - m := new(RunCommandResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *executorClient) Cancel(ctx context.Context, in *CancelCommandRequest, opts ...grpc.CallOption) (*CancelCommandResponse, error) { - out := new(CancelCommandResponse) - err := c.cc.Invoke(ctx, "/proto.Executor/Cancel", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *executorClient) IsInstalled(ctx context.Context, in *IsInstalledRequest, opts ...grpc.CallOption) (*IsInstalledResponse, error) { - out := new(IsInstalledResponse) - err := c.cc.Invoke(ctx, "/proto.Executor/IsInstalled", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *executorClient) ShouldBid(ctx context.Context, in *ShouldBidRequest, opts ...grpc.CallOption) (*ShouldBidResponse, error) { - out := new(ShouldBidResponse) - err := c.cc.Invoke(ctx, "/proto.Executor/ShouldBid", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *executorClient) ShouldBidBasedOnUsage(ctx context.Context, in *ShouldBidBasedOnUsageRequest, opts ...grpc.CallOption) (*ShouldBidResponse, error) { - out := new(ShouldBidResponse) - err := c.cc.Invoke(ctx, "/proto.Executor/ShouldBidBasedOnUsage", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *executorClient) GetOutputStream(ctx context.Context, in *OutputStreamRequest, opts ...grpc.CallOption) (Executor_GetOutputStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &Executor_ServiceDesc.Streams[1], "/proto.Executor/GetOutputStream", opts...) - if err != nil { - return nil, err - } - x := &executorGetOutputStreamClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Executor_GetOutputStreamClient interface { - Recv() (*OutputStreamResponse, error) - grpc.ClientStream -} - -type executorGetOutputStreamClient struct { - grpc.ClientStream -} - -func (x *executorGetOutputStreamClient) Recv() (*OutputStreamResponse, error) { - m := new(OutputStreamResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// ExecutorServer is the server API for Executor service. -// All implementations must embed UnimplementedExecutorServer -// for forward compatibility -type ExecutorServer interface { - Run(context.Context, *RunCommandRequest) (*RunCommandResponse, error) - Start(context.Context, *RunCommandRequest) (*StartResponse, error) - Wait(*WaitRequest, Executor_WaitServer) error - Cancel(context.Context, *CancelCommandRequest) (*CancelCommandResponse, error) - IsInstalled(context.Context, *IsInstalledRequest) (*IsInstalledResponse, error) - ShouldBid(context.Context, *ShouldBidRequest) (*ShouldBidResponse, error) - ShouldBidBasedOnUsage(context.Context, *ShouldBidBasedOnUsageRequest) (*ShouldBidResponse, error) - GetOutputStream(*OutputStreamRequest, Executor_GetOutputStreamServer) error - mustEmbedUnimplementedExecutorServer() -} - -// UnimplementedExecutorServer must be embedded to have forward compatible implementations. -type UnimplementedExecutorServer struct { -} - -func (UnimplementedExecutorServer) Run(context.Context, *RunCommandRequest) (*RunCommandResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Run not implemented") -} -func (UnimplementedExecutorServer) Start(context.Context, *RunCommandRequest) (*StartResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Start not implemented") -} -func (UnimplementedExecutorServer) Wait(*WaitRequest, Executor_WaitServer) error { - return status.Errorf(codes.Unimplemented, "method Wait not implemented") -} -func (UnimplementedExecutorServer) Cancel(context.Context, *CancelCommandRequest) (*CancelCommandResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Cancel not implemented") -} -func (UnimplementedExecutorServer) IsInstalled(context.Context, *IsInstalledRequest) (*IsInstalledResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method IsInstalled not implemented") -} -func (UnimplementedExecutorServer) ShouldBid(context.Context, *ShouldBidRequest) (*ShouldBidResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ShouldBid not implemented") -} -func (UnimplementedExecutorServer) ShouldBidBasedOnUsage(context.Context, *ShouldBidBasedOnUsageRequest) (*ShouldBidResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ShouldBidBasedOnUsage not implemented") -} -func (UnimplementedExecutorServer) GetOutputStream(*OutputStreamRequest, Executor_GetOutputStreamServer) error { - return status.Errorf(codes.Unimplemented, "method GetOutputStream not implemented") -} -func (UnimplementedExecutorServer) mustEmbedUnimplementedExecutorServer() {} - -// UnsafeExecutorServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to ExecutorServer will -// result in compilation errors. -type UnsafeExecutorServer interface { - mustEmbedUnimplementedExecutorServer() -} - -func RegisterExecutorServer(s grpc.ServiceRegistrar, srv ExecutorServer) { - s.RegisterService(&Executor_ServiceDesc, srv) -} - -func _Executor_Run_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(RunCommandRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ExecutorServer).Run(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/proto.Executor/Run", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExecutorServer).Run(ctx, req.(*RunCommandRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Executor_Start_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(RunCommandRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ExecutorServer).Start(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/proto.Executor/Start", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExecutorServer).Start(ctx, req.(*RunCommandRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Executor_Wait_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(WaitRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(ExecutorServer).Wait(m, &executorWaitServer{stream}) -} - -type Executor_WaitServer interface { - Send(*RunCommandResponse) error - grpc.ServerStream -} - -type executorWaitServer struct { - grpc.ServerStream -} - -func (x *executorWaitServer) Send(m *RunCommandResponse) error { - return x.ServerStream.SendMsg(m) -} - -func _Executor_Cancel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CancelCommandRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ExecutorServer).Cancel(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/proto.Executor/Cancel", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExecutorServer).Cancel(ctx, req.(*CancelCommandRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Executor_IsInstalled_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(IsInstalledRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ExecutorServer).IsInstalled(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/proto.Executor/IsInstalled", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExecutorServer).IsInstalled(ctx, req.(*IsInstalledRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Executor_ShouldBid_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ShouldBidRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ExecutorServer).ShouldBid(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/proto.Executor/ShouldBid", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExecutorServer).ShouldBid(ctx, req.(*ShouldBidRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Executor_ShouldBidBasedOnUsage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ShouldBidBasedOnUsageRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ExecutorServer).ShouldBidBasedOnUsage(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/proto.Executor/ShouldBidBasedOnUsage", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExecutorServer).ShouldBidBasedOnUsage(ctx, req.(*ShouldBidBasedOnUsageRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Executor_GetOutputStream_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(OutputStreamRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(ExecutorServer).GetOutputStream(m, &executorGetOutputStreamServer{stream}) -} - -type Executor_GetOutputStreamServer interface { - Send(*OutputStreamResponse) error - grpc.ServerStream -} - -type executorGetOutputStreamServer struct { - grpc.ServerStream -} - -func (x *executorGetOutputStreamServer) Send(m *OutputStreamResponse) error { - return x.ServerStream.SendMsg(m) -} - -// Executor_ServiceDesc is the grpc.ServiceDesc for Executor service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Executor_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "proto.Executor", - HandlerType: (*ExecutorServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Run", - Handler: _Executor_Run_Handler, - }, - { - MethodName: "Start", - Handler: _Executor_Start_Handler, - }, - { - MethodName: "Cancel", - Handler: _Executor_Cancel_Handler, - }, - { - MethodName: "IsInstalled", - Handler: _Executor_IsInstalled_Handler, - }, - { - MethodName: "ShouldBid", - Handler: _Executor_ShouldBid_Handler, - }, - { - MethodName: "ShouldBidBasedOnUsage", - Handler: _Executor_ShouldBidBasedOnUsage_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Wait", - Handler: _Executor_Wait_Handler, - ServerStreams: true, - }, - { - StreamName: "GetOutputStream", - Handler: _Executor_GetOutputStream_Handler, - ServerStreams: true, - }, - }, - Metadata: "executor.proto", -} diff --git a/pkg/executor/plugins/grpc/server.go b/pkg/executor/plugins/grpc/server.go deleted file mode 100644 index d7b87798f6..0000000000 --- a/pkg/executor/plugins/grpc/server.go +++ /dev/null @@ -1,166 +0,0 @@ -package grpc - -import ( - "context" - "encoding/json" - "fmt" - "io" - - "github.com/bacalhau-project/bacalhau/pkg/bidstrategy" - "github.com/bacalhau-project/bacalhau/pkg/executor" - "github.com/bacalhau-project/bacalhau/pkg/executor/plugins/grpc/proto" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -const ( - DefaultStreamBufferSize = 1024 -) - -// TODO: Complete protobuf structure, rather than merely wrapping serialized JSON bytes in protobuf containers. -// Details in: https://github.com/bacalhau-project/bacalhau/issues/2700 - -type GRPCServer struct { - Impl executor.Executor - - proto.UnimplementedExecutorServer -} - -func (s *GRPCServer) Start(_ context.Context, request *proto.RunCommandRequest) (*proto.StartResponse, error) { - // NB(forrest): A new context is created for the `Start` operation because `Start` initiates a - // long-running operation. The context passed as an argument to this method is tied to the gRPC request and is - // canceled when this method returns. By creating a separate context, we ensure that `Start` has a lifecycle - // independent of the gRPC request. - ctx := context.Background() - args := new(executor.RunCommandRequest) - if err := json.Unmarshal(request.Params, args); err != nil { - return nil, err - } - if err := s.Impl.Start(ctx, args); err != nil { - return nil, err - } - return &proto.StartResponse{}, nil -} - -func (s *GRPCServer) Wait(request *proto.WaitRequest, server proto.Executor_WaitServer) error { - // NB(forrest): The context obtained from `server.Context()` is appropriate to use here because `Wait` - // is a streaming RPC. The context remains active for the entire lifetime of the stream and is - // only canceled when the client or server closes the stream. This behavior is in contrast to - // unary RPCs (like `Start` and `Run`), where the context is tied to the individual request. - ctx := server.Context() - waitC, errC := s.Impl.Wait(ctx, request.GetExecutionID()) - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errC: - return err - case res := <-waitC: - resp, err := json.Marshal(res) - if err != nil { - return err - } - if err := server.Send(&proto.RunCommandResponse{Params: resp}); err != nil { - return err - } - } - return nil -} - -func (s *GRPCServer) Run(ctx context.Context, request *proto.RunCommandRequest) (*proto.RunCommandResponse, error) { - args := new(executor.RunCommandRequest) - if err := json.Unmarshal(request.Params, args); err != nil { - return nil, err - } - result, err := s.Impl.Run(ctx, args) - if err != nil { - return nil, err - } - b, err := json.Marshal(result) - if err != nil { - return nil, err - } - return &proto.RunCommandResponse{Params: b}, nil -} - -func (s *GRPCServer) Cancel(ctx context.Context, request *proto.CancelCommandRequest) (*proto.CancelCommandResponse, error) { - err := s.Impl.Cancel(ctx, request.ExecutionID) - if err != nil { - return nil, err - } - return &proto.CancelCommandResponse{}, nil -} - -func (s *GRPCServer) IsInstalled(ctx context.Context, _ *proto.IsInstalledRequest) (*proto.IsInstalledResponse, error) { - installed, err := s.Impl.IsInstalled(ctx) - if err != nil { - return nil, err - } - return &proto.IsInstalledResponse{Installed: installed}, nil -} - -func (s *GRPCServer) ShouldBid(ctx context.Context, request *proto.ShouldBidRequest) (*proto.ShouldBidResponse, error) { - var args bidstrategy.BidStrategyRequest - if err := json.Unmarshal(request.BidRequest, &args); err != nil { - return nil, err - } - result, err := s.Impl.ShouldBid(ctx, args) - if err != nil { - return nil, err - } - b, err := json.Marshal(result) - if err != nil { - return nil, err - } - return &proto.ShouldBidResponse{BidResponse: b}, nil -} - -func (s *GRPCServer) ShouldBidBasedOnUsage( - ctx context.Context, - request *proto.ShouldBidBasedOnUsageRequest) (*proto.ShouldBidResponse, error) { - var bidReq bidstrategy.BidStrategyRequest - if err := json.Unmarshal(request.BidRequest, &bidReq); err != nil { - return nil, err - } - var usage models.Resources - if err := json.Unmarshal(request.Usage, &usage); err != nil { - return nil, err - } - result, err := s.Impl.ShouldBidBasedOnUsage(ctx, bidReq, usage) - if err != nil { - return nil, err - } - b, err := json.Marshal(result) - if err != nil { - return nil, err - } - return &proto.ShouldBidResponse{BidResponse: b}, nil -} - -func (s *GRPCServer) GetLogStream(request *proto.OutputStreamRequest, server proto.Executor_GetOutputStreamServer) error { - ctx := server.Context() - result, err := s.Impl.GetLogStream(ctx, executor.LogStreamRequest{ - ExecutionID: request.ExecutionID, - Tail: request.History, - Follow: request.Follow, - }) - if err != nil { - return err - } - defer result.Close() - - buffer := make([]byte, DefaultStreamBufferSize) - for { - n, err := result.Read(buffer) - if err != nil { - if err == io.EOF { - break - } - return fmt.Errorf("failed to read data: %w", err) - } - - res := &proto.OutputStreamResponse{Data: buffer[:n]} - if err := server.Send(res); err != nil { - return fmt.Errorf("failed to send data: %w", err) - } - } - return nil -} diff --git a/pkg/executor/util/pluggable_executor.go b/pkg/executor/util/pluggable_executor.go deleted file mode 100644 index 243302cde6..0000000000 --- a/pkg/executor/util/pluggable_executor.go +++ /dev/null @@ -1,141 +0,0 @@ -package util - -import ( - "context" - "fmt" - "os" - "os/exec" - "path/filepath" - - "github.com/bacalhau-project/bacalhau/pkg/executor" - "github.com/bacalhau-project/bacalhau/pkg/executor/plugins/grpc" - "github.com/hashicorp/go-plugin" -) - -func NewPluginExecutorManager() *PluginExecutorManager { - return &PluginExecutorManager{ - registered: make(map[string]PluginExecutorManagerConfig), - active: make(map[string]*activeExecutor), - } -} - -type PluginExecutorManager struct { - registered map[string]PluginExecutorManagerConfig - active map[string]*activeExecutor -} - -func (e *PluginExecutorManager) Get(ctx context.Context, key string) (executor.Executor, error) { - engine, ok := e.active[key] - if !ok { - return nil, fmt.Errorf("plugin %s not found", key) - } - return engine.Impl, nil -} - -func (e *PluginExecutorManager) Has(ctx context.Context, key string) bool { - _, ok := e.active[key] - return ok -} - -// Keys returns the keys of the registered executors -func (e *PluginExecutorManager) Keys(ctx context.Context) []string { - keys := make([]string, 0, len(e.active)) - for k := range e.active { - keys = append(keys, k) - } - return keys -} - -// compile-time check that PluginExecutorManager implements ExecutorProvider -var _ executor.ExecutorProvider = (*PluginExecutorManager)(nil) - -type activeExecutor struct { - Impl executor.Executor - Closer func() -} - -type PluginExecutorManagerConfig struct { - Name string - Path string - Command string - ProtocolVersion uint - MagicCookieKey string - MagicCookieValue string -} - -func (e *PluginExecutorManager) RegisterPlugin(config PluginExecutorManagerConfig) error { - _, ok := e.registered[config.Name] - if ok { - return fmt.Errorf("duplicate registration of executor %s", config.Name) - } - - if pluginBin, err := os.Stat(filepath.Join(config.Path, config.Command)); err != nil { - return err - } else if pluginBin.IsDir() { - return fmt.Errorf("plugin location is directory, expected binary") - } - // TODO check if binary is executable - - e.registered[config.Name] = config - return nil -} - -func (e *PluginExecutorManager) Start(ctx context.Context) error { - for name, config := range e.registered { - pluginExecutor, closer, err := e.dispense(name, config) - if err != nil { - return err - } - e.active[name] = &activeExecutor{ - Impl: pluginExecutor, - Closer: closer, - } - } - return nil -} - -func (e *PluginExecutorManager) Stop(ctx context.Context) error { - for _, active := range e.active { - active.Closer() - } - return nil -} - -const PluggableExecutorPluginName = "PLUGGABLE_EXECUTOR" - -func (e *PluginExecutorManager) dispense(name string, config PluginExecutorManagerConfig) (executor.Executor, func(), error) { - client := plugin.NewClient(&plugin.ClientConfig{ - Plugins: map[string]plugin.Plugin{ - PluggableExecutorPluginName: &grpc.ExecutorGRPCPlugin{}, - }, - AllowedProtocols: []plugin.Protocol{ - plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, - HandshakeConfig: plugin.HandshakeConfig{ - ProtocolVersion: config.ProtocolVersion, - MagicCookieKey: config.MagicCookieKey, - MagicCookieValue: config.MagicCookieValue, - }, - //nolint:gosec - Cmd: exec.Command(filepath.Join(config.Path, config.Command)), - }) - - rpcClient, err := client.Client() - if err != nil { - client.Kill() - return nil, nil, err - } - - raw, err := rpcClient.Dispense(PluggableExecutorPluginName) - if err != nil { - client.Kill() - return nil, nil, err - } - - pluginExecutor, ok := raw.(executor.Executor) - if !ok { - client.Kill() - return nil, nil, fmt.Errorf("plugin is not of type: PluggableExecutor") - } - - return pluginExecutor, func() { client.Kill() }, nil -} diff --git a/pkg/executor/util/utils.go b/pkg/executor/util/utils.go index ba2f7ef71e..b40b02b5e2 100644 --- a/pkg/executor/util/utils.go +++ b/pkg/executor/util/utils.go @@ -130,27 +130,3 @@ func NewNoopExecutors(config noop_executor.ExecutorConfig) executor.ExecutorProv noopExecutor := noop_executor.NewNoopExecutorWithConfig(config) return provider.NewNoopProvider[executor.Executor](noopExecutor) } - -type PluginExecutorOptions struct { - Plugins []PluginExecutorManagerConfig -} - -func NewPluginExecutorProvider( - ctx context.Context, - cm *system.CleanupManager, - pluginOptions PluginExecutorOptions, -) (executor.ExecutorProvider, error) { - pe := NewPluginExecutorManager() - for _, cfg := range pluginOptions.Plugins { - if err := pe.RegisterPlugin(cfg); err != nil { - return nil, err - } - } - if err := pe.Start(ctx); err != nil { - return nil, err - } - - cm.RegisterCallbackWithContext(pe.Stop) - - return pe, nil -} diff --git a/pkg/node/factories.go b/pkg/node/factories.go index 01a59b9ce3..d10fa144b1 100644 --- a/pkg/node/factories.go +++ b/pkg/node/factories.go @@ -14,7 +14,6 @@ import ( baccrypto "github.com/bacalhau-project/bacalhau/pkg/lib/crypto" "github.com/bacalhau-project/bacalhau/pkg/lib/policy" "github.com/bacalhau-project/bacalhau/pkg/lib/provider" - "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/publisher" publisher_util "github.com/bacalhau-project/bacalhau/pkg/publisher/util" "github.com/bacalhau-project/bacalhau/pkg/storage" @@ -76,39 +75,6 @@ func NewStandardExecutorsFactory(cfg types.EngineConfig) ExecutorsFactory { }) } -func NewPluginExecutorFactory(pluginPath string) ExecutorsFactory { - return ExecutorsFactoryFunc( - func(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error) { - pr, err := executor_util.NewPluginExecutorProvider( - ctx, - nodeConfig.CleanupManager, - executor_util.PluginExecutorOptions{ - Plugins: []executor_util.PluginExecutorManagerConfig{ - { - Name: models.EngineDocker, - Path: pluginPath, - Command: "bacalhau-docker-executor", - ProtocolVersion: 1, - MagicCookieKey: "EXECUTOR_PLUGIN", - MagicCookieValue: "bacalhau_executor", - }, - { - Name: models.EngineWasm, - Path: pluginPath, - Command: "bacalhau-wasm-executor", - ProtocolVersion: 1, - MagicCookieKey: "EXECUTOR_PLUGIN", - MagicCookieValue: "bacalhau_executor", - }, - }, - }) - if err != nil { - return nil, err - } - return provider.NewConfiguredProvider(pr, nodeConfig.BacalhauConfig.Engines.Disabled), err - }) -} - func NewStandardPublishersFactory(cfg types.Bacalhau) PublishersFactory { return PublishersFactoryFunc( func( diff --git a/pkg/node/node.go b/pkg/node/node.go index 0c6e8f370b..663c09e350 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -65,19 +65,6 @@ type NodeDependencyInjector struct { AuthenticatorsFactory AuthenticatorsFactory } -func NewExecutorPluginNodeDependencyInjector( - cfg types.Bacalhau, - userKey *baccrypto.UserKey, - pluginPath string, -) NodeDependencyInjector { - return NodeDependencyInjector{ - StorageProvidersFactory: NewStandardStorageProvidersFactory(cfg), - ExecutorsFactory: NewPluginExecutorFactory(pluginPath), - PublishersFactory: NewStandardPublishersFactory(cfg), - AuthenticatorsFactory: NewStandardAuthenticatorsFactory(userKey), - } -} - func NewStandardNodeDependencyInjector(cfg types.Bacalhau, userKey *baccrypto.UserKey) NodeDependencyInjector { return NodeDependencyInjector{ StorageProvidersFactory: NewStandardStorageProvidersFactory(cfg), diff --git a/pkg/repo/fs.go b/pkg/repo/fs.go index 4e7724bcbe..eff5adcdee 100644 --- a/pkg/repo/fs.go +++ b/pkg/repo/fs.go @@ -208,7 +208,6 @@ func (fsr *FsRepo) WriteRunInfo(ctx context.Context, summaryShellVariablesString func (fsr *FsRepo) EnsureRepoPathsConfigured(c config_legacy.ReadWriter) { c.SetIfAbsent(legacy_types.AuthTokensPath, fsr.join(config_legacy.TokensPath)) c.SetIfAbsent(legacy_types.UserKeyPath, fsr.join(config_legacy.UserPrivateKeyFileName)) - c.SetIfAbsent(legacy_types.NodeExecutorPluginPath, fsr.join(config_legacy.PluginsPath)) // NB(forrest): pay attention to the subtle name difference here c.SetIfAbsent(legacy_types.NodeComputeStoragePath, fsr.join(config_legacy.ComputeStoragesPath)) diff --git a/pkg/repo/migrations/v3_4_test.go b/pkg/repo/migrations/v3_4_test.go index 803226fef7..8cbd9fc39b 100644 --- a/pkg/repo/migrations/v3_4_test.go +++ b/pkg/repo/migrations/v3_4_test.go @@ -75,7 +75,6 @@ func (suite *V3MigrationsTestSuite) TestV3MigrationWithFullRepo() { suite.DirExists(filepath.Join(suite.TempDir, "orchestrator_store")) suite.DirExists(filepath.Join(suite.TempDir, "orchestrator_store", "nats-store")) suite.FileExists(filepath.Join(suite.TempDir, "orchestrator_store", "jobs.db")) - suite.DirExists(filepath.Join(suite.TempDir, "plugins")) suite.FileExists(filepath.Join(suite.TempDir, "repo.version")) suite.FileExists(filepath.Join(suite.TempDir, "update.json")) suite.FileExists(filepath.Join(suite.TempDir, "user_id.pem")) @@ -186,7 +185,6 @@ func (suite *V3MigrationsTestSuite) TestV3MigrationWithMinimalRepo() { suite.DirExists(filepath.Join(suite.TempDir, "orchestrator_store")) suite.NoDirExists(filepath.Join(suite.TempDir, "orchestrator_store", "nats-store")) suite.NoFileExists(filepath.Join(suite.TempDir, "orchestrator_store", "jobs.db")) - suite.DirExists(filepath.Join(suite.TempDir, "plugins")) suite.FileExists(filepath.Join(suite.TempDir, "repo.version")) suite.FileExists(filepath.Join(suite.TempDir, "update.json")) suite.FileExists(filepath.Join(suite.TempDir, "user_id.pem")) @@ -249,7 +247,6 @@ func (suite *V3MigrationsTestSuite) TestV3MigrationWithOrchestratorRepo() { suite.DirExists(filepath.Join(suite.TempDir, "orchestrator_store")) suite.DirExists(filepath.Join(suite.TempDir, "orchestrator_store", "nats-store")) suite.FileExists(filepath.Join(suite.TempDir, "orchestrator_store", "jobs.db")) - suite.DirExists(filepath.Join(suite.TempDir, "plugins")) suite.FileExists(filepath.Join(suite.TempDir, "repo.version")) suite.FileExists(filepath.Join(suite.TempDir, "update.json")) suite.FileExists(filepath.Join(suite.TempDir, "user_id.pem")) @@ -362,7 +359,6 @@ func (suite *V3MigrationsTestSuite) TestV3MigrationWithComputeRepo() { suite.DirExists(filepath.Join(suite.TempDir, "orchestrator_store")) suite.NoDirExists(filepath.Join(suite.TempDir, "orchestrator_store", "nats-store")) suite.NoFileExists(filepath.Join(suite.TempDir, "orchestrator_store", "jobs.db")) - suite.DirExists(filepath.Join(suite.TempDir, "plugins")) suite.FileExists(filepath.Join(suite.TempDir, "repo.version")) suite.FileExists(filepath.Join(suite.TempDir, "update.json")) suite.FileExists(filepath.Join(suite.TempDir, "user_id.pem")) diff --git a/plugins/Makefile b/plugins/Makefile deleted file mode 100644 index 6c250ebdef..0000000000 --- a/plugins/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -TOPTARGETS := build clean - -SUBDIRS := executors - -$(TOPTARGETS): $(SUBDIRS) -$(SUBDIRS): - @echo "$@:" - @$(MAKE) -C $@ $(MAKECMDGOALS) - -.PHONY: $(TOPTARGETS) $(SUBDIRS) diff --git a/plugins/README.md b/plugins/README.md deleted file mode 100644 index f497a80a02..0000000000 --- a/plugins/README.md +++ /dev/null @@ -1,9 +0,0 @@ - -# Plugins - -This directory is the root directory for various plugins for Bacalhau. - - -## Executors - -The [executors plugins](./executors/README.md) implement the default pluggable executors for Bacalhau, along with the protobuf IDL required to implement new pluggable executors. diff --git a/plugins/executors/Makefile b/plugins/executors/Makefile deleted file mode 100644 index bc905f6934..0000000000 --- a/plugins/executors/Makefile +++ /dev/null @@ -1,26 +0,0 @@ -PLUGINS = -TMP_OUT = ../../ - -# As this is building Go specific libraries, it is expected -# that this will live inside each PLUGINS which is Go-based, -# but is implemented here to allow us to iteratively move the -# existing plugins. Requires the installation of -# -# $ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28 -# $ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 -# -# We should move this task to the individual plugins that require -# it once they exist. -idl: executor.proto - @echo " Building protos" - @ if ! which protoc > /dev/null; then \ - echo "error: protoc not installed" >&2; \ - exit 1; \ - fi - protoc --proto_path=. --go-grpc_out=$(TMP_OUT) --go_out=$(TMP_OUT) executor.proto - -build: idl - @echo " No plugins to build yet" - -clean: - @echo " Executors: Nothing to clean" diff --git a/plugins/executors/README.md b/plugins/executors/README.md deleted file mode 100644 index d7e03af3c7..0000000000 --- a/plugins/executors/README.md +++ /dev/null @@ -1,19 +0,0 @@ - -# Executor plugins - -Executors are responsible for running jobs in Bacalhau, and this directory is the home of the builtin Bacalhau plugins for Docker and WebAssembly jobs. - - -## WIP: Building plugins - -To build the default plugins once they are implemented here, you can use the provided Makefile. - -```sh -# Build -make build - -# Cleanup -make clean -``` - -This will run the build, and clean, tasks respectively for each plugin that lives in this directory. diff --git a/plugins/executors/executor.proto b/plugins/executors/executor.proto deleted file mode 100644 index 5783cbb14b..0000000000 --- a/plugins/executors/executor.proto +++ /dev/null @@ -1,75 +0,0 @@ - -syntax = "proto3"; -package proto; - -option go_package = "./pkg/executor/plugins/grpc/proto"; - -// TODO: Complete these structure, rather than merely wrapping serialized JSON bytes in protobuf containers. -// Details in: https://github.com/bacalhau-project/bacalhau/issues/2700 - -message StartResponse { - -} - -message RunCommandRequest { - bytes Params = 1; -} - -message RunCommandResponse { - bytes Params = 1; -} - -message CancelCommandRequest { - string ExecutionID =1; -} - -message CancelCommandResponse { - -} - -message IsInstalledRequest { - -} - -message IsInstalledResponse { - bool Installed = 1; -} - -message ShouldBidRequest { - bytes BidRequest = 1; -} - -message ShouldBidBasedOnUsageRequest { - bytes BidRequest = 1; - bytes Usage = 2; -} - -// shared by both semantic and resource bid -message ShouldBidResponse { - bytes BidResponse = 1; -} - -message OutputStreamRequest { - string ExecutionID = 1; - bool History = 2; - bool Follow = 3; -} - -message OutputStreamResponse { - bytes Data =1; -} - -message WaitRequest { - string ExecutionID = 1; -} - -service Executor { - rpc Run(RunCommandRequest) returns (RunCommandResponse); - rpc Start(RunCommandRequest) returns (StartResponse); - rpc Wait(WaitRequest) returns (stream RunCommandResponse); - rpc Cancel(CancelCommandRequest) returns (CancelCommandResponse); - rpc IsInstalled(IsInstalledRequest) returns (IsInstalledResponse); - rpc ShouldBid(ShouldBidRequest) returns (ShouldBidResponse); - rpc ShouldBidBasedOnUsage(ShouldBidBasedOnUsageRequest) returns (ShouldBidResponse); - rpc GetOutputStream(OutputStreamRequest) returns (stream OutputStreamResponse); -}