From 89d1e79ff22b5446d4db7e63400a173b72540c14 Mon Sep 17 00:00:00 2001 From: thomasgouveia Date: Tue, 27 Aug 2024 11:27:55 +0200 Subject: [PATCH 1/7] feat(#5408): add `otlplogfile` exporter This commit adds a new experimental exporter `otlplogfile`, that outputs log records to a JSON line file. It is based on the following specification: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md Signed-off-by: thomasgouveia --- exporters/otlp/otlplog/otlplogfile/README.md | 3 + exporters/otlp/otlplog/otlplogfile/config.go | 54 +++ exporters/otlp/otlplog/otlplogfile/doc.go | 12 + .../otlp/otlplog/otlplogfile/exporter.go | 95 ++++ .../otlp/otlplog/otlplogfile/exporter_test.go | 101 +++++ exporters/otlp/otlplog/otlplogfile/go.mod | 37 ++ exporters/otlp/otlplog/otlplogfile/go.sum | 25 ++ .../otlp/otlplog/otlplogfile/internal/gen.go | 9 + .../internal/transform/attr_test.go | 186 ++++++++ .../otlplogfile/internal/transform/log.go | 411 ++++++++++++++++++ .../internal/transform/log_attr_test.go | 149 +++++++ .../internal/transform/log_test.go | 246 +++++++++++ .../otlplogfile/internal/writer/buffered.go | 53 +++ .../internal/writer/buffered_test.go | 75 ++++ .../otlplogfile/internal/writer/writer.go | 114 +++++ .../internal/writer/writer_test.go | 110 +++++ exporters/otlp/otlplog/otlplogfile/version.go | 9 + .../otlp/otlplog/otlplogfile/version_test.go | 21 + versions.yaml | 1 + 19 files changed, 1711 insertions(+) create mode 100644 exporters/otlp/otlplog/otlplogfile/README.md create mode 100644 exporters/otlp/otlplog/otlplogfile/config.go create mode 100644 exporters/otlp/otlplog/otlplogfile/doc.go create mode 100644 exporters/otlp/otlplog/otlplogfile/exporter.go create mode 100644 exporters/otlp/otlplog/otlplogfile/exporter_test.go create mode 100644 exporters/otlp/otlplog/otlplogfile/go.mod create mode 100644 exporters/otlp/otlplog/otlplogfile/go.sum create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/gen.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/transform/attr_test.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/transform/log.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/transform/log_attr_test.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/transform/log_test.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/writer/buffered.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/writer/buffered_test.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go create mode 100644 exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go create mode 100644 exporters/otlp/otlplog/otlplogfile/version.go create mode 100644 exporters/otlp/otlplog/otlplogfile/version_test.go diff --git a/exporters/otlp/otlplog/otlplogfile/README.md b/exporters/otlp/otlplog/otlplogfile/README.md new file mode 100644 index 00000000000..5f720ff2bd0 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/README.md @@ -0,0 +1,3 @@ +# OTLP Log File Exporter + +[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile)](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile) diff --git a/exporters/otlp/otlplog/otlplogfile/config.go b/exporters/otlp/otlplog/otlplogfile/config.go new file mode 100644 index 00000000000..5b0cf3507ab --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/config.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" + +import "time" + +type fnOpt func(config) config + +func (f fnOpt) applyOption(c config) config { return f(c) } + +// Option sets the configuration value for an Exporter. +type Option interface { + applyOption(config) config +} + +// config contains options for the OTLP Log file exporter. +type config struct { + // Path to a file on disk where records must be appended. + // This file is preferably a json line file as stated in the specification. + // See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file + // See: https://jsonlines.org + path string + // Duration represents the interval when the buffer should be flushed. + flushInterval time.Duration +} + +func newConfig(options []Option) config { + c := config{ + path: "/var/log/opentelemetry/logs.jsonl", + flushInterval: 5 * time.Second, + } + for _, opt := range options { + c = opt.applyOption(c) + } + return c +} + +// WithFlushInterval configures the duration after which the buffer is periodically flushed to the disk. +func WithFlushInterval(flushInterval time.Duration) Option { + return fnOpt(func(c config) config { + c.flushInterval = flushInterval + return c + }) +} + +// WithPath defines a path to a file where the log records will be written. +// If not set, will default to /var/log/opentelemetry/logs.jsonl. +func WithPath(path string) Option { + return fnOpt(func(c config) config { + c.path = path + return c + }) +} diff --git a/exporters/otlp/otlplog/otlplogfile/doc.go b/exporters/otlp/otlplog/otlplogfile/doc.go new file mode 100644 index 00000000000..8be6faf03c6 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/doc.go @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +/* +Package otlplogfile provides an OTLP log exporter that outputs log records to a JSON line file. The exporter uses a buffered +file writer to write log records to file to reduce I/O and improve performance. + +All Exporters must be created with [New]. + +See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md +*/ +package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" diff --git a/exporters/otlp/otlplog/otlplogfile/exporter.go b/exporters/otlp/otlplog/otlplogfile/exporter.go new file mode 100644 index 00000000000..776f3893506 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/exporter.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" + +import ( + "context" + "sync" + + "google.golang.org/protobuf/encoding/protojson" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/transform" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer" + "go.opentelemetry.io/otel/sdk/log" + lpb "go.opentelemetry.io/proto/otlp/logs/v1" +) + +// Exporter is an OpenTelemetry log exporter that outputs log records +// into JSON files. The implementation is based on the specification +// defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md +type Exporter struct { + mu sync.Mutex + fw *writer.FileWriter + stopped bool +} + +// Compile-time check that the implementation satisfies the interface. +var _ log.Exporter = &Exporter{} + +// New returns a new [Exporter]. +func New(options ...Option) (*Exporter, error) { + cfg := newConfig(options) + + fw, err := writer.NewFileWriter(cfg.path, cfg.flushInterval) + if err != nil { + return nil, err + } + + return &Exporter{ + fw: fw, + stopped: false, + }, nil +} + +// Export exports logs records to the file. +func (e *Exporter) Export(ctx context.Context, records []log.Record) error { + // Honor context cancellation + if err := ctx.Err(); err != nil { + return err + } + + e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return nil + } + + data := &lpb.LogsData{ + ResourceLogs: transform.ResourceLogs(records), + } + + by, err := protojson.Marshal(data) + if err != nil { + return err + } + + return e.fw.Export(by) +} + +// ForceFlush flushes data to the file. +func (e *Exporter) ForceFlush(_ context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return nil + } + + return e.fw.Flush() +} + +// Shutdown shuts down the exporter. Buffered data is written to disk, +// and opened resources such as file will be closed. +func (e *Exporter) Shutdown(_ context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return nil + } + + e.stopped = true + return e.fw.Shutdown() +} diff --git a/exporters/otlp/otlplog/otlplogfile/exporter_test.go b/exporters/otlp/otlplog/otlplogfile/exporter_test.go new file mode 100644 index 00000000000..7102761d4e7 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/exporter_test.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" +import ( + "context" + "fmt" + "os" + "path" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/log" + + sdklog "go.opentelemetry.io/otel/sdk/log" +) + +// tempFile creates a temporary file for the given test case and returns its path on disk. +// The file is automatically cleaned up when the test ends. +func tempFile(tb testing.TB) string { + f, err := os.CreateTemp(tb.TempDir(), tb.Name()) + assert.NoError(tb, err, "must not error when creating temporary file") + tb.Cleanup(func() { + assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written") + }) + return f.Name() +} + +// makeRecords is a helper function to generate an array of log record with the desired size. +func makeRecords(count int, message string) []sdklog.Record { + var records []sdklog.Record + for i := 0; i < count; i++ { + r := sdklog.Record{} + r.SetSeverityText("INFO") + r.SetSeverity(log.SeverityInfo) + r.SetBody(log.StringValue(message)) + r.SetTimestamp(time.Now()) + r.SetObservedTimestamp(time.Now()) + records = append(records, r) + } + return records +} + +func TestExporter(t *testing.T) { + filepath := tempFile(t) + records := makeRecords(1, "hello, world!") + + exporter, err := New(WithPath(filepath)) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, exporter.Shutdown(context.TODO())) + }) + + err = exporter.Export(context.TODO(), records) + assert.NoError(t, err) + err = exporter.ForceFlush(context.TODO()) + assert.NoError(t, err) +} + +func TestExporterConcurrentSafe(t *testing.T) { + filepath := tempFile(t) + exporter, err := New(WithPath(filepath)) + require.NoError(t, err, "New()") + + const goroutines = 10 + + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + runs := new(uint64) + for i := 0; i < goroutines; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + _ = exporter.Export(ctx, makeRecords(1, fmt.Sprintf("log from goroutine %d", i))) + _ = exporter.ForceFlush(ctx) + atomic.AddUint64(runs, 1) + } + } + }() + } + + for atomic.LoadUint64(runs) == 0 { + runtime.Gosched() + } + + assert.NoError(t, exporter.Shutdown(ctx), "must not error when shutting down") + cancel() + wg.Wait() +} diff --git a/exporters/otlp/otlplog/otlplogfile/go.mod b/exporters/otlp/otlplog/otlplogfile/go.mod new file mode 100644 index 00000000000..24a13b3f912 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/go.mod @@ -0,0 +1,37 @@ +module go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile + +go 1.22 + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel/log v0.5.0 + go.opentelemetry.io/otel/sdk v1.29.0 + go.opentelemetry.io/otel/sdk/log v0.4.0 + go.opentelemetry.io/otel/trace v1.29.0 + go.opentelemetry.io/proto/otlp v1.3.1 + google.golang.org/protobuf v1.34.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + golang.org/x/sys v0.24.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/otel => ../../../.. + +replace go.opentelemetry.io/otel/sdk/log => ../../../../sdk/log + +replace go.opentelemetry.io/otel/sdk => ../../../../sdk + +replace go.opentelemetry.io/otel/log => ../../../../log + +replace go.opentelemetry.io/otel/trace => ../../../../trace + +replace go.opentelemetry.io/otel/metric => ../../../../metric diff --git a/exporters/otlp/otlplog/otlplogfile/go.sum b/exporters/otlp/otlplog/otlplogfile/go.sum new file mode 100644 index 00000000000..d6412c2392c --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/go.sum @@ -0,0 +1,25 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/exporters/otlp/otlplog/otlplogfile/internal/gen.go b/exporters/otlp/otlplog/otlplogfile/internal/gen.go new file mode 100644 index 00000000000..1e49596a597 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/gen.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal" + +//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/attr_test.go.tmpl "--data={}" --out=transform/attr_test.go +//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/log.go.tmpl "--data={}" --out=transform/log.go +//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/log_attr_test.go.tmpl "--data={}" --out=transform/log_attr_test.go +//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlplog/transform/log_test.go.tmpl "--data={}" --out=transform/log_test.go diff --git a/exporters/otlp/otlplog/otlplogfile/internal/transform/attr_test.go b/exporters/otlp/otlplog/otlplogfile/internal/transform/attr_test.go new file mode 100644 index 00000000000..da5bf0e7c9c --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/transform/attr_test.go @@ -0,0 +1,186 @@ +// Code created by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/otlplog/transform/attr_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transform + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" + cpb "go.opentelemetry.io/proto/otlp/common/v1" +) + +var ( + attrBool = attribute.Bool("bool", true) + attrBoolSlice = attribute.BoolSlice("bool slice", []bool{true, false}) + attrInt = attribute.Int("int", 1) + attrIntSlice = attribute.IntSlice("int slice", []int{-1, 1}) + attrInt64 = attribute.Int64("int64", 1) + attrInt64Slice = attribute.Int64Slice("int64 slice", []int64{-1, 1}) + attrFloat64 = attribute.Float64("float64", 1) + attrFloat64Slice = attribute.Float64Slice("float64 slice", []float64{-1, 1}) + attrString = attribute.String("string", "o") + attrStringSlice = attribute.StringSlice("string slice", []string{"o", "n"}) + attrInvalid = attribute.KeyValue{ + Key: attribute.Key("invalid"), + Value: attribute.Value{}, + } + + valBoolTrue = &cpb.AnyValue{Value: &cpb.AnyValue_BoolValue{BoolValue: true}} + valBoolFalse = &cpb.AnyValue{Value: &cpb.AnyValue_BoolValue{BoolValue: false}} + valBoolSlice = &cpb.AnyValue{Value: &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: []*cpb.AnyValue{valBoolTrue, valBoolFalse}, + }, + }} + valIntOne = &cpb.AnyValue{Value: &cpb.AnyValue_IntValue{IntValue: 1}} + valIntNOne = &cpb.AnyValue{Value: &cpb.AnyValue_IntValue{IntValue: -1}} + valIntSlice = &cpb.AnyValue{Value: &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: []*cpb.AnyValue{valIntNOne, valIntOne}, + }, + }} + valDblOne = &cpb.AnyValue{Value: &cpb.AnyValue_DoubleValue{DoubleValue: 1}} + valDblNOne = &cpb.AnyValue{Value: &cpb.AnyValue_DoubleValue{DoubleValue: -1}} + valDblSlice = &cpb.AnyValue{Value: &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: []*cpb.AnyValue{valDblNOne, valDblOne}, + }, + }} + valStrO = &cpb.AnyValue{Value: &cpb.AnyValue_StringValue{StringValue: "o"}} + valStrN = &cpb.AnyValue{Value: &cpb.AnyValue_StringValue{StringValue: "n"}} + valStrSlice = &cpb.AnyValue{Value: &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: []*cpb.AnyValue{valStrO, valStrN}, + }, + }} + + kvBool = &cpb.KeyValue{Key: "bool", Value: valBoolTrue} + kvBoolSlice = &cpb.KeyValue{Key: "bool slice", Value: valBoolSlice} + kvInt = &cpb.KeyValue{Key: "int", Value: valIntOne} + kvIntSlice = &cpb.KeyValue{Key: "int slice", Value: valIntSlice} + kvInt64 = &cpb.KeyValue{Key: "int64", Value: valIntOne} + kvInt64Slice = &cpb.KeyValue{Key: "int64 slice", Value: valIntSlice} + kvFloat64 = &cpb.KeyValue{Key: "float64", Value: valDblOne} + kvFloat64Slice = &cpb.KeyValue{Key: "float64 slice", Value: valDblSlice} + kvString = &cpb.KeyValue{Key: "string", Value: valStrO} + kvStringSlice = &cpb.KeyValue{Key: "string slice", Value: valStrSlice} + kvInvalid = &cpb.KeyValue{ + Key: "invalid", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "INVALID"}, + }, + } +) + +func TestAttrTransforms(t *testing.T) { + type attrTest struct { + name string + in []attribute.KeyValue + want []*cpb.KeyValue + } + + for _, test := range []attrTest{ + {"nil", nil, nil}, + {"empty", []attribute.KeyValue{}, nil}, + { + "invalid", + []attribute.KeyValue{attrInvalid}, + []*cpb.KeyValue{kvInvalid}, + }, + { + "bool", + []attribute.KeyValue{attrBool}, + []*cpb.KeyValue{kvBool}, + }, + { + "bool slice", + []attribute.KeyValue{attrBoolSlice}, + []*cpb.KeyValue{kvBoolSlice}, + }, + { + "int", + []attribute.KeyValue{attrInt}, + []*cpb.KeyValue{kvInt}, + }, + { + "int slice", + []attribute.KeyValue{attrIntSlice}, + []*cpb.KeyValue{kvIntSlice}, + }, + { + "int64", + []attribute.KeyValue{attrInt64}, + []*cpb.KeyValue{kvInt64}, + }, + { + "int64 slice", + []attribute.KeyValue{attrInt64Slice}, + []*cpb.KeyValue{kvInt64Slice}, + }, + { + "float64", + []attribute.KeyValue{attrFloat64}, + []*cpb.KeyValue{kvFloat64}, + }, + { + "float64 slice", + []attribute.KeyValue{attrFloat64Slice}, + []*cpb.KeyValue{kvFloat64Slice}, + }, + { + "string", + []attribute.KeyValue{attrString}, + []*cpb.KeyValue{kvString}, + }, + { + "string slice", + []attribute.KeyValue{attrStringSlice}, + []*cpb.KeyValue{kvStringSlice}, + }, + { + "all", + []attribute.KeyValue{ + attrBool, + attrBoolSlice, + attrInt, + attrIntSlice, + attrInt64, + attrInt64Slice, + attrFloat64, + attrFloat64Slice, + attrString, + attrStringSlice, + attrInvalid, + }, + []*cpb.KeyValue{ + kvBool, + kvBoolSlice, + kvInt, + kvIntSlice, + kvInt64, + kvInt64Slice, + kvFloat64, + kvFloat64Slice, + kvString, + kvStringSlice, + kvInvalid, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Run("Attrs", func(t *testing.T) { + assert.ElementsMatch(t, test.want, Attrs(test.in)) + }) + t.Run("AttrIter", func(t *testing.T) { + s := attribute.NewSet(test.in...) + assert.ElementsMatch(t, test.want, AttrIter(s.Iter())) + }) + }) + } +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/transform/log.go b/exporters/otlp/otlplog/otlplogfile/internal/transform/log.go new file mode 100644 index 00000000000..668cf08d6df --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/transform/log.go @@ -0,0 +1,411 @@ +// Code created by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/otlplog/transform/log.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package transform provides transformation functionality from the +// sdk/log data-types into OTLP data-types. +package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/transform" + +import ( + "sync" + "time" + + cpb "go.opentelemetry.io/proto/otlp/common/v1" + lpb "go.opentelemetry.io/proto/otlp/logs/v1" + rpb "go.opentelemetry.io/proto/otlp/resource/v1" + + "go.opentelemetry.io/otel/attribute" + api "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/log" +) + +// ResourceLogs returns an slice of OTLP ResourceLogs generated from records. +func ResourceLogs(records []log.Record) []*lpb.ResourceLogs { + if len(records) == 0 { + return nil + } + + resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs) + defer func() { + clear(resMap) + resourceLogsMapPool.Put(resMap) + }() + resourceLogsMap(&resMap, records) + + out := make([]*lpb.ResourceLogs, 0, len(resMap)) + for _, rl := range resMap { + out = append(out, rl) + } + return out +} + +var resourceLogsMapPool = sync.Pool{ + New: func() any { + return make(map[attribute.Distinct]*lpb.ResourceLogs) + }, +} + +func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) { + for _, r := range records { + res := r.Resource() + rl, ok := (*dst)[res.Equivalent()] + if !ok { + rl = new(lpb.ResourceLogs) + if res.Len() > 0 { + rl.Resource = &rpb.Resource{ + Attributes: AttrIter(res.Iter()), + } + } + rl.SchemaUrl = res.SchemaURL() + (*dst)[res.Equivalent()] = rl + } + rl.ScopeLogs = ScopeLogs(records) + } +} + +// ScopeLogs returns a slice of OTLP ScopeLogs generated from recoreds. +func ScopeLogs(records []log.Record) []*lpb.ScopeLogs { + scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs) + defer func() { + clear(scopeMap) + scopeLogsMapPool.Put(scopeMap) + }() + scopeLogsMap(&scopeMap, records) + + out := make([]*lpb.ScopeLogs, 0, len(scopeMap)) + for _, sl := range scopeMap { + out = append(out, sl) + } + return out +} + +var scopeLogsMapPool = sync.Pool{ + New: func() any { + return make(map[instrumentation.Scope]*lpb.ScopeLogs) + }, +} + +func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) { + for _, r := range records { + scope := r.InstrumentationScope() + sl, ok := (*dst)[scope] + if !ok { + sl = new(lpb.ScopeLogs) + var emptyScope instrumentation.Scope + if scope != emptyScope { + sl.Scope = &cpb.InstrumentationScope{ + Name: scope.Name, + Version: scope.Version, + } + sl.SchemaUrl = scope.SchemaURL + } + (*dst)[scope] = sl + } + sl.LogRecords = append(sl.LogRecords, LogRecord(r)) + } +} + +// LogRecord returns an OTLP LogRecord generated from record. +func LogRecord(record log.Record) *lpb.LogRecord { + r := &lpb.LogRecord{ + TimeUnixNano: timeUnixNano(record.Timestamp()), + ObservedTimeUnixNano: timeUnixNano(record.ObservedTimestamp()), + SeverityNumber: SeverityNumber(record.Severity()), + SeverityText: record.SeverityText(), + Body: LogAttrValue(record.Body()), + Attributes: make([]*cpb.KeyValue, 0, record.AttributesLen()), + Flags: uint32(record.TraceFlags()), + // TODO: DroppedAttributesCount: /* ... */, + } + record.WalkAttributes(func(kv api.KeyValue) bool { + r.Attributes = append(r.Attributes, LogAttr(kv)) + return true + }) + if tID := record.TraceID(); tID.IsValid() { + r.TraceId = tID[:] + } + if sID := record.SpanID(); sID.IsValid() { + r.SpanId = sID[:] + } + return r +} + +// timeUnixNano returns t as a Unix time, the number of nanoseconds elapsed +// since January 1, 1970 UTC as uint64. The result is undefined if the Unix +// time in nanoseconds cannot be represented by an int64 (a date before the +// year 1678 or after 2262). timeUnixNano on the zero Time returns 0. The +// result does not depend on the location associated with t. +func timeUnixNano(t time.Time) uint64 { + if t.IsZero() { + return 0 + } + return uint64(t.UnixNano()) +} + +// AttrIter transforms an [attribute.Iterator] into OTLP key-values. +func AttrIter(iter attribute.Iterator) []*cpb.KeyValue { + l := iter.Len() + if l == 0 { + return nil + } + + out := make([]*cpb.KeyValue, 0, l) + for iter.Next() { + out = append(out, Attr(iter.Attribute())) + } + return out +} + +// Attrs transforms a slice of [attribute.KeyValue] into OTLP key-values. +func Attrs(attrs []attribute.KeyValue) []*cpb.KeyValue { + if len(attrs) == 0 { + return nil + } + + out := make([]*cpb.KeyValue, 0, len(attrs)) + for _, kv := range attrs { + out = append(out, Attr(kv)) + } + return out +} + +// Attr transforms an [attribute.KeyValue] into an OTLP key-value. +func Attr(kv attribute.KeyValue) *cpb.KeyValue { + return &cpb.KeyValue{Key: string(kv.Key), Value: AttrValue(kv.Value)} +} + +// AttrValue transforms an [attribute.Value] into an OTLP AnyValue. +func AttrValue(v attribute.Value) *cpb.AnyValue { + av := new(cpb.AnyValue) + switch v.Type() { + case attribute.BOOL: + av.Value = &cpb.AnyValue_BoolValue{ + BoolValue: v.AsBool(), + } + case attribute.BOOLSLICE: + av.Value = &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: boolSliceValues(v.AsBoolSlice()), + }, + } + case attribute.INT64: + av.Value = &cpb.AnyValue_IntValue{ + IntValue: v.AsInt64(), + } + case attribute.INT64SLICE: + av.Value = &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: int64SliceValues(v.AsInt64Slice()), + }, + } + case attribute.FLOAT64: + av.Value = &cpb.AnyValue_DoubleValue{ + DoubleValue: v.AsFloat64(), + } + case attribute.FLOAT64SLICE: + av.Value = &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: float64SliceValues(v.AsFloat64Slice()), + }, + } + case attribute.STRING: + av.Value = &cpb.AnyValue_StringValue{ + StringValue: v.AsString(), + } + case attribute.STRINGSLICE: + av.Value = &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: stringSliceValues(v.AsStringSlice()), + }, + } + default: + av.Value = &cpb.AnyValue_StringValue{ + StringValue: "INVALID", + } + } + return av +} + +func boolSliceValues(vals []bool) []*cpb.AnyValue { + converted := make([]*cpb.AnyValue, len(vals)) + for i, v := range vals { + converted[i] = &cpb.AnyValue{ + Value: &cpb.AnyValue_BoolValue{ + BoolValue: v, + }, + } + } + return converted +} + +func int64SliceValues(vals []int64) []*cpb.AnyValue { + converted := make([]*cpb.AnyValue, len(vals)) + for i, v := range vals { + converted[i] = &cpb.AnyValue{ + Value: &cpb.AnyValue_IntValue{ + IntValue: v, + }, + } + } + return converted +} + +func float64SliceValues(vals []float64) []*cpb.AnyValue { + converted := make([]*cpb.AnyValue, len(vals)) + for i, v := range vals { + converted[i] = &cpb.AnyValue{ + Value: &cpb.AnyValue_DoubleValue{ + DoubleValue: v, + }, + } + } + return converted +} + +func stringSliceValues(vals []string) []*cpb.AnyValue { + converted := make([]*cpb.AnyValue, len(vals)) + for i, v := range vals { + converted[i] = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: v, + }, + } + } + return converted +} + +// Attrs transforms a slice of [api.KeyValue] into OTLP key-values. +func LogAttrs(attrs []api.KeyValue) []*cpb.KeyValue { + if len(attrs) == 0 { + return nil + } + + out := make([]*cpb.KeyValue, 0, len(attrs)) + for _, kv := range attrs { + out = append(out, LogAttr(kv)) + } + return out +} + +// LogAttr transforms an [api.KeyValue] into an OTLP key-value. +func LogAttr(attr api.KeyValue) *cpb.KeyValue { + return &cpb.KeyValue{ + Key: attr.Key, + Value: LogAttrValue(attr.Value), + } +} + +// LogAttrValues transforms a slice of [api.Value] into an OTLP []AnyValue. +func LogAttrValues(vals []api.Value) []*cpb.AnyValue { + if len(vals) == 0 { + return nil + } + + out := make([]*cpb.AnyValue, 0, len(vals)) + for _, v := range vals { + out = append(out, LogAttrValue(v)) + } + return out +} + +// LogAttrValue transforms an [api.Value] into an OTLP AnyValue. +func LogAttrValue(v api.Value) *cpb.AnyValue { + av := new(cpb.AnyValue) + switch v.Kind() { + case api.KindBool: + av.Value = &cpb.AnyValue_BoolValue{ + BoolValue: v.AsBool(), + } + case api.KindInt64: + av.Value = &cpb.AnyValue_IntValue{ + IntValue: v.AsInt64(), + } + case api.KindFloat64: + av.Value = &cpb.AnyValue_DoubleValue{ + DoubleValue: v.AsFloat64(), + } + case api.KindString: + av.Value = &cpb.AnyValue_StringValue{ + StringValue: v.AsString(), + } + case api.KindBytes: + av.Value = &cpb.AnyValue_BytesValue{ + BytesValue: v.AsBytes(), + } + case api.KindSlice: + av.Value = &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: LogAttrValues(v.AsSlice()), + }, + } + case api.KindMap: + av.Value = &cpb.AnyValue_KvlistValue{ + KvlistValue: &cpb.KeyValueList{ + Values: LogAttrs(v.AsMap()), + }, + } + default: + av.Value = &cpb.AnyValue_StringValue{ + StringValue: "INVALID", + } + } + return av +} + +// SeverityNumber transforms a [log.Severity] into an OTLP SeverityNumber. +func SeverityNumber(s api.Severity) lpb.SeverityNumber { + switch s { + case api.SeverityTrace: + return lpb.SeverityNumber_SEVERITY_NUMBER_TRACE + case api.SeverityTrace2: + return lpb.SeverityNumber_SEVERITY_NUMBER_TRACE2 + case api.SeverityTrace3: + return lpb.SeverityNumber_SEVERITY_NUMBER_TRACE3 + case api.SeverityTrace4: + return lpb.SeverityNumber_SEVERITY_NUMBER_TRACE4 + case api.SeverityDebug: + return lpb.SeverityNumber_SEVERITY_NUMBER_DEBUG + case api.SeverityDebug2: + return lpb.SeverityNumber_SEVERITY_NUMBER_DEBUG2 + case api.SeverityDebug3: + return lpb.SeverityNumber_SEVERITY_NUMBER_DEBUG3 + case api.SeverityDebug4: + return lpb.SeverityNumber_SEVERITY_NUMBER_DEBUG4 + case api.SeverityInfo: + return lpb.SeverityNumber_SEVERITY_NUMBER_INFO + case api.SeverityInfo2: + return lpb.SeverityNumber_SEVERITY_NUMBER_INFO2 + case api.SeverityInfo3: + return lpb.SeverityNumber_SEVERITY_NUMBER_INFO3 + case api.SeverityInfo4: + return lpb.SeverityNumber_SEVERITY_NUMBER_INFO4 + case api.SeverityWarn: + return lpb.SeverityNumber_SEVERITY_NUMBER_WARN + case api.SeverityWarn2: + return lpb.SeverityNumber_SEVERITY_NUMBER_WARN2 + case api.SeverityWarn3: + return lpb.SeverityNumber_SEVERITY_NUMBER_WARN3 + case api.SeverityWarn4: + return lpb.SeverityNumber_SEVERITY_NUMBER_WARN4 + case api.SeverityError: + return lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + case api.SeverityError2: + return lpb.SeverityNumber_SEVERITY_NUMBER_ERROR2 + case api.SeverityError3: + return lpb.SeverityNumber_SEVERITY_NUMBER_ERROR3 + case api.SeverityError4: + return lpb.SeverityNumber_SEVERITY_NUMBER_ERROR4 + case api.SeverityFatal: + return lpb.SeverityNumber_SEVERITY_NUMBER_FATAL + case api.SeverityFatal2: + return lpb.SeverityNumber_SEVERITY_NUMBER_FATAL2 + case api.SeverityFatal3: + return lpb.SeverityNumber_SEVERITY_NUMBER_FATAL3 + case api.SeverityFatal4: + return lpb.SeverityNumber_SEVERITY_NUMBER_FATAL4 + } + return lpb.SeverityNumber_SEVERITY_NUMBER_UNSPECIFIED +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/transform/log_attr_test.go b/exporters/otlp/otlplog/otlplogfile/internal/transform/log_attr_test.go new file mode 100644 index 00000000000..6de65e77fbe --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/transform/log_attr_test.go @@ -0,0 +1,149 @@ +// Code created by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/otlplog/transform/log_attr_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transform + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/log" + cpb "go.opentelemetry.io/proto/otlp/common/v1" +) + +var ( + logAttrBool = log.Bool("bool", true) + logAttrInt = log.Int("int", 1) + logAttrInt64 = log.Int64("int64", 1) + logAttrFloat64 = log.Float64("float64", 1) + logAttrString = log.String("string", "o") + logAttrBytes = log.Bytes("bytes", []byte("test")) + logAttrSlice = log.Slice("slice", log.BoolValue(true)) + logAttrMap = log.Map("map", logAttrString) + logAttrEmpty = log.Empty("") + + kvBytes = &cpb.KeyValue{ + Key: "bytes", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_BytesValue{ + BytesValue: []byte("test"), + }, + }, + } + kvSlice = &cpb.KeyValue{ + Key: "slice", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_ArrayValue{ + ArrayValue: &cpb.ArrayValue{ + Values: []*cpb.AnyValue{valBoolTrue}, + }, + }, + }, + } + kvMap = &cpb.KeyValue{ + Key: "map", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_KvlistValue{ + KvlistValue: &cpb.KeyValueList{ + Values: []*cpb.KeyValue{kvString}, + }, + }, + }, + } + kvEmpty = &cpb.KeyValue{ + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "INVALID"}, + }, + } +) + +func TestLogAttrs(t *testing.T) { + type logAttrTest struct { + name string + in []log.KeyValue + want []*cpb.KeyValue + } + + for _, test := range []logAttrTest{ + {"nil", nil, nil}, + {"len(0)", []log.KeyValue{}, nil}, + { + "empty", + []log.KeyValue{logAttrEmpty}, + []*cpb.KeyValue{kvEmpty}, + }, + { + "bool", + []log.KeyValue{logAttrBool}, + []*cpb.KeyValue{kvBool}, + }, + { + "int", + []log.KeyValue{logAttrInt}, + []*cpb.KeyValue{kvInt}, + }, + { + "int64", + []log.KeyValue{logAttrInt64}, + []*cpb.KeyValue{kvInt64}, + }, + { + "float64", + []log.KeyValue{logAttrFloat64}, + []*cpb.KeyValue{kvFloat64}, + }, + { + "string", + []log.KeyValue{logAttrString}, + []*cpb.KeyValue{kvString}, + }, + { + "bytes", + []log.KeyValue{logAttrBytes}, + []*cpb.KeyValue{kvBytes}, + }, + { + "slice", + []log.KeyValue{logAttrSlice}, + []*cpb.KeyValue{kvSlice}, + }, + { + "map", + []log.KeyValue{logAttrMap}, + []*cpb.KeyValue{kvMap}, + }, + { + "all", + []log.KeyValue{ + logAttrBool, + logAttrInt, + logAttrInt64, + logAttrFloat64, + logAttrString, + logAttrBytes, + logAttrSlice, + logAttrMap, + logAttrEmpty, + }, + []*cpb.KeyValue{ + kvBool, + kvInt, + kvInt64, + kvFloat64, + kvString, + kvBytes, + kvSlice, + kvMap, + kvEmpty, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + assert.ElementsMatch(t, test.want, LogAttrs(test.in)) + }) + } +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/transform/log_test.go b/exporters/otlp/otlplog/otlplogfile/internal/transform/log_test.go new file mode 100644 index 00000000000..0cf8918c2d9 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/transform/log_test.go @@ -0,0 +1,246 @@ +// Code created by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/otlplog/transform/log_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transform + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + cpb "go.opentelemetry.io/proto/otlp/common/v1" + lpb "go.opentelemetry.io/proto/otlp/logs/v1" + rpb "go.opentelemetry.io/proto/otlp/resource/v1" + + api "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/log/logtest" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" +) + +var ( + // Sat Jan 01 2000 00:00:00 GMT+0000. + ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) + obs = ts.Add(30 * time.Second) + + alice = api.String("user", "alice") + bob = api.String("user", "bob") + + pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + }} + pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + }} + + sevA = api.SeverityInfo + sevB = api.SeverityError + + pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + + bodyA = api.StringValue("a") + bodyB = api.StringValue("b") + + pbBodyA = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "a", + }, + } + pbBodyB = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "b", + }, + } + + spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsA = byte(1) + flagsB = byte(0) + + scope = instrumentation.Scope{ + Name: "test/code/path", + Version: "v0.1.0", + SchemaURL: semconv.SchemaURL, + } + pbScope = &cpb.InstrumentationScope{ + Name: "test/code/path", + Version: "v0.1.0", + } + + res = resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("test server"), + semconv.ServiceVersion("v0.1.0"), + ) + pbRes = &rpb.Resource{ + Attributes: []*cpb.KeyValue{ + { + Key: "service.name", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + }, + }, + { + Key: "service.version", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + }, + }, + }, + } + + records = func() []log.Record { + var out []log.Record + + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevA, + SeverityText: "A", + Body: bodyA, + Attributes: []api.KeyValue{alice}, + TraceID: trace.TraceID(traceIDA), + SpanID: trace.SpanID(spanIDA), + TraceFlags: trace.TraceFlags(flagsA), + InstrumentationScope: &scope, + Resource: res, + }.NewRecord()) + + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevA, + SeverityText: "A", + Body: bodyA, + Attributes: []api.KeyValue{bob}, + TraceID: trace.TraceID(traceIDA), + SpanID: trace.SpanID(spanIDA), + TraceFlags: trace.TraceFlags(flagsA), + InstrumentationScope: &scope, + Resource: res, + }.NewRecord()) + + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevB, + SeverityText: "B", + Body: bodyB, + Attributes: []api.KeyValue{alice}, + TraceID: trace.TraceID(traceIDB), + SpanID: trace.SpanID(spanIDB), + TraceFlags: trace.TraceFlags(flagsB), + InstrumentationScope: &scope, + Resource: res, + }.NewRecord()) + + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevB, + SeverityText: "B", + Body: bodyB, + Attributes: []api.KeyValue{bob}, + TraceID: trace.TraceID(traceIDB), + SpanID: trace.SpanID(spanIDB), + TraceFlags: trace.TraceFlags(flagsB), + InstrumentationScope: &scope, + Resource: res, + }.NewRecord()) + + return out + }() + + pbLogRecords = []*lpb.LogRecord{ + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{pbAlice}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{pbBob}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{pbAlice}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{pbBob}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + } + + pbScopeLogs = &lpb.ScopeLogs{ + Scope: pbScope, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + } + + pbResourceLogs = &lpb.ResourceLogs{ + Resource: pbRes, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs}, + } +) + +func TestResourceLogs(t *testing.T) { + want := []*lpb.ResourceLogs{pbResourceLogs} + assert.Equal(t, want, ResourceLogs(records)) +} + +func TestSeverityNumber(t *testing.T) { + for i := 0; i <= int(api.SeverityFatal4); i++ { + want := lpb.SeverityNumber(i) + want += lpb.SeverityNumber_SEVERITY_NUMBER_UNSPECIFIED + assert.Equal(t, want, SeverityNumber(api.Severity(i))) + } +} + +func BenchmarkResourceLogs(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var out []*lpb.ResourceLogs + for pb.Next() { + out = ResourceLogs(records) + } + _ = out + }) +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/buffered.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/buffered.go new file mode 100644 index 00000000000..ba332b1c8a9 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/buffered.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package writer // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer" + +import ( + "bufio" + "errors" + "io" +) + +// flusher implementations are responsible for ensuring that any buffered or pending data +// is written out or processed. +type flusher interface { + // Flush writes any buffered data to the underlying storage. + // It returns an error if the data could not be flushed. + Flush() error +} + +// bufferedWriter is intended to use more memory +// in order to optimize writing to disk to help improve performance. +type bufferedWriter struct { + wrapped io.Closer + buffer *bufio.Writer +} + +// Ensure that the implementation satisfies the interface at compile-time. +var ( + _ io.WriteCloser = (*bufferedWriter)(nil) + _ flusher = (*bufferedWriter)(nil) +) + +func newBufferedWriteCloser(f io.WriteCloser) io.WriteCloser { + return &bufferedWriter{ + wrapped: f, + buffer: bufio.NewWriter(f), + } +} + +func (bw *bufferedWriter) Write(p []byte) (n int, err error) { + return bw.buffer.Write(p) +} + +func (bw *bufferedWriter) Close() error { + return errors.Join( + bw.buffer.Flush(), + bw.wrapped.Close(), + ) +} + +func (bw *bufferedWriter) Flush() error { + return bw.buffer.Flush() +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/buffered_test.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/buffered_test.go new file mode 100644 index 00000000000..c98748f34ae --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/buffered_test.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package writer + +import ( + "bytes" + "errors" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + msg = "hello, world!" + + sizeByte = 1 + sizeKiloByte = 1 << (10 * iota) + sizeMegaByte +) + +type noopWriteCloser struct { + w io.Writer +} + +func (wc *noopWriteCloser) Write(p []byte) (int, error) { return wc.w.Write(p) } +func (wc *noopWriteCloser) Close() error { return nil } + +func TestBufferedWrites(t *testing.T) { + t.Parallel() + + b := bytes.NewBuffer(nil) + w := newBufferedWriteCloser(&noopWriteCloser{b}) + + _, err := w.Write([]byte(msg)) + assert.NoError(t, err, "Must not error when writing data") + assert.NoError(t, w.Close(), "Must not error when closing writer") + assert.Equal(t, msg, b.String(), "Must match the expected string") +} + +var errBenchmark error + +func BenchmarkBufferedWriter(b *testing.B) { + for _, payloadSize := range []int{ + 10 * sizeKiloByte, + 100 * sizeKiloByte, + sizeMegaByte, + 10 * sizeMegaByte, + } { + payload := make([]byte, payloadSize) + for i := 0; i < payloadSize; i++ { + payload[i] = 'a' + } + + for name, w := range map[string]io.WriteCloser{ + "raw-file": tempFile(b), + "buffered-file": newBufferedWriteCloser(tempFile(b)), + } { + w := w + b.Run(fmt.Sprintf("%s_%d_bytes", name, payloadSize), func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + var err error + for i := 0; i < b.N; i++ { + _, err = w.Write(payload) + } + + errBenchmark = errors.Join(err, w.Close()) + }) + } + } +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go new file mode 100644 index 00000000000..b9ac2cf5ead --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package writer // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer" + +import ( + "fmt" + "io" + "os" + "sync" + "time" +) + +// FileWriter writes data to a configured file. +// It is buffered to reduce I/O operations to improve performance. +type FileWriter struct { + path string + file io.WriteCloser + mu sync.Mutex + + flushInterval time.Duration + flushTicker *time.Ticker + stopTicker chan struct{} +} + +var _ flusher = (*FileWriter)(nil) + +// NewFileWriter initializes a file writer for the file at the given path. +func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error) { + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + + fw := &FileWriter{ + path: path, + flushInterval: flushInterval, + file: newBufferedWriteCloser(file), + } + + if fw.flushInterval > 0 { + fw.startFlusher() + } + + return fw, nil +} + +// Export writes the given data in the file. +func (w *FileWriter) Export(data []byte) error { + w.mu.Lock() + defer w.mu.Unlock() + + if _, err := w.file.Write(data); err != nil { + return err + } + + // As stated in the specification, line separator is \n. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file + if _, err := io.WriteString(w.file, "\n"); err != nil { + return err + } + + return nil +} + +// Shutdown stops the flusher. It also stops the flush ticker if set. +func (w *FileWriter) Shutdown() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.flushTicker != nil { + close(w.stopTicker) + } + return w.file.Close() +} + +// Flush writes buffered data to disk. +func (w *FileWriter) Flush() error { + ff, ok := w.file.(flusher) + if !ok { + return nil + } + + w.mu.Lock() + defer w.mu.Unlock() + + return ff.Flush() +} + +// startFlusher starts the flusher to periodically flush the buffer. +func (w *FileWriter) startFlusher() { + w.mu.Lock() + defer w.mu.Unlock() + + ff, ok := w.file.(flusher) + if !ok { + return + } + + w.stopTicker = make(chan struct{}) + w.flushTicker = time.NewTicker(w.flushInterval) + go func() { + for { + select { + case <-w.flushTicker.C: + _ = ff.Flush() + case <-w.stopTicker: + w.flushTicker.Stop() + w.flushTicker = nil + return + } + } + }() +} diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go new file mode 100644 index 00000000000..e9374997872 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package writer + +import ( + "context" + "fmt" + "os" + "path" + "runtime" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// tempFile creates a temporary file for the given test case and returns its path on disk. +// The file is automatically cleaned up when the test ends. +func tempFile(tb testing.TB) *os.File { + f, err := os.CreateTemp(tb.TempDir(), tb.Name()) + require.NoError(tb, err, "must not error when creating temporary file") + tb.Cleanup(func() { + assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written") + }) + return f +} + +func TestNewFileWriter(t *testing.T) { + f := tempFile(t) + + writer, err := NewFileWriter(f.Name(), 0) + // nolint: errcheck + defer writer.Shutdown() + + assert.NoError(t, err, "must not error when creating the file writer") + assert.Equal(t, f.Name(), writer.path, "writer file path must be the same than the file path") + + // Ensure file was created + _, err = os.Stat(f.Name()) + assert.NoError(t, err, "must not error when trying to retrieve file stats") +} + +func TestFileWriterExport(t *testing.T) { + f := tempFile(t) + + writer, err := NewFileWriter(f.Name(), 0) + // nolint: errcheck + defer writer.Shutdown() + require.NoError(t, err, "must not error when creating the file writer") + + data := []byte("helloworld") + assert.NoError(t, writer.Export(data)) + + // Force data to be written to disk. + _ = writer.Flush() + + // Read file and verify content + content, err := os.ReadFile(f.Name()) + require.NoError(t, err, "must not error when reading file content") + assert.Equal(t, "helloworld\n", string(content)) +} + +func TestFileWriterShutdown(t *testing.T) { + f := tempFile(t) + + writer, err := NewFileWriter(f.Name(), 0) + require.NoError(t, err, "must not error when creating the file writer") + assert.NoError(t, writer.Shutdown(), "must not error when calling Shutdown()") +} + +func TestFileWriterConcurrentSafe(t *testing.T) { + f := tempFile(t) + + writer, err := NewFileWriter(f.Name(), 0) + require.NoError(t, err, "must not error when creating the file writer") + + const goroutines = 10 + + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + runs := new(uint64) + for i := 0; i < goroutines; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + _ = writer.Export([]byte(fmt.Sprintf("data from goroutine %d", i))) + _ = writer.Flush() + atomic.AddUint64(runs, 1) + } + } + }() + } + + for atomic.LoadUint64(runs) == 0 { + runtime.Gosched() + } + + assert.NoError(t, writer.Shutdown(), "must not error when shutting down") + cancel() + wg.Wait() +} diff --git a/exporters/otlp/otlplog/otlplogfile/version.go b/exporters/otlp/otlplog/otlplogfile/version.go new file mode 100644 index 00000000000..aa0c05a2912 --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/version.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" + +// Version is the current release version of the OpenTelemetry OTLP file logs exporter in use. +func Version() string { + return "0.1.0" +} diff --git a/exporters/otlp/otlplog/otlplogfile/version_test.go b/exporters/otlp/otlplog/otlplogfile/version_test.go new file mode 100644 index 00000000000..dca31c9a0af --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/version_test.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlplogfile + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/assert" +) + +// regex taken from https://github.com/Masterminds/semver/tree/v3.1.1 +var versionRegex = regexp.MustCompile(`^v?([0-9]+)(\.[0-9]+)?(\.[0-9]+)?` + + `(-([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?` + + `(\+([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?$`) + +func TestVersionSemver(t *testing.T) { + v := Version() + assert.NotNil(t, versionRegex.FindStringSubmatch(v), "version is not semver: %s", v) +} diff --git a/versions.yaml b/versions.yaml index 3ba611d7136..5658bad7f16 100644 --- a/versions.yaml +++ b/versions.yaml @@ -38,6 +38,7 @@ module-sets: modules: - go.opentelemetry.io/otel/log - go.opentelemetry.io/otel/sdk/log + - go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile - go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc - go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp - go.opentelemetry.io/otel/exporters/stdout/stdoutlog From 7dde8dc27aeccbf5476ebab95f4397945a7c88dc Mon Sep 17 00:00:00 2001 From: thomasgouveia Date: Tue, 27 Aug 2024 14:15:59 +0200 Subject: [PATCH 2/7] docs(#5408): add example file Signed-off-by: thomasgouveia --- .../otlp/otlplog/otlplogfile/example_test.go | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 exporters/otlp/otlplog/otlplogfile/example_test.go diff --git a/exporters/otlp/otlplog/otlplogfile/example_test.go b/exporters/otlp/otlplog/otlplogfile/example_test.go new file mode 100644 index 00000000000..a46b9fc1e2a --- /dev/null +++ b/exporters/otlp/otlplog/otlplogfile/example_test.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlplogfile_test + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" + "go.opentelemetry.io/otel/log/global" + "go.opentelemetry.io/otel/sdk/log" +) + +func Example() { + ctx := context.Background() + exp, err := otlplogfile.New( + otlplogfile.WithPath("/tmp/otlp-logs.jsonl"), + otlplogfile.WithFlushInterval(time.Second), + ) + if err != nil { + panic(err) + } + + processor := log.NewBatchProcessor(exp) + provider := log.NewLoggerProvider(log.WithProcessor(processor)) + defer func() { + if err := provider.Shutdown(ctx); err != nil { + panic(err) + } + }() + + global.SetLoggerProvider(provider) + + // From here, the provider can be used by instrumentation to collect + // telemetry. +} From 0f3e7c86bdbb6168549d06f25151db39d2da6205 Mon Sep 17 00:00:00 2001 From: thomasgouveia Date: Tue, 27 Aug 2024 14:19:45 +0200 Subject: [PATCH 3/7] docs(#5408): update CHANGELOG Signed-off-by: thomasgouveia --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2591358863c..2a67bcbf05c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5408) + ### Removed - Drop support for [Go 1.21]. (#5736, #5740) From 37c4f5e0869c0bdb0ed191c310473ca087045642 Mon Sep 17 00:00:00 2001 From: thomasgouveia Date: Tue, 27 Aug 2024 14:31:15 +0200 Subject: [PATCH 4/7] docs(#5408): update CHANGELOG Signed-off-by: thomasgouveia --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a67bcbf05c..967adedc682 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5408) +- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5408, #5743) ### Removed From cc10281ea650c79d39bb4e44cab06c4cf5fdd801 Mon Sep 17 00:00:00 2001 From: Thomas Gouveia <47056759+thomasgouveia@users.noreply.github.com> Date: Tue, 3 Sep 2024 09:08:41 +0200 Subject: [PATCH 5/7] chore: apply suggestions Co-authored-by: Damien Mathieu <42@dmathieu.com> --- CHANGELOG.md | 2 +- exporters/otlp/otlplog/otlplogfile/exporter.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 967adedc682..f1a205f5505 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5408, #5743) +- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5743) ### Removed diff --git a/exporters/otlp/otlplog/otlplogfile/exporter.go b/exporters/otlp/otlplog/otlplogfile/exporter.go index 776f3893506..1fed76a4920 100644 --- a/exporters/otlp/otlplog/otlplogfile/exporter.go +++ b/exporters/otlp/otlplog/otlplogfile/exporter.go @@ -16,8 +16,8 @@ import ( ) // Exporter is an OpenTelemetry log exporter that outputs log records -// into JSON files. The implementation is based on the specification -// defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md +// into files, as JSON. The implementation is based on the specification +// defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md type Exporter struct { mu sync.Mutex fw *writer.FileWriter From fc7212eddd4d50a0509087447b85830fb68b8fe2 Mon Sep 17 00:00:00 2001 From: Thomas Gouveia <47056759+thomasgouveia@users.noreply.github.com> Date: Tue, 3 Sep 2024 09:09:06 +0200 Subject: [PATCH 6/7] chore: apply suggestions Co-authored-by: Damien Mathieu <42@dmathieu.com> --- exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go index b9ac2cf5ead..dc44eb54c86 100644 --- a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go @@ -55,7 +55,7 @@ func (w *FileWriter) Export(data []byte) error { } // As stated in the specification, line separator is \n. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md#json-lines-file if _, err := io.WriteString(w.file, "\n"); err != nil { return err } From 7244adccf204f8d3e6e30dba1109d74b0b6a0efa Mon Sep 17 00:00:00 2001 From: thomasgouveia Date: Fri, 6 Sep 2024 14:32:15 +0200 Subject: [PATCH 7/7] feat(#5408): use stdout as default output Signed-off-by: thomasgouveia --- exporters/otlp/otlplog/otlplogfile/config.go | 83 ++++++++++++------- .../otlp/otlplog/otlplogfile/example_test.go | 2 +- .../otlp/otlplog/otlplogfile/exporter.go | 17 ++-- .../otlp/otlplog/otlplogfile/exporter_test.go | 12 +-- .../otlplogfile/internal/writer/writer.go | 47 +++++------ .../internal/writer/writer_test.go | 9 +- 6 files changed, 91 insertions(+), 79 deletions(-) diff --git a/exporters/otlp/otlplog/otlplogfile/config.go b/exporters/otlp/otlplog/otlplogfile/config.go index 5b0cf3507ab..d765601f8af 100644 --- a/exporters/otlp/otlplog/otlplogfile/config.go +++ b/exporters/otlp/otlplog/otlplogfile/config.go @@ -3,52 +3,71 @@ package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" -import "time" +import ( + "errors" + "fmt" + "io" + "os" + "time" +) -type fnOpt func(config) config - -func (f fnOpt) applyOption(c config) config { return f(c) } - -// Option sets the configuration value for an Exporter. -type Option interface { - applyOption(config) config -} +// Option configures a field of the configuration or return an error if needed. +type Option func(*config) (*config, error) // config contains options for the OTLP Log file exporter. type config struct { - // Path to a file on disk where records must be appended. - // This file is preferably a json line file as stated in the specification. - // See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file - // See: https://jsonlines.org - path string + // Out is the output where the records should be written. + out io.WriteCloser // Duration represents the interval when the buffer should be flushed. flushInterval time.Duration } -func newConfig(options []Option) config { - c := config{ - path: "/var/log/opentelemetry/logs.jsonl", +func newConfig(options []Option) (*config, error) { + c := &config{ + out: os.Stdout, flushInterval: 5 * time.Second, } + + var configErr error for _, opt := range options { - c = opt.applyOption(c) + if _, err := opt(c); err != nil { + configErr = errors.Join(configErr, err) + } } - return c + + if configErr != nil { + return nil, configErr + } + + return c, nil } -// WithFlushInterval configures the duration after which the buffer is periodically flushed to the disk. -func WithFlushInterval(flushInterval time.Duration) Option { - return fnOpt(func(c config) config { - c.flushInterval = flushInterval - return c - }) +// WithFile configures a file where the records will be exported. +// An error is returned if the file could not be created or opened. +func WithFile(path string) Option { + return func(c *config) (*config, error) { + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + + return WithWriter(file)(c) + } +} + +// WithWriter configures the destination where the exporter should output +// the records. By default, if not specified, stdout is used. +func WithWriter(w io.WriteCloser) Option { + return func(c *config) (*config, error) { + c.out = w + return c, nil + } } -// WithPath defines a path to a file where the log records will be written. -// If not set, will default to /var/log/opentelemetry/logs.jsonl. -func WithPath(path string) Option { - return fnOpt(func(c config) config { - c.path = path - return c - }) +// WithFlushInterval configures the duration after which the buffer is periodically flushed to the output. +func WithFlushInterval(flushInterval time.Duration) Option { + return func(c *config) (*config, error) { + c.flushInterval = flushInterval + return c, nil + } } diff --git a/exporters/otlp/otlplog/otlplogfile/example_test.go b/exporters/otlp/otlplog/otlplogfile/example_test.go index a46b9fc1e2a..a6a2fda925b 100644 --- a/exporters/otlp/otlplog/otlplogfile/example_test.go +++ b/exporters/otlp/otlplog/otlplogfile/example_test.go @@ -15,7 +15,7 @@ import ( func Example() { ctx := context.Background() exp, err := otlplogfile.New( - otlplogfile.WithPath("/tmp/otlp-logs.jsonl"), + otlplogfile.WithFile("/tmp/otlp-logs.jsonl"), otlplogfile.WithFlushInterval(time.Second), ) if err != nil { diff --git a/exporters/otlp/otlplog/otlplogfile/exporter.go b/exporters/otlp/otlplog/otlplogfile/exporter.go index 1fed76a4920..12c24086bac 100644 --- a/exporters/otlp/otlplog/otlplogfile/exporter.go +++ b/exporters/otlp/otlplog/otlplogfile/exporter.go @@ -20,7 +20,7 @@ import ( // defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md type Exporter struct { mu sync.Mutex - fw *writer.FileWriter + w *writer.Writer stopped bool } @@ -29,15 +29,18 @@ var _ log.Exporter = &Exporter{} // New returns a new [Exporter]. func New(options ...Option) (*Exporter, error) { - cfg := newConfig(options) + cfg, err := newConfig(options) + if err != nil { + return nil, err + } - fw, err := writer.NewFileWriter(cfg.path, cfg.flushInterval) + w, err := writer.New(cfg.out, cfg.flushInterval) if err != nil { return nil, err } return &Exporter{ - fw: fw, + w: w, stopped: false, }, nil } @@ -65,7 +68,7 @@ func (e *Exporter) Export(ctx context.Context, records []log.Record) error { return err } - return e.fw.Export(by) + return e.w.Export(by) } // ForceFlush flushes data to the file. @@ -77,7 +80,7 @@ func (e *Exporter) ForceFlush(_ context.Context) error { return nil } - return e.fw.Flush() + return e.w.Flush() } // Shutdown shuts down the exporter. Buffered data is written to disk, @@ -91,5 +94,5 @@ func (e *Exporter) Shutdown(_ context.Context) error { } e.stopped = true - return e.fw.Shutdown() + return e.w.Shutdown() } diff --git a/exporters/otlp/otlplog/otlplogfile/exporter_test.go b/exporters/otlp/otlplog/otlplogfile/exporter_test.go index 7102761d4e7..4d1da14ac75 100644 --- a/exporters/otlp/otlplog/otlplogfile/exporter_test.go +++ b/exporters/otlp/otlplog/otlplogfile/exporter_test.go @@ -23,13 +23,13 @@ import ( // tempFile creates a temporary file for the given test case and returns its path on disk. // The file is automatically cleaned up when the test ends. -func tempFile(tb testing.TB) string { +func tempFile(tb testing.TB) *os.File { f, err := os.CreateTemp(tb.TempDir(), tb.Name()) assert.NoError(tb, err, "must not error when creating temporary file") tb.Cleanup(func() { assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written") }) - return f.Name() + return f } // makeRecords is a helper function to generate an array of log record with the desired size. @@ -48,10 +48,10 @@ func makeRecords(count int, message string) []sdklog.Record { } func TestExporter(t *testing.T) { - filepath := tempFile(t) + file := tempFile(t) records := makeRecords(1, "hello, world!") - exporter, err := New(WithPath(filepath)) + exporter, err := New(WithWriter(file)) assert.NoError(t, err) t.Cleanup(func() { assert.NoError(t, exporter.Shutdown(context.TODO())) @@ -64,8 +64,8 @@ func TestExporter(t *testing.T) { } func TestExporterConcurrentSafe(t *testing.T) { - filepath := tempFile(t) - exporter, err := New(WithPath(filepath)) + file := tempFile(t) + exporter, err := New(WithWriter(file)) require.NoError(t, err, "New()") const goroutines = 10 diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go index dc44eb54c86..ad77ffa1287 100644 --- a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go @@ -4,38 +4,29 @@ package writer // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer" import ( - "fmt" "io" - "os" "sync" "time" ) -// FileWriter writes data to a configured file. +// Writer writes data to the configured io.WriteCloser. // It is buffered to reduce I/O operations to improve performance. -type FileWriter struct { - path string - file io.WriteCloser - mu sync.Mutex +type Writer struct { + out io.WriteCloser + mu sync.Mutex flushInterval time.Duration flushTicker *time.Ticker stopTicker chan struct{} } -var _ flusher = (*FileWriter)(nil) +var _ flusher = (*Writer)(nil) -// NewFileWriter initializes a file writer for the file at the given path. -func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error) { - file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) - if err != nil { - return nil, fmt.Errorf("failed to open file: %w", err) - } - - fw := &FileWriter{ - path: path, +// New initializes a writer for the given io.WriteCloser. +func New(w io.WriteCloser, flushInterval time.Duration) (*Writer, error) { + fw := &Writer{ flushInterval: flushInterval, - file: newBufferedWriteCloser(file), + out: newBufferedWriteCloser(w), } if fw.flushInterval > 0 { @@ -46,17 +37,17 @@ func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error } // Export writes the given data in the file. -func (w *FileWriter) Export(data []byte) error { +func (w *Writer) Export(data []byte) error { w.mu.Lock() defer w.mu.Unlock() - if _, err := w.file.Write(data); err != nil { + if _, err := w.out.Write(data); err != nil { return err } // As stated in the specification, line separator is \n. - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md#json-lines-file - if _, err := io.WriteString(w.file, "\n"); err != nil { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file + if _, err := io.WriteString(w.out, "\n"); err != nil { return err } @@ -64,19 +55,19 @@ func (w *FileWriter) Export(data []byte) error { } // Shutdown stops the flusher. It also stops the flush ticker if set. -func (w *FileWriter) Shutdown() error { +func (w *Writer) Shutdown() error { w.mu.Lock() defer w.mu.Unlock() if w.flushTicker != nil { close(w.stopTicker) } - return w.file.Close() + return w.out.Close() } // Flush writes buffered data to disk. -func (w *FileWriter) Flush() error { - ff, ok := w.file.(flusher) +func (w *Writer) Flush() error { + ff, ok := w.out.(flusher) if !ok { return nil } @@ -88,11 +79,11 @@ func (w *FileWriter) Flush() error { } // startFlusher starts the flusher to periodically flush the buffer. -func (w *FileWriter) startFlusher() { +func (w *Writer) startFlusher() { w.mu.Lock() defer w.mu.Unlock() - ff, ok := w.file.(flusher) + ff, ok := w.out.(flusher) if !ok { return } diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go index e9374997872..1c450f86bb1 100644 --- a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go @@ -31,12 +31,11 @@ func tempFile(tb testing.TB) *os.File { func TestNewFileWriter(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) // nolint: errcheck defer writer.Shutdown() assert.NoError(t, err, "must not error when creating the file writer") - assert.Equal(t, f.Name(), writer.path, "writer file path must be the same than the file path") // Ensure file was created _, err = os.Stat(f.Name()) @@ -46,7 +45,7 @@ func TestNewFileWriter(t *testing.T) { func TestFileWriterExport(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) // nolint: errcheck defer writer.Shutdown() require.NoError(t, err, "must not error when creating the file writer") @@ -66,7 +65,7 @@ func TestFileWriterExport(t *testing.T) { func TestFileWriterShutdown(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) require.NoError(t, err, "must not error when creating the file writer") assert.NoError(t, writer.Shutdown(), "must not error when calling Shutdown()") } @@ -74,7 +73,7 @@ func TestFileWriterShutdown(t *testing.T) { func TestFileWriterConcurrentSafe(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) require.NoError(t, err, "must not error when creating the file writer") const goroutines = 10