diff --git a/.chloggen/9480-configgrpc-option-wrapper.yaml b/.chloggen/9480-configgrpc-option-wrapper.yaml new file mode 100644 index 00000000000..22e14f870e3 --- /dev/null +++ b/.chloggen/9480-configgrpc-option-wrapper.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'deprecation' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: configgrpc + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Deprecate `ClientConfig.ToClientConn`/`ServerConfig.ToServer` in favor of `ToClientConnWithOptions`/`ToServerWithOptions`" + +# One or more tracking issues or pull requests related to the change +issues: [9480] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Users providing a grpc.DialOption/grpc.ServerOption should now wrap them into + a generic option with `WithGrpcDialOption`/`WithGrpcServerOption`. + + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/add-pipeline-module.yaml b/.chloggen/add-pipeline-module.yaml new file mode 100644 index 00000000000..60819a20673 --- /dev/null +++ b/.chloggen/add-pipeline-module.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pipeline + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds new `pipeline` module to house the concept of pipeline ID and Signal. + +# One or more tracking issues or pull requests related to the change +issues: [11209] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/consumertest-profiles-samplecount.yaml b/.chloggen/consumertest-profiles-samplecount.yaml new file mode 100644 index 00000000000..6101b539b06 --- /dev/null +++ b/.chloggen/consumertest-profiles-samplecount.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumertest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce SampleCount method in ProfilesSink struct. + +# One or more tracking issues or pull requests related to the change +issues: [11225] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/use-cobra-add-version.yaml b/.chloggen/use-cobra-add-version.yaml new file mode 100644 index 00000000000..1d5eddf6b89 --- /dev/null +++ b/.chloggen/use-cobra-add-version.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: mdatagen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use cobra for the command, add version flag + +# One or more tracking issues or pull requests related to the change +issues: [11196] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/Makefile b/Makefile index b1c821ac21e..f69d242129d 100644 --- a/Makefile +++ b/Makefile @@ -304,11 +304,13 @@ check-contrib: -replace go.opentelemetry.io/collector/extension/zpagesextension=$(CURDIR)/extension/zpagesextension \ -replace go.opentelemetry.io/collector/featuregate=$(CURDIR)/featuregate \ -replace go.opentelemetry.io/collector/internal/globalgates=$(CURDIR)/internal/globalgates \ + -replace go.opentelemetry.io/collector/internal/globalsignal=$(CURDIR)/internal/globalsignal \ -replace go.opentelemetry.io/collector/otelcol=$(CURDIR)/otelcol \ -replace go.opentelemetry.io/collector/otelcol/otelcoltest=$(CURDIR)/otelcol/otelcoltest \ -replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata \ -replace go.opentelemetry.io/collector/pdata/testdata=$(CURDIR)/pdata/testdata \ -replace go.opentelemetry.io/collector/pdata/pprofile=$(CURDIR)/pdata/pprofile \ + -replace go.opentelemetry.io/collector/pipeline=$(CURDIR)/pipeline \ -replace go.opentelemetry.io/collector/processor=$(CURDIR)/processor \ -replace go.opentelemetry.io/collector/processor/batchprocessor=$(CURDIR)/processor/batchprocessor \ -replace go.opentelemetry.io/collector/processor/memorylimiterprocessor=$(CURDIR)/processor/memorylimiterprocessor \ @@ -371,11 +373,13 @@ restore-contrib: -dropreplace go.opentelemetry.io/collector/extension/zpagesextension \ -dropreplace go.opentelemetry.io/collector/featuregate \ -dropreplace go.opentelemetry.io/collector/internal/globalgates \ + -dropreplace go.opentelemetry.io/collector/internal/globalsignal \ -dropreplace go.opentelemetry.io/collector/otelcol \ -dropreplace go.opentelemetry.io/collector/otelcol/otelcoltest \ -dropreplace go.opentelemetry.io/collector/pdata \ -dropreplace go.opentelemetry.io/collector/pdata/testdata \ -dropreplace go.opentelemetry.io/collector/pdata/pprofile \ + -dropreplace go.opentelemetry.io/collector/pipeline \ -dropreplace go.opentelemetry.io/collector/processor \ -dropreplace go.opentelemetry.io/collector/processor/batchprocessor \ -dropreplace go.opentelemetry.io/collector/processor/memorylimiterprocessor \ diff --git a/VERSIONING.md b/VERSIONING.md index 8da5ba3ba23..68d69ee23fe 100644 --- a/VERSIONING.md +++ b/VERSIONING.md @@ -1,13 +1,42 @@ -# Versioning +# Versioning and stability -This document describes the versioning policy for this repository. This policy -is designed so that the following goal can be achieved: +The OpenTelemetry Collector SIG produces several artifacts for [a variety of audiences](CONTRIBUTING.md#target-audiences). This document describes the versioning and support policy for these artifacts. These policies are designed so that the following goal can be achieved: -**Users are provided a codebase of value that is stable and secure.** +**Users are provided software artifacts of value that are stable and secure.** -## Public API expectations +The policies are divided depending on the artifact's target audience. While an artifact is supported, [critical bugs](docs/release.md#bugfix-release-criteria) and security vulnerabilities MUST be addressed. The main criteria for the length of support for an artifact is how easy it is for an artifact's target audience to adapt to disruptive changes. -The following public API expectations apply to all modules in opentelemetry-collector and opentelemetry-collector-contrib. +These policies reflect the current consensus of the OpenTelemetry Collector SIG. They are subject to change as the project evolves. + +## Software artifacts for end users + +Software artifacts intended for [end users](CONTRIBUTING.md#end-users) of the OpenTelemetry Collector include +- Binary distributions of the OpenTelemetry Collector. +- Go modules that expose Collector components, such as receivers, processors, connectors, extensions and exporters. + +These artifacts are versioned according to the [semantic versioning v2.0.0](https://semver.org/) specification. + +### General considerations + +Binary distributions produced by the Collector SIG contain components and features with varying [levels of stability](README.md#stability-levels). We abide by the following principles to relate the Collector's version to the stability of its components and features: + +* The Collector's core framework behavior MUST be stable in order for a Collector distribution to be v1.0.0 or higher. +* Users can easily understand when they are opting in to use a component or feature that is not stable. + * The Collector MUST be configurable so that unstable components or features can be excluded ensuring that a fully stable configuration is possible. + * The Collector's telemetry (e.g. Collector logs) MUST provide the ability to identify usage of unstable components or features. + +### Long-term support after v1 + +The OpenTelemetry Collector SIG provides long-term support for stable binary distributions of the OpenTelemetry Collector and its components. The following policies apply to long-term support for any major version starting on v1: + +* A binary distribution of the OpenTelemetry Collector MUST be supported for a minimum of **one year** after the release of the next major version of said distribution. +* Components MUST be supported for a minimum of **6 months** after the release of the next major version of said component or after the component has been marked as deprecated. If a component has been deprecated for 6 months it MAY be removed from a binary distribution of the OpenTelemetry Collector. This does not imply a major version change in the Collector distribution. + +## Go modules + +Go modules are intended to be used by [component developers](CONTRIBUTING.md#component-developers) and [Collector library users](CONTRIBUTING.md#collector-library-users) of the OpenTelemetry Collector + +Unless otherwise specified, the following public API expectations apply to all modules in opentelemetry-collector and opentelemetry-collector-contrib. As a general rule, stability guarantees of modules versioned as `v1` or higher are aligned with [Go 1 compatibility promise](https://go.dev/doc/go1compat). ### General Go API considerations @@ -54,7 +83,7 @@ structure. must continue to be valid after a change to the validation rules, except when the configuration struct would cause an error on its intended usage (e.g. when calling a method or when passed to any method or function in any module under opentelemetry-collector). -## Versioning and module schema +### Module versioning and schema * Versioning of this project will be idiomatic of a Go project using [Go modules](https://golang.org/ref/mod#versions). @@ -135,3 +164,10 @@ on its intended usage (e.g. when calling a method or when passed to any method o * Contrib modules will be kept up to date with this project's releases. * GitHub releases will be made for all releases. * Go modules will be made available at Go package mirrors. + +### Long-term support after v1 + +The OpenTelemetry Collector SIG provides long-term support for stable Go modules. Support for modules depend on the module's [target audiences](CONTRIBUTING.md#target-audiences). The following policies apply to long-term support for any major version starting on v1: + +- Modules intended for **component developers** MUST be supported for a minimum of **1 year** after the release of the next major version of said module or after the module has been marked as deprecated. +- Modules intended for **Collector library users** MUST be supported for a minimum of **6 months** after the release of the next major version of said module or after the module has been marked as deprecated. diff --git a/cmd/mdatagen/README.md b/cmd/mdatagen/README.md index 075a1fa9674..5eb7586596e 100644 --- a/cmd/mdatagen/README.md +++ b/cmd/mdatagen/README.md @@ -57,7 +57,7 @@ You can run `cd cmd/mdatagen && $(GOCMD) install .` to install the `mdatagen` to ## Contributing to the Metadata Generator -The code for generating the documentation can be found in [loader.go](./internal/loader.go) and the templates for rendering the documentation can be found in [templates](internal/templates). +The code for generating the documentation can be found in [loader.go](./internal/loader.go) and the templates for rendering the documentation can be found in [templates](./internal/templates). When making updates to the metadata generator or introducing support for new functionality: 1. Ensure the [metadata-schema.yaml](./metadata-schema.yaml) and [metadata.yaml](./metadata.yaml) files reflect the changes. diff --git a/cmd/mdatagen/go.mod b/cmd/mdatagen/go.mod index dd643f8c13d..300b5c6a8d6 100644 --- a/cmd/mdatagen/go.mod +++ b/cmd/mdatagen/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/google/go-cmp v0.6.0 + github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 go.opentelemetry.io/collector/config/configtelemetry v0.109.0 @@ -30,6 +31,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect @@ -39,6 +41,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.109.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.109.0 // indirect diff --git a/cmd/mdatagen/go.sum b/cmd/mdatagen/go.sum index f73b8235de3..7dddf67f2b6 100644 --- a/cmd/mdatagen/go.sum +++ b/cmd/mdatagen/go.sum @@ -1,3 +1,4 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -15,6 +16,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -42,6 +45,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/cmd/mdatagen/internal/command.go b/cmd/mdatagen/internal/command.go new file mode 100644 index 00000000000..fbd4088a34f --- /dev/null +++ b/cmd/mdatagen/internal/command.go @@ -0,0 +1,424 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/cmd/mdatagen/internal" + +import ( + "bytes" + "errors" + "fmt" + "go/format" + "os" + "path/filepath" + "regexp" + "runtime/debug" + "strings" + "text/template" + + "github.com/spf13/cobra" + "golang.org/x/text/cases" + "golang.org/x/text/language" +) + +const ( + statusStart = "" + statusEnd = "" +) + +func getVersion() (string, error) { + // the second returned value is a boolean, which is true if the binaries are built with module support. + info, ok := debug.ReadBuildInfo() + if !ok { + return "", errors.New("could not read build info") + } + return info.Main.Version, nil +} + +// NewCommand constructs a new cobra.Command using the given Settings. +// Any URIs specified in CollectorSettings.ConfigProviderSettings.ResolverSettings.URIs +// are considered defaults and will be overwritten by config flags passed as +// command-line arguments to the executable. +// At least one Provider must be set. +func NewCommand() (*cobra.Command, error) { + ver, err := getVersion() + if err != nil { + return nil, err + } + rootCmd := &cobra.Command{ + Use: "mdatagen", + Version: ver, + SilenceUsage: true, + RunE: func(_ *cobra.Command, args []string) error { + return run(args[0]) + }, + } + return rootCmd, nil +} + +func run(ymlPath string) error { + if ymlPath == "" { + return errors.New("argument must be metadata.yaml file") + } + ymlPath, err := filepath.Abs(ymlPath) + if err != nil { + return fmt.Errorf("failed to get absolute path for %v: %w", ymlPath, err) + } + + ymlDir := filepath.Dir(ymlPath) + packageName := filepath.Base(ymlDir) + + md, err := LoadMetadata(ymlPath) + if err != nil { + return fmt.Errorf("failed loading %v: %w", ymlPath, err) + } + + tmplDir := "templates" + + codeDir := filepath.Join(ymlDir, "internal", "metadata") + if err = os.MkdirAll(codeDir, 0700); err != nil { + return fmt.Errorf("unable to create output directory %q: %w", codeDir, err) + } + if md.Status != nil { + if md.Status.Class != "cmd" && md.Status.Class != "pkg" && !md.Status.NotComponent { + if err = generateFile(filepath.Join(tmplDir, "status.go.tmpl"), + filepath.Join(codeDir, "generated_status.go"), md, "metadata"); err != nil { + return err + } + if err = generateFile(filepath.Join(tmplDir, "component_test.go.tmpl"), + filepath.Join(ymlDir, "generated_component_test.go"), md, packageName); err != nil { + return err + } + } + + if err = generateFile(filepath.Join(tmplDir, "package_test.go.tmpl"), + filepath.Join(ymlDir, "generated_package_test.go"), md, packageName); err != nil { + return err + } + + if _, err = os.Stat(filepath.Join(ymlDir, "README.md")); err == nil { + if err = inlineReplace( + filepath.Join(tmplDir, "readme.md.tmpl"), + filepath.Join(ymlDir, "README.md"), + md, statusStart, statusEnd); err != nil { + return err + } + } + } + + toGenerate := map[string]string{} + + if len(md.Telemetry.Metrics) != 0 { // if there are telemetry metrics, generate telemetry specific files + if err = generateFile(filepath.Join(tmplDir, "component_telemetry_test.go.tmpl"), + filepath.Join(ymlDir, "generated_component_telemetry_test.go"), md, packageName); err != nil { + return err + } + toGenerate[filepath.Join(tmplDir, "telemetry.go.tmpl")] = filepath.Join(codeDir, "generated_telemetry.go") + toGenerate[filepath.Join(tmplDir, "telemetry_test.go.tmpl")] = filepath.Join(codeDir, "generated_telemetry_test.go") + } + + if len(md.Metrics) != 0 || len(md.Telemetry.Metrics) != 0 { // if there's metrics or internal metrics, generate documentation for them + toGenerate[filepath.Join(tmplDir, "documentation.md.tmpl")] = filepath.Join(ymlDir, "documentation.md") + } + + for tmpl, dst := range toGenerate { + if err = generateFile(tmpl, dst, md, "metadata"); err != nil { + return err + } + } + + if len(md.Metrics) == 0 && len(md.ResourceAttributes) == 0 { + return nil + } + + if err = os.MkdirAll(filepath.Join(codeDir, "testdata"), 0700); err != nil { + return fmt.Errorf("unable to create output directory %q: %w", filepath.Join(codeDir, "testdata"), err) + } + + toGenerate = map[string]string{ + filepath.Join(tmplDir, "testdata", "config.yaml.tmpl"): filepath.Join(codeDir, "testdata", "config.yaml"), + filepath.Join(tmplDir, "config.go.tmpl"): filepath.Join(codeDir, "generated_config.go"), + filepath.Join(tmplDir, "config_test.go.tmpl"): filepath.Join(codeDir, "generated_config_test.go"), + } + + if len(md.ResourceAttributes) > 0 { // only generate resource files if resource attributes are configured + toGenerate[filepath.Join(tmplDir, "resource.go.tmpl")] = filepath.Join(codeDir, "generated_resource.go") + toGenerate[filepath.Join(tmplDir, "resource_test.go.tmpl")] = filepath.Join(codeDir, "generated_resource_test.go") + } + + if len(md.Metrics) > 0 { // only generate metrics if metrics are present + toGenerate[filepath.Join(tmplDir, "metrics.go.tmpl")] = filepath.Join(codeDir, "generated_metrics.go") + toGenerate[filepath.Join(tmplDir, "metrics_test.go.tmpl")] = filepath.Join(codeDir, "generated_metrics_test.go") + } + + for tmpl, dst := range toGenerate { + if err = generateFile(tmpl, dst, md, "metadata"); err != nil { + return err + } + } + + return nil +} + +func templatize(tmplFile string, md Metadata) *template.Template { + return template.Must( + template. + New(filepath.Base(tmplFile)). + Option("missingkey=error"). + Funcs(map[string]any{ + "publicVar": func(s string) (string, error) { + return FormatIdentifier(s, true) + }, + "attributeInfo": func(an AttributeName) Attribute { + return md.Attributes[an] + }, + "metricInfo": func(mn MetricName) Metric { + return md.Metrics[mn] + }, + "telemetryInfo": func(mn MetricName) Metric { + return md.Telemetry.Metrics[mn] + }, + "parseImportsRequired": func(metrics map[MetricName]Metric) bool { + for _, m := range metrics { + if m.Data().HasMetricInputType() { + return true + } + } + return false + }, + "stringsJoin": strings.Join, + "stringsSplit": strings.Split, + "userLinks": func(elems []string) []string { + result := make([]string, len(elems)) + for i, elem := range elems { + if elem == "open-telemetry/collector-approvers" { + result[i] = "[@open-telemetry/collector-approvers](https://github.com/orgs/open-telemetry/teams/collector-approvers)" + } else { + result[i] = fmt.Sprintf("[@%s](https://www.github.com/%s)", elem, elem) + } + } + return result + }, + "casesTitle": cases.Title(language.English).String, + "toLowerCase": strings.ToLower, + "toCamelCase": func(s string) string { + caser := cases.Title(language.English).String + parts := strings.Split(s, "_") + result := "" + for _, part := range parts { + result += caser(part) + } + return result + }, + "inc": func(i int) int { return i + 1 }, + "distroURL": func(name string) string { + return Distros[name] + }, + "isExporter": func() bool { + return md.Status.Class == "exporter" + }, + "isProcessor": func() bool { + return md.Status.Class == "processor" + }, + "isReceiver": func() bool { + return md.Status.Class == "receiver" + }, + "isExtension": func() bool { + return md.Status.Class == "extension" + }, + "isConnector": func() bool { + return md.Status.Class == "connector" + }, + "isCommand": func() bool { + return md.Status.Class == "cmd" + }, + "supportsLogs": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "logs" { + return true + } + } + } + return false + }, + "supportsMetrics": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "metrics" { + return true + } + } + } + return false + }, + "supportsTraces": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "traces" { + return true + } + } + } + return false + }, + "supportsLogsToLogs": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "logs_to_logs" { + return true + } + } + } + return false + }, + "supportsLogsToMetrics": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "logs_to_metrics" { + return true + } + } + } + return false + }, + "supportsLogsToTraces": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "logs_to_traces" { + return true + } + } + } + return false + }, + "supportsMetricsToLogs": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "metrics_to_logs" { + return true + } + } + } + return false + }, + "supportsMetricsToMetrics": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "metrics_to_metrics" { + return true + } + } + } + return false + }, + "supportsMetricsToTraces": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "metrics_to_traces" { + return true + } + } + } + return false + }, + "supportsTracesToLogs": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "traces_to_logs" { + return true + } + } + } + return false + }, + "supportsTracesToMetrics": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "traces_to_metrics" { + return true + } + } + } + return false + }, + "supportsTracesToTraces": func() bool { + for _, signals := range md.Status.Stability { + for _, s := range signals { + if s == "traces_to_traces" { + return true + } + } + } + return false + }, + "expectConsumerError": func() bool { + return md.Tests.ExpectConsumerError + }, + // ParseFS delegates the parsing of the files to `Glob` + // which uses the `\` as a special character. + // Meaning on windows based machines, the `\` needs to be replaced + // with a `/` for it to find the file. + }).ParseFS(TemplateFS, strings.ReplaceAll(tmplFile, "\\", "/"))) +} + +func inlineReplace(tmplFile string, outputFile string, md Metadata, start string, end string) error { + var readmeContents []byte + var err error + if readmeContents, err = os.ReadFile(outputFile); err != nil { // nolint: gosec + return err + } + + var re = regexp.MustCompile(fmt.Sprintf("%s[\\s\\S]*%s", start, end)) + if !re.Match(readmeContents) { + return nil + } + + tmpl := templatize(tmplFile, md) + buf := bytes.Buffer{} + + if md.GithubProject == "" { + md.GithubProject = "open-telemetry/opentelemetry-collector-contrib" + } + + if err := tmpl.Execute(&buf, TemplateContext{Metadata: md, Package: "metadata"}); err != nil { + return fmt.Errorf("failed executing template: %w", err) + } + + result := buf.String() + + s := re.ReplaceAllString(string(readmeContents), result) + if err := os.WriteFile(outputFile, []byte(s), 0600); err != nil { + return fmt.Errorf("failed writing %q: %w", outputFile, err) + } + + return nil +} + +func generateFile(tmplFile string, outputFile string, md Metadata, goPackage string) error { + tmpl := templatize(tmplFile, md) + buf := bytes.Buffer{} + + if err := tmpl.Execute(&buf, TemplateContext{Metadata: md, Package: goPackage}); err != nil { + return fmt.Errorf("failed executing template: %w", err) + } + + if err := os.Remove(outputFile); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("unable to remove genererated file %q: %w", outputFile, err) + } + + result := buf.Bytes() + var formatErr error + if strings.HasSuffix(outputFile, ".go") { + if formatted, err := format.Source(buf.Bytes()); err == nil { + result = formatted + } else { + formatErr = fmt.Errorf("failed formatting %s:%w", outputFile, err) + } + } + + if err := os.WriteFile(outputFile, result, 0600); err != nil { + return fmt.Errorf("failed writing %q: %w", outputFile, err) + } + + return formatErr +} diff --git a/cmd/mdatagen/main_test.go b/cmd/mdatagen/internal/command_test.go similarity index 95% rename from cmd/mdatagen/main_test.go rename to cmd/mdatagen/internal/command_test.go index 707f9bc45e1..b1d82b55f58 100644 --- a/cmd/mdatagen/main_test.go +++ b/cmd/mdatagen/internal/command_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package main +package internal import ( "bytes" @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/cmd/mdatagen/internal" "go.opentelemetry.io/collector/component" ) @@ -112,7 +111,7 @@ func TestRunContents(t *testing.T) { tmpdir := filepath.Join(t.TempDir(), "shortname") err := os.MkdirAll(tmpdir, 0750) require.NoError(t, err) - ymlContent, err := os.ReadFile(filepath.Join("internal/testdata", tt.yml)) + ymlContent, err := os.ReadFile(filepath.Join("testdata", tt.yml)) require.NoError(t, err) metadataFile := filepath.Join(tmpdir, "metadata.yaml") require.NoError(t, os.WriteFile(metadataFile, ymlContent, 0600)) @@ -266,7 +265,7 @@ func TestInlineReplace(t *testing.T) { warnings []string stability map[component.StabilityLevel][]string distros []string - codeowners *internal.Codeowners + codeowners *Codeowners githubProject string }{ { @@ -308,7 +307,7 @@ Some info about a component outputFile: "readme_with_status_codeowners_and_seeking_new.md", componentClass: "receiver", distros: []string{"contrib"}, - codeowners: &internal.Codeowners{ + codeowners: &Codeowners{ Active: []string{"foo"}, SeekingNew: true, }, @@ -325,7 +324,7 @@ Some info about a component outputFile: "readme_with_status_codeowners_and_emeritus.md", componentClass: "receiver", distros: []string{"contrib"}, - codeowners: &internal.Codeowners{ + codeowners: &Codeowners{ Active: []string{"foo"}, Emeritus: []string{"bar"}, }, @@ -342,7 +341,7 @@ Some info about a component outputFile: "readme_with_status_codeowners.md", componentClass: "receiver", distros: []string{"contrib"}, - codeowners: &internal.Codeowners{ + codeowners: &Codeowners{ Active: []string{"foo"}, }, }, @@ -426,11 +425,11 @@ Some info about a component if len(tt.stability) > 0 { stability = tt.stability } - md := internal.Metadata{ + md := Metadata{ GithubProject: tt.githubProject, Type: "foo", ShortFolderName: "foo", - Status: &internal.Status{ + Status: &Status{ Stability: stability, Distributions: tt.distros, Class: tt.componentClass, @@ -450,7 +449,7 @@ Some info about a component got, err := os.ReadFile(filepath.Join(tmpdir, "README.md")) // nolint: gosec require.NoError(t, err) got = bytes.ReplaceAll(got, []byte("\r\n"), []byte("\n")) - expected, err := os.ReadFile(filepath.Join("internal/testdata", tt.outputFile)) + expected, err := os.ReadFile(filepath.Join("testdata", tt.outputFile)) require.NoError(t, err) expected = bytes.ReplaceAll(expected, []byte("\r\n"), []byte("\n")) fmt.Println(string(got)) @@ -464,14 +463,14 @@ func TestGenerateStatusMetadata(t *testing.T) { tests := []struct { name string output string - md internal.Metadata + md Metadata expected string }{ { name: "foo component with beta status", - md: internal.Metadata{ + md: Metadata{ Type: "foo", - Status: &internal.Status{ + Status: &Status{ Stability: map[component.StabilityLevel][]string{ component.StabilityLevelBeta: {"metrics"}, }, @@ -499,9 +498,9 @@ const ( }, { name: "foo component with alpha status", - md: internal.Metadata{ + md: Metadata{ Type: "foo", - Status: &internal.Status{ + Status: &Status{ Stability: map[component.StabilityLevel][]string{ component.StabilityLevelAlpha: {"metrics"}, }, @@ -546,14 +545,14 @@ func TestGenerateTelemetryMetadata(t *testing.T) { tests := []struct { name string output string - md internal.Metadata + md Metadata expected string }{ { name: "foo component with beta status", - md: internal.Metadata{ + md: Metadata{ Type: "foo", - Status: &internal.Status{ + Status: &Status{ Stability: map[component.StabilityLevel][]string{ component.StabilityLevelBeta: {"metrics"}, }, @@ -589,9 +588,9 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { }, { name: "foo component with alpha status", - md: internal.Metadata{ + md: Metadata{ Type: "foo", - Status: &internal.Status{ + Status: &Status{ Stability: map[component.StabilityLevel][]string{ component.StabilityLevelAlpha: {"metrics"}, }, diff --git a/cmd/mdatagen/main.go b/cmd/mdatagen/main.go index 0f27b925fce..32bab21949d 100644 --- a/cmd/mdatagen/main.go +++ b/cmd/mdatagen/main.go @@ -1,410 +1,18 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -//go:generate mdatagen metadata.yaml - package main -import ( - "bytes" - "errors" - "flag" - "fmt" - "go/format" - "log" - "os" - "path/filepath" - "regexp" - "strings" - "text/template" +//go:generate mdatagen metadata.yaml - "golang.org/x/text/cases" - "golang.org/x/text/language" +import ( + "github.com/spf13/cobra" "go.opentelemetry.io/collector/cmd/mdatagen/internal" ) -const ( - statusStart = "" - statusEnd = "" -) - func main() { - flag.Usage = func() { - fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s metadata.yaml\n", os.Args[0]) - flag.PrintDefaults() - } - flag.Parse() - yml := flag.Arg(0) - if err := run(yml); err != nil { - log.Fatal(err) - } -} - -func run(ymlPath string) error { - if ymlPath == "" { - return errors.New("argument must be metadata.yaml file") - } - ymlPath, err := filepath.Abs(ymlPath) - if err != nil { - return fmt.Errorf("failed to get absolute path for %v: %w", ymlPath, err) - } - - ymlDir := filepath.Dir(ymlPath) - packageName := filepath.Base(ymlDir) - - md, err := internal.LoadMetadata(ymlPath) - if err != nil { - return fmt.Errorf("failed loading %v: %w", ymlPath, err) - } - - tmplDir := "templates" - - codeDir := filepath.Join(ymlDir, "internal", "metadata") - if err = os.MkdirAll(codeDir, 0700); err != nil { - return fmt.Errorf("unable to create output directory %q: %w", codeDir, err) - } - if md.Status != nil { - if md.Status.Class != "cmd" && md.Status.Class != "pkg" && !md.Status.NotComponent { - if err = generateFile(filepath.Join(tmplDir, "status.go.tmpl"), - filepath.Join(codeDir, "generated_status.go"), md, "metadata"); err != nil { - return err - } - if err = generateFile(filepath.Join(tmplDir, "component_test.go.tmpl"), - filepath.Join(ymlDir, "generated_component_test.go"), md, packageName); err != nil { - return err - } - } - - if err = generateFile(filepath.Join(tmplDir, "package_test.go.tmpl"), - filepath.Join(ymlDir, "generated_package_test.go"), md, packageName); err != nil { - return err - } - - if _, err = os.Stat(filepath.Join(ymlDir, "README.md")); err == nil { - if err = inlineReplace( - filepath.Join(tmplDir, "readme.md.tmpl"), - filepath.Join(ymlDir, "README.md"), - md, statusStart, statusEnd); err != nil { - return err - } - } - } - - toGenerate := map[string]string{} - - if len(md.Telemetry.Metrics) != 0 { // if there are telemetry metrics, generate telemetry specific files - if err = generateFile(filepath.Join(tmplDir, "component_telemetry_test.go.tmpl"), - filepath.Join(ymlDir, "generated_component_telemetry_test.go"), md, packageName); err != nil { - return err - } - toGenerate[filepath.Join(tmplDir, "telemetry.go.tmpl")] = filepath.Join(codeDir, "generated_telemetry.go") - toGenerate[filepath.Join(tmplDir, "telemetry_test.go.tmpl")] = filepath.Join(codeDir, "generated_telemetry_test.go") - } - - if len(md.Metrics) != 0 || len(md.Telemetry.Metrics) != 0 { // if there's metrics or internal metrics, generate documentation for them - toGenerate[filepath.Join(tmplDir, "documentation.md.tmpl")] = filepath.Join(ymlDir, "documentation.md") - } - - for tmpl, dst := range toGenerate { - if err = generateFile(tmpl, dst, md, "metadata"); err != nil { - return err - } - } - - if len(md.Metrics) == 0 && len(md.ResourceAttributes) == 0 { - return nil - } - - if err = os.MkdirAll(filepath.Join(codeDir, "testdata"), 0700); err != nil { - return fmt.Errorf("unable to create output directory %q: %w", filepath.Join(codeDir, "testdata"), err) - } - - toGenerate = map[string]string{ - filepath.Join(tmplDir, "testdata", "config.yaml.tmpl"): filepath.Join(codeDir, "testdata", "config.yaml"), - filepath.Join(tmplDir, "config.go.tmpl"): filepath.Join(codeDir, "generated_config.go"), - filepath.Join(tmplDir, "config_test.go.tmpl"): filepath.Join(codeDir, "generated_config_test.go"), - } - - if len(md.ResourceAttributes) > 0 { // only generate resource files if resource attributes are configured - toGenerate[filepath.Join(tmplDir, "resource.go.tmpl")] = filepath.Join(codeDir, "generated_resource.go") - toGenerate[filepath.Join(tmplDir, "resource_test.go.tmpl")] = filepath.Join(codeDir, "generated_resource_test.go") - } - - if len(md.Metrics) > 0 { // only generate metrics if metrics are present - toGenerate[filepath.Join(tmplDir, "metrics.go.tmpl")] = filepath.Join(codeDir, "generated_metrics.go") - toGenerate[filepath.Join(tmplDir, "metrics_test.go.tmpl")] = filepath.Join(codeDir, "generated_metrics_test.go") - } - - for tmpl, dst := range toGenerate { - if err = generateFile(tmpl, dst, md, "metadata"); err != nil { - return err - } - } - - return nil -} - -func templatize(tmplFile string, md internal.Metadata) *template.Template { - return template.Must( - template. - New(filepath.Base(tmplFile)). - Option("missingkey=error"). - Funcs(map[string]any{ - "publicVar": func(s string) (string, error) { - return internal.FormatIdentifier(s, true) - }, - "attributeInfo": func(an internal.AttributeName) internal.Attribute { - return md.Attributes[an] - }, - "metricInfo": func(mn internal.MetricName) internal.Metric { - return md.Metrics[mn] - }, - "telemetryInfo": func(mn internal.MetricName) internal.Metric { - return md.Telemetry.Metrics[mn] - }, - "parseImportsRequired": func(metrics map[internal.MetricName]internal.Metric) bool { - for _, m := range metrics { - if m.Data().HasMetricInputType() { - return true - } - } - return false - }, - "stringsJoin": strings.Join, - "stringsSplit": strings.Split, - "userLinks": func(elems []string) []string { - result := make([]string, len(elems)) - for i, elem := range elems { - if elem == "open-telemetry/collector-approvers" { - result[i] = "[@open-telemetry/collector-approvers](https://github.com/orgs/open-telemetry/teams/collector-approvers)" - } else { - result[i] = fmt.Sprintf("[@%s](https://www.github.com/%s)", elem, elem) - } - } - return result - }, - "casesTitle": cases.Title(language.English).String, - "toLowerCase": strings.ToLower, - "toCamelCase": func(s string) string { - caser := cases.Title(language.English).String - parts := strings.Split(s, "_") - result := "" - for _, part := range parts { - result += caser(part) - } - return result - }, - "inc": func(i int) int { return i + 1 }, - "distroURL": func(name string) string { - return internal.Distros[name] - }, - "isExporter": func() bool { - return md.Status.Class == "exporter" - }, - "isProcessor": func() bool { - return md.Status.Class == "processor" - }, - "isReceiver": func() bool { - return md.Status.Class == "receiver" - }, - "isExtension": func() bool { - return md.Status.Class == "extension" - }, - "isConnector": func() bool { - return md.Status.Class == "connector" - }, - "isCommand": func() bool { - return md.Status.Class == "cmd" - }, - "supportsLogs": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "logs" { - return true - } - } - } - return false - }, - "supportsMetrics": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "metrics" { - return true - } - } - } - return false - }, - "supportsTraces": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "traces" { - return true - } - } - } - return false - }, - "supportsLogsToLogs": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "logs_to_logs" { - return true - } - } - } - return false - }, - "supportsLogsToMetrics": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "logs_to_metrics" { - return true - } - } - } - return false - }, - "supportsLogsToTraces": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "logs_to_traces" { - return true - } - } - } - return false - }, - "supportsMetricsToLogs": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "metrics_to_logs" { - return true - } - } - } - return false - }, - "supportsMetricsToMetrics": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "metrics_to_metrics" { - return true - } - } - } - return false - }, - "supportsMetricsToTraces": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "metrics_to_traces" { - return true - } - } - } - return false - }, - "supportsTracesToLogs": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "traces_to_logs" { - return true - } - } - } - return false - }, - "supportsTracesToMetrics": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "traces_to_metrics" { - return true - } - } - } - return false - }, - "supportsTracesToTraces": func() bool { - for _, signals := range md.Status.Stability { - for _, s := range signals { - if s == "traces_to_traces" { - return true - } - } - } - return false - }, - "expectConsumerError": func() bool { - return md.Tests.ExpectConsumerError - }, - // ParseFS delegates the parsing of the files to `Glob` - // which uses the `\` as a special character. - // Meaning on windows based machines, the `\` needs to be replaced - // with a `/` for it to find the file. - }).ParseFS(internal.TemplateFS, strings.ReplaceAll(tmplFile, "\\", "/"))) -} - -func inlineReplace(tmplFile string, outputFile string, md internal.Metadata, start string, end string) error { - var readmeContents []byte - var err error - if readmeContents, err = os.ReadFile(outputFile); err != nil { // nolint: gosec - return err - } - - var re = regexp.MustCompile(fmt.Sprintf("%s[\\s\\S]*%s", start, end)) - if !re.Match(readmeContents) { - return nil - } - - tmpl := templatize(tmplFile, md) - buf := bytes.Buffer{} - - if md.GithubProject == "" { - md.GithubProject = "open-telemetry/opentelemetry-collector-contrib" - } - - if err := tmpl.Execute(&buf, internal.TemplateContext{Metadata: md, Package: "metadata"}); err != nil { - return fmt.Errorf("failed executing template: %w", err) - } - - result := buf.String() - - s := re.ReplaceAllString(string(readmeContents), result) - if err := os.WriteFile(outputFile, []byte(s), 0600); err != nil { - return fmt.Errorf("failed writing %q: %w", outputFile, err) - } - - return nil -} - -func generateFile(tmplFile string, outputFile string, md internal.Metadata, goPackage string) error { - tmpl := templatize(tmplFile, md) - buf := bytes.Buffer{} - - if err := tmpl.Execute(&buf, internal.TemplateContext{Metadata: md, Package: goPackage}); err != nil { - return fmt.Errorf("failed executing template: %w", err) - } - - if err := os.Remove(outputFile); err != nil && !errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("unable to remove genererated file %q: %w", outputFile, err) - } - - result := buf.Bytes() - var formatErr error - if strings.HasSuffix(outputFile, ".go") { - if formatted, err := format.Source(buf.Bytes()); err == nil { - result = formatted - } else { - formatErr = fmt.Errorf("failed formatting %s:%w", outputFile, err) - } - } - - if err := os.WriteFile(outputFile, result, 0600); err != nil { - return fmt.Errorf("failed writing %q: %w", outputFile, err) - } - - return formatErr + cmd, err := internal.NewCommand() + cobra.CheckErr(err) + cobra.CheckErr(cmd.Execute()) } diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 9caa916b81c..2bf15063c57 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -226,16 +226,58 @@ func (gcs *ClientConfig) isSchemeHTTPS() bool { // a non-blocking dial (the function won't wait for connections to be // established, and connecting happens in the background). To make it a blocking // dial, use grpc.WithBlock() dial option. -func (gcs *ClientConfig) ToClientConn(ctx context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) { - opts, err := gcs.toDialOptions(ctx, host, settings) +// +// Deprecated: [v0.110.0] If providing a [grpc.DialOption], use [ClientConfig.ToClientConnWithOptions] +// with [WithGrpcDialOption] instead. +func (gcs *ClientConfig) ToClientConn( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + grpcOpts ...grpc.DialOption, +) (*grpc.ClientConn, error) { + var extraOpts []ToClientConnOption + for _, grpcOpt := range grpcOpts { + extraOpts = append(extraOpts, WithGrpcDialOption(grpcOpt)) + } + return gcs.ToClientConnWithOptions(ctx, host, settings, extraOpts...) +} + +// ToClientConnOption is a sealed interface wrapping options for [ClientConfig.ToClientConnWithOptions]. +type ToClientConnOption interface { + isToClientConnOption() +} + +type grpcDialOptionWrapper struct { + opt grpc.DialOption +} + +// WithGrpcDialOption wraps a [grpc.DialOption] into a [ToClientConnOption]. +func WithGrpcDialOption(opt grpc.DialOption) ToClientConnOption { + return grpcDialOptionWrapper{opt: opt} +} +func (grpcDialOptionWrapper) isToClientConnOption() {} + +// ToClientConnWithOptions is the same as [ClientConfig.ToClientConn], but uses the [ToClientConnOption] interface for options. +// This method will eventually replace [ClientConfig.ToClientConn]. +func (gcs *ClientConfig) ToClientConnWithOptions( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + extraOpts ...ToClientConnOption, +) (*grpc.ClientConn, error) { + grpcOpts, err := gcs.getGrpcDialOptions(ctx, host, settings, extraOpts) if err != nil { return nil, err } - opts = append(opts, extraOpts...) - return grpc.NewClient(gcs.sanitizedEndpoint(), opts...) + return grpc.NewClient(gcs.sanitizedEndpoint(), grpcOpts...) } -func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) { +func (gcs *ClientConfig) getGrpcDialOptions( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + extraOpts []ToClientConnOption, +) ([]grpc.DialOption, error) { var opts []grpc.DialOption if gcs.Compression.IsCompressed() { cp, err := getGRPCCompressionName(gcs.Compression) @@ -312,6 +354,12 @@ func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host, // Enable OpenTelemetry observability plugin. opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...))) + for _, opt := range extraOpts { + if wrapper, ok := opt.(grpcDialOptionWrapper); ok { + opts = append(opts, wrapper.opt) + } + } + return opts, nil } @@ -335,17 +383,58 @@ func (gss *ServerConfig) Validate() error { return nil } -// ToServer returns a grpc.Server for the configuration -func (gss *ServerConfig) ToServer(_ context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.ServerOption) (*grpc.Server, error) { - opts, err := gss.toServerOption(host, settings) +// ToServer returns a [grpc.Server] for the configuration +// +// Deprecated: [v0.110.0] If providing a [grpc.ServerOption], use [ServerConfig.ToServerWithOptions] +// with [WithGrpcServerOption] instead. +func (gss *ServerConfig) ToServer( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + grpcOpts ...grpc.ServerOption, +) (*grpc.Server, error) { + var extraOpts []ToServerOption + for _, grpcOpt := range grpcOpts { + extraOpts = append(extraOpts, WithGrpcServerOption(grpcOpt)) + } + return gss.ToServerWithOptions(ctx, host, settings, extraOpts...) +} + +// ToServerOption is a sealed interface wrapping options for [ServerConfig.ToServerWithOptions]. +type ToServerOption interface { + isToServerOption() +} + +type grpcServerOptionWrapper struct { + opt grpc.ServerOption +} + +// WithGrpcServerOption wraps a [grpc.ServerOption] into a [ToServerOption]. +func WithGrpcServerOption(opt grpc.ServerOption) ToServerOption { + return grpcServerOptionWrapper{opt: opt} +} +func (grpcServerOptionWrapper) isToServerOption() {} + +// ToServerWithOptions is the same as [ServerConfig.ToServer], but uses the [ToServerOption] interface for options. +// This method will eventually replace [ServerConfig.ToServer]. +func (gss *ServerConfig) ToServerWithOptions( + _ context.Context, + host component.Host, + settings component.TelemetrySettings, + extraOpts ...ToServerOption, +) (*grpc.Server, error) { + grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts) if err != nil { return nil, err } - opts = append(opts, extraOpts...) - return grpc.NewServer(opts...), nil + return grpc.NewServer(grpcOpts...), nil } -func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) { +func (gss *ServerConfig) getGrpcServerOptions( + host component.Host, + settings component.TelemetrySettings, + extraOpts []ToServerOption, +) ([]grpc.ServerOption, error) { switch gss.NetAddr.Transport { case confignet.TransportTypeTCP, confignet.TransportTypeTCP4, confignet.TransportTypeTCP6, confignet.TransportTypeUDP, confignet.TransportTypeUDP4, confignet.TransportTypeUDP6: internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint) @@ -435,6 +524,12 @@ func (gss *ServerConfig) toServerOption(host component.Host, settings component. opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...)) + for _, opt := range extraOpts { + if wrapper, ok := opt.(grpcServerOptionWrapper); ok { + opts = append(opts, wrapper.opt) + } + } + return opts, nil } diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index d2a6b1c15e0..585b0c6d7c1 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -126,11 +126,33 @@ func TestDefaultGrpcClientSettings(t *testing.T) { Insecure: true, }, } - opts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings()) + opts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{}) require.NoError(t, err) assert.Len(t, opts, 2) } +func TestGrpcClientExtraOption(t *testing.T) { + tt, err := componenttest.SetupTelemetry(componentID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + gcs := &ClientConfig{ + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + extraOpt := grpc.WithUserAgent("test-agent") + opts, err := gcs.getGrpcDialOptions( + context.Background(), + componenttest.NewNopHost(), + tt.TelemetrySettings(), + []ToClientConnOption{WithGrpcDialOption(extraOpt)}, + ) + require.NoError(t, err) + assert.Len(t, opts, 3) + assert.Equal(t, opts[2], extraOpt) +} + func TestAllGrpcClientSettings(t *testing.T) { tt, err := componenttest.SetupTelemetry(componentID) require.NoError(t, err) @@ -231,7 +253,7 @@ func TestAllGrpcClientSettings(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - opts, err := test.settings.toDialOptions(context.Background(), test.host, tt.TelemetrySettings()) + opts, err := test.settings.getGrpcDialOptions(context.Background(), test.host, tt.TelemetrySettings(), []ToClientConnOption{}) require.NoError(t, err) assert.Len(t, opts, 9) }) @@ -244,11 +266,28 @@ func TestDefaultGrpcServerSettings(t *testing.T) { Endpoint: "0.0.0.0:1234", }, } - opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) + opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{}) require.NoError(t, err) assert.Len(t, opts, 3) } +func TestGrpcServerExtraOption(t *testing.T) { + gss := &ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: "0.0.0.0:1234", + }, + } + extraOpt := grpc.ConnectionTimeout(1_000_000_000) + opts, err := gss.getGrpcServerOptions( + componenttest.NewNopHost(), + componenttest.NewNopTelemetrySettings(), + []ToServerOption{WithGrpcServerOption(extraOpt)}, + ) + require.NoError(t, err) + assert.Len(t, opts, 4) + assert.Equal(t, opts[3], extraOpt) +} + func TestGrpcServerValidate(t *testing.T) { tests := []struct { gss *ServerConfig @@ -329,7 +368,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) { }, }, } - opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) + opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{}) require.NoError(t, err) assert.Len(t, opts, 10) } @@ -488,7 +527,7 @@ func TestUseSecure(t *testing.T) { TLSSetting: configtls.ClientConfig{}, Keepalive: nil, } - dialOpts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings()) + dialOpts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{}) require.NoError(t, err) assert.Len(t, dialOpts, 2) } @@ -540,7 +579,7 @@ func TestGRPCServerWarning(t *testing.T) { logger, observed := observer.New(zap.DebugLevel) set.Logger = zap.New(logger) - opts, err := test.settings.toServerOption(componenttest.NewNopHost(), set) + opts, err := test.settings.getGrpcServerOptions(componenttest.NewNopHost(), set, []ToServerOption{}) require.NoError(t, err) require.NotNil(t, opts) _ = grpc.NewServer(opts...) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 24d2905d7af..07ac4a1d8a9 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -68,8 +68,8 @@ is hosted at a different [origin][origin]. If left blank or set to `null`, CORS will not be enabled. - `allowed_origins`: A list of [origins][origin] allowed to send requests to the receiver. An origin may contain a wildcard (`*`) to replace 0 or more - characters (e.g., `https://*.example.com`). To allow any origin, set to - `["*"]`. If no origins are listed, CORS will not be enabled. + characters (e.g., `https://*.example.com`). **Do not use** a plain wildcard + `["*"]`, as our CORS response includes `Access-Control-Allow-Credentials: true`, which makes browsers to **disallow a plain wildcard** (this is a security standard). To allow any origin, you can specify at least the protocol, for example `["https://*", "http://*"]`. If no origins are listed, CORS will not be enabled. - `allowed_headers`: Allow CORS requests to include headers outside the [default safelist][cors-headers]. By default, safelist headers and `X-Requested-With` will be allowed. To allow any request header, set to diff --git a/consumer/consumertest/sink.go b/consumer/consumertest/sink.go index ec35e717ae0..a6d2a424ee1 100644 --- a/consumer/consumertest/sink.go +++ b/consumer/consumertest/sink.go @@ -163,8 +163,9 @@ func (sle *LogsSink) Reset() { // stores all profiles and allows querying them for testing. type ProfilesSink struct { nonMutatingConsumer - mu sync.Mutex - profiles []pprofile.Profiles + mu sync.Mutex + profiles []pprofile.Profiles + sampleCount int } var _ consumerprofiles.Profiles = (*ProfilesSink)(nil) @@ -175,6 +176,7 @@ func (ste *ProfilesSink) ConsumeProfiles(_ context.Context, td pprofile.Profiles defer ste.mu.Unlock() ste.profiles = append(ste.profiles, td) + ste.sampleCount += td.SampleCount() return nil } @@ -189,10 +191,18 @@ func (ste *ProfilesSink) AllProfiles() []pprofile.Profiles { return copyProfiles } +// ProfileRecordCount returns the number of profiles stored by this sink since last Reset. +func (ste *ProfilesSink) SampleCount() int { + ste.mu.Lock() + defer ste.mu.Unlock() + return ste.sampleCount +} + // Reset deletes any stored data. func (ste *ProfilesSink) Reset() { ste.mu.Lock() defer ste.mu.Unlock() ste.profiles = nil + ste.sampleCount = 0 } diff --git a/consumer/consumertest/sink_test.go b/consumer/consumertest/sink_test.go index 3a377345fc4..5d7f7f3bf8a 100644 --- a/consumer/consumertest/sink_test.go +++ b/consumer/consumertest/sink_test.go @@ -71,6 +71,8 @@ func TestProfilesSink(t *testing.T) { want = append(want, td) } assert.Equal(t, want, sink.AllProfiles()) + assert.Equal(t, len(want), sink.SampleCount()) sink.Reset() assert.Empty(t, sink.AllProfiles()) + assert.Empty(t, sink.SampleCount()) } diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index b8d6dee2a75..ecda0e93add 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -58,7 +59,8 @@ func newExporter(cfg component.Config, set exporter.Settings) *baseExporter { // start actually creates the gRPC connection. The client construction is deferred till this point as this // is the only place we get hold of Extensions which are required to construct auth round tripper. func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) { - if e.clientConn, err = e.config.ClientConfig.ToClientConn(ctx, host, e.settings, grpc.WithUserAgent(e.userAgent)); err != nil { + agentOpt := configgrpc.WithGrpcDialOption(grpc.WithUserAgent(e.userAgent)) + if e.clientConn, err = e.config.ClientConfig.ToClientConnWithOptions(ctx, host, e.settings, agentOpt); err != nil { return err } e.traceExporter = ptraceotlp.NewGRPCClient(e.clientConn) diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index c29940eaee7..8b7ec9dc807 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -49,7 +49,12 @@ func createDefaultConfig() component.Config { } } -func composeSignalURL(oCfg *Config, signalOverrideURL string, signalName string) (string, error) { +// composeSignalURL composes the final URL for the signal (traces, metrics, logs) based on the configuration. +// oCfg is the configuration of the exporter. +// signalOverrideURL is the URL specified in the signal specific configuration (empty if not specified). +// signalName is the name of the signal, e.g. "traces", "metrics", "logs". +// signalVersion is the version of the signal, e.g. "v1" or "v1development". +func composeSignalURL(oCfg *Config, signalOverrideURL string, signalName string, signalVersion string) (string, error) { switch { case signalOverrideURL != "": _, err := url.Parse(signalOverrideURL) @@ -61,9 +66,9 @@ func composeSignalURL(oCfg *Config, signalOverrideURL string, signalName string) return "", fmt.Errorf("either endpoint or %s_endpoint must be specified", signalName) default: if strings.HasSuffix(oCfg.Endpoint, "/") { - return oCfg.Endpoint + "v1/" + signalName, nil + return oCfg.Endpoint + signalVersion + "/" + signalName, nil } - return oCfg.Endpoint + "/v1/" + signalName, nil + return oCfg.Endpoint + "/" + signalVersion + "/" + signalName, nil } } @@ -78,7 +83,7 @@ func createTracesExporter( } oCfg := cfg.(*Config) - oce.tracesURL, err = composeSignalURL(oCfg, oCfg.TracesEndpoint, "traces") + oce.tracesURL, err = composeSignalURL(oCfg, oCfg.TracesEndpoint, "traces", "v1") if err != nil { return nil, err } @@ -104,7 +109,7 @@ func createMetricsExporter( } oCfg := cfg.(*Config) - oce.metricsURL, err = composeSignalURL(oCfg, oCfg.MetricsEndpoint, "metrics") + oce.metricsURL, err = composeSignalURL(oCfg, oCfg.MetricsEndpoint, "metrics", "v1") if err != nil { return nil, err } @@ -130,7 +135,7 @@ func createLogsExporter( } oCfg := cfg.(*Config) - oce.logsURL, err = composeSignalURL(oCfg, oCfg.LogsEndpoint, "logs") + oce.logsURL, err = composeSignalURL(oCfg, oCfg.LogsEndpoint, "logs", "v1") if err != nil { return nil, err } diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 0aef54e96f6..5170e8b4500 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -215,13 +215,19 @@ func TestComposeSignalURL(t *testing.T) { // Has slash at end cfg.ClientConfig.Endpoint = "http://localhost:4318/" - url, err := composeSignalURL(cfg, "", "traces") + url, err := composeSignalURL(cfg, "", "traces", "v1") require.NoError(t, err) assert.Equal(t, "http://localhost:4318/v1/traces", url) // No slash at end cfg.ClientConfig.Endpoint = "http://localhost:4318" - url, err = composeSignalURL(cfg, "", "traces") + url, err = composeSignalURL(cfg, "", "traces", "v1") require.NoError(t, err) assert.Equal(t, "http://localhost:4318/v1/traces", url) + + // Different version + cfg.ClientConfig.Endpoint = "http://localhost:4318" + url, err = composeSignalURL(cfg, "", "traces", "v2") + require.NoError(t, err) + assert.Equal(t, "http://localhost:4318/v2/traces", url) } diff --git a/internal/globalsignal/Makefile b/internal/globalsignal/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/internal/globalsignal/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/internal/globalsignal/go.mod b/internal/globalsignal/go.mod new file mode 100644 index 00000000000..8ee5f5ddf1e --- /dev/null +++ b/internal/globalsignal/go.mod @@ -0,0 +1,11 @@ +module go.opentelemetry.io/collector/internal/globalsignal + +go 1.22.0 + +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/internal/globalsignal/go.sum b/internal/globalsignal/go.sum new file mode 100644 index 00000000000..60ce688a041 --- /dev/null +++ b/internal/globalsignal/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/globalsignal/signal.go b/internal/globalsignal/signal.go new file mode 100644 index 00000000000..a10431743b0 --- /dev/null +++ b/internal/globalsignal/signal.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package globalsignal // import "go.opentelemetry.io/collector/internal/globalsignal" + +import ( + "fmt" + "regexp" +) + +// Signal represents the signals supported by the collector. +type Signal struct { + name string +} + +// String returns the string representation of the signal. +func (s Signal) String() string { + return s.name +} + +// MarshalText marshals the Signal. +func (s Signal) MarshalText() (text []byte, err error) { + return []byte(s.name), nil +} + +// signalRegex is used to validate the signal. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +var signalRegex = regexp.MustCompile(`^[a-z]{1,62}$`) + +// NewSignal creates a Signal. It returns an error if the Signal is invalid. +// A Signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +func NewSignal(signal string) (Signal, error) { + if len(signal) == 0 { + return Signal{}, fmt.Errorf("signal must not be empty") + } + if !signalRegex.MatchString(signal) { + return Signal{}, fmt.Errorf("invalid character(s) in type %q", signal) + } + return Signal{name: signal}, nil +} + +// MustNewSignal creates a Signal. It panics if the Signal is invalid. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +func MustNewSignal(signal string) Signal { + s, err := NewSignal(signal) + if err != nil { + panic(err) + } + return s +} diff --git a/internal/globalsignal/signal_test.go b/internal/globalsignal/signal_test.go new file mode 100644 index 00000000000..0729c62c0b5 --- /dev/null +++ b/internal/globalsignal/signal_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package globalsignal + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_NewSignal(t *testing.T) { + s, err := NewSignal("traces") + require.NoError(t, err) + assert.Equal(t, Signal{name: "traces"}, s) +} + +func Test_NewSignal_Invalid(t *testing.T) { + _, err := NewSignal("") + require.Error(t, err) + _, err = NewSignal("TRACES") + require.Error(t, err) +} + +func Test_MustNewSignal(t *testing.T) { + s := MustNewSignal("traces") + assert.Equal(t, Signal{name: "traces"}, s) +} + +func Test_Signal_String(t *testing.T) { + s := MustNewSignal("traces") + assert.Equal(t, "traces", s.String()) +} + +func Test_Signal_MarshalText(t *testing.T) { + s := MustNewSignal("traces") + b, err := s.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("traces"), b) +} diff --git a/pipeline/Makefile b/pipeline/Makefile new file mode 100644 index 00000000000..39734bfaebb --- /dev/null +++ b/pipeline/Makefile @@ -0,0 +1 @@ +include ../Makefile.Common diff --git a/pipeline/go.mod b/pipeline/go.mod new file mode 100644 index 00000000000..d033db037c3 --- /dev/null +++ b/pipeline/go.mod @@ -0,0 +1,16 @@ +module go.opentelemetry.io/collector/pipeline + +go 1.22.0 + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/internal/globalsignal v0.109.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/internal/globalsignal => ../internal/globalsignal diff --git a/pipeline/go.sum b/pipeline/go.sum new file mode 100644 index 00000000000..60ce688a041 --- /dev/null +++ b/pipeline/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go new file mode 100644 index 00000000000..ae8ac833cc5 --- /dev/null +++ b/pipeline/pipeline.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline // import "go.opentelemetry.io/collector/pipeline" +import ( + "errors" + "fmt" + "regexp" + "strings" + + "go.opentelemetry.io/collector/internal/globalsignal" +) + +// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys. +const typeAndNameSeparator = "/" + +// ID represents the identity for a pipeline. It combines two values: +// * signal - the Signal of the pipeline. +// * name - the name of that pipeline. +type ID struct { + signal Signal `mapstructure:"-"` + name string `mapstructure:"-"` +} + +// NewID returns a new ID with the given Signal and empty name. +func NewID(signal Signal) ID { + return ID{signal: signal} +} + +// MustNewID builds a Signal and returns a new ID with the given Signal and empty name. +// It panics if the Signal is invalid. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +func MustNewID(signal string) ID { + return ID{signal: globalsignal.MustNewSignal(signal)} +} + +// NewIDWithName returns a new ID with the given Signal and name. +func NewIDWithName(signal Signal, name string) ID { + return ID{signal: signal, name: name} +} + +// MustNewIDWithName builds a Signal and returns a new ID with the given Signal and name. +// It panics if the Signal is invalid or name is invalid. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +// A name must consist of 1 to 1024 unicode characters excluding whitespace, control characters, and symbols. +func MustNewIDWithName(signal string, name string) ID { + id := ID{signal: globalsignal.MustNewSignal(signal)} + err := validateName(name) + if err != nil { + panic(err) + } + id.name = name + return id +} + +// Signal returns the Signal of the ID. +func (i ID) Signal() Signal { + return i.signal +} + +// Name returns the name of the ID. +func (i ID) Name() string { + return i.name +} + +// MarshalText implements the encoding.TextMarshaler interface. +// This marshals the Signal and name as one string in the config. +func (i ID) MarshalText() (text []byte, err error) { + return []byte(i.String()), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (i *ID) UnmarshalText(text []byte) error { + idStr := string(text) + items := strings.SplitN(idStr, typeAndNameSeparator, 2) + var signalStr, nameStr string + if len(items) >= 1 { + signalStr = strings.TrimSpace(items[0]) + } + + if len(items) == 1 && signalStr == "" { + return errors.New("id must not be empty") + } + + if signalStr == "" { + return fmt.Errorf("in %q id: the part before %s should not be empty", idStr, typeAndNameSeparator) + } + + if len(items) > 1 { + // "name" part is present. + nameStr = strings.TrimSpace(items[1]) + if nameStr == "" { + return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator) + } + if err := validateName(nameStr); err != nil { + return fmt.Errorf("in %q id: %w", nameStr, err) + } + } + + var err error + if i.signal, err = globalsignal.NewSignal(signalStr); err != nil { + return fmt.Errorf("in %q id: %w", idStr, err) + } + i.name = nameStr + + return nil +} + +// String returns the ID string representation as "signal[/name]" format. +func (i ID) String() string { + if i.name == "" { + return i.signal.String() + } + + return i.signal.String() + typeAndNameSeparator + i.name +} + +// nameRegexp is used to validate the name of an ID. A name can consist of +// 1 to 1024 unicode characters excluding whitespace, control characters, and +// symbols. +var nameRegexp = regexp.MustCompile(`^[^\pZ\pC\pS]+$`) + +func validateName(nameStr string) error { + if len(nameStr) > 1024 { + return fmt.Errorf("name %q is longer than 1024 characters (%d characters)", nameStr, len(nameStr)) + } + if !nameRegexp.MatchString(nameStr) { + return fmt.Errorf("invalid character(s) in name %q", nameStr) + } + return nil +} diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go new file mode 100644 index 00000000000..91727381092 --- /dev/null +++ b/pipeline/pipeline_test.go @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/internal/globalsignal" +) + +func Test_NewID(t *testing.T) { + id := NewID(SignalTraces) + assert.Equal(t, ID{signal: SignalTraces}, id) +} + +func Test_MustNewID(t *testing.T) { + id := MustNewID("traces") + assert.Equal(t, ID{signal: SignalTraces}, id) +} + +func Test_NewIDWithName(t *testing.T) { + id := NewIDWithName(SignalTraces, "test") + assert.Equal(t, ID{signal: SignalTraces, name: "test"}, id) +} + +func Test_MustNewIDWithName(t *testing.T) { + id := MustNewIDWithName("traces", "test") + assert.Equal(t, ID{signal: SignalTraces, name: "test"}, id) +} + +func TestMarshalText(t *testing.T) { + id := NewIDWithName(SignalTraces, "name") + got, err := id.MarshalText() + require.NoError(t, err) + assert.Equal(t, id.String(), string(got)) +} + +func TestUnmarshalText(t *testing.T) { + validSignal := globalsignal.MustNewSignal("valid") + var testCases = []struct { + idStr string + expectedErr bool + expectedID ID + }{ + { + idStr: "valid", + expectedID: ID{signal: validSignal, name: ""}, + }, + { + idStr: "valid/valid_name", + expectedID: ID{signal: validSignal, name: "valid_name"}, + }, + { + idStr: " valid / valid_name ", + expectedID: ID{signal: validSignal, name: "valid_name"}, + }, + { + idStr: "valid/中文好", + expectedID: ID{signal: validSignal, name: "中文好"}, + }, + { + idStr: "valid/name-with-dashes", + expectedID: ID{signal: validSignal, name: "name-with-dashes"}, + }, + // issue 10816 + { + idStr: "valid/Linux-Messages-File_01J49HCH3SWFXRVASWFZFRT3J2__processor0__logs", + expectedID: ID{signal: validSignal, name: "Linux-Messages-File_01J49HCH3SWFXRVASWFZFRT3J2__processor0__logs"}, + }, + { + idStr: "valid/1", + expectedID: ID{signal: validSignal, name: "1"}, + }, + { + idStr: "/valid_name", + expectedErr: true, + }, + { + idStr: " /valid_name", + expectedErr: true, + }, + { + idStr: "valid/", + expectedErr: true, + }, + { + idStr: "valid/ ", + expectedErr: true, + }, + { + idStr: " ", + expectedErr: true, + }, + { + idStr: "valid/invalid name", + expectedErr: true, + }, + { + idStr: "valid/" + strings.Repeat("a", 1025), + expectedErr: true, + }, + { + idStr: "INVALID", + expectedErr: true, + }, + { + idStr: "INVALID/name", + expectedErr: true, + }, + } + + for _, test := range testCases { + t.Run(test.idStr, func(t *testing.T) { + id := ID{} + err := id.UnmarshalText([]byte(test.idStr)) + if test.expectedErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, test.expectedID, id) + assert.Equal(t, test.expectedID.Signal(), id.Signal()) + assert.Equal(t, test.expectedID.Name(), id.Name()) + assert.Equal(t, test.expectedID.String(), id.String()) + }) + } +} diff --git a/pipeline/signal.go b/pipeline/signal.go new file mode 100644 index 00000000000..eaa2c75b331 --- /dev/null +++ b/pipeline/signal.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline // import "go.opentelemetry.io/collector/pipeline" + +import ( + "go.opentelemetry.io/collector/internal/globalsignal" +) + +// Signal represents the signals supported by the collector. We currently support +// collecting metrics, traces and logs, this can expand in the future. +type Signal = globalsignal.Signal + +var ( + SignalTraces = globalsignal.MustNewSignal("traces") + SignalMetrics = globalsignal.MustNewSignal("metrics") + SignalLogs = globalsignal.MustNewSignal("logs") +) diff --git a/pipeline/signal_test.go b/pipeline/signal_test.go new file mode 100644 index 00000000000..4e6d17bbcc2 --- /dev/null +++ b/pipeline/signal_test.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_Signal_String(t *testing.T) { + assert.Equal(t, "traces", SignalTraces.String()) + assert.Equal(t, "metrics", SignalMetrics.String()) + assert.Equal(t, "logs", SignalLogs.String()) +} + +func Test_Signal_MarshalText(t *testing.T) { + val, err := SignalTraces.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("traces"), val) + + val, err = SignalMetrics.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("metrics"), val) + + val, err = SignalLogs.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("logs"), val) +} diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index a4c5f08c7a9..82218ef88db 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -89,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error { } var err error - if r.serverGRPC, err = r.cfg.GRPC.ToServer(context.Background(), host, r.settings.TelemetrySettings); err != nil { + if r.serverGRPC, err = r.cfg.GRPC.ToServerWithOptions(context.Background(), host, r.settings.TelemetrySettings); err != nil { return err } diff --git a/versions.yaml b/versions.yaml index e015921dd93..82e1bf2b9d6 100644 --- a/versions.yaml +++ b/versions.yaml @@ -21,6 +21,7 @@ module-sets: modules: - go.opentelemetry.io/collector - go.opentelemetry.io/collector/internal/globalgates + - go.opentelemetry.io/collector/internal/globalsignal - go.opentelemetry.io/collector/cmd/builder - go.opentelemetry.io/collector/cmd/mdatagen - go.opentelemetry.io/collector/component @@ -59,6 +60,7 @@ module-sets: - go.opentelemetry.io/collector/otelcol/otelcoltest - go.opentelemetry.io/collector/pdata/pprofile - go.opentelemetry.io/collector/pdata/testdata + - go.opentelemetry.io/collector/pipeline - go.opentelemetry.io/collector/processor - go.opentelemetry.io/collector/processor/batchprocessor - go.opentelemetry.io/collector/processor/memorylimiterprocessor