From 764831dcbfe2f8d811b8dcc67363394960359427 Mon Sep 17 00:00:00 2001 From: Jamil Shamy <4977827+jamlo@users.noreply.github.com> Date: Tue, 8 Oct 2024 10:51:16 -0400 Subject: [PATCH 1/4] Improve API error handling (#4607) This PR tries to fix the "FromHttpResponse" function. Returning 2 errors from a function can be confusing and uncommon in Golang, especially if the function only returns these 2 errors. The new code will return one error, which is always an APIError, and it also improves the error messaging wording, adding more details in the output. Several more tweaks can be done to the current error message handling if needed. Note: this is identical to [PR 4565](https://github.com/bacalhau-project/bacalhau/pull/4565), though created from a branch, and not a fork. --- pkg/publicapi/apimodels/error.go | 19 ++++++++++++------- pkg/publicapi/client/v2/client.go | 27 ++++++++------------------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 6d2bc98bfd..842fe5734e 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -2,7 +2,6 @@ package apimodels import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -67,23 +66,29 @@ func (e *APIError) Error() string { } // Parse HTTP Resposne to APIError -func FromHttpResponse(resp *http.Response) (*APIError, error) { - +func GenerateAPIErrorFromHTTPResponse(resp *http.Response) *APIError { if resp == nil { - return nil, errors.New("response is nil, cannot be unmarsheld to APIError") + return NewAPIError(0, "API call error, invalid response") } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("error reading response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to read API call response body. Error: %q", err.Error())) } var apiErr APIError err = json.Unmarshal(body, &apiErr) if err != nil { - return nil, fmt.Errorf("error parsing response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to parse API call response body. Error: %q. Body received: %q", + err.Error(), + string(body), + )) } // If the JSON didn't include a status code, use the HTTP Status @@ -91,7 +96,7 @@ func FromHttpResponse(resp *http.Response) (*APIError, error) { apiErr.HTTPStatusCode = resp.StatusCode } - return &apiErr, nil + return &apiErr } // FromBacError converts a bacerror.Error to an APIError diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index 18f687ed34..95979028be 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -74,18 +74,12 @@ func (c *httpClient) Get(ctx context.Context, endpoint string, in apimodels.GetR return apimodels.NewUnauthorizedError("invalid token") } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - defer resp.Body.Close() if out != nil { @@ -116,18 +110,12 @@ func (c *httpClient) write(ctx context.Context, verb, endpoint string, in apimod return apimodels.ErrInvalidToken } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - if out != nil { if err := decodeBody(resp, &out); err != nil { return err @@ -362,12 +350,13 @@ func (c *httpClient) interceptError(ctx context.Context, err error, resp *http.R WithCode(bacerrors.UnauthorizedError) } - apiError, apiErr := apimodels.FromHttpResponse(resp) - if apiErr == nil { + apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp) + if apiError != nil { return apiError.ToBacError() } - return bacerrors.Wrap(apiErr, "server error"). + return bacerrors.New("server error"). + WithHTTPStatusCode(http.StatusInternalServerError). WithCode(bacerrors.InternalError) } From 54e8e0771fc668865e748916d70d054ebc0cd158 Mon Sep 17 00:00:00 2001 From: Jamil Shamy <4977827+jamlo@users.noreply.github.com> Date: Wed, 9 Oct 2024 07:42:50 -0400 Subject: [PATCH 2/4] Support running Bacalhau in Docker compose (#4596) This pull requests addresses issue #4595 --- .pre-commit-config.yaml | 2 +- test-integration/Dockerfile-ClientNode | 27 +++ test-integration/Dockerfile-ComputeNode | 24 +++ .../Dockerfile-DockerImageRegistryNode | 24 +++ test-integration/Dockerfile-RequesterNode | 22 ++ test-integration/README.md | 198 ++++++++++++++++++ test-integration/certificates/README.md | 9 + .../certificates/generate_leaf_certs.sh | 71 +++++++ .../certificates/generate_root_ca.sh | 29 +++ .../bacalhau-container-img-registry-node.crt | 33 +++ .../bacalhau-container-img-registry-node.key | 52 +++++ .../bacalhau_test_root_ca.crt | 31 +++ .../bacalhau_test_root_ca.key | 52 +++++ test-integration/compute_node_image_setup.sh | 38 ++++ test-integration/docker-compose.yml | 117 +++++++++++ 15 files changed, 728 insertions(+), 1 deletion(-) create mode 100644 test-integration/Dockerfile-ClientNode create mode 100644 test-integration/Dockerfile-ComputeNode create mode 100644 test-integration/Dockerfile-DockerImageRegistryNode create mode 100644 test-integration/Dockerfile-RequesterNode create mode 100644 test-integration/README.md create mode 100644 test-integration/certificates/README.md create mode 100755 test-integration/certificates/generate_leaf_certs.sh create mode 100755 test-integration/certificates/generate_root_ca.sh create mode 100644 test-integration/certificates/generated_assets/bacalhau-container-img-registry-node.crt create mode 100644 test-integration/certificates/generated_assets/bacalhau-container-img-registry-node.key create mode 100644 test-integration/certificates/generated_assets/bacalhau_test_root_ca.crt create mode 100644 test-integration/certificates/generated_assets/bacalhau_test_root_ca.key create mode 100755 test-integration/compute_node_image_setup.sh create mode 100644 test-integration/docker-compose.yml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ee7d6b8d4a..ee46828daa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: detect-aws-credentials args: [--allow-missing-credentials] - id: detect-private-key - exclude: testdata/.* + exclude: 'testdata/.*|test-integration/certificates/.*' - id: check-yaml - id: check-json - repo: https://github.com/astral-sh/ruff-pre-commit diff --git a/test-integration/Dockerfile-ClientNode b/test-integration/Dockerfile-ClientNode new file mode 100644 index 0000000000..da31f340f7 --- /dev/null +++ b/test-integration/Dockerfile-ClientNode @@ -0,0 +1,27 @@ +# Use the docker:dind image as the base image +FROM docker:dind + +# Set the working directory +WORKDIR /app + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Update CA certificates +RUN update-ca-certificates + +# Download and execute the Bash script from the given URL +RUN curl -sSL https://get.bacalhau.org/install.sh | bash + +# Download the binary, make it executable, and move it to /usr/local/bin +RUN curl -o /tmp/mc https://dl.min.io/client/mc/release/linux-amd64/mc \ + && chmod +x /tmp/mc \ + && mv /tmp/mc /usr/local/bin/ + +ENTRYPOINT ["dockerd-entrypoint.sh"] diff --git a/test-integration/Dockerfile-ComputeNode b/test-integration/Dockerfile-ComputeNode new file mode 100644 index 0000000000..7a6cc4ebaa --- /dev/null +++ b/test-integration/Dockerfile-ComputeNode @@ -0,0 +1,24 @@ +# Use the docker:dind image as the base image +FROM docker:dind + +# Set the working directory +WORKDIR /app + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Update CA certificates +RUN update-ca-certificates + +# Download and execute the Bash script from the given URL +RUN curl -sSL https://get.bacalhau.org/install.sh | bash + +COPY compute_node_image_setup.sh compute_node_image_setup.sh +ENTRYPOINT ["/usr/bin/env"] +CMD ./compute_node_image_setup.sh diff --git a/test-integration/Dockerfile-DockerImageRegistryNode b/test-integration/Dockerfile-DockerImageRegistryNode new file mode 100644 index 0000000000..9c38ba886e --- /dev/null +++ b/test-integration/Dockerfile-DockerImageRegistryNode @@ -0,0 +1,24 @@ +FROM registry:2 + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Create a directory to store certificates to be used by the registry +RUN mkdir /certs + +# Copy the certificate and key from the local directory to /certs +COPY certificates/generated_assets/bacalhau-container-img-registry-node.crt /certs/ +COPY certificates/generated_assets/bacalhau-container-img-registry-node.key /certs/ + +# Ensure proper permissions for certs +RUN chmod 600 /certs/bacalhau-container-img-registry-node.key +RUN chmod 644 /certs/bacalhau-container-img-registry-node.crt + +# Expose the registry's default port +EXPOSE 5000 443 diff --git a/test-integration/Dockerfile-RequesterNode b/test-integration/Dockerfile-RequesterNode new file mode 100644 index 0000000000..cbbd207c32 --- /dev/null +++ b/test-integration/Dockerfile-RequesterNode @@ -0,0 +1,22 @@ +# Use the docker:dind image as the base image +FROM docker:dind + +# Set the working directory +WORKDIR /app + +# Install curl and bash +RUN apk update && apk add --no-cache curl bash + +# Install the ca-certificates package +RUN apk add --no-cache ca-certificates + +# Copy a root ca into the image +COPY certificates/generated_assets/bacalhau_test_root_ca.crt /usr/local/share/ca-certificates/bacalhau_test_root_ca.crt + +# Update CA certificates +RUN update-ca-certificates + +# Download and execute the Bash script from the given URL +RUN curl -sSL https://get.bacalhau.org/install.sh | bash + +ENTRYPOINT ["dockerd-entrypoint.sh"] diff --git a/test-integration/README.md b/test-integration/README.md new file mode 100644 index 0000000000..bada3d6e3c --- /dev/null +++ b/test-integration/README.md @@ -0,0 +1,198 @@ +# Running Bacalhau on Docker + +## Overview + +Since Bacalhau is a distributed system with multiple components, it is critical to have a reliable method for end-to-end testing. Additionally, it's important that these tests closely resemble a real production environment without relying on mocks. + +This setup addresses those needs by running Bacalhau inside containers while also supporting Docker workloads within these containers (using Docker-in-Docker, or DinD). + +## Architecture + +- A Requester Docker container, running Bacalhau as a requester node. +- A Compute Docker container, running Bacalhau as a compute node and is configured to run Docker containers inside it. +- A Bacalhau Client Docker container to act as a jumpbox to interact with this Bacalhau deployment. +- A [Registry](https://github.com/distribution/distribution/) Docker container to act as the local container image registry. +- A Minio Docker container to support running S3 compatible input/output jobs. +- Docker Compose is used to create 5 services: the Requester Node, the Compute Node, the Client CLI Node, the registry node, and the Minio node. +- All the services are connected on the same Docker network, allowing them to communicate over the bridged network. +- All the containers have an injected custom Certificate Authority, which is used for a portion of the internal TLS communication. + - TODO: Expand the TLS setup to more components. Now it is used for the registry communication only. + +## Setup + +--- +### Build the Docker Images + +Build the Requester Node image: +```shell +docker build -f Dockerfile-RequesterNode -t bacalhau-requester-node-image . +``` + +Build the Compute Node image: +```shell +docker build -f Dockerfile-ComputeNode -t bacalhau-compute-node-image . +``` + +Build the Client Node image: +```shell +docker build -f Dockerfile-ClientNode -t bacalhau-client-node-image . +``` + +Build the Registry Node image: +```shell +docker build -f Dockerfile-DockerImageRegistryNode -t bacalhau-container-img-registry-node-image . +``` + +After running these commands, you should see the above images created: +```shell +docker image ls +``` +--- +### Running the setup + +Run Docker Compose: +```shell +docker-compose up +``` + +Access the utility client container to use the Bacalhau CLI: +```shell +docker exec -it bacalhau-client-node-container /bin/bash +``` + +Once inside the container, you can run the following commands to verify the setup: +```shell +# You should see two nodes: a Requestor and a Compute Node +bacalhau node list +``` + +Run a test workload +```shell +bacalhau docker run hello-world + +# Describe the job; it should have completed successfully. +bacalhau job describe ........ +``` + +In another terminal window, you can follow the logs of the Requester node, and compute node +```shell +docker logs bacalhau-requester-node-container -f +docker logs bacalhau-compute-node-container -f +``` + +--- +### Setting Up Minio + +Access the utility client container to use the Bacalhau CLI: +```shell +docker exec -it bacalhau-client-node-container /bin/bash +``` + +Setup an alias for the Minio CLI +```shell +# The environment variables are already injected in +# the container, no need to replce them yourself. +mc alias set bacalhau-minio "http://${BACALHAU_MINIO_NODE_HOST}:9000" "${MINIO_ROOT_USER}" "${MINIO_ROOT_PASSWORD}" +mc admin info bacalhau-minio +``` + +Create a bucket and add some files +```shell +mc mb bacalhau-minio/my-data-bucket +mc ls bacalhau-minio/my-data-bucket/section1/ +echo "This is a sample text hello hello." > example.txt +mc cp example.txt bacalhau-minio/my-data-bucket/section1/ +``` + +RUn a job with data input from the minion bucket + +```shell +# Content of aws-test-job.yaml below +bacalhau job run aws-test-job.yaml +``` + +```yaml +Name: S3 Job Data Access Test +Type: batch +Count: 1 +Tasks: + - Name: main + Engine: + Type: docker + Params: + Image: ubuntu:latest + Entrypoint: + - /bin/bash + Parameters: + - "-c" + - "cat /put-my-s3-data-here/example.txt" + InputSources: + - Target: "/put-my-s3-data-here" + Source: + Type: s3 + Params: + Bucket: "my-data-bucket" + Key: "section1/" + Endpoint: "http://bacalhau-minio-node:9000" + Region: "us-east-1" # If no region added, it fails, even for minio +``` + +--- +### Setting Up private registry + +This docker compose deployment has a private registry deployed on its own node. It allows us to +create tests and experiment with docker images jobs without the need to use DockerHub in anyway. + +From inside the client container, let's pull an image from DockerHub, push it to our own private registry, +then run a docker job running the image in out private registry. + +```shell +# pull from docker hub +docker pull ubuntu + +# tag the image to prepare it to be push to our private registry +docker image tag ubuntu bacalhau-container-img-registry-node:5000/firstbacalhauimage + +# push the image to our private registry +docker push bacalhau-container-img-registry-node:5000/firstbacalhauimage +``` + +Now, let's create a job that references that image in private registry: + +```shell +# Content of private-registry-test-job.yaml below +bacalhau job run private-registry-test-job.yaml +``` + +```yaml +Name: Job to test using local registry images +Type: batch +Count: 1 +Tasks: + - Name: main + Engine: + Type: docker + Params: + Image: bacalhau-container-img-registry-node:5000/firstbacalhauimage + Entrypoint: + - /bin/bash + Parameters: + - "-c" + - "echo test-local-registry" +``` + +--- +### Notes: + +If for some reason after running `docker-compose up`, you faced issues with the Image registry node starting, try to remove the image registry docker volume by running: + +```shell +# Destroy the deployment +docker-compose down + +# Remove registry volume +docker volume rm test-integration_registry-volume + +# Create deployment again +docker-compose up +``` diff --git a/test-integration/certificates/README.md b/test-integration/certificates/README.md new file mode 100644 index 0000000000..f993908841 --- /dev/null +++ b/test-integration/certificates/README.md @@ -0,0 +1,9 @@ +# Certificate Generation + +The script in the folder allows you to generate certificates that are signed by a root CA, and provide the +CN and SAN for these leaf certs. The generated certs will be added to the `generated_assets` directory. + +Usage: `./generate_leaf_certs.sh ` +```shell +./generate_leaf_certs.sh my-bacalhau-requester-node +``` diff --git a/test-integration/certificates/generate_leaf_certs.sh b/test-integration/certificates/generate_leaf_certs.sh new file mode 100755 index 0000000000..0411adc9d3 --- /dev/null +++ b/test-integration/certificates/generate_leaf_certs.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# Set variables +ROOT_CA_CERT="generated_assets/bacalhau_test_root_ca.crt" +ROOT_CA_KEY="generated_assets/bacalhau_test_root_ca.key" +DAYS_VALID=1825 # 5 years + +# Organization name and country (same as before) +ORG_NAME="Bacalhau" +COUNTRY="US" + +# Check if the input argument is provided +if [[ -z "$1" ]]; then + echo "Error: Please provide a string for the Common Name and Subject Alternative Names." + exit 1 +fi + +COMMON_NAME="$1" +OUTPUT_CERT="generated_assets/${COMMON_NAME}.crt" +OUTPUT_KEY="generated_assets/${COMMON_NAME}.key" +CSR_PATH="generated_assets/${COMMON_NAME}.csr" +CNF_PATH="generated_assets/${COMMON_NAME}.cnf" + +# Check if the files already exist +if [[ -f "${OUTPUT_CERT}" ]] || [[ -f "${OUTPUT_KEY}" ]]; then + echo "Error: One or both of the following files already exist:" + [[ -f "${OUTPUT_CERT}" ]] && echo " - ${OUTPUT_CERT}" + [[ -f "${OUTPUT_KEY}" ]] && echo " - ${OUTPUT_KEY}" + echo "Please remove or rename the existing files before running this script." + exit 1 +fi + +# Generate a private key for the new certificate +echo "Generating certificate signed by the root CA..." +openssl genpkey -algorithm RSA -out "${OUTPUT_KEY}" -pkeyopt rsa_keygen_bits:4096 + +# Create an OpenSSL configuration file for the SAN +cat > "${CNF_PATH}" </dev/null 2>&1; then + echo "dockerd is available! Now Starting Bacalhau as a compute node" + bacalhau config set compute.auth.token="${NETWORK_AUTH_TOKEN}" + bacalhau serve --compute -c compute.orchestrators="nats://${REQUESTER_NODE_LINK}:4222" + # Wait for any process to exit + wait -n + + # Exit with status of process that exited first + exit $? + fi + + # Wait before retrying + echo "dockerd is not available yet. Retrying in ${RETRY_INTERVAL} seconds..." + sleep "${RETRY_INTERVAL}" + + # Increment attempt counter + attempt=$((attempt + 1)) +done + +echo "dockerd did not become available within ${TOTAL_WAIT_TIME_FOR_DOCKERD} seconds." +exit 1 diff --git a/test-integration/docker-compose.yml b/test-integration/docker-compose.yml new file mode 100644 index 0000000000..2340fba1a6 --- /dev/null +++ b/test-integration/docker-compose.yml @@ -0,0 +1,117 @@ +x-common-env-variables: &common-env-variables + NETWORK_AUTH_TOKEN: "i_am_very_secret_token" + BACALHAU_API_PORT: "1234" + MINIO_ROOT_USER: "minioadmin" + MINIO_ROOT_PASSWORD: "minioadminpass" + AWS_ACCESS_KEY_ID: "minioadmin" + AWS_SECRET_ACCESS_KEY: "minioadminpass" + +networks: + bacalhau-network: + driver: bridge + +volumes: + minio-volume: + driver: local + registry-volume: + driver: local + +services: + bacalhau-minio-node: + image: quay.io/minio/minio + container_name: bacalhau-minio-node-container + command: server /data --console-address ":9001" + volumes: + - minio-volume:/data + restart: always + networks: + - bacalhau-network + environment: *common-env-variables + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-container-img-registry-node: + image: bacalhau-container-img-registry-node-image + container_name: bacalhau-container-img-registry-container + volumes: + - registry-volume:/var/lib/registry + restart: always + networks: + - bacalhau-network + environment: + REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY: /var/lib/registry + REGISTRY_HTTP_ADDR: "0.0.0.0:5000" + REGISTRY_HTTP_TLS_CERTIFICATE: "/certs/bacalhau-container-img-registry-node.crt" + REGISTRY_HTTP_TLS_KEY: "/certs/bacalhau-container-img-registry-node.key" + healthcheck: + test: [ "CMD-SHELL", "nc -zv localhost 5000" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-requester-node: + image: bacalhau-requester-node-image + container_name: bacalhau-requester-node-container + networks: + - bacalhau-network + environment: *common-env-variables + depends_on: + bacalhau-minio-node: + condition: service_healthy + privileged: true + command: + - /bin/bash + - -c + - | + bacalhau config set "orchestrator.auth.token" "$${NETWORK_AUTH_TOKEN}" && bacalhau serve --orchestrator -c api.port=$${BACALHAU_API_PORT} + healthcheck: + test: [ "CMD-SHELL", "nc -zv localhost 1234" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-compute-node: + image: bacalhau-compute-node-image + container_name: bacalhau-compute-node-container + privileged: true + networks: + - bacalhau-network + depends_on: + bacalhau-requester-node: + condition: service_healthy + bacalhau-container-img-registry-node: + condition: service_healthy + environment: + <<: *common-env-variables + REQUESTER_NODE_LINK: 'bacalhau-requester-node' + healthcheck: + test: [ "CMD-SHELL", "nc -zv localhost 1234" ] + interval: 1s + timeout: 5s + retries: 30 + start_period: 2s + + bacalhau-client-node: + image: bacalhau-client-node-image + container_name: bacalhau-client-node-container + privileged: true + networks: + - bacalhau-network + depends_on: + bacalhau-requester-node: + condition: service_healthy + bacalhau-compute-node: + condition: service_healthy + bacalhau-container-img-registry-node: + condition: service_healthy + environment: + <<: *common-env-variables + BACALHAU_API_HOST: 'bacalhau-requester-node' + BACALHAU_COMPUTE_NODE_HOST: 'bacalhau-compute-node' + BACALHAU_MINIO_NODE_HOST: 'bacalhau-minio-node' From bfc1578958cc2143ab04546073b0f6832ae4b2fb Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Wed, 9 Oct 2024 14:49:18 +0200 Subject: [PATCH 3/4] fix cspell and disable golangci-lint spellcheck (#4610) --- .cspell-code.json | 52 ---------------- .cspell/custom-dictionary.txt | 61 ++++++++++++++++++ .golangci.yml | 7 --- DESIGN.md | 17 +---- Makefile | 4 +- clients/python/docs/OrchestratorApi.md | 12 ++-- cmd/cli/agent/alive_test.go | 4 +- cmd/cli/agent/node_test.go | 4 +- cmd/cli/agent/version_test.go | 4 +- cmd/cli/docker/docker_run_cli_test.go | 22 ++++--- cmd/cli/docker/docker_run_test.go | 14 ++--- cmd/cli/exec/exec.go | 2 +- cmd/cli/exec/exec_test.go | 2 +- cmd/cli/exec/templates.go | 6 +- cmd/cli/helpers/helpers.go | 2 +- cmd/cli/serve/serve.go | 4 +- cmd/cli/version/version.go | 4 +- cmd/cli/version/version_test.go | 9 +-- cmd/cli/wasm/wasm_run.go | 4 +- cmd/testing/base.go | 2 +- cmd/util/flags/cliflags/job.go | 2 +- cmd/util/flags/configflags/register.go | 30 ++++----- cmd/util/opts/storage_specconfig.go | 2 +- cmd/util/tokens.go | 2 +- cspell.json | 8 --- cspell.yaml | 62 +++++++++++++++++++ .../provisioning/dashboards/dashboard.json | 12 ++-- pkg/authz/policies/policy_ns_anon.rego | 4 +- pkg/compute/executor.go | 26 ++++---- pkg/compute/executor_buffer.go | 6 +- pkg/compute/store/test/store_suite.go | 2 +- pkg/config/types/bacalhau_test.go | 40 ++++++------ pkg/config/types/gen/generate.go | 4 +- pkg/devstack/devstack.go | 4 +- pkg/executor/docker/executor.go | 8 +-- pkg/executor/docker/executor_test.go | 2 +- pkg/executor/docker/models/types_test.go | 20 +++--- pkg/executor/docker/network.go | 8 +-- pkg/executor/wasm/executor.go | 8 +-- pkg/executor/wasm/loader.go | 4 +- pkg/jobstore/boltdb/store_test.go | 2 +- .../collections/hashed_priority_queue_test.go | 8 +-- pkg/lib/collections/priority_queue.go | 10 +-- .../collections/priority_queue_base_test.go | 32 +++++----- pkg/lib/collections/priority_queue_test.go | 8 +-- pkg/lib/crypto/certificate.go | 2 +- pkg/lib/crypto/certificate_test.go | 6 +- pkg/lib/policy/scrypt.go | 2 +- pkg/logger/wasm/logmanager.go | 7 ++- pkg/models/event_test.go | 2 +- pkg/models/execution.go | 2 +- pkg/models/network.go | 20 +++--- pkg/models/network_test.go | 4 +- pkg/nats/proxy/compute_proxy.go | 7 ++- pkg/nats/stream/consumer_client.go | 7 ++- pkg/nats/stream/types.go | 2 +- pkg/nats/transport/nats.go | 2 +- pkg/node/factories.go | 8 +-- pkg/node/utils.go | 2 +- pkg/publicapi/apimodels/error.go | 2 +- pkg/publicapi/endpoint/orchestrator/node.go | 2 +- pkg/publicapi/middleware/version.go | 26 ++++---- pkg/publicapi/middleware/version_test.go | 6 +- pkg/repo/migrations/v3_4.go | 2 +- pkg/routing/tracing/tracing.go | 2 +- pkg/s3/errors_test.go | 2 + pkg/storage/inline/storage.go | 15 ++--- pkg/storage/s3/storage_test.go | 2 + pkg/test/compute/resourcelimits_test.go | 4 +- pkg/test/executor/test_runner.go | 4 +- pkg/test/scenario/resolver.go | 2 +- pkg/test/scenario/responses.go | 2 +- pkg/test/scenario/results.go | 2 +- pkg/util/generic/broadcaster_test.go | 7 ++- pkg/util/idgen/short_id_test.go | 2 + python/mkdocs.yml | 2 +- test-integration/README.md | 2 +- 77 files changed, 382 insertions(+), 327 deletions(-) delete mode 100644 .cspell-code.json delete mode 100644 cspell.json create mode 100644 cspell.yaml diff --git a/.cspell-code.json b/.cspell-code.json deleted file mode 100644 index 2e846c2e82..0000000000 --- a/.cspell-code.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "version": "0.2", - "language": "en", - "allowCompoundWords": true, - "dictionaryDefinitions": [ - { - "name": "custom-dictionary", - "path": "./.cspell/custom-dictionary.txt", - "addWords": true - } - ], - "dictionaries": [ - "en", - "custom-words", - "custom-dictionary" - ], - "ignorePaths": [ - "**/package.json", - "**/docs/package-lock.json", - "**/docs/docs/examples/model-training/Stable-Diffusion-Dreambooth/index.md", - "docs/docs/examples/model-training/Training-Tensorflow-Model/index.md", - "./webui/build", - "./webui/node_modules", - "./webui/package.json", - "./webui/package-lock.json", - "./.gitprecommit", - "./webui/tsconfig.json", - "./vendor", - "go.sum", - "go.mod", - "go.work.sum", - "apps" - ], - "ignoreRegExpList": [ - "Urls", - "Email", - "RsaCert", - "SshRsa", - "Base64MultiLine", - "Base64SingleLine", - "CommitHash", - "CommitHashLink", - "CStyleHexValue", - "CSSHexValue", - "SHA", - "HashStrings", - "UnicodeRef", - "UUID", - "/github.com.*/", - "/\\w+{12,}/" - ] -} diff --git a/.cspell/custom-dictionary.txt b/.cspell/custom-dictionary.txt index 4866751ba1..bb8b2c1c1b 100644 --- a/.cspell/custom-dictionary.txt +++ b/.cspell/custom-dictionary.txt @@ -355,6 +355,7 @@ wasmlogs wasmmodels wazero wdbaruni's +simonwo webui wesbos winderresearch @@ -371,3 +372,63 @@ yyyymmddhhmm zarr zerolog zidane +IMDC +kvstore +unmarshalling +Nowf +pkey +machineid +bacerror +Nacked +pqueue +Routez +Connz +Subsz +nuid +Noticef +Warnf +Debugf +Tracef +sresource +Syncer +mathgo +providables +JSONV +Idxs +boltdblib +hclog +THAMTShard +mergo +serde +qdisc +puuid +pkgs +pscbin +rocm +strg +otlploggrpc +yacspin +APITLSCA +APITLSCA +Milli +Errf +doesn +cicd +nvme +fdisk +mdstat +xcom +Fooco +Eventuallyf +Truef +sekret +Equalf +Doesnt +HAMT +dagpb +Berdly +frrist +swaggo +isbadactor +installationid +firstbacalhauimage \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml index 8a7a7ba501..641b6e1d6e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -54,12 +54,6 @@ linters-settings: - shadow lll: line-length: 140 - misspell: - locale: US - ignore-words: - - favour - - cancelled - - cancelling nolintlint: allow-leading-space: true # don't require machine-readable nolint directives (i.e. with no leading space) allow-unused: true # report any unused nolint directives @@ -89,7 +83,6 @@ linters: - govet - ineffassign - lll - - misspell - mnd - nakedret - noctx diff --git a/DESIGN.md b/DESIGN.md index 62d45785aa..c322baa3ee 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -103,16 +103,11 @@ Ideally, we will also allow much more fine-grained control, specifying location, - She has a file `process.py` which includes the python code necessary to execute in a function called 'downscale()' which takes a file handle to local, processes it, and returns a bytestream. - She executes the following command: ``` -ifps job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR +ipfs job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR ``` - This runs the command in a local executor, first installing all the python packages necessary, and then executing them, on the subset of data available on that node. - Once complete, the system returns the CID of the updated dataset that she can download. -- **SCENARIO 3** Want to burst to cloud but cannot move entire dataset in short time - - DHASH CAN YOU HELP FLESH OUT - - **PUSH COMPUTE INTO GENE SEQUENCER** - - **PIPE TO S3** - ## Components to Build - Build an application that listens for jobs over NATS, receives payment somehow, runs the job in {kubernetes, docker, idk}, and returns the result to the use (ideally the 'result' is in the form of an ipfs object and we can just return the hash). @@ -125,13 +120,3 @@ ifps job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrz Bacalhau means cod (the fish) in Portuguese (where several folks were brainstorming this topic). Compute-Over-Data == Cod == Bacalhau - -## Prior Art / Parallel Projects -* IPFS-FAN - distributed serverless - https://research.protocol.ai/publications/ipfs-fan-a-function-addressable-computation-network/delarocha2021a.pdf -* IPLS : A Framework for Decentralized Federated Learning- https://arxiv.org/pdf/2101.01901v1.pdf -* Interplanetary Distributed Computing (2018) - https://github.com/yenkuanlee/IPDC -* IPTF - IPFS + TensorFlow (2018) - https://github.com/tesserai/iptf -* Lurk -> Run queries over Filecoin Sealed Data (no public paper yet) -* Radix - Nomad based scheduler for IPFS cluster (only) - high level spec doc https://docs.google.com/document/d/18hdYBmDlvusEOQ-iSNIO_IAEOvJVFL1MyAU_B8hON9Q/edit?usp=sharing -* Bringing Arbitrary Compute to Authoritative Data https://queue.acm.org/detail.cfm?id=2645649 -* Manta: a scalable, distributed object store https://github.com/joyent/manta diff --git a/Makefile b/Makefile index e33f0023a5..beab2c26cf 100644 --- a/Makefile +++ b/Makefile @@ -517,5 +517,5 @@ else endif .PHONY: spellcheck-code -spellcheck-code: ## Runs a spellchecker over all code - MVP just does one file - cspell -c .cspell-code.json lint ./pkg/authn/** +spellcheck-code: + cspell lint -c cspell.yaml --quiet "**/*.{go,js,ts,jsx,tsx,md,yml,yaml,json}" diff --git a/clients/python/docs/OrchestratorApi.md b/clients/python/docs/OrchestratorApi.md index e8f4b2c85b..8303d5e504 100644 --- a/clients/python/docs/OrchestratorApi.md +++ b/clients/python/docs/OrchestratorApi.md @@ -35,7 +35,7 @@ from pprint import pprint api_instance = bacalhau_apiclient.OrchestratorApi() id = 'id_example' # str | ID to get the job for include = 'include_example' # str | Takes history and executions as options. If empty will not include anything else. (optional) -limit = 56 # int | Number of history or exeuctions to fetch. Should be used in conjugation with include (optional) +limit = 56 # int | Number of history or executions to fetch. Should be used in conjugation with include (optional) try: # Returns a job. @@ -47,11 +47,11 @@ except ApiException as e: ### Parameters -Name | Type | Description | Notes -------------- | ------------- | ------------- | ------------- - **id** | **str**| ID to get the job for | - **include** | **str**| Takes history and executions as options. If empty will not include anything else. | [optional] - **limit** | **int**| Number of history or exeuctions to fetch. Should be used in conjugation with include | [optional] +Name | Type | Description | Notes +------------- | ------------- |--------------------------------------------------------------------------------------| ------------- + **id** | **str**| ID to get the job for | + **include** | **str**| Takes history and executions as options. If empty will not include anything else. | [optional] + **limit** | **int**| Number of history or executions to fetch. Should be used in conjugation with include | [optional] ### Return type diff --git a/cmd/cli/agent/alive_test.go b/cmd/cli/agent/alive_test.go index b25b07be67..469c6944d1 100644 --- a/cmd/cli/agent/alive_test.go +++ b/cmd/cli/agent/alive_test.go @@ -30,7 +30,7 @@ func (s *AliveSuite) TestAliveJSONOutput() { aliveInfo := &apimodels.IsAliveResponse{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &aliveInfo) - s.Require().NoError(err, "Could not unmarshall the output into json - %+v", err) + s.Require().NoError(err, "Could not unmarshal the output into json - %+v", err) s.Require().True(aliveInfo.IsReady()) } @@ -40,6 +40,6 @@ func (s *AliveSuite) TestAliveYAMLOutput() { aliveInfo := &apimodels.IsAliveResponse{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &aliveInfo) - s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out) s.Require().True(aliveInfo.IsReady()) } diff --git a/cmd/cli/agent/node_test.go b/cmd/cli/agent/node_test.go index 8fe7096a2b..5121e9bb65 100644 --- a/cmd/cli/agent/node_test.go +++ b/cmd/cli/agent/node_test.go @@ -28,7 +28,7 @@ func (s *NodeSuite) TestNodeJSONOutput() { nodeInfo := &models.NodeState{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &nodeInfo) - s.Require().NoError(err, "Could not unmarshall the output into json - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into json - %+v", out) s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in json.") } @@ -38,6 +38,6 @@ func (s *NodeSuite) TestNodeYAMLOutput() { nodeInfo := &models.NodeState{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &nodeInfo) - s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out) s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in yaml.") } diff --git a/cmd/cli/agent/version_test.go b/cmd/cli/agent/version_test.go index e7fe5421ef..8083ebfb36 100644 --- a/cmd/cli/agent/version_test.go +++ b/cmd/cli/agent/version_test.go @@ -45,7 +45,7 @@ func (s *VersionSuite) TestVersionJSONOutput() { expectedVersion := version.Get() printedVersion := &models.BuildVersionInfo{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &printedVersion) - s.Require().NoError(err, "Could not unmarshall the output into json - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into json - %+v", out) s.Require().Equal(expectedVersion, printedVersion, "Versions do not match in json.") } @@ -58,6 +58,6 @@ func (s *VersionSuite) TestVersionYAMLOutput() { expectedVersion := version.Get() printedVersion := &models.BuildVersionInfo{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &printedVersion) - s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", out) + s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out) s.Require().Equal(expectedVersion, printedVersion, "Versions do not match in yaml.") } diff --git a/cmd/cli/docker/docker_run_cli_test.go b/cmd/cli/docker/docker_run_cli_test.go index 9f5f8d2539..42ecf4dd0d 100644 --- a/cmd/cli/docker/docker_run_cli_test.go +++ b/cmd/cli/docker/docker_run_cli_test.go @@ -1,5 +1,7 @@ //go:build unit || !integration +/* spell-checker: disable */ + package docker import ( @@ -431,7 +433,7 @@ func TestJobFlagParsing(t *testing.T) { }, expectedError: false, }, - // TODO(forrest): if/when validtion on the network config is adjusted expect this test to fail. + // TODO(forrest): if/when validation on the network config is adjusted expect this test to fail. { name: "with none network and domains", flags: []string{"--network=none", "--domain=example.com", "--domain=example.io", "image:tag"}, @@ -487,30 +489,30 @@ func TestJobFlagParsing(t *testing.T) { }, { name: "with s3 publisher", - flags: []string{"--publisher=s3://mybucket/mykey", "image:tag"}, + flags: []string{"--publisher=s3://myBucket/myKey", "image:tag"}, assertJob: func(t *testing.T, j *models.Job) { defaultJobAssertions(t, j) task := j.Task() s3publisher, err := publisher_s3.DecodePublisherSpec(task.Publisher) require.NoError(t, err) assert.Equal(t, publisher_s3.PublisherSpec{ - Bucket: "mybucket", - Key: "mykey", + Bucket: "myBucket", + Key: "myKey", }, s3publisher) }, expectedError: false, }, { name: "with s3 publisher with opts", - flags: []string{"-p=s3://mybucket/mykey,opt=region=us-west-2,opt=endpoint=https://s3.custom.com", "image:tag"}, + flags: []string{"-p=s3://myBucket/myKey,opt=region=us-west-2,opt=endpoint=https://s3.custom.com", "image:tag"}, assertJob: func(t *testing.T, j *models.Job) { defaultJobAssertions(t, j) task := j.Task() s3publisher, err := publisher_s3.DecodePublisherSpec(task.Publisher) require.NoError(t, err) assert.Equal(t, publisher_s3.PublisherSpec{ - Bucket: "mybucket", - Key: "mykey", + Bucket: "myBucket", + Key: "myKey", Region: "us-west-2", Endpoint: "https://s3.custom.com", }, s3publisher) @@ -519,15 +521,15 @@ func TestJobFlagParsing(t *testing.T) { }, { name: "with s3 publisher with options", - flags: []string{"-p=s3://mybucket/mykey,option=region=us-west-2,option=endpoint=https://s3.custom.com", "image:tag"}, + flags: []string{"-p=s3://myBucket/myKey,option=region=us-west-2,option=endpoint=https://s3.custom.com", "image:tag"}, assertJob: func(t *testing.T, j *models.Job) { defaultJobAssertions(t, j) task := j.Task() s3publisher, err := publisher_s3.DecodePublisherSpec(task.Publisher) require.NoError(t, err) assert.Equal(t, publisher_s3.PublisherSpec{ - Bucket: "mybucket", - Key: "mykey", + Bucket: "myBucket", + Key: "myKey", Region: "us-west-2", Endpoint: "https://s3.custom.com", }, s3publisher) diff --git a/cmd/cli/docker/docker_run_test.go b/cmd/cli/docker/docker_run_test.go index dd0e603f7a..13c3ece935 100644 --- a/cmd/cli/docker/docker_run_test.go +++ b/cmd/cli/docker/docker_run_test.go @@ -165,12 +165,12 @@ func (s *DockerRunSuite) TestRun_SubmitUrlInputs() { {inputURL: InputURL{url: "https://raw.githubusercontent.com/bacalhau-project/bacalhau/main/main.go", pathInContainer: "/inputs", filename: "main.go", flag: "-i"}}, } - for _, turls := range testURLs { + for _, urls := range testURLs { ctx := context.Background() flagsArray := []string{"docker", "run"} - flagsArray = append(flagsArray, turls.inputURL.flag, turls.inputURL.url) - flagsArray = append(flagsArray, "ubuntu", "cat", fmt.Sprintf("%s/%s", turls.inputURL.pathInContainer, turls.inputURL.filename)) + flagsArray = append(flagsArray, urls.inputURL.flag, urls.inputURL.url) + flagsArray = append(flagsArray, "ubuntu", "cat", fmt.Sprintf("%s/%s", urls.inputURL.pathInContainer, urls.inputURL.filename)) _, out, err := s.ExecuteTestCobraCommand(flagsArray...) s.Require().NoError(err, "Error submitting job") @@ -180,8 +180,8 @@ func (s *DockerRunSuite) TestRun_SubmitUrlInputs() { s.Require().Equal(1, len(j.Task().InputSources), "Number of job urls != # of test urls.") urlSpec, err := storage_url.DecodeSpec(j.Task().InputSources[0].Source) s.Require().NoError(err) - s.Require().Equal(turls.inputURL.url, urlSpec.URL, "Test URL not equal to URL from job.") - s.Require().Equal(turls.inputURL.pathInContainer, j.Task().InputSources[0].Target, "Test Path not equal to Path from job.") + s.Require().Equal(urls.inputURL.url, urlSpec.URL, "Test URL not equal to URL from job.") + s.Require().Equal(urls.inputURL.pathInContainer, j.Task().InputSources[0].Target, "Test Path not equal to Path from job.") } } @@ -252,8 +252,8 @@ func (s *DockerRunSuite) TestRun_SubmitWorkdir() { }{ {workdir: "", errorCode: 0}, {workdir: "/", errorCode: 0}, - {workdir: "./mydir", errorCode: 1}, - {workdir: "../mydir", errorCode: 1}, + {workdir: "./myDir", errorCode: 1}, + {workdir: "../myDir", errorCode: 1}, {workdir: "http://foo.com", errorCode: 1}, {workdir: "/foo//", errorCode: 0}, // double forward slash is allowed in unix {workdir: "/foo//bar", errorCode: 0}, diff --git a/cmd/cli/exec/exec.go b/cmd/cli/exec/exec.go index 7c6af0ea1a..3996edd794 100644 --- a/cmd/cli/exec/exec.go +++ b/cmd/cli/exec/exec.go @@ -228,7 +228,7 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti job.Task().Env = options.TaskSettings.EnvironmentVariables job.Task().InputSources = options.TaskSettings.InputSources.Values() if err != nil { - return nil, fmt.Errorf("parsing job labes: %w", err) + return nil, fmt.Errorf("parsing job labels: %w", err) } job.Constraints, err = options.JobSettings.Constraints() if err != nil { diff --git a/cmd/cli/exec/exec_test.go b/cmd/cli/exec/exec_test.go index 608d3f05a5..076811f186 100644 --- a/cmd/cli/exec/exec_test.go +++ b/cmd/cli/exec/exec_test.go @@ -37,7 +37,7 @@ var testcases []testCase = []testCase{ { // bacalhau exec ruby -e "puts 'hello'" name: "no ruby here", - cmdLine: []string{"ruby", "-e", "\"puts 'helllo'\""}, + cmdLine: []string{"ruby", "-e", "\"puts 'hello'\""}, expectedUnknownArgs: []string{}, expectedErrMsg: "the job type 'ruby' is not supported", }, diff --git a/cmd/cli/exec/templates.go b/cmd/cli/exec/templates.go index fb2f286acd..e65fee7898 100644 --- a/cmd/cli/exec/templates.go +++ b/cmd/cli/exec/templates.go @@ -24,8 +24,8 @@ type TemplateMap struct { m map[string]string } -func NewTemplateMap(fsys fs.ReadDirFS, tplPath string) (*TemplateMap, error) { - entries, err := fsys.ReadDir(tplPath) +func NewTemplateMap(fSys fs.ReadDirFS, tplPath string) (*TemplateMap, error) { + entries, err := fSys.ReadDir(tplPath) if err != nil { return nil, err } @@ -41,7 +41,7 @@ func NewTemplateMap(fsys fs.ReadDirFS, tplPath string) (*TemplateMap, error) { name := nameFromFile(entry.Name()) - fd, err := fsys.Open(path.Join(tplPath, entry.Name())) + fd, err := fSys.Open(path.Join(tplPath, entry.Name())) if err != nil { return nil, err } diff --git a/cmd/cli/helpers/helpers.go b/cmd/cli/helpers/helpers.go index 1f639a074c..487d2b3d9e 100644 --- a/cmd/cli/helpers/helpers.go +++ b/cmd/cli/helpers/helpers.go @@ -62,7 +62,7 @@ func BuildJobFromFlags( labels, err := jobSettings.Labels() if err != nil { - return nil, fmt.Errorf("receieved invalid job labels: %w", err) + return nil, fmt.Errorf("received invalid job labels: %w", err) } job := &models.Job{ Name: jobSettings.Name(), diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 8886b58ae4..e2bdb8a2f3 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -277,12 +277,12 @@ func parseServerAPIHost(host string) (string, error) { // We should check that the value gives us an address type // we can use to get our IP address. If it doesn't, we should // panic. - atype, ok := network.AddressTypeFromString(host) + addrType, ok := network.AddressTypeFromString(host) if !ok { return "", fmt.Errorf("invalid address type in Server API Host config: %s", host) } - addr, err := network.GetNetworkAddress(atype, network.AllAddresses) + addr, err := network.GetNetworkAddress(addrType, network.AllAddresses) if err != nil { return "", fmt.Errorf("failed to get network address for Server API Host: %s: %w", host, err) } diff --git a/cmd/cli/version/version.go b/cmd/cli/version/version.go index 9e53db306e..a2ce1e1dc0 100644 --- a/cmd/cli/version/version.go +++ b/cmd/cli/version/version.go @@ -128,10 +128,10 @@ func (oV *VersionOptions) Run( } else { // NB(forrest): since `GetAllVersions` is an API call - in the event the server is un-reachable // we timeout after 3 seconds to avoid waiting on an unavailable server to return its version information. - vctx, cancel := context.WithTimeout(ctx, time.Second*3) + vCtx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() var err error - versions, err = util.GetAllVersions(vctx, cfg, api, r) + versions, err = util.GetAllVersions(vCtx, cfg, api, r) if err != nil { // No error on fail of version check. Just print as much as we can. log.Ctx(ctx).Warn().Err(err).Msg("failed to get updated versions") diff --git a/cmd/cli/version/version_test.go b/cmd/cli/version/version_test.go index a3d127e593..afbb2078c7 100644 --- a/cmd/cli/version/version_test.go +++ b/cmd/cli/version/version_test.go @@ -18,11 +18,12 @@ package version_test import ( "testing" - "github.com/bacalhau-project/bacalhau/cmd/util" - "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/bacalhau-project/bacalhau/cmd/util" + "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" + cmdtesting "github.com/bacalhau-project/bacalhau/cmd/testing" "github.com/bacalhau-project/bacalhau/cmd/util/output" ) @@ -51,7 +52,7 @@ func (suite *VersionSuite) TestVersionJSONOutput() { jsonDoc := &util.Versions{} err = marshaller.JSONUnmarshalWithMax([]byte(out), &jsonDoc) - require.NoError(suite.T(), err, "Could not unmarshall the output into json - %+v", err) + require.NoError(suite.T(), err, "Could not unmarshal the output into json - %+v", err) require.Equal(suite.T(), jsonDoc.ClientVersion.GitCommit, jsonDoc.ServerVersion.GitCommit, "Client and Server do not match in json.") } @@ -61,7 +62,7 @@ func (suite *VersionSuite) TestVersionYAMLOutput() { yamlDoc := &util.Versions{} err = marshaller.YAMLUnmarshalWithMax([]byte(out), &yamlDoc) - require.NoError(suite.T(), err, "Could not unmarshall the output into yaml - %+v", err) + require.NoError(suite.T(), err, "Could not unmarshal the output into yaml - %+v", err) require.Equal(suite.T(), yamlDoc.ClientVersion.GitCommit, yamlDoc.ServerVersion.GitCommit, "Client and Server do not match in yaml.") } diff --git a/cmd/cli/wasm/wasm_run.go b/cmd/cli/wasm/wasm_run.go index f3d93ca422..e01e0697d0 100644 --- a/cmd/cli/wasm/wasm_run.go +++ b/cmd/cli/wasm/wasm_run.go @@ -190,7 +190,7 @@ func build(ctx context.Context, args []string, opts *WasmRunOptions) (*models.Jo if err != nil { return nil, err } - envar, err := parse.StringSliceToMap(opts.EnvironmentVariables) + envVar, err := parse.StringSliceToMap(opts.EnvironmentVariables) if err != nil { return nil, fmt.Errorf("wasm env vars invalid: %w", err) } @@ -198,7 +198,7 @@ func build(ctx context.Context, args []string, opts *WasmRunOptions) (*models.Jo WithParameters(args[1:]...). WithEntrypoint(opts.Entrypoint). WithImportModules(opts.ImportModules). - WithEnvironmentVariables(envar). + WithEnvironmentVariables(envVar). Build() if err != nil { return nil, err diff --git a/cmd/testing/base.go b/cmd/testing/base.go index 304e1a005d..655834547a 100644 --- a/cmd/testing/base.go +++ b/cmd/testing/base.go @@ -112,7 +112,7 @@ func (s *BaseSuite) ExecuteTestCobraCommandWithStdin(stdin io.Reader, args ...st buf := new(bytes.Buffer) root := cli.NewRootCmd() root.SetOut(buf) - // TODO(forrest): we should separate the ouputs from a command into different buffers for stderr and sdtout, otherwise + // TODO(forrest): we should separate the outputs from a command into different buffers for stderr and stdout, otherwise // log lines and other outputs (like the update checker) will be included in the returned buffer, and commands // that make assertions on the output containing specific values, or being marshaller-able to yaml will fail. root.SetErr(buf) diff --git a/cmd/util/flags/cliflags/job.go b/cmd/util/flags/cliflags/job.go index f9f4a55b23..b9e89aed8a 100644 --- a/cmd/util/flags/cliflags/job.go +++ b/cmd/util/flags/cliflags/job.go @@ -71,7 +71,7 @@ func (j *JobSettings) Constraints() ([]*models.LabelSelectorRequirement, error) } // TODO(forrest): based on a conversation with walid we should be returning an error here if at anypoint if a label -// if provided that is invalid. We cannont remove them as we did previously. +// if provided that is invalid. We cannot remove them as we did previously. func (j *JobSettings) Labels() (map[string]string, error) { parsedLabels := make(map[string]string) rawLabels := j.labels diff --git a/cmd/util/flags/configflags/register.go b/cmd/util/flags/configflags/register.go index b46a8d1fb8..550b2baefa 100644 --- a/cmd/util/flags/configflags/register.go +++ b/cmd/util/flags/configflags/register.go @@ -46,7 +46,7 @@ func BindFlags(v *viper.Viper, register map[string][]Definition) error { for _, def := range defs { // sanity check to ensure we are not binding a config key on more than one flag. if dup, ok := seen[def.ConfigPath]; ok && !def.Deprecated { - return fmt.Errorf("DEVELOPER ERROR: duplicate regsistration of config key %s for flag %s"+ + return fmt.Errorf("DEVELOPER ERROR: duplicate registration of config key %s for flag %s"+ " previously registered on on flag %s", def.ConfigPath, def.FlagName, dup.FlagName) } if !def.Deprecated { @@ -79,43 +79,43 @@ func PreRun(v *viper.Viper, flags map[string][]Definition) func(*cobra.Command, // This method should be called before the command runs to register flags accordingly. func RegisterFlags(cmd *cobra.Command, register map[string][]Definition) error { for name, defs := range register { - fset := pflag.NewFlagSet(name, pflag.ContinueOnError) + flagSet := pflag.NewFlagSet(name, pflag.ContinueOnError) // Determine the type of the default value for _, def := range defs { switch v := def.DefaultValue.(type) { case int: - fset.Int(def.FlagName, v, def.Description) + flagSet.Int(def.FlagName, v, def.Description) case uint64: - fset.Uint64(def.FlagName, v, def.Description) + flagSet.Uint64(def.FlagName, v, def.Description) case bool: - fset.Bool(def.FlagName, v, def.Description) + flagSet.Bool(def.FlagName, v, def.Description) case string: - fset.String(def.FlagName, v, def.Description) + flagSet.String(def.FlagName, v, def.Description) case []string: - fset.StringSlice(def.FlagName, v, def.Description) + flagSet.StringSlice(def.FlagName, v, def.Description) case map[string]string: - fset.StringToString(def.FlagName, v, def.Description) + flagSet.StringToString(def.FlagName, v, def.Description) case models.JobSelectionDataLocality: - fset.Var(flags.DataLocalityFlag(&v), def.FlagName, def.Description) + flagSet.Var(flags.DataLocalityFlag(&v), def.FlagName, def.Description) case logger.LogMode: - fset.Var(flags.LoggingFlag(&v), def.FlagName, def.Description) + flagSet.Var(flags.LoggingFlag(&v), def.FlagName, def.Description) case time.Duration: - fset.DurationVar(&v, def.FlagName, v, def.Description) + flagSet.DurationVar(&v, def.FlagName, v, def.Description) case types.Duration: - fset.DurationVar((*time.Duration)(&v), def.FlagName, time.Duration(v), def.Description) + flagSet.DurationVar((*time.Duration)(&v), def.FlagName, time.Duration(v), def.Description) case types.ResourceType: - fset.String(def.FlagName, string(v), def.Description) + flagSet.String(def.FlagName, string(v), def.Description) default: return fmt.Errorf("unhandled type: %T for flag %s", v, def.FlagName) } if def.Deprecated { - flag := fset.Lookup(def.FlagName) + flag := flagSet.Lookup(def.FlagName) flag.Deprecated = def.DeprecatedMessage flag.Hidden = true } } - cmd.PersistentFlags().AddFlagSet(fset) + cmd.PersistentFlags().AddFlagSet(flagSet) } return nil } diff --git a/cmd/util/opts/storage_specconfig.go b/cmd/util/opts/storage_specconfig.go index bb6715b1f0..8c4ee7c489 100644 --- a/cmd/util/opts/storage_specconfig.go +++ b/cmd/util/opts/storage_specconfig.go @@ -73,7 +73,7 @@ func (o *StorageSpecConfigOpt) Set(value string) error { options[k] = v } default: - return fmt.Errorf("unpexted key %s in field %s", key, field) + return fmt.Errorf("unexpected key %s in field %s", key, field) } } alias := sourceURI diff --git a/cmd/util/tokens.go b/cmd/util/tokens.go index 545ed244fc..3c46c0161b 100644 --- a/cmd/util/tokens.go +++ b/cmd/util/tokens.go @@ -46,7 +46,7 @@ func writeTokens(path string, t tokens) error { return json.NewEncoder(file).Encode(t) } -// Read the authorization crdential associated with the passed API base URL. If +// Read the authorization credentials associated with the passed API base URL. If // there is no credential currently stored, ReadToken will return nil with no // error. func ReadToken(path string, apiURL string) (*apimodels.HTTPCredential, error) { diff --git a/cspell.json b/cspell.json deleted file mode 100644 index acd34f1354..0000000000 --- a/cspell.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "version": "0.2", - "ignorePaths": [], - "dictionaryDefinitions": [], - "dictionaries": [], - "ignoreWords": [], - "import": [".cspell-code.json"] -} diff --git a/cspell.yaml b/cspell.yaml new file mode 100644 index 0000000000..274eb310ff --- /dev/null +++ b/cspell.yaml @@ -0,0 +1,62 @@ +version: '0.2' +language: en +allowCompoundWords: true + +# Dictionary configurations +dictionaryDefinitions: + - name: custom-dictionary + path: ./.cspell/custom-dictionary.txt + addWords: true + +dictionaries: + # General dictionaries + - en + - en-gb + # Programming language-specific dictionaries + - python + - golang + - typescript + - node + - html + - css + - cpp + # Technology-specific dictionaries + - k8s + - terraform + # Custom dictionaries + - custom-words + - custom-dictionary + +# Paths to ignore +ignorePaths: + - python/mkdocs.yml + - webui/build + - webui/node_modules + - webui/lib/api/generated/** + +# Patterns to ignore +ignoreRegExpList: + # Internet and email + - Urls + - Email + # Cryptography and security + - RsaCert + - SshRsa + - SHA + # Encoding + - Base64 + - Base64MultiLine + - Base64SingleLine + - HexDigits + # Programming-related + - CommitHash + - CommitHashLink + - CStyleHexValue + - CSSHexValue + - EscapedUnicodeCharacters + - EscapeCharacters + - HashStrings + - UnicodeRef + - UUID + # Custom patterns + - /github.com.*/ diff --git a/ops/metrics/grafana/provisioning/dashboards/dashboard.json b/ops/metrics/grafana/provisioning/dashboards/dashboard.json index ecee403d32..9e05872bdf 100644 --- a/ops/metrics/grafana/provisioning/dashboards/dashboard.json +++ b/ops/metrics/grafana/provisioning/dashboards/dashboard.json @@ -182,7 +182,7 @@ "useBackend": false } ], - "title": "Jobs Receieved", + "title": "Jobs Received", "type": "stat" }, { @@ -656,7 +656,7 @@ "refId": "A" } ], - "title": "Averagef HTTP Requests Duration over 5min", + "title": "Average HTTP Requests Duration over 5min", "type": "timeseries" }, { @@ -930,7 +930,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Cancelable", + "title": "Evaluation Broker Cancelable", "type": "stat" }, { @@ -1004,7 +1004,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Inflight", + "title": "Evaluation Broker Inflight", "type": "stat" }, { @@ -1078,7 +1078,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Pending", + "title": "Evaluation Broker Pending", "type": "stat" }, { @@ -1152,7 +1152,7 @@ "useBackend": false } ], - "title": "Evaluatio Broker Waiting", + "title": "Evaluation Broker Waiting", "type": "stat" }, { diff --git a/pkg/authz/policies/policy_ns_anon.rego b/pkg/authz/policies/policy_ns_anon.rego index b243fd2dd9..c56ab79d13 100644 --- a/pkg/authz/policies/policy_ns_anon.rego +++ b/pkg/authz/policies/policy_ns_anon.rego @@ -30,7 +30,7 @@ allow if { namespace_readable(job_namespace_perms) } -# Allow reading all other endpoints, inclduing by users who don't have a token +# Allow reading all other endpoints, including by users who don't have a token allow if { input.http.path != job_endpoint not is_legacy_api @@ -51,7 +51,7 @@ allow if { not input.http.path[3] in ["submit", "cancel"] } -# Allow posting to auth endpoints, neccessary to get a token in the first place +# Allow posting to auth endpoints, necessary to get a token in the first place allow if { input.http.path[2] == "auth" } diff --git a/pkg/compute/executor.go b/pkg/compute/executor.go index 3908042204..f27a5fac7e 100644 --- a/pkg/compute/executor.go +++ b/pkg/compute/executor.go @@ -65,31 +65,31 @@ func NewBaseExecutor(params BaseExecutorParams) *BaseExecutor { func prepareInputVolumes( ctx context.Context, - strgprovider storage.StorageProvider, + storageProvider storage.StorageProvider, storageDirectory string, inputSources ...*models.InputSource) ( []storage.PreparedStorage, func(context.Context) error, error, ) { - inputVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, inputSources...) + inputVolumes, err := storage.ParallelPrepareStorage(ctx, storageProvider, storageDirectory, inputSources...) if err != nil { return nil, nil, err } return inputVolumes, func(ctx context.Context) error { - return storage.ParallelCleanStorage(ctx, strgprovider, inputVolumes) + return storage.ParallelCleanStorage(ctx, storageProvider, inputVolumes) }, nil } func prepareWasmVolumes( ctx context.Context, - strgprovider storage.StorageProvider, + storageProvider storage.StorageProvider, storageDirectory string, wasmEngine wasmmodels.EngineSpec) ( map[string][]storage.PreparedStorage, func(context.Context) error, error, ) { - importModuleVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, wasmEngine.ImportModules...) + importModuleVolumes, err := storage.ParallelPrepareStorage(ctx, storageProvider, storageDirectory, wasmEngine.ImportModules...) if err != nil { return nil, nil, err } - entryModuleVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, wasmEngine.EntryModule) + entryModuleVolumes, err := storage.ParallelPrepareStorage(ctx, storageProvider, storageDirectory, wasmEngine.EntryModule) if err != nil { return nil, nil, err } @@ -100,8 +100,8 @@ func prepareWasmVolumes( } cleanup := func(ctx context.Context) error { - err1 := storage.ParallelCleanStorage(ctx, strgprovider, importModuleVolumes) - err2 := storage.ParallelCleanStorage(ctx, strgprovider, entryModuleVolumes) + err1 := storage.ParallelCleanStorage(ctx, storageProvider, importModuleVolumes) + err2 := storage.ParallelCleanStorage(ctx, storageProvider, entryModuleVolumes) if err1 != nil || err2 != nil { return fmt.Errorf("Error cleaning up WASM volumes: %v, %v", err1, err2) } @@ -119,21 +119,21 @@ func prepareWasmVolumes( // // For example, an InputCleanupFn might be responsible for deallocating storage used // for input volumes, or deleting temporary input files that were created as part of the -// job's execution. The nature of it operation depends on the storage provided by `strgprovider` and +// job's execution. The nature of it operation depends on the storage provided by `storageProvider` and // input sources of the jobs associated tasks. For the case of a wasm job its input and entry module storage volumes // should be removed via the method after the jobs execution reaches a terminal state. type InputCleanupFn = func(context.Context) error func PrepareRunArguments( ctx context.Context, - strgprovider storage.StorageProvider, + storageProvider storage.StorageProvider, storageDirectory string, execution *models.Execution, resultsDir string, ) (*executor.RunCommandRequest, InputCleanupFn, error) { var cleanupFuncs []func(context.Context) error - inputVolumes, inputCleanup, err := prepareInputVolumes(ctx, strgprovider, storageDirectory, execution.Job.Task().InputSources...) + inputVolumes, inputCleanup, err := prepareInputVolumes(ctx, storageProvider, storageDirectory, execution.Job.Task().InputSources...) if err != nil { return nil, nil, err } @@ -162,7 +162,7 @@ func PrepareRunArguments( return nil, nil, err } - volumes, wasmCleanup, err := prepareWasmVolumes(ctx, strgprovider, storageDirectory, wasmEngine) + volumes, wasmCleanup, err := prepareWasmVolumes(ctx, storageProvider, storageDirectory, wasmEngine) if err != nil { return nil, nil, err } @@ -259,7 +259,7 @@ func (e *BaseExecutor) Start(ctx context.Context, execution *models.Execution) * log.Ctx(ctx).Debug().Msg("starting execution") if e.failureInjection.IsBadActor { - result.Err = fmt.Errorf("i am a baaad node. i failed execution %s", execution.ID) + result.Err = fmt.Errorf("i am a bad node. i failed execution %s", execution.ID) return result } diff --git a/pkg/compute/executor_buffer.go b/pkg/compute/executor_buffer.go index cc3fb6e12b..764149dfe6 100644 --- a/pkg/compute/executor_buffer.go +++ b/pkg/compute/executor_buffer.go @@ -154,7 +154,7 @@ func (s *ExecutorBuffer) deque() { // There are at most max matches, so try at most that many times max := s.queuedTasks.Len() for i := 0; i < max; i++ { - qitem := s.queuedTasks.DequeueWhere(func(task *bufferTask) bool { + qItem := s.queuedTasks.DequeueWhere(func(task *bufferTask) bool { // If we don't have enough resources to run this task, then we will skip it queuedResources := task.localExecutionState.Execution.TotalAllocatedResources() allocatedResources := s.runningCapacity.AddIfHasCapacity(ctx, *queuedResources) @@ -174,13 +174,13 @@ func (s *ExecutorBuffer) deque() { return true }) - if qitem == nil { + if qItem == nil { // We didn't find anything in the queue that matches our resource availability so we will // break out of this look as there is nothing else to find break } - task := qitem.Value + task := qItem.Value // Move the execution to the running list and remove from the list of enqueued IDs // before we actually run the task diff --git a/pkg/compute/store/test/store_suite.go b/pkg/compute/store/test/store_suite.go index 80c3147323..87808bde0d 100644 --- a/pkg/compute/store/test/store_suite.go +++ b/pkg/compute/store/test/store_suite.go @@ -18,7 +18,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/test/mock" ) -type StoreCreator func(ctx context.Context, dbpath string) (store.ExecutionStore, error) +type StoreCreator func(ctx context.Context, dbPath string) (store.ExecutionStore, error) type StoreSuite struct { suite.Suite diff --git a/pkg/config/types/bacalhau_test.go b/pkg/config/types/bacalhau_test.go index f203374a60..5eebad8821 100644 --- a/pkg/config/types/bacalhau_test.go +++ b/pkg/config/types/bacalhau_test.go @@ -120,9 +120,9 @@ func TestBacalhauMergeNew(t *testing.T) { NameProvider: "test", } other := Bacalhau{} - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, base, MergeNewd) + assert.Equal(t, base, mergedNew) }) t.Run("MergeNew overwrites existing fields", func(t *testing.T) { @@ -139,12 +139,12 @@ func TestBacalhauMergeNew(t *testing.T) { }, StrictVersionMatch: true, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, "otherhost", MergeNewd.API.Host) - assert.Equal(t, 8080, MergeNewd.API.Port) - assert.Equal(t, "test", MergeNewd.NameProvider) - assert.True(t, MergeNewd.StrictVersionMatch) + assert.Equal(t, "otherhost", mergedNew.API.Host) + assert.Equal(t, 8080, mergedNew.API.Port) + assert.Equal(t, "test", mergedNew.NameProvider) + assert.True(t, mergedNew.StrictVersionMatch) }) t.Run("MergeNew with nested structs", func(t *testing.T) { @@ -165,11 +165,11 @@ func TestBacalhauMergeNew(t *testing.T) { }, }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.True(t, MergeNewd.Orchestrator.Enabled) - assert.Equal(t, "base.local", MergeNewd.Orchestrator.Host) - assert.Equal(t, Duration(10), MergeNewd.Orchestrator.NodeManager.DisconnectTimeout) + assert.True(t, mergedNew.Orchestrator.Enabled) + assert.Equal(t, "base.local", mergedNew.Orchestrator.Host) + assert.Equal(t, Duration(10), mergedNew.Orchestrator.NodeManager.DisconnectTimeout) }) t.Run("MergeNew with slices", func(t *testing.T) { @@ -183,9 +183,9 @@ func TestBacalhauMergeNew(t *testing.T) { Orchestrators: []string{"nats://127.0.0.1:4223", "nats://127.0.0.1:4224"}, }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, []string{"nats://127.0.0.1:4223", "nats://127.0.0.1:4224"}, MergeNewd.Compute.Orchestrators) + assert.Equal(t, []string{"nats://127.0.0.1:4223", "nats://127.0.0.1:4224"}, mergedNew.Compute.Orchestrators) }) t.Run("MergeNew doesn't affect original configs", func(t *testing.T) { @@ -200,11 +200,11 @@ func TestBacalhauMergeNew(t *testing.T) { Host: "otherhost", }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.NotEqual(t, base, MergeNewd) - assert.NotEqual(t, other, MergeNewd) + assert.NotEqual(t, base, mergedNew) + assert.NotEqual(t, other, mergedNew) assert.Equal(t, "localhost", base.API.Host) assert.Equal(t, "otherhost", other.API.Host) }) @@ -235,11 +235,11 @@ func TestBacalhauMergeNew(t *testing.T) { }, }, } - MergeNewd, err := base.MergeNew(other) + mergedNew, err := base.MergeNew(other) require.NoError(t, err) - assert.Equal(t, 1, MergeNewd.JobDefaults.Batch.Priority) - assert.Equal(t, "1000m", MergeNewd.JobDefaults.Batch.Task.Resources.CPU) - assert.Equal(t, "1Gb", MergeNewd.JobDefaults.Batch.Task.Resources.Memory) + assert.Equal(t, 1, mergedNew.JobDefaults.Batch.Priority) + assert.Equal(t, "1000m", mergedNew.JobDefaults.Batch.Task.Resources.CPU) + assert.Equal(t, "1Gb", mergedNew.JobDefaults.Batch.Task.Resources.Memory) }) } diff --git a/pkg/config/types/gen/generate.go b/pkg/config/types/gen/generate.go index ef00c27352..af644e9f22 100644 --- a/pkg/config/types/gen/generate.go +++ b/pkg/config/types/gen/generate.go @@ -113,8 +113,8 @@ func WriteConstants(fieldInfos map[string]FieldInfo, w io.Writer) error { func ConfigFieldMap(dir string) map[string]FieldInfo { // Parse the package directory - fset := token.NewFileSet() - pkgs, err := parser.ParseDir(fset, dir, nil, parser.ParseComments) + fileSet := token.NewFileSet() + pkgs, err := parser.ParseDir(fileSet, dir, nil, parser.ParseComments) if err != nil { log.Fatal(err) } diff --git a/pkg/devstack/devstack.go b/pkg/devstack/devstack.go index fed9d32e7c..19bb791ff9 100644 --- a/pkg/devstack/devstack.go +++ b/pkg/devstack/devstack.go @@ -126,11 +126,11 @@ func Setup( if isComputeNode { // We have multiple process on the same machine, all wanting to listen on a HTTP port // and so we will give each compute node a random open port to listen on. - fport, err := network.GetFreePort() + freePort, err := network.GetFreePort() if err != nil { return nil, errors.Wrap(err, "failed to get free port for local publisher") } - cfg.Publishers.Types.Local.Port = fport + cfg.Publishers.Types.Local.Port = freePort } cfg.Orchestrator.Enabled = isRequesterNode diff --git a/pkg/executor/docker/executor.go b/pkg/executor/docker/executor.go index 4ead9dea53..adb77ac97d 100644 --- a/pkg/executor/docker/executor.go +++ b/pkg/executor/docker/executor.go @@ -222,7 +222,7 @@ func (e *Executor) doWait(ctx context.Context, out chan *models.RunCommandResult out <- handle.result } else { // NB(forrest): this shouldn't happen with the wasm and docker executors, but handling it as it - // represents a significant error in executor logic, which may occur in future pluggable executor impls. + // represents a significant error in executor logic, which may occur in future pluggable executor impl. errCh <- fmt.Errorf("execution (%s) result is nil", handle.executionID) } } @@ -473,8 +473,8 @@ func makeContainerMounts( return nil, fmt.Errorf("output volume has no Location: %+v", output) } - srcd := filepath.Join(resultsDir, output.Name) - if err := os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W); err != nil { + srcDir := filepath.Join(resultsDir, output.Name) + if err := os.Mkdir(srcDir, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W); err != nil { return nil, fmt.Errorf("failed to create results dir for execution: %w", err) } @@ -486,7 +486,7 @@ func makeContainerMounts( // this is an output volume so can be written to ReadOnly: false, // we create a named folder in the job results folder for this output - Source: srcd, + Source: srcDir, // the path of the output volume is from the perspective of inside the container Target: output.Path, }) diff --git a/pkg/executor/docker/executor_test.go b/pkg/executor/docker/executor_test.go index 085ddb239a..f1c8a3812d 100644 --- a/pkg/executor/docker/executor_test.go +++ b/pkg/executor/docker/executor_test.go @@ -436,7 +436,7 @@ func (s *ExecutorTestSuite) TestDockerExecutionCancellation() { // This is important to do. In our docker executor, we set active to true, before calling the docker client with ContainerStart // Hence there is a bit of time before the container actually gets started. The correct way of identifying that whether - // a contianer has started or not is via activeCh. We want to make sure that contianer is started before canceling the execution. + // a container has started or not is via activeCh. We want to make sure that container is started before canceling the execution. handler, _ := s.executor.handlers.Get(executionID) <-handler.activeCh diff --git a/pkg/executor/docker/models/types_test.go b/pkg/executor/docker/models/types_test.go index 4c0cc0f4a1..ff179f18fa 100644 --- a/pkg/executor/docker/models/types_test.go +++ b/pkg/executor/docker/models/types_test.go @@ -21,14 +21,14 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec all fields", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithWorkingDirectory("/app"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, WorkingDirectory: "/app", @@ -38,13 +38,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no entry point", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithWorkingDirectory("/app"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, WorkingDirectory: "/app", Parameters: []string{"arg1", "arg2"}, @@ -53,13 +53,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no env var", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithWorkingDirectory("/app"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, WorkingDirectory: "/app", Parameters: []string{"arg1", "arg2"}, @@ -68,13 +68,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no params", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithWorkingDirectory("/app") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, WorkingDirectory: "/app", @@ -83,13 +83,13 @@ func TestDockerEngineBuilder_RoundTrip(t *testing.T) { { name: "valid spec no working dir", builder: func() *DockerEngineBuilder { - return NewDockerEngineBuilder("myimage"). + return NewDockerEngineBuilder("myImage"). WithEntrypoint("bash", "-c"). WithEnvironmentVariables("KEY1=VALUE1", "KEY2=VALUE2"). WithParameters("arg1", "arg2") }, expectedSpec: EngineSpec{ - Image: "myimage", + Image: "myImage", Entrypoint: []string{"bash", "-c"}, EnvironmentVariables: []string{"KEY1=VALUE1", "KEY2=VALUE2"}, Parameters: []string{"arg1", "arg2"}, diff --git a/pkg/executor/docker/network.go b/pkg/executor/docker/network.go index 73c22df42a..21495c7c3b 100644 --- a/pkg/executor/docker/network.go +++ b/pkg/executor/docker/network.go @@ -135,10 +135,10 @@ func (e *Executor) createHTTPGateway( } // Create the gateway container initially attached to the *host* network - domainList, derr := json.Marshal(networkConfig.DomainSet()) - clientList, cerr := json.Marshal([]string{subnet}) - if derr != nil || cerr != nil { - return nil, nil, pkgerrors.Wrap(errors.Join(derr, cerr), "error preparing gateway config") + domainList, dErr := json.Marshal(networkConfig.DomainSet()) + clientList, cErr := json.Marshal([]string{subnet}) + if dErr != nil || cErr != nil { + return nil, nil, pkgerrors.Wrap(errors.Join(dErr, cErr), "error preparing gateway config") } gatewayContainer, err := e.client.ContainerCreate(ctx, &container.Config{ diff --git a/pkg/executor/wasm/executor.go b/pkg/executor/wasm/executor.go index d53c9ad8d6..026069e709 100644 --- a/pkg/executor/wasm/executor.go +++ b/pkg/executor/wasm/executor.go @@ -273,18 +273,18 @@ func (e *Executor) makeFsFromStorage( return nil, fmt.Errorf("output volume has no path: %+v", output) } - srcd := filepath.Join(jobResultsDir, output.Name) + srcDir := filepath.Join(jobResultsDir, output.Name) log.Ctx(ctx).Debug(). Str("output", output.Name). - Str("dir", srcd). + Str("dir", srcDir). Msg("Collecting output") - err = os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W) + err = os.Mkdir(srcDir, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W) if err != nil { return nil, err } - err = rootFs.Mount(output.Name, touchfs.New(srcd)) + err = rootFs.Mount(output.Name, touchfs.New(srcDir)) if err != nil { return nil, err } diff --git a/pkg/executor/wasm/loader.go b/pkg/executor/wasm/loader.go index 9f9e205855..e8f7e3dbfc 100644 --- a/pkg/executor/wasm/loader.go +++ b/pkg/executor/wasm/loader.go @@ -25,7 +25,7 @@ import ( // the WebAssembly program, allowing the user to deploy self-contained // WebAssembly blobs. See the introductory talk at https://youtu.be/6zJkMLzXbQc. // -// This works by using the "module name" field of a WebAssmelby import header, +// This works by using the "module name" field of a WebAssembly import header, // (which for user-supplied modules is arbitrary) as a hint to the loader as to // where the dependency lives and how to retrieve it. The module still needs to // be specified as input data for the job (a previous implementation of the @@ -102,7 +102,7 @@ func (loader *ModuleLoader) loadModule(ctx context.Context, m storage.PreparedSt // InstantiateRemoteModule loads and instantiates the remote module and all of // its dependencies. It only looks in the job's input storage specs for modules. // -// This function calls itself reucrsively for any discovered dependencies on the +// This function calls itself recursively for any discovered dependencies on the // loaded modules, so that the returned module has all of its dependencies fully // instantiated and is ready to use. func (loader *ModuleLoader) InstantiateRemoteModule(ctx context.Context, m storage.PreparedStorage) (api.Module, error) { diff --git a/pkg/jobstore/boltdb/store_test.go b/pkg/jobstore/boltdb/store_test.go index b5e8f7cc11..671c6a1678 100644 --- a/pkg/jobstore/boltdb/store_test.go +++ b/pkg/jobstore/boltdb/store_test.go @@ -851,7 +851,7 @@ func (s *BoltJobstoreTestSuite) TestGetExecutions() { s.Equal(2, len(state)) s.Equal(state[0].GetModifyTime().Before(state[1].GetModifyTime()), true) - // When OrderBy is set to Modified At With Reverese + // When OrderBy is set to Modified At With Reverse state, err = s.store.GetExecutions(s.ctx, jobstore.GetExecutionsOptions{ JobID: "160", OrderBy: "modified_at", diff --git a/pkg/lib/collections/hashed_priority_queue_test.go b/pkg/lib/collections/hashed_priority_queue_test.go index f5838fbe99..967301fc6b 100644 --- a/pkg/lib/collections/hashed_priority_queue_test.go +++ b/pkg/lib/collections/hashed_priority_queue_test.go @@ -136,10 +136,10 @@ func (s *HashedPriorityQueueSuite) TestDuplicateKeys() { } for _, exp := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(exp.v, qitem.Value) - s.Require().Equal(exp.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(exp.v, qItem.Value) + s.Require().Equal(exp.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) diff --git a/pkg/lib/collections/priority_queue.go b/pkg/lib/collections/priority_queue.go index 8448111d3e..fda6953f23 100644 --- a/pkg/lib/collections/priority_queue.go +++ b/pkg/lib/collections/priority_queue.go @@ -158,19 +158,19 @@ func (pq *PriorityQueue[T]) DequeueWhere(matcher MatchingFunction[T]) *QueueItem // If any iteration does not generate a match, the item is requeued in a temporary // queue reading for requeueing on this queue later on. for pq.internalQueue.Len() > 0 { - qitem := pq.dequeue() + qItem := pq.dequeue() - if qitem == nil { + if qItem == nil { return nil } - if matcher(qitem.Value) { - result = qitem + if matcher(qItem.Value) { + result = qItem break } // Add to the queue - unmatched = append(unmatched, qitem) + unmatched = append(unmatched, qItem) } // Re-add the items that were not matched back onto the Q diff --git a/pkg/lib/collections/priority_queue_base_test.go b/pkg/lib/collections/priority_queue_base_test.go index 0a5db6f5cd..e8de45f323 100644 --- a/pkg/lib/collections/priority_queue_base_test.go +++ b/pkg/lib/collections/priority_queue_base_test.go @@ -36,10 +36,10 @@ func (s *PriorityQueueTestSuite) TestSimple() { } for _, tc := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(tc.v, qItem.Value) + s.Require().Equal(tc.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) @@ -63,10 +63,10 @@ func (s *PriorityQueueTestSuite) TestSimpleMin() { } for _, tc := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(tc.v, qItem.Value) + s.Require().Equal(tc.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) @@ -74,8 +74,8 @@ func (s *PriorityQueueTestSuite) TestSimpleMin() { func (s *PriorityQueueTestSuite) TestEmpty() { pq := s.NewQueue() - qitem := pq.Dequeue() - s.Require().Nil(qitem) + qItem := pq.Dequeue() + s.Require().Nil(qItem) s.Require().True(pq.IsEmpty()) } @@ -91,13 +91,13 @@ func (s *PriorityQueueTestSuite) TestDequeueWhere() { count := pq.Len() - qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + qItem := pq.DequeueWhere(func(possibleMatch TestData) bool { return possibleMatch.id == "B" }) - s.Require().NotNil(qitem) - s.Require().Equal(TestData{"B", 2}, qitem.Value) - s.Require().Equal(int64(3), qitem.Priority) + s.Require().NotNil(qItem) + s.Require().Equal(TestData{"B", 2}, qItem.Value) + s.Require().Equal(int64(3), qItem.Priority) s.Require().Equal(count-1, pq.Len()) } @@ -105,11 +105,11 @@ func (s *PriorityQueueTestSuite) TestDequeueWhereFail() { pq := s.NewQueue() pq.Enqueue(TestData{"A", 1}, 4) - qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + qItem := pq.DequeueWhere(func(possibleMatch TestData) bool { return possibleMatch.id == "Z" }) - s.Require().Nil(qitem) + s.Require().Nil(qItem) } func (s *PriorityQueueTestSuite) TestPeek() { diff --git a/pkg/lib/collections/priority_queue_test.go b/pkg/lib/collections/priority_queue_test.go index ec3adeb23d..0c7bacc917 100644 --- a/pkg/lib/collections/priority_queue_test.go +++ b/pkg/lib/collections/priority_queue_test.go @@ -53,10 +53,10 @@ func (s *PriorityQueueSuite) TestDuplicateKeys() { } for _, exp := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(exp.v, qitem.Value) - s.Require().Equal(exp.p, qitem.Priority) + qItem := pq.Dequeue() + s.Require().NotNil(qItem) + s.Require().Equal(exp.v, qItem.Value) + s.Require().Equal(exp.p, qItem.Priority) } s.Require().True(pq.IsEmpty()) diff --git a/pkg/lib/crypto/certificate.go b/pkg/lib/crypto/certificate.go index eaedc61769..b326ae9735 100644 --- a/pkg/lib/crypto/certificate.go +++ b/pkg/lib/crypto/certificate.go @@ -75,7 +75,7 @@ func NewSignedCertificate(parent Certificate, ipAddress []net.IP) (Certificate, return Certificate{cert: cert, parent: &parent, key: certPrivKey}, nil } -func (cert *Certificate) MarshalCertficate(out io.Writer) error { +func (cert *Certificate) MarshalCertificate(out io.Writer) error { var parent *x509.Certificate var signingKey *rsa.PrivateKey diff --git a/pkg/lib/crypto/certificate_test.go b/pkg/lib/crypto/certificate_test.go index 18de762b50..22a2590c30 100644 --- a/pkg/lib/crypto/certificate_test.go +++ b/pkg/lib/crypto/certificate_test.go @@ -28,7 +28,7 @@ func TestProducesValidCertificate(t *testing.T) { cert := getTestSelfSignedCert(t) var buf bytes.Buffer - err := cert.MarshalCertficate(&buf) + err := cert.MarshalCertificate(&buf) require.NoError(t, err) block, rest := pem.Decode(buf.Bytes()) @@ -49,7 +49,7 @@ func TestProducesSignedCertificate(t *testing.T) { require.NotNil(t, cert) var buf bytes.Buffer - err = cert.MarshalCertficate(&buf) + err = cert.MarshalCertificate(&buf) require.NoError(t, err) block, rest := pem.Decode(buf.Bytes()) @@ -61,7 +61,7 @@ func TestProducesSignedCertificate(t *testing.T) { require.NotNil(t, parsed) buf.Reset() - err = parent.MarshalCertficate(&buf) + err = parent.MarshalCertificate(&buf) require.NoError(t, err) pool := x509.NewCertPool() diff --git a/pkg/lib/policy/scrypt.go b/pkg/lib/policy/scrypt.go index 15e7e41da7..f69f5ca52f 100644 --- a/pkg/lib/policy/scrypt.go +++ b/pkg/lib/policy/scrypt.go @@ -30,7 +30,7 @@ var scryptFn = rego.Function2( Memoize: true, Nondeterministic: false, }, - func(bctx rego.BuiltinContext, passwordTerm, saltTerm *ast.Term) (*ast.Term, error) { + func(bCtx rego.BuiltinContext, passwordTerm, saltTerm *ast.Term) (*ast.Term, error) { var password, salt string if err := ast.As(passwordTerm.Value, &password); err != nil { return nil, err diff --git a/pkg/logger/wasm/logmanager.go b/pkg/logger/wasm/logmanager.go index 8184afaff9..b2c3735409 100644 --- a/pkg/logger/wasm/logmanager.go +++ b/pkg/logger/wasm/logmanager.go @@ -10,10 +10,11 @@ import ( "sync" "time" + "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/util" "github.com/bacalhau-project/bacalhau/pkg/util/generic" - "github.com/rs/zerolog/log" ) const ( @@ -161,11 +162,11 @@ func (lm *LogManager) Drain() { } func (lm *LogManager) GetWriters() (io.WriteCloser, io.WriteCloser) { - writerFunc := func(strm LogStreamType) func([]byte) *LogMessage { + writerFunc := func(stream LogStreamType) func([]byte) *LogMessage { return func(b []byte) *LogMessage { m := LogMessage{ Timestamp: time.Now().Unix(), - Stream: strm, + Stream: stream, } m.Data = append([]byte(nil), b...) return &m diff --git a/pkg/models/event_test.go b/pkg/models/event_test.go index 7d739f2de7..0056836d22 100644 --- a/pkg/models/event_test.go +++ b/pkg/models/event_test.go @@ -159,7 +159,7 @@ func (suite *EventTestSuite) TestGetJobStateIfPresent() { invalidState := "InvalidState" eventWithInvalidState := models.NewEvent(suite.topic).WithDetail(models.DetailsKeyNewState, invalidState) state, err = eventWithInvalidState.GetJobStateIfPresent() - suite.NoError(err) // models.JobStateType.UnmarshallText() does not return an error for invalid states + suite.NoError(err) // models.JobStateType.UnmarshalText() does not return an error for invalid states suite.Equal(models.JobStateTypeUndefined, state) } diff --git a/pkg/models/execution.go b/pkg/models/execution.go index ab54ebfee5..844fa26119 100644 --- a/pkg/models/execution.go +++ b/pkg/models/execution.go @@ -42,7 +42,7 @@ func (s ExecutionStateType) IsUndefined() bool { return s == ExecutionStateUndefined } -func (s ExecutionStateType) IsTermainl() bool { +func (s ExecutionStateType) IsTerminal() bool { return s == ExecutionStateBidRejected || s == ExecutionStateCompleted || s == ExecutionStateFailed || diff --git a/pkg/models/network.go b/pkg/models/network.go index ed2d063ad2..080b0c28a3 100644 --- a/pkg/models/network.go +++ b/pkg/models/network.go @@ -118,7 +118,7 @@ func (n *NetworkConfig) Validate() (err error) { err = errors.Join(err, fmt.Errorf("invalid networking type %q", n.Type)) } - // TODO(forrest): should return an error if the network type is not HTTP and domanins are set. + // TODO(forrest): should return an error if the network type is not HTTP and domains are set. for _, domain := range n.Domains { if domainRegex.MatchString(domain) { continue @@ -210,28 +210,28 @@ func matchDomain(left, right string) (diff int) { return diff } - lcur, rcur := len(lefts)-1, len(rights)-1 - for lcur >= 0 && rcur >= 0 { + lCur, rCur := len(lefts)-1, len(rights)-1 + for lCur >= 0 && rCur >= 0 { // If neither is a blank, these components need to match. - if lefts[lcur] != wildcard && rights[rcur] != wildcard { - if diff = strings.Compare(lefts[lcur], rights[rcur]); diff != 0 { + if lefts[lCur] != wildcard && rights[rCur] != wildcard { + if diff = strings.Compare(lefts[lCur], rights[rCur]); diff != 0 { return diff } } // If both are blanks, they match. - if lefts[lcur] == wildcard || rights[rcur] == wildcard { + if lefts[lCur] == wildcard || rights[rCur] == wildcard { break } // Blank means we are matching any subdomains, so only the rest of // the domain needs to match for this to work. - if lefts[lcur] != wildcard { - lcur -= 1 + if lefts[lCur] != wildcard { + lCur -= 1 } - if rights[rcur] != wildcard { - rcur -= 1 + if rights[rCur] != wildcard { + rCur -= 1 } } diff --git a/pkg/models/network_test.go b/pkg/models/network_test.go index 7c3c5265b4..e64c927613 100644 --- a/pkg/models/network_test.go +++ b/pkg/models/network_test.go @@ -108,8 +108,8 @@ func TestDomainMatching(t *testing.T) { {require.Less, "zzz.com", "foo.com"}, {require.Greater, "aaa.com", "foo.com"}, {require.Equal, "FOO.com", "foo.COM"}, - {require.Less, "bfoo.com", "afoo.com"}, - {require.Greater, "afoo.com", "bfoo.com"}, + {require.Less, "bFoo.com", "aFoo.com"}, + {require.Greater, "aFoo.com", "bFoo.com"}, {require.Less, "x-foo.com", ".foo.com"}, } diff --git a/pkg/nats/proxy/compute_proxy.go b/pkg/nats/proxy/compute_proxy.go index 498eb94b13..97d2bebb32 100644 --- a/pkg/nats/proxy/compute_proxy.go +++ b/pkg/nats/proxy/compute_proxy.go @@ -6,12 +6,13 @@ import ( "fmt" "time" + "github.com/nats-io/nats.go" + "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/compute" "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/nats/stream" - "github.com/nats-io/nats.go" - "github.com/rs/zerolog/log" ) const ( @@ -35,7 +36,7 @@ func NewComputeProxy(params ComputeProxyParams) (*ComputeProxy, error) { sc, err := stream.NewConsumerClient(stream.ConsumerClientParams{ Conn: params.Conn, Config: stream.StreamConsumerClientConfig{ - StreamCancellationBufferDuration: 5 * time.Second, //nolinter:gomnd + StreamCancellationBufferDuration: 5 * time.Second, //nolint:gomnd }, }) if err != nil { diff --git a/pkg/nats/stream/consumer_client.go b/pkg/nats/stream/consumer_client.go index f996763482..f4dea71f1e 100644 --- a/pkg/nats/stream/consumer_client.go +++ b/pkg/nats/stream/consumer_client.go @@ -9,11 +9,12 @@ import ( "sync" "time" - "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" "github.com/rs/zerolog/log" "github.com/samber/lo" + + "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" ) // RequestChanLen Default request channel length for buffering asynchronous results. @@ -26,7 +27,7 @@ const ( heartBeatPrefix = "_HEARTBEAT" inboxPrefixLen = len(inboxPrefix) replySuffixLen = 8 // Gives us 62^8 - rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + rDigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" base = 62 nuidSize = 22 ) @@ -219,7 +220,7 @@ func (nc *ConsumerClient) newRespInbox() string { rn := nc.respRand.Int63() for i := 0; i < replySuffixLen; i++ { - sb.WriteByte(rdigits[rn%base]) + sb.WriteByte(rDigits[rn%base]) rn /= base } diff --git a/pkg/nats/stream/types.go b/pkg/nats/stream/types.go index d3cc84ca3b..f6bf8be717 100644 --- a/pkg/nats/stream/types.go +++ b/pkg/nats/stream/types.go @@ -49,7 +49,7 @@ type StreamInfo struct { // CreatedAt represents the time the stream was created. CreatedAt time.Time // Function to cancel the stream. This is useful in the event the consumer client - // is no longer interested in the stream. The cancel function is inovked informing the + // is no longer interested in the stream. The cancel function is invoked informing the // producer to no longer serve the stream. Cancel context.CancelFunc } diff --git a/pkg/nats/transport/nats.go b/pkg/nats/transport/nats.go index 5c1bff4c8f..e96c5bac86 100644 --- a/pkg/nats/transport/nats.go +++ b/pkg/nats/transport/nats.go @@ -264,7 +264,7 @@ func (t *NATSTransport) CallbackProxy() compute.Callback { return t.callbackProxy } -// RegistrationProxy returns the previoously created registration proxy. +// ManagementProxy returns the previously created registration proxy. func (t *NATSTransport) ManagementProxy() compute.ManagementEndpoint { return t.managementProxy } diff --git a/pkg/node/factories.go b/pkg/node/factories.go index 325f25225f..01a59b9ce3 100644 --- a/pkg/node/factories.go +++ b/pkg/node/factories.go @@ -130,7 +130,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators func(ctx context.Context, nodeConfig NodeConfig) (authn.Provider, error) { var allErr error - authns := make(map[string]authn.Authenticator, len(nodeConfig.BacalhauConfig.API.Auth.Methods)) + auths := make(map[string]authn.Authenticator, len(nodeConfig.BacalhauConfig.API.Auth.Methods)) for name, authnConfig := range nodeConfig.BacalhauConfig.API.Auth.Methods { switch authnConfig.Type { case string(authn.MethodTypeChallenge): @@ -140,7 +140,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators continue } - authns[name] = challenge.NewAuthenticator( + auths[name] = challenge.NewAuthenticator( methodPolicy, challenge.NewStringMarshaller(nodeConfig.NodeID), userKey.PrivateKey(), @@ -153,7 +153,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators continue } - authns[name] = ask.NewAuthenticator( + auths[name] = ask.NewAuthenticator( methodPolicy, userKey.PrivateKey(), nodeConfig.NodeID, @@ -163,7 +163,7 @@ func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) Authenticators } } - return provider.NewMappedProvider(authns), allErr + return provider.NewMappedProvider(auths), allErr }, ) } diff --git a/pkg/node/utils.go b/pkg/node/utils.go index 69ea4218f0..28673152d7 100644 --- a/pkg/node/utils.go +++ b/pkg/node/utils.go @@ -56,7 +56,7 @@ func getTLSCertificate(cfg types.Bacalhau) (string, string, error) { return "", "", err } else if caCert, err := crypto.NewSelfSignedCertificate(privKey, false, ips); err != nil { return "", "", errors.Wrap(err, "failed to generate server certificate") - } else if err = caCert.MarshalCertficate(certFile); err != nil { + } else if err = caCert.MarshalCertificate(certFile); err != nil { return "", "", errors.Wrap(err, "failed to write server certificate") } cert = certFile.Name() diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 842fe5734e..81845fdad9 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -65,7 +65,7 @@ func (e *APIError) Error() string { return e.Message } -// Parse HTTP Resposne to APIError +// Parse HTTP Response to APIError func GenerateAPIErrorFromHTTPResponse(resp *http.Response) *APIError { if resp == nil { return NewAPIError(0, "API call error, invalid response") diff --git a/pkg/publicapi/endpoint/orchestrator/node.go b/pkg/publicapi/endpoint/orchestrator/node.go index 18a60baa6b..890b4aa135 100644 --- a/pkg/publicapi/endpoint/orchestrator/node.go +++ b/pkg/publicapi/endpoint/orchestrator/node.go @@ -14,7 +14,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/util" ) -// godoc for Orchstrator GetNode +// godoc for Orchestrator GetNode // // @ID orchestrator/getNode // @Summary Get an orchestrator node diff --git a/pkg/publicapi/middleware/version.go b/pkg/publicapi/middleware/version.go index bda88e0ad2..292e98c6d2 100644 --- a/pkg/publicapi/middleware/version.go +++ b/pkg/publicapi/middleware/version.go @@ -34,32 +34,32 @@ func VersionNotifyLogger(logger *zerolog.Logger, serverVersion semver.Version) e // instructs logger to extract given list of headers from request. LogHeaders: []string{apimodels.HTTPHeaderBacalhauGitVersion}, LogValuesFunc: func(c echo.Context, v echomiddelware.RequestLoggerValues) error { - notif := Notification{ + notification := Notification{ RequestID: v.RequestID, ClientID: c.Response().Header().Get(apimodels.HTTPHeaderClientID), ServerVersion: serverVersion.String(), } defer func() { - if notif.Message != "" { + if notification.Message != "" { logger.WithLevel(zerolog.DebugLevel). - Str("ClientID", notif.ClientID). - Str("RequestID", notif.RequestID). - Str("ClientVersion", notif.ClientVersion). - Str("ServerVersion", notif.ServerVersion). - Msg(notif.Message) + Str("ClientID", notification.ClientID). + Str("RequestID", notification.RequestID). + Str("ClientVersion", notification.ClientVersion). + Str("ServerVersion", notification.ServerVersion). + Msg(notification.Message) } }() cVersion := v.Headers[apimodels.HTTPHeaderBacalhauGitVersion] if len(cVersion) == 0 { // version header is empty, cannot parse it - notif.Message = "received request from client without version" + notification.Message = "received request from client without version" return nil } if len(cVersion) > 1 { // version header contained multiple fields - notif.Message = fmt.Sprintf("received request from client with multiple versions: %s", cVersion) + notification.Message = fmt.Sprintf("received request from client with multiple versions: %s", cVersion) return nil } @@ -67,20 +67,20 @@ func VersionNotifyLogger(logger *zerolog.Logger, serverVersion semver.Version) e clientVersion, err := semver.NewVersion(cVersion[0]) if err != nil { // cannot parse client version, should notify - notif.Message = fmt.Sprintf("received request with invalid client version: %s", cVersion[0]) + notification.Message = fmt.Sprintf("received request with invalid client version: %s", cVersion[0]) return nil } // extract parsed client version for comparison - notif.ClientVersion = clientVersion.String() + notification.ClientVersion = clientVersion.String() diff := serverVersion.Compare(clientVersion) switch diff { case 1: // client version is less than server version - notif.Message = "received request from outdated client" + notification.Message = "received request from outdated client" case -1: // server version is less than client version - notif.Message = "received request from newer client" + notification.Message = "received request from newer client" case 0: // versions are the same, don't notify } diff --git a/pkg/publicapi/middleware/version_test.go b/pkg/publicapi/middleware/version_test.go index f49b1a2d9c..1887b3600a 100644 --- a/pkg/publicapi/middleware/version_test.go +++ b/pkg/publicapi/middleware/version_test.go @@ -103,9 +103,9 @@ func (suite *VersionNotifyTestSuite) TestLogVersionNotify() { if suite.buf.Len() == 0 { suite.Equalf("", tc.expectedMessage, "unexpected notification") } else { - notif := suite.parseMessage(suite.buf.String()) - suite.Contains(notif.Message, tc.expectedMessage) - suite.Equal(tc.expectedClientVersion, notif.ClientVersion) + notification := suite.parseMessage(suite.buf.String()) + suite.Contains(notification.Message, tc.expectedMessage) + suite.Equal(tc.expectedClientVersion, notification.ClientVersion) } }) } diff --git a/pkg/repo/migrations/v3_4.go b/pkg/repo/migrations/v3_4.go index 939499a914..23bd35065b 100644 --- a/pkg/repo/migrations/v3_4.go +++ b/pkg/repo/migrations/v3_4.go @@ -57,7 +57,7 @@ func V3MigrationWithConfig(globalCfg system.GlobalConfig) repo.Migration { } // update the legacy version file so older versions fail gracefully. if err := r.WriteLegacyVersion(repo.Version4); err != nil { - return fmt.Errorf("updating repo.verion: %w", err) + return fmt.Errorf("updating repo.version: %w", err) } if err := r.WriteLastUpdateCheck(time.UnixMilli(0)); err != nil { return err diff --git a/pkg/routing/tracing/tracing.go b/pkg/routing/tracing/tracing.go index 1efadffe62..ed936e15c0 100644 --- a/pkg/routing/tracing/tracing.go +++ b/pkg/routing/tracing/tracing.go @@ -62,7 +62,7 @@ func (r *NodeStore) GetByPrefix(ctx context.Context, prefix string) (models.Node log.Ctx(ctx).Trace(). Dur("duration", dur). Str("prefix", prefix). - Msg("node retrieved by previus") + Msg("node retrieved by previous") }() return r.delegate.GetByPrefix(ctx, prefix) diff --git a/pkg/s3/errors_test.go b/pkg/s3/errors_test.go index 3e72863698..096da13e93 100644 --- a/pkg/s3/errors_test.go +++ b/pkg/s3/errors_test.go @@ -1,5 +1,7 @@ //go:build unit || !integration +/* spell-checker: disable */ + package s3 import ( diff --git a/pkg/storage/inline/storage.go b/pkg/storage/inline/storage.go index 927110ae63..75ca770ff5 100644 --- a/pkg/storage/inline/storage.go +++ b/pkg/storage/inline/storage.go @@ -33,11 +33,12 @@ import ( "os" "path/filepath" + "github.com/c2h5oh/datasize" + "github.com/vincent-petithory/dataurl" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/storage" "github.com/bacalhau-project/bacalhau/pkg/util/targzip" - "github.com/c2h5oh/datasize" - "github.com/vincent-petithory/dataurl" ) // The maximum size that will be stored inline without gzip compression. @@ -74,8 +75,8 @@ func (i *InlineStorage) GetVolumeSize(_ context.Context, spec models.InputSource } if data.ContentType() == gzipMimeType { - size, derr := targzip.UncompressedSize(bytes.NewReader(data.Data)) - return size.Bytes(), derr + size, dErr := targzip.UncompressedSize(bytes.NewReader(data.Data)) + return size.Bytes(), dErr } else { return uint64(len(data.Data)), nil } @@ -128,13 +129,13 @@ func (i *InlineStorage) PrepareStorage(_ context.Context, storageDirectory strin return storage.StorageVolume{}, err } - _, werr := tempfile.Write(data.Data) - cerr := tempfile.Close() + _, wErr := tempfile.Write(data.Data) + cErr := tempfile.Close() return storage.StorageVolume{ Type: storage.StorageVolumeConnectorBind, Source: tempfile.Name(), Target: spec.Target, - }, errors.Join(werr, cerr) + }, errors.Join(wErr, cErr) } } diff --git a/pkg/storage/s3/storage_test.go b/pkg/storage/s3/storage_test.go index 5016a01ae9..2aa3519edc 100644 --- a/pkg/storage/s3/storage_test.go +++ b/pkg/storage/s3/storage_test.go @@ -1,5 +1,7 @@ //go:build integration || !unit +/* spell-checker: disable */ + package s3_test import ( diff --git a/pkg/test/compute/resourcelimits_test.go b/pkg/test/compute/resourcelimits_test.go index 43b510904c..f4635b271d 100644 --- a/pkg/test/compute/resourcelimits_test.go +++ b/pkg/test/compute/resourcelimits_test.go @@ -162,7 +162,7 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() { }) require.NoError(suite.T(), err) - // sleep a bit here to simulate jobs being sumbmitted over time + // sleep a bit here to simulate jobs being submitted over time time.Sleep((10 + time.Duration(rand.Intn(10))) * time.Millisecond) } @@ -363,7 +363,7 @@ func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() { require.NoError(suite.T(), err) jobIds = append(jobIds, submittedJob.JobID) - // sleep a bit here to simulate jobs being sumbmitted over time + // sleep a bit here to simulate jobs being submitted over time // and to give time for compute nodes to accept and run the jobs // this needs to be less than the time the job lasts // so we are running jobs in parallel diff --git a/pkg/test/executor/test_runner.go b/pkg/test/executor/test_runner.go index a9a24b7dca..acd6bb37ee 100644 --- a/pkg/test/executor/test_runner.go +++ b/pkg/test/executor/test_runner.go @@ -87,9 +87,9 @@ func RunTestCase( execution.AllocateResources(job.Task().Name, models.Resources{}) resultsDirectory := t.TempDir() - strgProvider := stack.Nodes[0].ComputeNode.Storages + storageProvider := stack.Nodes[0].ComputeNode.Storages - runCommandArguments, cleanup, err := compute.PrepareRunArguments(ctx, strgProvider, t.TempDir(), execution, resultsDirectory) + runCommandArguments, cleanup, err := compute.PrepareRunArguments(ctx, storageProvider, t.TempDir(), execution, resultsDirectory) require.NoError(t, err) t.Cleanup(func() { if err := cleanup(ctx); err != nil { diff --git a/pkg/test/scenario/resolver.go b/pkg/test/scenario/resolver.go index 9fa1dad27d..0ca4dc75d4 100644 --- a/pkg/test/scenario/resolver.go +++ b/pkg/test/scenario/resolver.go @@ -183,7 +183,7 @@ func GetFilteredExecutionStates(jobState *JobState, filterState models.Execution func WaitForTerminalStates() StateChecks { return func(state *JobState) (bool, error) { for _, executionState := range state.Executions { - if !executionState.ComputeState.StateType.IsTermainl() { + if !executionState.ComputeState.StateType.IsTerminal() { return false, nil } } diff --git a/pkg/test/scenario/responses.go b/pkg/test/scenario/responses.go index b78d2f8eb8..217cb11d12 100644 --- a/pkg/test/scenario/responses.go +++ b/pkg/test/scenario/responses.go @@ -21,7 +21,7 @@ func SubmitJobSuccess() CheckSubmitResponse { return fmt.Errorf("expected job response, got nil") } if len(response.Warnings) > 0 { - return fmt.Errorf("unexpted warnings returned when submitting job: %v", response.Warnings) + return fmt.Errorf("unexpected warnings returned when submitting job: %v", response.Warnings) } return nil } diff --git a/pkg/test/scenario/results.go b/pkg/test/scenario/results.go index 7d2825e8b2..e62ee66428 100644 --- a/pkg/test/scenario/results.go +++ b/pkg/test/scenario/results.go @@ -64,7 +64,7 @@ func FileEquals( } } -// ManyCheckes returns a CheckResults that runs the passed checkers and returns +// ManyChecks returns a CheckResults that runs the passed checkers and returns // an error if any of them fail. func ManyChecks(checks ...CheckResults) CheckResults { return func(resultsDir string) error { diff --git a/pkg/util/generic/broadcaster_test.go b/pkg/util/generic/broadcaster_test.go index 0f4debc615..ca10bf067d 100644 --- a/pkg/util/generic/broadcaster_test.go +++ b/pkg/util/generic/broadcaster_test.go @@ -5,10 +5,11 @@ package generic_test import ( "testing" - _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/util/generic" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + _ "github.com/bacalhau-project/bacalhau/pkg/logger" + "github.com/bacalhau-project/bacalhau/pkg/util/generic" ) type BroadcasterTestSuite struct { @@ -50,7 +51,7 @@ func (s *BroadcasterTestSuite) TestBroadcasterAutoclose() { require.Error(s.T(), err) } -func (s *BroadcasterTestSuite) TestBroadcasterSubUnsub() { +func (s *BroadcasterTestSuite) TestBroadcasterSubUnsubscribe() { ch1, err1 := s.broadcaster.Subscribe() ch2, err2 := s.broadcaster.Subscribe() require.NoError(s.T(), err1) diff --git a/pkg/util/idgen/short_id_test.go b/pkg/util/idgen/short_id_test.go index 36079088cc..230d08471b 100644 --- a/pkg/util/idgen/short_id_test.go +++ b/pkg/util/idgen/short_id_test.go @@ -1,5 +1,7 @@ //go:build unit || !integration +/* spell-checker: disable */ + package idgen import ( diff --git a/python/mkdocs.yml b/python/mkdocs.yml index 0b6a73d46a..3710c19320 100644 --- a/python/mkdocs.yml +++ b/python/mkdocs.yml @@ -1,4 +1,4 @@ -site_name: Bacalahu SDK +site_name: Bacalhau SDK site_url: https://github.com/bacalhau-project/bacalhau repo_url: https://github.com/bacalhau-project/bacalhau/python repo_name: bacalhau-project/bacalhau-sdk diff --git a/test-integration/README.md b/test-integration/README.md index bada3d6e3c..48f9eabed4 100644 --- a/test-integration/README.md +++ b/test-integration/README.md @@ -91,7 +91,7 @@ docker exec -it bacalhau-client-node-container /bin/bash Setup an alias for the Minio CLI ```shell # The environment variables are already injected in -# the container, no need to replce them yourself. +# the container, no need to replace them yourself. mc alias set bacalhau-minio "http://${BACALHAU_MINIO_NODE_HOST}:9000" "${MINIO_ROOT_USER}" "${MINIO_ROOT_PASSWORD}" mc admin info bacalhau-minio ``` From b9d87f0203709c508b4a62be665a20b4369f95da Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Wed, 9 Oct 2024 17:19:48 +0200 Subject: [PATCH 4/4] remove event emitter (#4601) --- pkg/eventhandler/chained_handlers.go | 61 ---------- pkg/eventhandler/chained_handlers_test.go | 111 ------------------ pkg/eventhandler/context_provider.go | 81 ------------- pkg/eventhandler/interfaces.go | 21 ---- .../mock_eventhandler/mock_contextprovider.go | 49 -------- .../mock_eventhandler/mock_handlers.go | 55 --------- pkg/eventhandler/tracer.go | 78 ------------ pkg/models/job_event_string.go | 36 ------ pkg/models/jobevent.go | 107 ----------------- pkg/node/requester.go | 31 +---- pkg/orchestrator/callback.go | 22 +--- pkg/orchestrator/endpoint.go | 10 -- pkg/orchestrator/event_emitter.go | 96 --------------- pkg/orchestrator/planner/event_emitter.go | 54 --------- 14 files changed, 8 insertions(+), 804 deletions(-) delete mode 100644 pkg/eventhandler/chained_handlers.go delete mode 100644 pkg/eventhandler/chained_handlers_test.go delete mode 100644 pkg/eventhandler/context_provider.go delete mode 100644 pkg/eventhandler/interfaces.go delete mode 100644 pkg/eventhandler/mock_eventhandler/mock_contextprovider.go delete mode 100644 pkg/eventhandler/mock_eventhandler/mock_handlers.go delete mode 100644 pkg/eventhandler/tracer.go delete mode 100644 pkg/models/job_event_string.go delete mode 100644 pkg/models/jobevent.go delete mode 100644 pkg/orchestrator/event_emitter.go delete mode 100644 pkg/orchestrator/planner/event_emitter.go diff --git a/pkg/eventhandler/chained_handlers.go b/pkg/eventhandler/chained_handlers.go deleted file mode 100644 index 93cbd95273..0000000000 --- a/pkg/eventhandler/chained_handlers.go +++ /dev/null @@ -1,61 +0,0 @@ -package eventhandler - -import ( - "context" - "fmt" - "time" - - "github.com/rs/zerolog/log" - - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// An event handler implementation that chains multiple event handlers, and accepts a context provider -// to setup up the context once for all handlers. -type ChainedJobEventHandler struct { - eventHandlers []JobEventHandler - contextProvider ContextProvider -} - -func NewChainedJobEventHandler(contextProvider ContextProvider) *ChainedJobEventHandler { - return &ChainedJobEventHandler{contextProvider: contextProvider} -} - -func (r *ChainedJobEventHandler) AddHandlers(handlers ...JobEventHandler) { - r.eventHandlers = append(r.eventHandlers, handlers...) -} - -func (r *ChainedJobEventHandler) HandleJobEvent(ctx context.Context, event models.JobEvent) (err error) { - startTime := time.Now() - defer logEvent(ctx, event, startTime)(&err) - - if r.eventHandlers == nil { - return fmt.Errorf("no event handlers registered") - } - - jobCtx := r.contextProvider.GetContext(ctx, event.JobID) - - // All handlers are called, unless one of them returns an error. - for _, handler := range r.eventHandlers { - if err = handler.HandleJobEvent(jobCtx, event); err != nil { //nolint:gocritic - return err - } - } - return nil -} - -func logEvent(ctx context.Context, event models.JobEvent, startTime time.Time) func(*error) { - return func(handlerError *error) { - logMsg := log.Ctx(ctx).Debug(). - Str("EventName", event.EventName.String()). - Str("JobID", event.JobID). - Str("NodeID", event.SourceNodeID). - Str("Status", event.Status). - Dur("HandleDuration", time.Since(startTime)) - if *handlerError != nil { - logMsg = logMsg.AnErr("HandlerError", *handlerError) - } - - logMsg.Msg("Handled event") - } -} diff --git a/pkg/eventhandler/chained_handlers_test.go b/pkg/eventhandler/chained_handlers_test.go deleted file mode 100644 index cf92117769..0000000000 --- a/pkg/eventhandler/chained_handlers_test.go +++ /dev/null @@ -1,111 +0,0 @@ -//go:build unit || !integration - -package eventhandler - -import ( - "context" - "fmt" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.uber.org/mock/gomock" - - "github.com/bacalhau-project/bacalhau/pkg/eventhandler/mock_eventhandler" - "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// In order for 'go test' to run this suite, we need to create -// a normal test function and pass our suite to suite.Run -func TestChainedHandlers(t *testing.T) { - suite.Run(t, new(jobEventHandlerSuite)) -} - -type jobEventHandlerSuite struct { - suite.Suite - ctrl *gomock.Controller - chainedHandler *ChainedJobEventHandler - handler1 *mock_eventhandler.MockJobEventHandler - handler2 *mock_eventhandler.MockJobEventHandler - contextProvider *mock_eventhandler.MockContextProvider - context context.Context - event models.JobEvent -} - -// Before each test -func (suite *jobEventHandlerSuite) SetupTest() { - suite.ctrl = gomock.NewController(suite.T()) - suite.handler1 = mock_eventhandler.NewMockJobEventHandler(suite.ctrl) - suite.handler2 = mock_eventhandler.NewMockJobEventHandler(suite.ctrl) - suite.contextProvider = mock_eventhandler.NewMockContextProvider(suite.ctrl) - suite.chainedHandler = NewChainedJobEventHandler(suite.contextProvider) - suite.context = context.WithValue(context.Background(), "test", "test") - suite.event = models.JobEvent{ - EventName: models.JobEventCreated, - JobID: uuid.NewString(), - SourceNodeID: "nodeA", - Status: "this is a test event", - } - logger.ConfigureTestLogging(suite.T()) -} - -func (suite *jobEventHandlerSuite) TearDownTest() { - suite.ctrl.Finish() -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEvent() { - suite.chainedHandler.AddHandlers(suite.handler1, suite.handler2) - ctx := context.Background() - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // assert both handlers are called with the context provider's context and event - gomock.InOrder( - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - suite.handler2.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - ) - - // assert no error was returned - require.NoError(suite.T(), suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventLazilyAdded() { - suite.chainedHandler.AddHandlers(suite.handler1) - suite.chainedHandler.AddHandlers(suite.handler2) - ctx := context.Background() - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // assert both handlers are called with the context provider's context and event - gomock.InOrder( - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - suite.handler2.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - ) - - // assert no error was returned - require.NoError(suite.T(), suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventError() { - suite.chainedHandler.AddHandlers(suite.handler1) - suite.chainedHandler.AddHandlers(suite.handler2) - ctx := context.Background() - mockError := fmt.Errorf("i am an error") - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // mock first handler to return an error, and don't expect the second handler to be called - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(mockError) - - // assert no error was returned - require.Equal(suite.T(), mockError, suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventEmptyHandlers() { - require.Error(suite.T(), suite.chainedHandler.HandleJobEvent(context.Background(), suite.event)) -} diff --git a/pkg/eventhandler/context_provider.go b/pkg/eventhandler/context_provider.go deleted file mode 100644 index 774878edcd..0000000000 --- a/pkg/eventhandler/context_provider.go +++ /dev/null @@ -1,81 +0,0 @@ -package eventhandler - -import ( - "context" - "sync" - - "go.opentelemetry.io/otel/attribute" - oteltrace "go.opentelemetry.io/otel/trace" - - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/telemetry" -) - -// Interface for a context provider that can be used to generate a context to be used to handle -// job events. -type ContextProvider interface { - GetContext(ctx context.Context, jobID string) context.Context -} - -// TracerContextProvider is a context provider that generates a context along with tracing information. -// It also implements JobEventHandler to end the local lifecycle context for a job when it is completed. -type TracerContextProvider struct { - nodeID string - jobNodeContexts map[string]context.Context // per-node job lifecycle - contextMutex sync.RWMutex -} - -func NewTracerContextProvider(nodeID string) *TracerContextProvider { - return &TracerContextProvider{ - nodeID: nodeID, - jobNodeContexts: make(map[string]context.Context), - } -} - -func (t *TracerContextProvider) GetContext(ctx context.Context, jobID string) context.Context { - t.contextMutex.Lock() - defer t.contextMutex.Unlock() - - jobCtx, _ := telemetry.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent", - oteltrace.WithSpanKind(oteltrace.SpanKindInternal), - oteltrace.WithAttributes( - attribute.String(telemetry.TracerAttributeNameNodeID, t.nodeID), - attribute.String(telemetry.TracerAttributeNameJobID, jobID), - ), - ) - - // keep the latest context to clean it up during shutdown if necessary - t.jobNodeContexts[jobID] = jobCtx - return jobCtx -} - -func (t *TracerContextProvider) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - // If the event is known to be terminal, end the local lifecycle context: - if event.EventName.IsTerminal() { - t.endJobNodeContext(ctx, event.JobID) - } - - return nil -} - -func (t *TracerContextProvider) Shutdown() error { - t.contextMutex.RLock() - defer t.contextMutex.RUnlock() - - for _, ctx := range t.jobNodeContexts { - oteltrace.SpanFromContext(ctx).End() - } - - // clear the maps - t.jobNodeContexts = make(map[string]context.Context) - - return nil -} - -// endJobNodeContext ends the local lifecycle context for a job. -func (t *TracerContextProvider) endJobNodeContext(ctx context.Context, jobID string) { - oteltrace.SpanFromContext(ctx).End() - t.contextMutex.Lock() - defer t.contextMutex.Unlock() - delete(t.jobNodeContexts, jobID) -} diff --git a/pkg/eventhandler/interfaces.go b/pkg/eventhandler/interfaces.go deleted file mode 100644 index 5c332daa28..0000000000 --- a/pkg/eventhandler/interfaces.go +++ /dev/null @@ -1,21 +0,0 @@ -package eventhandler - -//go:generate mockgen --source interfaces.go --destination mock_eventhandler/mock_handlers.go --package mock_eventhandler - -import ( - "context" - - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// A job event handler is a component that is notified of events related to jobs. -type JobEventHandler interface { - HandleJobEvent(ctx context.Context, event models.JobEvent) error -} - -// function that implements the JobEventHandler interface -type JobEventHandlerFunc func(ctx context.Context, event models.JobEvent) error - -func (f JobEventHandlerFunc) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - return f(ctx, event) -} diff --git a/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go b/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go deleted file mode 100644 index 4bb1fc0721..0000000000 --- a/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: system/context_provider.go - -// Package mock_system is a generated GoMock package. -package mock_eventhandler - -import ( - context "context" - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockContextProvider is a mock of ContextProvider interface. -type MockContextProvider struct { - ctrl *gomock.Controller - recorder *MockContextProviderMockRecorder -} - -// MockContextProviderMockRecorder is the mock recorder for MockContextProvider. -type MockContextProviderMockRecorder struct { - mock *MockContextProvider -} - -// NewMockContextProvider creates a new mock instance. -func NewMockContextProvider(ctrl *gomock.Controller) *MockContextProvider { - mock := &MockContextProvider{ctrl: ctrl} - mock.recorder = &MockContextProviderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockContextProvider) EXPECT() *MockContextProviderMockRecorder { - return m.recorder -} - -// GetContext mocks base method. -func (m *MockContextProvider) GetContext(ctx context.Context, jobID string) context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetContext", ctx, jobID) - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// GetContext indicates an expected call of GetContext. -func (mr *MockContextProviderMockRecorder) GetContext(ctx, jobID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContext", reflect.TypeOf((*MockContextProvider)(nil).GetContext), ctx, jobID) -} diff --git a/pkg/eventhandler/mock_eventhandler/mock_handlers.go b/pkg/eventhandler/mock_eventhandler/mock_handlers.go deleted file mode 100644 index 832e5914df..0000000000 --- a/pkg/eventhandler/mock_eventhandler/mock_handlers.go +++ /dev/null @@ -1,55 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: interfaces.go -// -// Generated by this command: -// -// mockgen --source interfaces.go --destination mock_eventhandler/mock_handlers.go --package mock_eventhandler -// - -// Package mock_eventhandler is a generated GoMock package. -package mock_eventhandler - -import ( - context "context" - reflect "reflect" - - models "github.com/bacalhau-project/bacalhau/pkg/models" - gomock "go.uber.org/mock/gomock" -) - -// MockJobEventHandler is a mock of JobEventHandler interface. -type MockJobEventHandler struct { - ctrl *gomock.Controller - recorder *MockJobEventHandlerMockRecorder -} - -// MockJobEventHandlerMockRecorder is the mock recorder for MockJobEventHandler. -type MockJobEventHandlerMockRecorder struct { - mock *MockJobEventHandler -} - -// NewMockJobEventHandler creates a new mock instance. -func NewMockJobEventHandler(ctrl *gomock.Controller) *MockJobEventHandler { - mock := &MockJobEventHandler{ctrl: ctrl} - mock.recorder = &MockJobEventHandlerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockJobEventHandler) EXPECT() *MockJobEventHandlerMockRecorder { - return m.recorder -} - -// HandleJobEvent mocks base method. -func (m *MockJobEventHandler) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HandleJobEvent", ctx, event) - ret0, _ := ret[0].(error) - return ret0 -} - -// HandleJobEvent indicates an expected call of HandleJobEvent. -func (mr *MockJobEventHandlerMockRecorder) HandleJobEvent(ctx, event any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleJobEvent", reflect.TypeOf((*MockJobEventHandler)(nil).HandleJobEvent), ctx, event) -} diff --git a/pkg/eventhandler/tracer.go b/pkg/eventhandler/tracer.go deleted file mode 100644 index 4db8f63dcb..0000000000 --- a/pkg/eventhandler/tracer.go +++ /dev/null @@ -1,78 +0,0 @@ -package eventhandler - -import ( - "context" - "fmt" - "io/fs" - "os" - - "github.com/rs/zerolog" - - "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// Tracer is a JobEventHandler that will marshal the received event to a -// file-based log. -// -// Note that we don't need any mutexes here because writing to an os.File is -// thread-safe (see https://github.com/rs/zerolog/blob/master/writer.go#L33) -type Tracer struct { - LogFile *os.File - Logger zerolog.Logger -} - -const eventTracerFilePerms fs.FileMode = 0644 - -// Returns an eventhandler.Tracer that writes to config.GetEventTracerPath(), or -// an error if the file can't be opened. -func NewTracer(path string) (*Tracer, error) { - return NewTracerToFile(path) -} - -// Returns an eventhandler.Tracer that writes to the specified filename, or an -// error if the file can't be opened. -func NewTracerToFile(filename string) (*Tracer, error) { - file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, eventTracerFilePerms) - if err != nil { - return nil, err - } - - return &Tracer{ - LogFile: file, - Logger: zerolog.New(file).With().Timestamp().Logger(), - }, nil -} - -// HandleJobEvent implements JobEventHandler -func (t *Tracer) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - trace(t.Logger, event) - return nil -} - -func trace[Event any](log zerolog.Logger, event Event) { - log.Log(). - Str("Type", fmt.Sprintf("%T", event)). - Func(func(e *zerolog.Event) { - // TODO: #828 Potential hotspot - marshaling is expensive, and - // we do it for every event. - eventJSON, err := marshaller.JSONMarshalWithMax(event) - if err == nil { - e.RawJSON("Event", eventJSON) - } else { - e.AnErr("MarshalError", err) - } - }).Send() -} - -func (t *Tracer) Shutdown() error { - if t.LogFile != nil { - err := t.LogFile.Close() - t.LogFile = nil - t.Logger = zerolog.Nop() - return err - } - return nil -} - -var _ JobEventHandler = (*Tracer)(nil) diff --git a/pkg/models/job_event_string.go b/pkg/models/job_event_string.go deleted file mode 100644 index 1d345bf277..0000000000 --- a/pkg/models/job_event_string.go +++ /dev/null @@ -1,36 +0,0 @@ -// Code generated by "stringer -type=JobEventType --trimprefix=JobEvent --output job_event_string.go"; DO NOT EDIT. - -package models - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[jobEventUndefined-0] - _ = x[JobEventCreated-1] - _ = x[JobEventBid-2] - _ = x[JobEventBidAccepted-3] - _ = x[JobEventBidRejected-4] - _ = x[JobEventComputeError-5] - _ = x[JobEventResultsProposed-6] - _ = x[JobEventResultsAccepted-7] - _ = x[JobEventResultsRejected-8] - _ = x[JobEventResultsPublished-9] - _ = x[JobEventError-10] - _ = x[JobEventCanceled-11] - _ = x[JobEventCompleted-12] - _ = x[jobEventDone-13] -} - -const _JobEventType_name = "jobEventUndefinedCreatedBidBidAcceptedBidRejectedComputeErrorResultsProposedResultsAcceptedResultsRejectedResultsPublishedErrorCanceledCompletedjobEventDone" - -var _JobEventType_index = [...]uint8{0, 17, 24, 27, 38, 49, 61, 76, 91, 106, 122, 127, 135, 144, 156} - -func (i JobEventType) String() string { - if i < 0 || i >= JobEventType(len(_JobEventType_index)-1) { - return "JobEventType(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _JobEventType_name[_JobEventType_index[i]:_JobEventType_index[i+1]] -} diff --git a/pkg/models/jobevent.go b/pkg/models/jobevent.go deleted file mode 100644 index 54de944a22..0000000000 --- a/pkg/models/jobevent.go +++ /dev/null @@ -1,107 +0,0 @@ -package models - -import ( - "fmt" - "time" -) - -//go:generate stringer -type=JobEventType --trimprefix=JobEvent --output job_event_string.go -type JobEventType int - -const ( - jobEventUndefined JobEventType = iota // must be first - - // Job has been created on the requester node - JobEventCreated - - // a compute node bid on a job - JobEventBid - - // a requester node accepted for rejected a job bid - JobEventBidAccepted - JobEventBidRejected - - // a compute node had an error running a job - JobEventComputeError - - // a compute node completed running a job - JobEventResultsProposed - - // a Requester node accepted the results from a node for a job - JobEventResultsAccepted - - // a Requester node rejected the results from a node for a job - JobEventResultsRejected - - // once the results have been accepted or rejected - // the compute node will publish them and issue this event - JobEventResultsPublished - - // a requester node declared an error running a job - JobEventError - - // a user canceled a job - JobEventCanceled - - // a job has been completed - JobEventCompleted - - jobEventDone // must be last -) - -func (je JobEventType) IsUndefined() bool { - return je == jobEventUndefined -} - -// IsTerminal returns true if the given event type signals the end of the -// lifecycle of a job. After this, all nodes can safely ignore the job. -func (je JobEventType) IsTerminal() bool { - return je == JobEventError || je == JobEventCompleted || je == JobEventCanceled -} - -func ParseJobEventType(str string) (JobEventType, error) { - for typ := jobEventUndefined + 1; typ < jobEventDone; typ++ { - if equal(typ.String(), str) { - return typ, nil - } - } - - return jobEventUndefined, fmt.Errorf( - "executor: unknown job event type '%s'", str) -} - -func JobEventTypes() []JobEventType { - var res []JobEventType - for typ := jobEventUndefined + 1; typ < jobEventDone; typ++ { - res = append(res, typ) - } - - return res -} - -func (je JobEventType) MarshalText() ([]byte, error) { - return []byte(je.String()), nil -} - -func (je *JobEventType) UnmarshalText(text []byte) (err error) { - name := string(text) - *je, err = ParseJobEventType(name) - return -} - -// TODO remove this https://github.com/bacalhau-project/bacalhau/issues/4185 -type JobEvent struct { - JobID string `json:"JobID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` - // compute execution identifier - ExecutionID string `json:"ExecutionID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` - // the node that emitted this event - SourceNodeID string `json:"SourceNodeID,omitempty" example:"QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF"` - // the node that this event is for - // e.g. "AcceptJobBid" was emitted by Requester but it targeting compute node - TargetNodeID string `json:"TargetNodeID,omitempty" example:"QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL"` - - EventName JobEventType `json:"EventName,omitempty"` - Status string `json:"Status,omitempty" example:"Got results proposal of length: 0"` - - EventTime time.Time `json:"EventTime,omitempty" example:"2022-11-17T13:32:55.756658941Z"` -} diff --git a/pkg/node/requester.go b/pkg/node/requester.go index ee73b00b9c..96166b5285 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -36,7 +36,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/util" "github.com/bacalhau-project/bacalhau/pkg/compute" - "github.com/bacalhau-project/bacalhau/pkg/eventhandler" "github.com/bacalhau-project/bacalhau/pkg/jobstore" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/selection/discovery" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/selection/ranking" @@ -76,14 +75,6 @@ func NewRequesterNode( return nil, err } - // prepare event handlers - tracerContextProvider := eventhandler.NewTracerContextProvider(nodeID) - localJobEventConsumer := eventhandler.NewChainedJobEventHandler(tracerContextProvider) - - eventEmitter := orchestrator.NewEventEmitter(orchestrator.EventEmitterParams{ - EventConsumer: localJobEventConsumer, - }) - jobStore, err := createJobStore(ctx, cfg) if err != nil { return nil, err @@ -120,12 +111,6 @@ func NewRequesterNode( JobStore: jobStore, }), - // planner that publishes events on job completion or failure - planner.NewEventEmitter(planner.EventEmitterParams{ - ID: nodeID, - EventEmitter: eventEmitter, - }), - // logs job completion or failure planner.NewLoggingPlanner(), ) @@ -227,7 +212,6 @@ func NewRequesterNode( endpointV2 := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ ID: nodeID, Store: jobStore, - EventEmitter: eventEmitter, ComputeProxy: computeProxy, JobTransformer: jobTransformers, TaskTranslator: translationProvider, @@ -268,12 +252,6 @@ func NewRequesterNode( ) auth_endpoint.BindEndpoint(ctx, apiServer.Router, authenticators) - // order of event handlers is important as triggering some handlers might depend on the state of others. - localJobEventConsumer.AddHandlers( - // ends the span for the job if received a terminal event - tracerContextProvider, - ) - // ncl subscriber, err := ncl.NewSubscriber(transportLayer.Client(), ncl.WithSubscriberMessageSerDeRegistry(messageSerDeRegistry), @@ -302,10 +280,6 @@ func NewRequesterNode( } evalBroker.SetEnabled(false) - cleanupErr = tracerContextProvider.Shutdown() - if cleanupErr != nil { - util.LogDebugIfContextCancelled(ctx, cleanupErr, "failed to shutdown tracer context provider") - } // Close the jobstore after the evaluation broker is disabled cleanupErr = jobStore.Close(ctx) if cleanupErr != nil { @@ -317,9 +291,8 @@ func NewRequesterNode( // It provides the compute call back endpoints for interacting with compute nodes. // e.g. bidding, job completions, cancellations, and failures callback := orchestrator.NewCallback(&orchestrator.CallbackParams{ - ID: nodeID, - EventEmitter: eventEmitter, - Store: jobStore, + ID: nodeID, + Store: jobStore, }) if err = transportLayer.RegisterComputeCallback(callback); err != nil { return nil, err diff --git a/pkg/orchestrator/callback.go b/pkg/orchestrator/callback.go index 4550bf8369..6be49b86b4 100644 --- a/pkg/orchestrator/callback.go +++ b/pkg/orchestrator/callback.go @@ -13,23 +13,20 @@ import ( ) type CallbackParams struct { - ID string - Store jobstore.Store - EventEmitter EventEmitter + ID string + Store jobstore.Store } // Callback base implementation of requester Endpoint type Callback struct { - id string - store jobstore.Store - eventEmitter EventEmitter + id string + store jobstore.Store } func NewCallback(params *CallbackParams) *Callback { return &Callback{ - id: params.ID, - store: params.Store, - eventEmitter: params.EventEmitter, + id: params.ID, + store: params.Store, } } @@ -96,16 +93,11 @@ func (e *Callback) OnBidComplete(ctx context.Context, response compute.BidResult log.Ctx(ctx).Error().Err(err).Msgf("[OnBidComplete] failed to commit transaction") return } - - if response.Accepted { - e.eventEmitter.EmitBidReceived(ctx, response) - } } func (e *Callback) OnRunComplete(ctx context.Context, result compute.RunResult) { log.Ctx(ctx).Debug().Msgf("Requester node %s received RunComplete for execution: %s from %s", e.id, result.ExecutionID, result.SourcePeerID) - e.eventEmitter.EmitRunComplete(ctx, result) txContext, err := e.store.BeginTx(ctx) if err != nil { @@ -223,8 +215,6 @@ func (e *Callback) OnComputeFailure(ctx context.Context, result compute.ComputeE log.Ctx(ctx).Error().Err(err).Msgf("[OnComputeFailure] failed to commit transaction") return } - - e.eventEmitter.EmitComputeFailure(ctx, result.ExecutionID, result) } // enqueueEvaluation enqueues an evaluation to allow the scheduler to either accept the bid, or find a new node diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index dfe1a5da90..caf28c0d78 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -24,7 +24,6 @@ import ( type BaseEndpointParams struct { ID string Store jobstore.Store - EventEmitter EventEmitter ComputeProxy compute.Endpoint JobTransformer transformer.JobTransformer TaskTranslator translation.TranslatorProvider @@ -34,7 +33,6 @@ type BaseEndpointParams struct { type BaseEndpoint struct { id string store jobstore.Store - eventEmitter EventEmitter computeProxy compute.Endpoint jobTransformer transformer.JobTransformer taskTranslator translation.TranslatorProvider @@ -45,7 +43,6 @@ func NewBaseEndpoint(params *BaseEndpointParams) *BaseEndpoint { return &BaseEndpoint{ id: params.ID, store: params.Store, - eventEmitter: params.EventEmitter, computeProxy: params.ComputeProxy, jobTransformer: params.JobTransformer, taskTranslator: params.TaskTranslator, @@ -145,7 +142,6 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) return nil, err } - e.eventEmitter.EmitJobCreated(ctx, *job) return &SubmitJobResponse{ JobID: job.ID, EvaluationID: eval.ID, @@ -223,12 +219,6 @@ func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (St return StopJobResponse{}, err } - e.eventEmitter.EmitEventSilently(ctx, models.JobEvent{ - JobID: request.JobID, - EventName: models.JobEventCanceled, - Status: request.Reason, - EventTime: time.Now(), - }) return StopJobResponse{ EvaluationID: evalID, }, nil diff --git a/pkg/orchestrator/event_emitter.go b/pkg/orchestrator/event_emitter.go deleted file mode 100644 index 513caa25f5..0000000000 --- a/pkg/orchestrator/event_emitter.go +++ /dev/null @@ -1,96 +0,0 @@ -package orchestrator - -import ( - "context" - "time" - - "github.com/rs/zerolog/log" - - "github.com/bacalhau-project/bacalhau/pkg/compute" - "github.com/bacalhau-project/bacalhau/pkg/eventhandler" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// A quick workaround to publish job events locally as we still have some types that rely -// on job events to update their states (e.g. localdb) and to take actions (e.g. websockets and logging) -// TODO: create a strongly typed local event emitter, and update localdb directly from -// -// requester instead of consuming events. -type EventEmitterParams struct { - EventConsumer eventhandler.JobEventHandler -} - -type EventEmitter struct { - eventConsumer eventhandler.JobEventHandler -} - -func NewEventEmitter(params EventEmitterParams) EventEmitter { - return EventEmitter{ - eventConsumer: params.EventConsumer, - } -} - -func (e EventEmitter) EmitJobCreated( - ctx context.Context, job models.Job) { - event := models.JobEvent{ - JobID: job.ID, - SourceNodeID: job.Meta[models.MetaRequesterID], - EventName: models.JobEventCreated, - EventTime: time.Now(), - } - e.EmitEventSilently(ctx, event) -} - -func (e EventEmitter) EmitBidReceived( - ctx context.Context, result compute.BidResult) { - e.EmitEventSilently(ctx, e.constructEvent(result.RoutingMetadata, result.ExecutionMetadata, models.JobEventBid)) -} - -func (e EventEmitter) EmitBidAccepted( - ctx context.Context, request compute.BidAcceptedRequest, response compute.BidAcceptedResponse) { - e.EmitEventSilently(ctx, e.constructEvent(request.RoutingMetadata, response.ExecutionMetadata, models.JobEventBidAccepted)) -} - -func (e EventEmitter) EmitBidRejected( - ctx context.Context, request compute.BidRejectedRequest, response compute.BidRejectedResponse) { - e.EmitEventSilently(ctx, e.constructEvent(request.RoutingMetadata, response.ExecutionMetadata, models.JobEventBidRejected)) -} - -func (e EventEmitter) EmitRunComplete(ctx context.Context, response compute.RunResult) { - e.EmitEventSilently(ctx, e.constructEvent(response.RoutingMetadata, response.ExecutionMetadata, models.JobEventResultsProposed)) -} - -func (e EventEmitter) EmitComputeFailure(ctx context.Context, executionID string, err error) { - event := models.JobEvent{ - ExecutionID: executionID, - EventName: models.JobEventComputeError, - Status: err.Error(), - EventTime: time.Now(), - } - e.EmitEventSilently(ctx, event) -} - -func (e EventEmitter) constructEvent( - routingMetadata compute.RoutingMetadata, - executionMetadata compute.ExecutionMetadata, - eventName models.JobEventType) models.JobEvent { - return models.JobEvent{ - TargetNodeID: routingMetadata.TargetPeerID, - SourceNodeID: routingMetadata.SourcePeerID, - JobID: executionMetadata.JobID, - ExecutionID: executionMetadata.ExecutionID, - EventName: eventName, - EventTime: time.Now(), - } -} - -func (e EventEmitter) EmitEvent(ctx context.Context, event models.JobEvent) error { - return e.eventConsumer.HandleJobEvent(ctx, event) -} - -func (e EventEmitter) EmitEventSilently(ctx context.Context, event models.JobEvent) { - err := e.EmitEvent(ctx, event) - if err != nil { - log.Ctx(ctx).Error().Err(err).Msgf("failed to emit event %+v", event) - } -} diff --git a/pkg/orchestrator/planner/event_emitter.go b/pkg/orchestrator/planner/event_emitter.go deleted file mode 100644 index e7ea6bea28..0000000000 --- a/pkg/orchestrator/planner/event_emitter.go +++ /dev/null @@ -1,54 +0,0 @@ -package planner - -import ( - "context" - "time" - - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/orchestrator" -) - -// EventEmitter is a planner implementation that emits events based on the job state. -type EventEmitter struct { - id string - eventEmitter orchestrator.EventEmitter -} - -// EventEmitterParams holds the parameters for creating a new EventEmitter. -type EventEmitterParams struct { - ID string - EventEmitter orchestrator.EventEmitter -} - -// NewEventEmitter creates a new instance of EventEmitter. -func NewEventEmitter(params EventEmitterParams) *EventEmitter { - return &EventEmitter{ - id: params.ID, - eventEmitter: params.EventEmitter, - } -} - -// Process updates the state of the executions in the plan according to the scheduler's desired state. -func (s *EventEmitter) Process(ctx context.Context, plan *models.Plan) error { - var eventName models.JobEventType - switch plan.DesiredJobState { - case models.JobStateTypeCompleted: - eventName = models.JobEventCompleted - case models.JobStateTypeFailed: - eventName = models.JobEventError - default: - } - if !eventName.IsUndefined() { - s.eventEmitter.EmitEventSilently(ctx, models.JobEvent{ - SourceNodeID: s.id, - JobID: plan.Job.ID, - Status: plan.UpdateMessage, - EventName: eventName, - EventTime: time.Now(), - }) - } - return nil -} - -// compile-time check whether the EventEmitter implements the Planner interface. -var _ orchestrator.Planner = (*EventEmitter)(nil)