Skip to content

Commit

Permalink
Copy large inline sources to IPFS (#3073)
Browse files Browse the repository at this point in the history
Currently an inline attachment sent to the requester endpoint will be
replaced with an IPFS CID if the inline source is above a certain
source. We want to have the same functionality in the orchestrator
endpoint so that we can submit jobs without concern that we are
inflating the size of the data store unnecessarily.

The implementation of the transformer is copied from the requester node
  • Loading branch information
rossjones authored Dec 7, 2023
1 parent f75b705 commit 0d09adb
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func NewRequesterNode(
transformer.NameOptional(),
transformer.DefaultsApplier(requesterConfig.JobDefaults),
transformer.RequesterInfo(host.ID().String(), marshaledPublicKey),
transformer.NewInlineStoragePinner(storageProviders),
},
ResultTransformer: resultTransformers,
})
Expand Down
148 changes: 148 additions & 0 deletions pkg/orchestrator/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//go:build integration || !unit

package orchestrator_test

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"encoding/base64"

"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/eventhandler"
"github.com/bacalhau-project/bacalhau/pkg/ipfs"
"github.com/bacalhau-project/bacalhau/pkg/jobstore/inmemory"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/node"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/transformer"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/stretchr/testify/suite"
gomock "go.uber.org/mock/gomock"
)

type EndpointSuite struct {
suite.Suite

ctx context.Context
cm *system.CleanupManager

client ipfs.Client
node *ipfs.Node
}

func TestEndpointSuite(t *testing.T) {
suite.Run(t, new(EndpointSuite))
}

func (s *EndpointSuite) SetupSuite() {
s.ctx = context.Background()
s.cm = system.NewCleanupManager()

node, _ := ipfs.NewNodeWithConfig(s.ctx, s.cm, types.IpfsConfig{PrivateInternal: true})
s.node = node

s.client = ipfs.NewClient(s.node.Client().API)
}

func (s *EndpointSuite) TearDownSuite() {
s.node.Close(s.ctx)
}

func (s *EndpointSuite) TestInlinePinnerTransformInSubmit() {
ctrl := gomock.NewController(s.T())
defer ctrl.Finish()

tracerContextProvider := eventhandler.NewTracerContextProvider("test")
localJobEventConsumer := eventhandler.NewChainedJobEventHandler(tracerContextProvider)
eventEmitter := orchestrator.NewEventEmitter(orchestrator.EventEmitterParams{
EventConsumer: localJobEventConsumer,
})

storageFactory := node.NewStandardStorageProvidersFactory()
storageProviders, err := storageFactory.Get(s.ctx, node.NodeConfig{
CleanupManager: s.cm,
IPFSClient: s.client,
})
s.Require().NoError(err)

evalBroker := orchestrator.NewMockEvaluationBroker(ctrl)
evalBroker.EXPECT().Enqueue(gomock.Any()).Return(nil)

endpoint := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{
ID: "test_endpoint",
EvaluationBroker: evalBroker,
Store: inmemory.NewInMemoryJobStore(),
EventEmitter: eventEmitter,
JobTransformer: transformer.ChainedTransformer[*models.Job]{
transformer.JobFn(transformer.IDGenerator),
transformer.NewInlineStoragePinner(storageProviders),
},
})

sb := strings.Builder{}
sb.Grow(1024 * 10)
for i := 0; i < 1024; i++ {
_, _ = sb.WriteString("HelloWorld")
}
base64Content := base64.StdEncoding.EncodeToString([]byte(sb.String()))

request := &orchestrator.SubmitJobRequest{
&models.Job{
Name: "testjob",
Type: "batch",
Tasks: []*models.Task{
&models.Task{
Name: "Task 1",
Engine: &models.SpecConfig{Type: models.EngineNoop},
InputSources: []*models.InputSource{
&models.InputSource{
Source: &models.SpecConfig{
Type: models.StorageSourceInline,
Params: map[string]interface{}{
"URL": fmt.Sprintf("data:text/html;base64,%s", base64Content),
},
},
Target: "fake-target",
},
},
},
},
},
}

response, err := endpoint.SubmitJob(s.ctx, request)
s.Require().NoError(err)
s.Require().NotEmpty(response.JobID)
s.Require().NotEmpty(response.EvaluationID)
s.Require().Empty(response.Warnings)

// Because we are using pointers and calling directly we expect the job in the request
// to have been transformed, with the input source now being IPFS.
isource := request.Job.Task().InputSources[0]
s.Require().Equal(isource.Source.Type, "ipfs")
s.Require().NotEmpty(isource.Source.Params["CID"])

cid := isource.Source.Params["CID"].(string)

// We can't rely on GetCidSize to retrieve an accurate file size, it seems to
// be out by 11 bytes, so we'll actually fetch the content
// size, err := s.client.GetCidSize(s.ctx, cid)
// s.Require().NoError(err)
// s.Require().Equal(sb.Len(), size) // 10240 == 10251?

tmpDir := s.T().TempDir()
target := filepath.Join(tmpDir, "inline-transform-test.txt")

err = s.client.Get(s.ctx, cid, target)
s.Require().NoError(err)

fileinfo, err := os.Stat(target)
s.Require().NoError(err)
s.Require().Equal(int64(sb.Len()), fileinfo.Size())

}
51 changes: 51 additions & 0 deletions pkg/orchestrator/transformer/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package transformer

import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/models"
modelsutils "github.com/bacalhau-project/bacalhau/pkg/models/utils"
"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/storage/copy"
"github.com/c2h5oh/datasize"
"github.com/rs/zerolog/log"
)

// The maximum size that an individual inline storage spec and all inline
// storage specs (respectively) can take up before being pinned to IPFS
// storage.
const (
maximumIndividualSpec datasize.ByteSize = 4 * datasize.KB
maximumTotalSpec datasize.ByteSize = 4 * datasize.KB
)

// NewInlineStoragePinner returns a job transformer that limits the inline space
// taken up by inline data. It will scan a job for StorageSpec objects that
// store their data inline and move any that are too large into IPFS storage. It
// also limits the total size taken up by inline specs and if this value is
// exceeded it will move the largest specs into IPFS.
func NewInlineStoragePinner(provider storage.StorageProvider) JobTransformer {
f := func(ctx context.Context, j *models.Job) error {
hasInline := provider.Has(ctx, model.StorageSourceInline.String())
hasIPFS := provider.Has(ctx, model.StorageSourceIPFS.String())
if !hasInline || !hasIPFS {
log.Ctx(ctx).Warn().Msg("Skipping inline data transform because storage not installed")
return nil
}

_, err := copy.CopyOversize(
ctx,
provider,
modelsutils.AllInputSources(j),
model.StorageSourceInline.String(),
model.StorageSourceIPFS.String(),
maximumIndividualSpec,
maximumTotalSpec,
)

return err
}

return JobFn(f)
}

0 comments on commit 0d09adb

Please sign in to comment.