diff --git a/plugins/extractors/optimus/README.md b/plugins/extractors/optimus/README.md index cafacd44..5801fb2e 100644 --- a/plugins/extractors/optimus/README.md +++ b/plugins/extractors/optimus/README.md @@ -14,6 +14,7 @@ source: | Key | Value | Example | Description | | | :-- | :---- | :------ | :---------- | :- | | `host` | `string` | `optimus.com:80` | Optimus' GRPC host | *required* | +| `max_size_in_mb` | `int` | `45` | Max megabytes for GRPC client to receive message. Default to 45. | | ## Outputs diff --git a/plugins/extractors/optimus/client.go b/plugins/extractors/optimus/client.go index 7fa612f4..12048025 100644 --- a/plugins/extractors/optimus/client.go +++ b/plugins/extractors/optimus/client.go @@ -14,10 +14,10 @@ import ( ) const ( - service = "optimus" - GRPCMaxClientSendSize = 45 << 20 // 45MB - GRPCMaxClientRecvSize = 45 << 20 // 45MB - GRPCMaxRetry uint = 3 + service = "optimus" + GRPCMaxClientSendSizeMB = 45 + GRPCMaxClientRecvSizeMB = 45 + GRPCMaxRetry uint = 3 ) type Client interface { @@ -25,7 +25,7 @@ type Client interface { pb.ProjectServiceClient pb.JobSpecificationServiceClient pb.JobRunServiceClient - Connect(ctx context.Context, host string) error + Connect(ctx context.Context, host string, maxSizeInMB int) error Close() error } @@ -41,11 +41,11 @@ type client struct { conn *grpc.ClientConn } -func (c *client) Connect(ctx context.Context, host string) (err error) { +func (c *client) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) { dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2) defer dialCancel() - if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil { + if c.conn, err = c.createConnection(dialTimeoutCtx, host, maxSizeInMB); err != nil { err = errors.Wrap(err, "error creating connection") return } @@ -62,7 +62,11 @@ func (c *client) Close() error { return c.conn.Close() } -func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) { +func (c *client) createConnection(ctx context.Context, host string, maxSizeInMB int) (*grpc.ClientConn, error) { + if maxSizeInMB <= 0 { + maxSizeInMB = GRPCMaxClientRecvSizeMB + } + retryOpts := []grpc_retry.CallOption{ grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)), grpc_retry.WithMax(GRPCMaxRetry), @@ -72,8 +76,8 @@ func (c *client) createConnection(ctx context.Context, host string) (*grpc.Clien grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize), - grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize), + grpc.MaxCallSendMsgSize(GRPCMaxClientSendSizeMB<<20), + grpc.MaxCallRecvMsgSize(maxSizeInMB<<20), ), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor(retryOpts...), diff --git a/plugins/extractors/optimus/optimus.go b/plugins/extractors/optimus/optimus.go index 4c6c3635..efee1091 100644 --- a/plugins/extractors/optimus/optimus.go +++ b/plugins/extractors/optimus/optimus.go @@ -23,7 +23,8 @@ var summary string // Config holds the set of configuration for the bigquery extractor type Config struct { - Host string `mapstructure:"host" validate:"required"` + Host string `mapstructure:"host" validate:"required"` + MaxSizeInMB int `mapstructure:"max_size_in_mb"` } var sampleConfig = ` @@ -64,7 +65,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) return plugins.InvalidConfigError{} } - if err := e.client.Connect(ctx, e.config.Host); err != nil { + if err := e.client.Connect(ctx, e.config.Host, e.config.MaxSizeInMB); err != nil { return errors.Wrap(err, "error connecting to host") } diff --git a/plugins/extractors/optimus/optimus_test.go b/plugins/extractors/optimus/optimus_test.go index efb995a4..0f915a21 100644 --- a/plugins/extractors/optimus/optimus_test.go +++ b/plugins/extractors/optimus/optimus_test.go @@ -38,7 +38,7 @@ func TestInit(t *testing.T) { ctx := context.TODO() client := new(mockClient) - client.On("Connect", ctx, validConfig["host"]).Return(nil) + client.On("Connect", ctx, validConfig["host"], 0).Return(nil) defer client.AssertExpectations(t) extr := optimus.New(testutils.Logger, client) @@ -78,8 +78,8 @@ type mockClient struct { mock.Mock } -func (c *mockClient) Connect(ctx context.Context, host string) (err error) { - args := c.Called(ctx, host) +func (c *mockClient) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) { + args := c.Called(ctx, host, maxSizeInMB) return args.Error(0) } @@ -118,7 +118,7 @@ func (c *mockClient) GetJobTask( } func setupExtractExpectation(ctx context.Context, client *mockClient) { - client.On("Connect", ctx, validConfig["host"]).Return(nil).Once() + client.On("Connect", ctx, validConfig["host"], 0).Return(nil).Once() client.On("ListProjects", ctx, &pb.ListProjectsRequest{}, mock.Anything).Return(&pb.ListProjectsResponse{ Projects: []*pb.ProjectSpecification{