From 0d09adbe7847014e4d7e7feb20ac252bd0e5ff4a Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Thu, 7 Dec 2023 12:24:21 +0000 Subject: [PATCH] Copy large inline sources to IPFS (#3073) Currently an inline attachment sent to the requester endpoint will be replaced with an IPFS CID if the inline source is above a certain source. We want to have the same functionality in the orchestrator endpoint so that we can submit jobs without concern that we are inflating the size of the data store unnecessarily. The implementation of the transformer is copied from the requester node --- pkg/node/requester.go | 1 + pkg/orchestrator/endpoint_test.go | 148 ++++++++++++++++++++++++ pkg/orchestrator/transformer/storage.go | 51 ++++++++ 3 files changed, 200 insertions(+) create mode 100644 pkg/orchestrator/endpoint_test.go create mode 100644 pkg/orchestrator/transformer/storage.go diff --git a/pkg/node/requester.go b/pkg/node/requester.go index 5c77572dfb..83884da4b9 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -256,6 +256,7 @@ func NewRequesterNode( transformer.NameOptional(), transformer.DefaultsApplier(requesterConfig.JobDefaults), transformer.RequesterInfo(host.ID().String(), marshaledPublicKey), + transformer.NewInlineStoragePinner(storageProviders), }, ResultTransformer: resultTransformers, }) diff --git a/pkg/orchestrator/endpoint_test.go b/pkg/orchestrator/endpoint_test.go new file mode 100644 index 0000000000..eb923e5007 --- /dev/null +++ b/pkg/orchestrator/endpoint_test.go @@ -0,0 +1,148 @@ +//go:build integration || !unit + +package orchestrator_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + + "encoding/base64" + + "github.com/bacalhau-project/bacalhau/pkg/config/types" + "github.com/bacalhau-project/bacalhau/pkg/eventhandler" + "github.com/bacalhau-project/bacalhau/pkg/ipfs" + "github.com/bacalhau-project/bacalhau/pkg/jobstore/inmemory" + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/node" + "github.com/bacalhau-project/bacalhau/pkg/orchestrator" + "github.com/bacalhau-project/bacalhau/pkg/orchestrator/transformer" + "github.com/bacalhau-project/bacalhau/pkg/system" + "github.com/stretchr/testify/suite" + gomock "go.uber.org/mock/gomock" +) + +type EndpointSuite struct { + suite.Suite + + ctx context.Context + cm *system.CleanupManager + + client ipfs.Client + node *ipfs.Node +} + +func TestEndpointSuite(t *testing.T) { + suite.Run(t, new(EndpointSuite)) +} + +func (s *EndpointSuite) SetupSuite() { + s.ctx = context.Background() + s.cm = system.NewCleanupManager() + + node, _ := ipfs.NewNodeWithConfig(s.ctx, s.cm, types.IpfsConfig{PrivateInternal: true}) + s.node = node + + s.client = ipfs.NewClient(s.node.Client().API) +} + +func (s *EndpointSuite) TearDownSuite() { + s.node.Close(s.ctx) +} + +func (s *EndpointSuite) TestInlinePinnerTransformInSubmit() { + ctrl := gomock.NewController(s.T()) + defer ctrl.Finish() + + tracerContextProvider := eventhandler.NewTracerContextProvider("test") + localJobEventConsumer := eventhandler.NewChainedJobEventHandler(tracerContextProvider) + eventEmitter := orchestrator.NewEventEmitter(orchestrator.EventEmitterParams{ + EventConsumer: localJobEventConsumer, + }) + + storageFactory := node.NewStandardStorageProvidersFactory() + storageProviders, err := storageFactory.Get(s.ctx, node.NodeConfig{ + CleanupManager: s.cm, + IPFSClient: s.client, + }) + s.Require().NoError(err) + + evalBroker := orchestrator.NewMockEvaluationBroker(ctrl) + evalBroker.EXPECT().Enqueue(gomock.Any()).Return(nil) + + endpoint := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ + ID: "test_endpoint", + EvaluationBroker: evalBroker, + Store: inmemory.NewInMemoryJobStore(), + EventEmitter: eventEmitter, + JobTransformer: transformer.ChainedTransformer[*models.Job]{ + transformer.JobFn(transformer.IDGenerator), + transformer.NewInlineStoragePinner(storageProviders), + }, + }) + + sb := strings.Builder{} + sb.Grow(1024 * 10) + for i := 0; i < 1024; i++ { + _, _ = sb.WriteString("HelloWorld") + } + base64Content := base64.StdEncoding.EncodeToString([]byte(sb.String())) + + request := &orchestrator.SubmitJobRequest{ + &models.Job{ + Name: "testjob", + Type: "batch", + Tasks: []*models.Task{ + &models.Task{ + Name: "Task 1", + Engine: &models.SpecConfig{Type: models.EngineNoop}, + InputSources: []*models.InputSource{ + &models.InputSource{ + Source: &models.SpecConfig{ + Type: models.StorageSourceInline, + Params: map[string]interface{}{ + "URL": fmt.Sprintf("data:text/html;base64,%s", base64Content), + }, + }, + Target: "fake-target", + }, + }, + }, + }, + }, + } + + response, err := endpoint.SubmitJob(s.ctx, request) + s.Require().NoError(err) + s.Require().NotEmpty(response.JobID) + s.Require().NotEmpty(response.EvaluationID) + s.Require().Empty(response.Warnings) + + // Because we are using pointers and calling directly we expect the job in the request + // to have been transformed, with the input source now being IPFS. + isource := request.Job.Task().InputSources[0] + s.Require().Equal(isource.Source.Type, "ipfs") + s.Require().NotEmpty(isource.Source.Params["CID"]) + + cid := isource.Source.Params["CID"].(string) + + // We can't rely on GetCidSize to retrieve an accurate file size, it seems to + // be out by 11 bytes, so we'll actually fetch the content + // size, err := s.client.GetCidSize(s.ctx, cid) + // s.Require().NoError(err) + // s.Require().Equal(sb.Len(), size) // 10240 == 10251? + + tmpDir := s.T().TempDir() + target := filepath.Join(tmpDir, "inline-transform-test.txt") + + err = s.client.Get(s.ctx, cid, target) + s.Require().NoError(err) + + fileinfo, err := os.Stat(target) + s.Require().NoError(err) + s.Require().Equal(int64(sb.Len()), fileinfo.Size()) + +} diff --git a/pkg/orchestrator/transformer/storage.go b/pkg/orchestrator/transformer/storage.go new file mode 100644 index 0000000000..836d64d29d --- /dev/null +++ b/pkg/orchestrator/transformer/storage.go @@ -0,0 +1,51 @@ +package transformer + +import ( + "context" + + "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" + modelsutils "github.com/bacalhau-project/bacalhau/pkg/models/utils" + "github.com/bacalhau-project/bacalhau/pkg/storage" + "github.com/bacalhau-project/bacalhau/pkg/storage/copy" + "github.com/c2h5oh/datasize" + "github.com/rs/zerolog/log" +) + +// The maximum size that an individual inline storage spec and all inline +// storage specs (respectively) can take up before being pinned to IPFS +// storage. +const ( + maximumIndividualSpec datasize.ByteSize = 4 * datasize.KB + maximumTotalSpec datasize.ByteSize = 4 * datasize.KB +) + +// NewInlineStoragePinner returns a job transformer that limits the inline space +// taken up by inline data. It will scan a job for StorageSpec objects that +// store their data inline and move any that are too large into IPFS storage. It +// also limits the total size taken up by inline specs and if this value is +// exceeded it will move the largest specs into IPFS. +func NewInlineStoragePinner(provider storage.StorageProvider) JobTransformer { + f := func(ctx context.Context, j *models.Job) error { + hasInline := provider.Has(ctx, model.StorageSourceInline.String()) + hasIPFS := provider.Has(ctx, model.StorageSourceIPFS.String()) + if !hasInline || !hasIPFS { + log.Ctx(ctx).Warn().Msg("Skipping inline data transform because storage not installed") + return nil + } + + _, err := copy.CopyOversize( + ctx, + provider, + modelsutils.AllInputSources(j), + model.StorageSourceInline.String(), + model.StorageSourceIPFS.String(), + maximumIndividualSpec, + maximumTotalSpec, + ) + + return err + } + + return JobFn(f) +}