Skip to content

Commit

Permalink
improve async error reporting (#4550)
Browse files Browse the repository at this point in the history
Improve async error reporting of executions from compute nodes back to
orchestrators and job store, such as errors related to docker executor,
s3 publisher and input source.

The PR does the following:
1. Enriches S3 errors with AWS error code and more metadata
2. Use the new bacerrors.Error for docker returned errors
3. Add new `ErrorCode` to `models.Event` details, and populate that
value with bacerrors `{Component}:{ErrorCode}`, such as
`S3Publisher:NoSuchBucket` and `Docker:ImageNotFound`
4. Introduced new `Details` field to executions compute state, which
will hold additional metadata about the latest state of the execution,
mainly the `ErrorCode`
5. Publish ErrorCode to otel analytics


### Examples:
#### Bad docker image
```
→ bacalhau docker run non_existent_image

Job successfully submitted. Job ID: j-29a81940-18a2-44b7-b0da-807d45946f45
Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running):

 TIME          EXEC. ID    TOPIC            EVENT
 22:37:32.323              Submission       Job submitted
 22:37:32.340  e-640f0876  Scheduling       Requested execution on n-7c5b7d69
                                            * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507
 22:37:34.569  e-640f0876  Exec Scanning    Error: image not available: "non_existent_image"
                                            Hint: To resolve this, either:
                                            1. Check if the image exists in the registry and the name is correct
                                            2. If the image is private, supply the node with valid Docker login credentials using the
                                            DOCKER_USERNAME and DOCKER_PASSWORD environment variables
                                            * ErrorCode: Docker:ImageNotFound
                                            * Image: non_existent_image
 22:37:34.585  e-a3a3afe2  Scheduling       Requested execution on n-7c5b7d69
                                            * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507
 22:37:36.732  e-a3a3afe2  Exec Scanning    Error: image not available: "non_existent_image"
                                            Hint: To resolve this, either:
                                            1. Check if the image exists in the registry and the name is correct
                                            2. If the image is private, supply the node with valid Docker login credentials using the
                                            DOCKER_USERNAME and DOCKER_PASSWORD environment variables
                                            * ErrorCode: Docker:ImageNotFound
                                            * Image: non_existent_image

Error: job failed

To get more details about the run, execute:
	bacalhau job describe j-29a81940-18a2-44b7-b0da-807d45946f45

To get more details about the run executions, execute:
	bacalhau job executions j-29a81940-18a2-44b7-b0da-807d45946f45


bacalhau job executions j-29a81940-18a2-44b7-b0da-807d45946f45 --output yaml
- AllocatedResources:
    Tasks: {}
  ComputeState:
    Message: 'image not available: "non_existent_image"'
    StateType: 8
  CreateTime: 1727642252340926000
  DesiredState:
    Message: execution failed
    StateType: 2
  EvalID: ecad787d-e72a-4987-b353-cd6552d546bf
  FollowupEvalID: ""
  ID: e-640f0876-119b-40ac-883e-b2126b5a40f3
  JobID: j-29a81940-18a2-44b7-b0da-807d45946f45
  ModifyTime: 1727642254570170000
  Name: ""
  Namespace: default
  NextExecution: ""
  NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507
  PreviousExecution: ""
  PublishedResult:
    Type: ""
  Revision: 3
  RunOutput: null
- AllocatedResources:
    Tasks: {}
  ComputeState:
    Message: 'image not available: "non_existent_image"'
    StateType: 8
  CreateTime: 1727642254585495000
  DesiredState:
    Message: execution failed
    StateType: 2
  EvalID: ef3bae6f-54fb-4f48-9b83-98364049e685
  FollowupEvalID: ""
  ID: e-a3a3afe2-5d12-498f-ad19-86ea00425d30
  JobID: j-29a81940-18a2-44b7-b0da-807d45946f45
  ModifyTime: 1727642256732971000
  Name: ""
  Namespace: default
  NextExecution: ""
  NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507
  PreviousExecution: ""
  PublishedResult:
    Type: ""
  Revision: 3
  RunOutput: null
```

#### Bad S3 bucket
```
→ bacalhau job run docker-s3.yaml
Job successfully submitted. Job ID: j-036bc69b-7b81-489b-a714-d1349d6e6f5b
Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running):

 TIME          EXEC. ID    TOPIC            EVENT
 22:36:57.853              Submission       Job submitted
 22:36:57.868  e-ad0ab10c  Scheduling       Requested execution on n-7c5b7d69
                                            * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507
 22:36:57.929  e-ad0ab10c  Execution        Running
 22:37:03.414  e-ad0ab10c  Publishing       Error: failed to publish s3 result: operation error S3: PutObject, https response error StatusCode:
                           Results          404, RequestID: 62FSTZ2400AA0782, api error NoSuchBucket: The specified bucket does not exist
                                            * AWSRequestID: 62FSTZ2400AA0782
                                            * ErrorCode: S3Publisher:NoSuchBucket
                                            * Operation: PutObject
                                            * Service: S3
 22:37:03.432  e-995b726b  Scheduling       Requested execution on n-7c5b7d69
                                            * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507
 22:37:03.482  e-995b726b  Execution        Running
 22:37:07.085  e-995b726b  Publishing       Error: failed to publish s3 result: operation error S3: PutObject, https response error StatusCode:
                           Results          404, RequestID: YNJQY666GB15CT3K, api error NoSuchBucket: The specified bucket does not exist
                                            * Operation: PutObject
                                            * Service: S3
                                            * AWSRequestID: YNJQY666GB15CT3K
                                            * ErrorCode: S3Publisher:NoSuchBucket

Error: job failed

To get more details about the run, execute:
	bacalhau job describe j-036bc69b-7b81-489b-a714-d1349d6e6f5b

To get more details about the run executions, execute:
	bacalhau job executions j-036bc69b-7b81-489b-a714-d1349d6e6f5b

To download the results, execute:
	bacalhau job get j-036bc69b-7b81-489b-a714-d1349d6e6f5b
```
  • Loading branch information
wdbaruni authored Sep 30, 2024
1 parent 160d9b4 commit 3fd303d
Show file tree
Hide file tree
Showing 25 changed files with 624 additions and 172 deletions.
8 changes: 4 additions & 4 deletions cmd/cli/docker/docker_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (s *DockerRunSuite) TestRun_BadExecutables() {
imageName: "badimage", // Bad image
executable: "ls", // Good executable
isValid: false,
errStringContains: "Could not inspect image",
errStringContains: "image not available",
},
"good-image-bad-executable": {
imageName: "ubuntu", // Good image // TODO we consider an untagged image poor practice, fix this
Expand All @@ -443,7 +443,7 @@ func (s *DockerRunSuite) TestRun_BadExecutables() {
imageName: "badimage", // Bad image
executable: "BADEXECUTABLE", // Bad executable
isValid: false,
errStringContains: "Could not inspect image",
errStringContains: "image not available",
},
}

Expand Down Expand Up @@ -491,8 +491,8 @@ func (s *DockerRunSuite) TestRun_InvalidImage() {
// test. Alternatively, we could reduce the complexity and assert the job
// simply failed which is the expected behaviour for an invalid image
s.Require().Len(info.Executions.Items, 2)
s.Contains(info.Executions.Items[0].ComputeState.Message, `Could not inspect image "@" - could be due to repo/image not existing`)
s.Contains(info.Executions.Items[1].ComputeState.Message, `Could not inspect image "@" - could be due to repo/image not existing`)
s.Contains(info.Executions.Items[0].ComputeState.Message, `invalid image format: "@"`)
s.Contains(info.Executions.Items[1].ComputeState.Message, `invalid image format: "@"`)
}

func (s *DockerRunSuite) TestRun_Timeout_DefaultValue() {
Expand Down
24 changes: 17 additions & 7 deletions pkg/analytics/execution_compute_message.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package analytics

import (
"github.com/bacalhau-project/bacalhau/pkg/models"
)

const ComputeMessageExecutionEventType = "bacalhau.execution_v1.compute_message"

type ExecutionComputeMessage struct {
JobID string `json:"job_id,omitempty"`
ExecutionID string `json:"execution_id,omitempty"`
ComputeMessage string `json:"compute_message,omitempty"`
JobID string `json:"job_id,omitempty"`
ExecutionID string `json:"execution_id,omitempty"`
ComputeMessage string `json:"compute_message,omitempty"`
ComputeErrorCode string `json:"compute_state_error_code,omitempty"`
}

func NewComputeMessageExecutionEvent(jobID string, executionID string, computeMessage string) *Event {
func NewComputeMessageExecutionEvent(e models.Execution) *Event {
var errorCode string
if e.ComputeState.Details != nil {
errorCode = e.ComputeState.Details[models.DetailsKeyErrorCode]
}
return NewEvent(ComputeMessageExecutionEventType, ExecutionComputeMessage{
JobID: jobID,
ExecutionID: executionID,
ComputeMessage: computeMessage,
JobID: e.JobID,
ExecutionID: e.ID,
ComputeMessage: e.ComputeState.Message,
ComputeErrorCode: errorCode,
})
}
25 changes: 21 additions & 4 deletions pkg/analytics/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ type ExecutionEvent struct {

Resources map[string]Resource `json:"resources,omitempty"`

DesiredState string `json:"desired_state,omitempty"`
ComputeState string `json:"compute_state,omitempty"`
DesiredState string `json:"desired_state,omitempty"`
DesiredStateErrorCode string `json:"desired_state_error_code,omitempty"`

ComputeState string `json:"compute_state,omitempty"`
ComputeStateErrorCode string `json:"compute_state_error_code,omitempty"`

PublishedResultType string `json:"publisher_type,omitempty"`

Expand Down Expand Up @@ -77,6 +80,18 @@ func newExecutionEvent(e models.Execution) ExecutionEvent {
exitCode = e.RunOutput.ExitCode
}

var (
desiredStateErrorCode string
computeStateErrorCode string
)

if e.DesiredState.Details != nil {
desiredStateErrorCode = e.DesiredState.Details[models.DetailsKeyErrorCode]
}
if e.ComputeState.Details != nil {
computeStateErrorCode = e.ComputeState.Details[models.DetailsKeyErrorCode]
}

return ExecutionEvent{
// ID fields.
JobID: e.JobID,
Expand All @@ -93,8 +108,10 @@ func newExecutionEvent(e models.Execution) ExecutionEvent {
Resources: resources,

// states.
DesiredState: e.DesiredState.StateType.String(),
ComputeState: e.ComputeState.StateType.String(),
DesiredState: e.DesiredState.StateType.String(),
DesiredStateErrorCode: desiredStateErrorCode,
ComputeState: e.ComputeState.StateType.String(),
ComputeStateErrorCode: computeStateErrorCode,

// publisher if any.
PublishedResultType: e.PublishedResult.Type,
Expand Down
1 change: 1 addition & 0 deletions pkg/bacerrors/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
ConfigurationError ErrorCode = "ConfigurationError"
DatastoreFailure ErrorCode = "DatastoreFailure"
RequestCancelled ErrorCode = "RequestCancelled"
UnknownError ErrorCode = "UnknownError"
)

func Code(code string) ErrorCode {
Expand Down
21 changes: 20 additions & 1 deletion pkg/bacerrors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Error interface {
// WithDetails adds or updates the details associated with the error.
WithDetails(details map[string]string) Error

// WithDetail adds or updates a single detail associated with the error.
WithDetail(key, value string) Error

// WithCode sets the ErrorCode for this error.
WithCode(code ErrorCode) Error

Expand Down Expand Up @@ -143,7 +146,23 @@ func (e *errorImpl) WithFailsExecution() Error {
// WithDetails sets the details field of Error and
// returns the Error itself for chaining.
func (e *errorImpl) WithDetails(details map[string]string) Error {
e.details = details
// merge the new details with the existing details
if e.details == nil {
e.details = make(map[string]string)
}
for k, v := range details {
e.details[k] = v
}
return e
}

// WithDetail adds a single detail to the details field of Error and
// returns the Error itself for chaining.
func (e *errorImpl) WithDetail(key, value string) Error {
if e.details == nil {
e.details = make(map[string]string)
}
e.details[key] = value
return e
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/bacerrors/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,79 @@ func (suite *ErrorTestSuite) TestErrorWithDetails() {
suite.False(err.Retryable())
suite.False(err.FailsExecution())
suite.Equal(details, err.Details())

// Test appending details
additionalDetails := map[string]string{"key3": "value3", "key2": "newvalue2"}
err = err.WithDetails(additionalDetails)

expectedDetails := map[string]string{"key1": "value1", "key2": "newvalue2", "key3": "value3"}
suite.Equal(expectedDetails, err.Details())
}

func (suite *ErrorTestSuite) TestErrorWithDetail() {
message := "TestMessage"
err := New(message).WithDetail("key1", "value1")

suite.Equal(message, err.Error())
suite.Empty(err.Hint())
suite.False(err.Retryable())
suite.False(err.FailsExecution())
suite.Equal(map[string]string{"key1": "value1"}, err.Details())

// Test adding another detail
err = err.WithDetail("key2", "value2")
expectedDetails := map[string]string{"key1": "value1", "key2": "value2"}
suite.Equal(expectedDetails, err.Details())

// Test overwriting an existing detail
err = err.WithDetail("key1", "newvalue1")
expectedDetails = map[string]string{"key1": "newvalue1", "key2": "value2"}
suite.Equal(expectedDetails, err.Details())
}

func (suite *ErrorTestSuite) TestErrorWithCode() {
message := "TestMessage"
err := New(message).WithCode(BadRequestError)

suite.Equal(message, err.Error())
suite.Equal(BadRequestError, err.Code())
suite.Equal(400, err.HTTPStatusCode()) // BadRequestError should map to 400
}

func (suite *ErrorTestSuite) TestErrorWithHTTPStatusCode() {
message := "TestMessage"
err := New(message).WithHTTPStatusCode(418) // I'm a teapot

suite.Equal(message, err.Error())
suite.Equal(418, err.HTTPStatusCode())
}

func (suite *ErrorTestSuite) TestErrorWithComponent() {
message := "TestMessage"
err := New(message).WithComponent("TestComponent")

suite.Equal(message, err.Error())
suite.Equal("TestComponent", err.Component())
}

func (suite *ErrorTestSuite) TestErrorChaining() {
err := New("TestMessage").
WithHint("TestHint").
WithRetryable().
WithFailsExecution().
WithDetail("key1", "value1").
WithCode(NotFoundError).
WithHTTPStatusCode(404).
WithComponent("TestComponent")

suite.Equal("TestMessage", err.Error())
suite.Equal("TestHint", err.Hint())
suite.True(err.Retryable())
suite.True(err.FailsExecution())
suite.Equal(map[string]string{"key1": "value1"}, err.Details())
suite.Equal(NotFoundError, err.Code())
suite.Equal(404, err.HTTPStatusCode())
suite.Equal("TestComponent", err.Component())
}

func (suite *ErrorTestSuite) TestWrapNonBacerror() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/models"
Expand Down Expand Up @@ -314,8 +315,7 @@ func (b Bidder) runResourceBidding(
// calculate resource usage of the job, failure here represents a compute failure.
resourceUsage, err := b.usageCalculator.Calculate(ctx, *job, *resources)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Error calculating resource requirements for job")
return nil, fmt.Errorf("calculating resource usage of job: %w", err)
return nil, bacerrors.Wrap(err, "calculating resource usage of job")
}

// ask the bidding strategy if we should bid on this job
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/capacity/disk/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package disk

import (
"context"
"fmt"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/storage"
Expand Down Expand Up @@ -34,7 +34,7 @@ func (c *DiskUsageCalculator) Calculate(ctx context.Context, job models.Job, par
}
volumeSize, err := strg.GetVolumeSize(ctx, *input)
if err != nil {
return nil, fmt.Errorf("error getting job disk space requirements: %w", err)
return nil, bacerrors.Wrap(err, "error getting job disk space requirements")
}
totalDiskRequirements += volumeSize
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,8 @@ func (e *BaseExecutor) publish(ctx context.Context, localExecutionState store.Lo
}
publishedResult, err := jobPublisher.PublishResult(ctx, execution, resultFolder)
if err != nil {
return nil, fmt.Errorf("failed to publish result: %w", err)
return nil, bacerrors.Wrap(err, "failed to publish result")
}

log.Ctx(ctx).Debug().
Str("execution", execution.ID).
Msg("Execution published")
Expand Down
56 changes: 6 additions & 50 deletions pkg/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,50 +36,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/util/closer"
)

type ImageUnavailableError struct {
Verb string
Image string
Creds config_legacy.DockerCredentials
Err error
}

func (die ImageUnavailableError) Error() string {
return pkgerrors.Wrapf(die.Err,
"Could not %s image %q - could be due to repo/image not existing, "+
"or registry needing authorization",
die.Verb,
die.Image,
).Error()
}

func (die ImageUnavailableError) Hint() string {
if !die.Creds.IsValid() {
return "If the image is private, supply the node with valid Docker login credentials " +
"using the " + config_legacy.DockerUsernameEnvVar + " and " + config_legacy.DockerPasswordEnvVar +
" environment variables"
}

return ""
}

func NewImageInspectError(image string, creds config_legacy.DockerCredentials, err error) error {
return ImageUnavailableError{
Verb: "inspect",
Image: image,
Creds: creds,
Err: err,
}
}

func NewImagePullError(image string, creds config_legacy.DockerCredentials, err error) error {
return ImageUnavailableError{
Verb: "pull",
Image: image,
Creds: creds,
Err: err,
}
}

type Client struct {
tracing.TracedClient
}
Expand All @@ -105,7 +61,7 @@ func (c *Client) HostGatewayIP(ctx context.Context) (net.IP, error) {
return net.IP{}, NewDockerError(err)
}
if configs := response.IPAM.Config; len(configs) < 1 {
return net.IP{}, NewCustomDockerError(DockerBridgeNetworkUnattached, "bridge network unattached")
return net.IP{}, NewCustomDockerError(BridgeNetworkUnattached, "bridge network unattached")
} else {
return net.ParseIP(configs[0].Gateway), nil
}
Expand Down Expand Up @@ -166,7 +122,7 @@ func (c *Client) FindContainer(ctx context.Context, label string, value string)
}
}

return "", NewCustomDockerError(DockerContainerNotFound, fmt.Sprintf("unable to find container for %s=%s", label, value))
return "", NewCustomDockerError(ContainerNotFound, fmt.Sprintf("unable to find container for %s=%s", label, value))
}

func (c *Client) FollowLogs(ctx context.Context, id string) (stdout, stderr io.Reader, err error) {
Expand Down Expand Up @@ -214,7 +170,7 @@ func (c *Client) GetOutputStream(ctx context.Context, id string, since string, f
}

if !cont.State.Running {
return nil, NewCustomDockerError(DockerContainerNotRunning, "cannot get logs when container is not running")
return nil, NewCustomDockerError(ContainerNotRunning, "cannot get logs when container is not running")
}

logOptions := container.LogsOptions{
Expand Down Expand Up @@ -268,7 +224,7 @@ func (c *Client) ImagePlatforms(ctx context.Context, image string, dockerCreds c

distribution, err := c.DistributionInspect(ctx, image, authToken)
if err != nil {
return nil, NewImageInspectError(image, dockerCreds, err)
return nil, NewDockerImageError(err, image)
}

return distribution.Platforms, nil
Expand Down Expand Up @@ -363,7 +319,7 @@ func (c *Client) ImageDistribution(
digestParts := strings.Split(repos[0], "@")
digest, err := digest.Parse(digestParts[1])
if err != nil {
return nil, NewCustomDockerError(DockerImageDigestMismatch, "image digest mismatch")
return nil, NewCustomDockerError(ImageDigestMismatch, "image digest mismatch")
}

return &ImageManifest{
Expand All @@ -382,7 +338,7 @@ func (c *Client) ImageDistribution(
authToken := getAuthToken(ctx, image, creds)
dist, err := c.DistributionInspect(ctx, image, authToken)
if err != nil {
return nil, NewImageInspectError(image, creds, err)
return nil, NewDockerImageError(err, image)
}

obj := dist.Descriptor.Digest
Expand Down
Loading

0 comments on commit 3fd303d

Please sign in to comment.