Skip to content

Commit

Permalink
feat(optimus): expose configuring grpc max receive size (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
StewartJingga authored Jun 13, 2022
1 parent b94d77f commit a358b93
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 16 deletions.
1 change: 1 addition & 0 deletions plugins/extractors/optimus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ source:
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `host` | `string` | `optimus.com:80` | Optimus' GRPC host | *required* |
| `max_size_in_mb` | `int` | `45` | Max megabytes for GRPC client to receive message. Default to 45. | |

## Outputs

Expand Down
24 changes: 14 additions & 10 deletions plugins/extractors/optimus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ import (
)

const (
service = "optimus"
GRPCMaxClientSendSize = 45 << 20 // 45MB
GRPCMaxClientRecvSize = 45 << 20 // 45MB
GRPCMaxRetry uint = 3
service = "optimus"
GRPCMaxClientSendSizeMB = 45
GRPCMaxClientRecvSizeMB = 45
GRPCMaxRetry uint = 3
)

type Client interface {
pb.NamespaceServiceClient
pb.ProjectServiceClient
pb.JobSpecificationServiceClient
pb.JobRunServiceClient
Connect(ctx context.Context, host string) error
Connect(ctx context.Context, host string, maxSizeInMB int) error
Close() error
}

Expand All @@ -41,11 +41,11 @@ type client struct {
conn *grpc.ClientConn
}

func (c *client) Connect(ctx context.Context, host string) (err error) {
func (c *client) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) {
dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2)
defer dialCancel()

if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil {
if c.conn, err = c.createConnection(dialTimeoutCtx, host, maxSizeInMB); err != nil {
err = errors.Wrap(err, "error creating connection")
return
}
Expand All @@ -62,7 +62,11 @@ func (c *client) Close() error {
return c.conn.Close()
}

func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) {
func (c *client) createConnection(ctx context.Context, host string, maxSizeInMB int) (*grpc.ClientConn, error) {
if maxSizeInMB <= 0 {
maxSizeInMB = GRPCMaxClientRecvSizeMB
}

retryOpts := []grpc_retry.CallOption{
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)),
grpc_retry.WithMax(GRPCMaxRetry),
Expand All @@ -72,8 +76,8 @@ func (c *client) createConnection(ctx context.Context, host string) (*grpc.Clien
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize),
grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize),
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSizeMB<<20),
grpc.MaxCallRecvMsgSize(maxSizeInMB<<20),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(retryOpts...),
Expand Down
5 changes: 3 additions & 2 deletions plugins/extractors/optimus/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ var summary string

// Config holds the set of configuration for the bigquery extractor
type Config struct {
Host string `mapstructure:"host" validate:"required"`
Host string `mapstructure:"host" validate:"required"`
MaxSizeInMB int `mapstructure:"max_size_in_mb"`
}

var sampleConfig = `
Expand Down Expand Up @@ -64,7 +65,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})
return plugins.InvalidConfigError{}
}

if err := e.client.Connect(ctx, e.config.Host); err != nil {
if err := e.client.Connect(ctx, e.config.Host, e.config.MaxSizeInMB); err != nil {
return errors.Wrap(err, "error connecting to host")
}

Expand Down
8 changes: 4 additions & 4 deletions plugins/extractors/optimus/optimus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestInit(t *testing.T) {
ctx := context.TODO()

client := new(mockClient)
client.On("Connect", ctx, validConfig["host"]).Return(nil)
client.On("Connect", ctx, validConfig["host"], 0).Return(nil)
defer client.AssertExpectations(t)

extr := optimus.New(testutils.Logger, client)
Expand Down Expand Up @@ -78,8 +78,8 @@ type mockClient struct {
mock.Mock
}

func (c *mockClient) Connect(ctx context.Context, host string) (err error) {
args := c.Called(ctx, host)
func (c *mockClient) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) {
args := c.Called(ctx, host, maxSizeInMB)

return args.Error(0)
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *mockClient) GetJobTask(
}

func setupExtractExpectation(ctx context.Context, client *mockClient) {
client.On("Connect", ctx, validConfig["host"]).Return(nil).Once()
client.On("Connect", ctx, validConfig["host"], 0).Return(nil).Once()

client.On("ListProjects", ctx, &pb.ListProjectsRequest{}, mock.Anything).Return(&pb.ListProjectsResponse{
Projects: []*pb.ProjectSpecification{
Expand Down

0 comments on commit a358b93

Please sign in to comment.