From 6f7ec985341ba2f3d95ee8c73ac8cac96250a4b3 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Mon, 14 Oct 2024 21:55:37 -0700 Subject: [PATCH 1/6] Disable batch option --- exporter/exporterhelper/common.go | 12 +--- .../exporterhelper/internal/base_exporter.go | 18 +---- .../internal/batch_sender_test.go | 71 ++++++++++--------- 3 files changed, 39 insertions(+), 62 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index de9822ee6aa..ab1f0db4e0b 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -61,19 +61,11 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { return internal.WithCapabilities(capabilities) } -// BatcherOption apply changes to batcher sender. -type BatcherOption = internal.BatcherOption - -// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. -func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption { - return internal.WithRequestBatchFuncs(mf, msf) -} - // WithBatcher enables batching for an exporter based on custom request types. // For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and // WithRequestBatchFuncs provided. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { - return internal.WithBatcher(cfg, opts...) +func WithBatcher(cfg exporterbatcher.Config) Option { + return internal.WithBatcher(cfg) } diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index f095bd558a2..bf2af3ec066 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -275,30 +275,14 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } -// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. -func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) BatcherOption { - return func(bs *BatchSender) error { - if mf == nil || msf == nil { - return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions") - } - if bs.mergeFunc != nil || bs.mergeSplitFunc != nil { - return fmt.Errorf("WithRequestBatchFuncs can only be used once with request-based exporters") - } - bs.mergeFunc = mf - bs.mergeSplitFunc = msf - return nil - } -} - // WithBatcher enables batching for an exporter based on custom request types. // For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and // WithRequestBatchFuncs provided. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { +func WithBatcher(cfg exporterbatcher.Config) Option { return func(o *BaseExporter) error { o.BatcherCfg = cfg - o.BatcherOpts = opts return nil } } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 456218fd914..f6d53bca0e0 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -34,20 +34,20 @@ func TestBatchSender_Merge(t *testing.T) { }{ { name: "split_disabled", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + batcherOption: WithBatcher(cfg), }, { name: "split_high_limit", batcherOption: func() Option { c := cfg c.MaxSizeItems = 1000 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + return WithBatcher(c) }(), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) + be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -94,14 +94,14 @@ func TestBatchSender_BatchExportError(t *testing.T) { }{ { name: "merge_only", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + batcherOption: WithBatcher(cfg), }, { name: "merge_without_split_triggered", batcherOption: func() Option { c := cfg c.MaxSizeItems = 200 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + return WithBatcher(c) }(), }, { @@ -109,7 +109,7 @@ func TestBatchSender_BatchExportError(t *testing.T) { batcherOption: func() Option { c := cfg c.MaxSizeItems = 20 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + return WithBatcher(c) }(), expectedRequests: 1, expectedItems: 20, @@ -117,7 +117,7 @@ func TestBatchSender_BatchExportError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) + be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -153,7 +153,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { cfg.MinSizeItems = 5 cfg.MaxSizeItems = 10 cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -190,7 +190,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { func TestBatchSender_Shutdown(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + be := queueBatchExporter(t, WithBatcher(batchCfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -212,7 +212,8 @@ func TestBatchSender_Disabled(t *testing.T) { cfg.Enabled = false cfg.MaxSizeItems = 5 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(cfg)) require.NotNil(t, be) require.NoError(t, err) @@ -241,7 +242,7 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.FlushTimeout = 50 * time.Millisecond cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -260,8 +261,8 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { func TestBatchSender_PostShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(exporterbatcher.NewDefaultConfig())) require.NotNil(t, be) require.NoError(t, err) assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -322,7 +323,8 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(tt.batcherCfg), WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) require.NotNil(t, be) require.NoError(t, err) @@ -377,7 +379,8 @@ func TestBatchSender_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 3 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -407,7 +410,8 @@ func TestBatchSender_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -442,7 +446,8 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -487,18 +492,9 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, expectedErr: false, }, - { - name: "funcs_set_twice", - opts: []Option{ - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc)), - }, - expectedErr: true, - }, { name: "nil_funcs", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, + opts: []Option{WithBatchFuncs(nil, nil), WithBatcher(exporterbatcher.NewDefaultConfig())}, expectedErr: true, }, } @@ -518,7 +514,8 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { func TestBatchSender_UnstartedShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(exporterbatcher.NewDefaultConfig())) require.NoError(t, err) err = be.Shutdown(context.Background()) @@ -542,7 +539,8 @@ func TestBatchSender_ShutdownDeadlock(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -578,7 +576,8 @@ func TestBatchSenderWithTimeout(t *testing.T) { tCfg := NewDefaultTimeoutConfig() tCfg.Timeout = 50 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg), WithTimeout(tCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -637,7 +636,8 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) { bCfg.MinSizeItems = 8 bCfg.FlushTimeout = 50 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) sink := newFakeRequestSink() @@ -668,7 +668,8 @@ func TestBatchSenderTimerFlush(t *testing.T) { bCfg.MinSizeItems = 8 bCfg.FlushTimeout = 100 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) sink := newFakeRequestSink() @@ -703,9 +704,9 @@ func TestBatchSenderTimerFlush(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } -func queueBatchExporter(t *testing.T, batchOption Option) *BaseExporter { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, batchOption, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) +func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter { + opts = append(opts, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, opts...) require.NotNil(t, be) require.NoError(t, err) return be From 5bc5127470fc501030781e15148849d9501dd2a7 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Tue, 15 Oct 2024 13:00:34 -0700 Subject: [PATCH 2/6] Added a change log --- .chloggen/disable-batch-option.yaml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .chloggen/disable-batch-option.yaml diff --git a/.chloggen/disable-batch-option.yaml b/.chloggen/disable-batch-option.yaml new file mode 100644 index 00000000000..a748816e8c2 --- /dev/null +++ b/.chloggen/disable-batch-option.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Disables setting batch option to batch sender directly. + +# One or more tracking issues or pull requests related to the change +issues: [10368] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file From e4b838dbe886967c5b6d7883527cee8a92b0aeea Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 15 Oct 2024 13:06:40 -0700 Subject: [PATCH 3/6] Update .chloggen/disable-batch-option.yaml --- .chloggen/disable-batch-option.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/disable-batch-option.yaml b/.chloggen/disable-batch-option.yaml index a748816e8c2..8e97fc2ce51 100644 --- a/.chloggen/disable-batch-option.yaml +++ b/.chloggen/disable-batch-option.yaml @@ -22,4 +22,4 @@ subtext: # Include 'user' if the change is relevant to end users. # Include 'api' if there is a change to a library API. # Default: '[user]' -change_logs: [api] \ No newline at end of file +change_logs: [api] From 67fee7f568068bed9207e1ffc4f482d57ef65f06 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Tue, 15 Oct 2024 13:14:28 -0700 Subject: [PATCH 4/6] Export WithBatchFunc api --- exporter/exporterhelper/common.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index ab1f0db4e0b..7f396f40776 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -69,3 +69,11 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { func WithBatcher(cfg exporterbatcher.Config) Option { return internal.WithBatcher(cfg) } + +// WithBatchFuncs enables setting custom batch merge functions. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], + msf exporterbatcher.BatchMergeSplitFunc[Request]) Option { + return internal.WithBatchFuncs(mf, msf) +} From d14e14d16237dd3fc293c1164949478c96a8b4b2 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Tue, 15 Oct 2024 14:01:36 -0700 Subject: [PATCH 5/6] Clean up batcher option --- exporter/exporterhelper/internal/base_exporter.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index bf2af3ec066..1aebb318c8f 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -29,9 +29,6 @@ type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender // Option apply changes to BaseExporter. type Option func(*BaseExporter) error -// BatcherOption apply changes to batcher sender. -type BatcherOption func(*BatchSender) error - type BaseExporter struct { component.StartFunc component.ShutdownFunc @@ -64,7 +61,6 @@ type BaseExporter struct { queueCfg exporterqueue.Config queueFactory exporterqueue.Factory[internal.Request] BatcherCfg exporterbatcher.Config - BatcherOpts []BatcherOption } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) { @@ -109,9 +105,6 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe if be.BatcherCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc) - for _, opt := range be.BatcherOpts { - err = multierr.Append(err, opt(bs)) - } if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) } From 918bd38a6eb730598c05b2b1235e800879d583b5 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Tue, 15 Oct 2024 16:49:38 -0700 Subject: [PATCH 6/6] Refined change log --- .chloggen/disable-batch-option.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.chloggen/disable-batch-option.yaml b/.chloggen/disable-batch-option.yaml index 8e97fc2ce51..bf077d21888 100644 --- a/.chloggen/disable-batch-option.yaml +++ b/.chloggen/disable-batch-option.yaml @@ -16,6 +16,9 @@ issues: [10368] # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: + Removed WithRequestBatchFuncs(BatcherOption) in favor of WithBatchFuncs(Option), where | + BatcherOption is a function that operates on batch sender and Option is one that operates | + on BaseExporter # Optional: The change log or logs in which this entry should be included. # e.g. '[user]' or '[user, api]'