diff --git a/CHANGELOG.md b/CHANGELOG.md index ac3c0fb22..a4b6ab8f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,11 +3,13 @@ Changelog All notable changes to this project will be documented in this file. -## 4.34.0 - TBD +## 4.34.0 - 2024-08-06 ### Added - The `list` subcommand now supports the format `jsonschema`. (@Jeffail) +- Go API: New `WithX` methods added to the environment type. (@Jeffail) +- Go API: New `ConfigWalker` API added to the schema type. (@Jeffail) ## 4.33.0 - 2024-07-19 diff --git a/internal/docs/field_interop.go b/internal/docs/field_interop.go deleted file mode 100644 index 1c0175e9d..000000000 --- a/internal/docs/field_interop.go +++ /dev/null @@ -1,85 +0,0 @@ -package docs - -import ( - "gopkg.in/yaml.v3" -) - -// ComponentFieldsFromConf walks the children of a YAML node and returns a list -// of fields extracted from it. This can be used in order to infer a field spec -// for a parsed component. -// -// TODO: V5 Remove this eventually. -func ComponentFieldsFromConf(conf any) (inferred map[string]FieldSpecs) { - inferred = map[string]FieldSpecs{} - - componentNodes := map[string]yaml.Node{} - - var node yaml.Node - if err := node.Encode(conf); err != nil { - return - } - - if err := node.Decode(componentNodes); err != nil { - return - } - - for k, v := range componentNodes { - inferred[k] = FieldsFromYAML(&v) - } - return -} - -// FieldsFromConf attempts to infer field documents from a config struct. -func FieldsFromConf(conf any) FieldSpecs { - var node yaml.Node - if err := node.Encode(conf); err != nil { - return FieldSpecs{} - } - return FieldsFromYAML(&node) -} - -// ChildDefaultAndTypesFromStruct enriches a field specs children with a type -// string and default value from another field spec inferred from a config -// struct. -// -// TODO: V5 Remove this eventually. -func (f FieldSpec) ChildDefaultAndTypesFromStruct(conf any) FieldSpec { - var node yaml.Node - if err := node.Encode(conf); err != nil { - return f - } - f.Children = f.Children.DefaultAndTypeFrom(FieldsFromYAML(&node)) - return f -} - -// DefaultAndTypeFrom enriches a field spec with a type string and default value -// from another field spec. -func (f FieldSpec) DefaultAndTypeFrom(from FieldSpec) FieldSpec { - if f.Default == nil && from.Default != nil { - f.Default = from.Default - } - if f.Type == "" && from.Type != "" { - f.Type = from.Type - } - f.Children = f.Children.DefaultAndTypeFrom(from.Children) - return f -} - -// DefaultAndTypeFrom enriches a field spec with a type string and default value -// from another field spec. -func (f FieldSpecs) DefaultAndTypeFrom(from FieldSpecs) FieldSpecs { - newSpecs := make(FieldSpecs, len(f)) - fromMap := map[string]FieldSpec{} - for _, v := range from { - fromMap[v.Name] = v - } - for i, v := range f { - ref, exists := fromMap[v.Name] - if !exists { - newSpecs[i] = v - continue - } - newSpecs[i] = v.DefaultAndTypeFrom(ref) - } - return newSpecs -} diff --git a/internal/docs/format_yaml.go b/internal/docs/format_yaml.go index 495a1b261..d0d341b8f 100644 --- a/internal/docs/format_yaml.go +++ b/internal/docs/format_yaml.go @@ -961,155 +961,6 @@ func (f FieldSpecs) YAMLToMap(node *yaml.Node, conf ToValueConfig) (map[string]a return resultMap, nil } -//------------------------------------------------------------------------------ - -func walkComponentsYAML(cType Type, node *yaml.Node, prov Provider, fn ComponentWalkYAMLFunc) error { - node = unwrapDocumentNode(node) - - name, spec, err := GetInferenceCandidateFromYAML(prov, cType, node) - if err != nil { - return err - } - - var label string - for i := 0; i < len(node.Content)-1; i += 2 { - if node.Content[i].Value == "label" { - label = node.Content[i+1].Value - break - } - } - - if err := fn(WalkedYAMLComponent{ - ComponentType: cType, - Name: name, - Label: label, - Conf: node, - }); err != nil { - return err - } - - reservedFields := ReservedFieldsByType(cType) - for i := 0; i < len(node.Content)-1; i += 2 { - if node.Content[i].Value == name { - if err := spec.Config.WalkYAML(node.Content[i+1], prov, fn); err != nil { - return err - } - continue - } - if node.Content[i].Value == "type" || node.Content[i].Value == "label" { - continue - } - if spec, exists := reservedFields[node.Content[i].Value]; exists { - if err := spec.WalkYAML(node.Content[i+1], prov, fn); err != nil { - return err - } - } - } - return nil -} - -// WalkYAML walks each node of a YAML tree and for any component types within -// the config a provided func is called. -func (f FieldSpec) WalkYAML(node *yaml.Node, prov Provider, fn ComponentWalkYAMLFunc) error { - node = unwrapDocumentNode(node) - - if coreType, isCore := f.Type.IsCoreComponent(); isCore { - switch f.Kind { - case Kind2DArray: - for i := 0; i < len(node.Content); i++ { - for j := 0; j < len(node.Content[i].Content); j++ { - if err := walkComponentsYAML(coreType, node.Content[i].Content[j], prov, fn); err != nil { - return err - } - } - } - case KindArray: - for i := 0; i < len(node.Content); i++ { - if err := walkComponentsYAML(coreType, node.Content[i], prov, fn); err != nil { - return err - } - } - case KindMap: - for i := 0; i < len(node.Content)-1; i += 2 { - if err := walkComponentsYAML(coreType, node.Content[i+1], prov, fn); err != nil { - return err - } - } - default: - if err := walkComponentsYAML(coreType, node, prov, fn); err != nil { - return err - } - } - } else if len(f.Children) > 0 { - switch f.Kind { - case Kind2DArray: - for i := 0; i < len(node.Content); i++ { - for j := 0; j < len(node.Content[i].Content); j++ { - if err := f.Children.WalkYAML(node.Content[i].Content[j], prov, fn); err != nil { - return err - } - } - } - case KindArray: - for i := 0; i < len(node.Content); i++ { - if err := f.Children.WalkYAML(node.Content[i], prov, fn); err != nil { - return err - } - } - case KindMap: - for i := 0; i < len(node.Content)-1; i += 2 { - if err := f.Children.WalkYAML(node.Content[i+1], prov, fn); err != nil { - return err - } - } - default: - if err := f.Children.WalkYAML(node, prov, fn); err != nil { - return err - } - } - } - return nil -} - -// ComponentWalkYAMLFunc is called for each component type within a YAML config, -// where the node representing that component is provided along with the type -// and implementation name. -type ComponentWalkYAMLFunc func(c WalkedYAMLComponent) error - -// WalkedYAMLComponent is a struct containing information about a component -// yielded via the WalkYAML method. -type WalkedYAMLComponent struct { - ComponentType Type - Name string - Label string - Conf *yaml.Node -} - -// WalkYAML walks each node of a YAML tree and for any component types within -// the config a provided func is called. -func (f FieldSpecs) WalkYAML(node *yaml.Node, prov Provider, fn ComponentWalkYAMLFunc) error { - node = unwrapDocumentNode(node) - - nodeKeys := map[string]*yaml.Node{} - for i := 0; i < len(node.Content)-1; i += 2 { - nodeKeys[node.Content[i].Value] = node.Content[i+1] - } - - // Following the order of our field specs, walk each field. - for _, field := range f { - value, exists := nodeKeys[field.Name] - if !exists { - continue - } - if err := field.WalkYAML(value, prov, fn); err != nil { - return err - } - } - return nil -} - -//------------------------------------------------------------------------------ - func unwrapDocumentNode(node *yaml.Node) *yaml.Node { if node != nil && node.Kind == yaml.DocumentNode && len(node.Content) > 0 { node = node.Content[0] diff --git a/internal/docs/walk_yaml.go b/internal/docs/walk_yaml.go new file mode 100644 index 000000000..8719a07ea --- /dev/null +++ b/internal/docs/walk_yaml.go @@ -0,0 +1,227 @@ +package docs + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "gopkg.in/yaml.v3" +) + +// ErrSkipChildComponents is used as a return value from WalkComponentsYAML to +// indicate that the component currently viewed should not have its child fields +// walked. It is not returned as an error by any function. +var ErrSkipChildComponents = errors.New("skip children") + +// WalkComponentFunc is called for each component type within a YAML config, +// where the node representing that component is provided along with the type +// and implementation name. +type WalkComponentFunc func(c WalkedComponent) error + +// WalkComponentConfig controls the behaviour of a walk function. +type WalkComponentConfig struct { + path string + Provider Provider + Func WalkComponentFunc +} + +func (w WalkComponentConfig) intoPath(str string) WalkComponentConfig { + tmp := w + if tmp.path != "" { + tmp.path += "." + str + } else { + tmp.path = str + } + return tmp +} + +func getYAMLLastLine(node *yaml.Node) int { + if len(node.Content) == 0 { + lines := strings.Count(node.Value, "\n") + return node.Line + lines + } + return getYAMLLastLine(node.Content[len(node.Content)-1]) +} + +// WalkedComponent is a struct containing information about a component yielded +// via the WalkComponentsYAML method. +type WalkedComponent struct { + Field FieldSpec + Path string + Label string + Name string + Value *yaml.Node + + LineStart int + LineEnd int + + spec ComponentSpec + conf WalkComponentConfig +} + +// WalkComponentsYAML walks each child field of a given node and for any +// component types within the config the provided func is called. +func (w WalkedComponent) WalkComponentsYAML(fn WalkComponentFunc) error { + tmpConf := w.conf + tmpConf.Func = fn + + reservedFields := ReservedFieldsByType(w.spec.Type) + for i := 0; i < len(w.Value.Content)-1; i += 2 { + if w.Value.Content[i].Value == w.Name { + if err := w.spec.Config.WalkComponentsYAML(tmpConf.intoPath(w.Name), w.Value.Content[i+1]); err != nil { + return err + } + continue + } + if w.Value.Content[i].Value == "type" || w.Value.Content[i].Value == "label" { + continue + } + if spec, exists := reservedFields[w.Value.Content[i].Value]; exists { + if err := spec.WalkComponentsYAML(tmpConf.intoPath(spec.Name), w.Value.Content[i+1]); err != nil { + return err + } + } + } + return nil +} + +func walkComponentYAML(conf WalkComponentConfig, coreType Type, f FieldSpec, node *yaml.Node) error { + node = unwrapDocumentNode(node) + + name, spec, err := GetInferenceCandidateFromYAML(conf.Provider, coreType, node) + if err != nil { + return err + } + + var label string + for i := 0; i < len(node.Content)-1; i += 2 { + if node.Content[i].Value == "label" { + label = node.Content[i+1].Value + break + } + } + + if err := conf.Func(WalkedComponent{ + Field: f, + Label: label, + Name: name, + Path: conf.path, + Value: node, + LineStart: node.Line, + LineEnd: getYAMLLastLine(node), + + spec: spec, + conf: conf, + }); err != nil { + if errors.Is(err, ErrSkipChildComponents) { + err = nil + } + return err + } + + reservedFields := ReservedFieldsByType(coreType) + for i := 0; i < len(node.Content)-1; i += 2 { + if node.Content[i].Value == name { + if err := spec.Config.WalkComponentsYAML(conf.intoPath(name), node.Content[i+1]); err != nil { + return err + } + continue + } + if node.Content[i].Value == "type" || node.Content[i].Value == "label" { + continue + } + if spec, exists := reservedFields[node.Content[i].Value]; exists { + if err := spec.WalkComponentsYAML(conf.intoPath(spec.Name), node.Content[i+1]); err != nil { + return err + } + } + } + return nil +} + +// WalkComponentsYAML walks each node of a YAML tree and for any component types +// within the config a provided func is called. +func (f FieldSpec) WalkComponentsYAML(conf WalkComponentConfig, node *yaml.Node) error { + node = unwrapDocumentNode(node) + + if coreType, isCore := f.Type.IsCoreComponent(); isCore { + switch f.Kind { + case Kind2DArray: + for i := 0; i < len(node.Content); i++ { + for j := 0; j < len(node.Content[i].Content); j++ { + if err := walkComponentYAML(conf.intoPath(fmt.Sprintf("%v.%v", i, j)), coreType, f, node.Content[i].Content[j]); err != nil { + return err + } + } + } + case KindArray: + for i := 0; i < len(node.Content); i++ { + if err := walkComponentYAML(conf.intoPath(strconv.Itoa(i)), coreType, f, node.Content[i]); err != nil { + return err + } + } + case KindMap: + for i := 0; i < len(node.Content)-1; i += 2 { + if err := walkComponentYAML(conf.intoPath(node.Content[i].Value), coreType, f, node.Content[i+1]); err != nil { + return err + } + } + default: + if err := walkComponentYAML(conf, coreType, f, node); err != nil { + return err + } + } + } else if len(f.Children) > 0 { + switch f.Kind { + case Kind2DArray: + for i := 0; i < len(node.Content); i++ { + for j := 0; j < len(node.Content[i].Content); j++ { + if err := f.Children.WalkComponentsYAML(conf.intoPath(fmt.Sprintf("%v.%v", i, j)), node.Content[i].Content[j]); err != nil { + return err + } + } + } + case KindArray: + for i := 0; i < len(node.Content); i++ { + if err := f.Children.WalkComponentsYAML(conf.intoPath(strconv.Itoa(i)), node.Content[i]); err != nil { + return err + } + } + case KindMap: + for i := 0; i < len(node.Content)-1; i += 2 { + if err := f.Children.WalkComponentsYAML(conf.intoPath(node.Content[i].Value), node.Content[i+1]); err != nil { + return err + } + } + default: + if err := f.Children.WalkComponentsYAML(conf, node); err != nil { + return err + } + } + } + return nil +} + +// WalkComponentsYAML walks each node of a YAML tree and for any component types +// within the config a provided func is called. +func (f FieldSpecs) WalkComponentsYAML(conf WalkComponentConfig, node *yaml.Node) error { + node = unwrapDocumentNode(node) + + nodeKeys := map[string]*yaml.Node{} + for i := 0; i < len(node.Content)-1; i += 2 { + nodeKeys[node.Content[i].Value] = node.Content[i+1] + } + + // Following the order of our field specs, walk each field. + for _, field := range f { + value, exists := nodeKeys[field.Name] + if !exists { + continue + } + if err := field.WalkComponentsYAML(conf.intoPath(field.Name), value); err != nil { + return err + } + } + return nil +} diff --git a/internal/docs/walk_yaml_test.go b/internal/docs/walk_yaml_test.go new file mode 100644 index 000000000..9af0867b0 --- /dev/null +++ b/internal/docs/walk_yaml_test.go @@ -0,0 +1,331 @@ +package docs_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" + + "github.com/redpanda-data/benthos/v4/internal/docs" +) + +func getMockProv(t testing.TB) *docs.MappedDocsProvider { + t.Helper() + + mockProv := docs.NewMappedDocsProvider() + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "kafka", + Type: docs.TypeInput, + Config: docs.FieldComponent().WithChildren( + docs.FieldString("addresses", "").Array(), + docs.FieldString("topics", "").Array(), + ), + }) + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "generate", + Type: docs.TypeInput, + Config: docs.FieldComponent().WithChildren( + docs.FieldString("mapping", ""), + ), + }) + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "dynamic", + Type: docs.TypeInput, + Config: docs.FieldComponent().WithChildren( + docs.FieldInput("inputs", "").Map(), + ), + }) + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "nats", + Type: docs.TypeOutput, + Config: docs.FieldComponent().WithChildren( + docs.FieldString("urls", "").Array(), + docs.FieldString("subject", ""), + docs.FieldInt("max_in_flight", ""), + ), + }) + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "compress", + Type: docs.TypeProcessor, + Config: docs.FieldComponent().WithChildren( + docs.FieldString("algorithm", ""), + ), + }) + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "workflow", + Type: docs.TypeProcessor, + Config: docs.FieldComponent().WithChildren( + docs.FieldString("order", "").ArrayOfArrays(), + ), + }) + mockProv.RegisterDocs(docs.ComponentSpec{ + Name: "switch", + Type: docs.TypeProcessor, + Config: docs.FieldComponent().Array().WithChildren( + docs.FieldString("check", ""), + docs.FieldProcessor("processors", "").Array(), + ), + }) + return mockProv +} + +func TestWalkYAML(t *testing.T) { + mockProv := getMockProv(t) + + tests := []struct { + name string + input string + output map[string][2]string + }{ + { + name: "simple input and output", + input: ` +input: + label: a + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + +output: + label: b + nats: + urls: [ nats://127.0.0.1:4222 ] + subject: benthos_messages + max_in_flight: 1 +`, + output: map[string][2]string{ + "input": {"a", "kafka"}, + "output": {"b", "nats"}, + }, + }, + { + name: "switch processor", + input: ` +pipeline: + processors: + - label: a + switch: + - check: 'root = "foobar"' + processors: + - label: b + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: c + compress: + algorithm: meow2 + - label: d + compress: + algorithm: meow3 +`, + output: map[string][2]string{ + "pipeline.processors.0": {"a", "switch"}, + "pipeline.processors.0.switch.0.processors.0": {"b", "compress"}, + "pipeline.processors.0.switch.1.processors.0": {"c", "compress"}, + "pipeline.processors.0.switch.1.processors.1": {"d", "compress"}, + }, + }, + { + name: "nested inputs and processors", + input: ` +input: + label: a + dynamic: + inputs: + foo: + label: b + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: c + compress: + algorithm: meow1 + bar: + label: d + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: e + switch: + - check: 'root = "foobar"' + processors: + - label: f + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: g + compress: + algorithm: meow2 + - label: h + compress: + algorithm: meow3 +`, + output: map[string][2]string{ + "input": {"a", "dynamic"}, + "input.dynamic.inputs.foo": {"b", "kafka"}, + "input.dynamic.inputs.foo.processors.0": {"c", "compress"}, + "input.dynamic.inputs.bar": {"d", "kafka"}, + "input.processors.0": {"e", "switch"}, + "input.processors.0.switch.0.processors.0": {"f", "compress"}, + "input.processors.0.switch.1.processors.0": {"g", "compress"}, + "input.processors.0.switch.1.processors.1": {"h", "compress"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + input := &yaml.Node{} + require.NoError(t, yaml.Unmarshal([]byte(test.input), input)) + + res := map[string][2]string{} + + require.NoError(t, configSpec.WalkComponentsYAML(docs.WalkComponentConfig{ + Provider: mockProv, + Func: func(c docs.WalkedComponent) error { + res[c.Path] = [2]string{c.Label, c.Name} + return nil + }, + }, input)) + + assert.Equal(t, test.output, res) + }) + } +} + +func TestWalkYAMLFragmented(t *testing.T) { + mockProv := getMockProv(t) + + input := &yaml.Node{} + require.NoError(t, yaml.Unmarshal([]byte(` +input: + label: a + dynamic: + inputs: + foo: + label: b + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: c + compress: + algorithm: meow1 + bar: + label: d + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: e + switch: + - check: 'root = "foobar"' + processors: + - label: f + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: g + compress: + algorithm: meow2 + - label: h + compress: + algorithm: meow3 +`), input)) + + res := map[string][2]string{} + + var walkFunc docs.WalkComponentFunc + walkFunc = func(c docs.WalkedComponent) error { + res[c.Path] = [2]string{c.Label, c.Name} + if err := c.WalkComponentsYAML(walkFunc); err != nil { + return err + } + return docs.ErrSkipChildComponents + } + + require.NoError(t, configSpec.WalkComponentsYAML(docs.WalkComponentConfig{ + Provider: mockProv, + Func: walkFunc, + }, input)) + + assert.Equal(t, map[string][2]string{ + "input": {"a", "dynamic"}, + "input.dynamic.inputs.foo": {"b", "kafka"}, + "input.dynamic.inputs.foo.processors.0": {"c", "compress"}, + "input.dynamic.inputs.bar": {"d", "kafka"}, + "input.processors.0": {"e", "switch"}, + "input.processors.0.switch.0.processors.0": {"f", "compress"}, + "input.processors.0.switch.1.processors.0": {"g", "compress"}, + "input.processors.0.switch.1.processors.1": {"h", "compress"}, + }, res) +} + +func TestWalkYAMLLines(t *testing.T) { + mockProv := getMockProv(t) + + input := &yaml.Node{} + require.NoError(t, yaml.Unmarshal([]byte(` +input: + label: a + dynamic: + inputs: + foo: + label: b + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: c + compress: + algorithm: meow1 + bar: + label: d + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: e + switch: + - check: 'root = "foobar"' + processors: + - label: f + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: g + compress: + algorithm: meow2 + - label: h + compress: + algorithm: meow3 +`), input)) + + res := map[string][2]int{} + + require.NoError(t, configSpec.WalkComponentsYAML(docs.WalkComponentConfig{ + Provider: mockProv, + Func: func(c docs.WalkedComponent) error { + res[c.Path] = [2]int{c.LineStart, c.LineEnd} + return nil + }, + }, input)) + + assert.Equal(t, map[string][2]int{ + "input": {3, 35}, + "input.dynamic.inputs.foo": {7, 14}, + "input.dynamic.inputs.foo.processors.0": {12, 14}, + "input.dynamic.inputs.bar": {16, 19}, + "input.processors.0": {21, 35}, + "input.processors.0.switch.0.processors.0": {25, 27}, + "input.processors.0.switch.1.processors.0": {30, 32}, + "input.processors.0.switch.1.processors.1": {33, 35}, + }, res) +} diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index 555d1a0a3..b272eb41e 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -740,20 +740,6 @@ func (s *StreamBuilder) AsYAML() (string, error) { return string(b), nil } -// WalkedComponent is a struct containing information about a component yielded -// via the WalkComponents method. -type WalkedComponent struct { - ComponentType string - Name string - Label string - confYAML string -} - -// ConfigYAML returns the configuration of a walked component in YAML form. -func (w *WalkedComponent) ConfigYAML() string { - return w.confYAML -} - // WalkComponents walks the Benthos configuration as it is currently built and // for each component type (input, processor, output, etc) calls a provided // function with a struct containing information about the component. @@ -777,19 +763,18 @@ func (s *StreamBuilder) WalkComponents(fn func(w *WalkedComponent) error) error return err } - return spec.WalkYAML(&node, s.env.internal, - func(c docs.WalkedYAMLComponent) error { - yamlBytes, err := yaml.Marshal(c.Conf) - if err != nil { - return err + walkConf := docs.WalkComponentConfig{ + Provider: s.env.internal, + Func: func(c docs.WalkedComponent) error { + tmpErr := fn(walkedComponentFromInternal(c)) + if errors.Is(tmpErr, ErrSkipComponents) { + tmpErr = docs.ErrSkipChildComponents } - return fn(&WalkedComponent{ - ComponentType: string(c.ComponentType), - Name: c.Name, - Label: c.Label, - confYAML: string(yamlBytes), - }) - }) + return tmpErr + }, + } + + return spec.WalkComponentsYAML(walkConf, &node) } //------------------------------------------------------------------------------ diff --git a/public/service/stream_builder_test.go b/public/service/stream_builder_test.go index 635be8379..35fd55316 100644 --- a/public/service/stream_builder_test.go +++ b/public/service/stream_builder_test.go @@ -741,7 +741,7 @@ output: name: "generate", conf: `label: "" generate: - mapping: 'root = deleted()'`, + mapping: 'root = deleted()'`, }, { typeStr: "buffer", @@ -794,10 +794,10 @@ output: name: "generate", conf: `label: "" generate: - mapping: 'root = deleted()' + mapping: 'root = deleted()' processors: - - label: "" - mutation: 'root = "hm"'`, + - label: "" + mutation: 'root = "hm"'`, }, { typeStr: "processor", @@ -816,8 +816,8 @@ mutation: 'root = "hm"'`, conf: `label: "" reject: lol nah processors: - - label: "" - mutation: 'root = "eh"'`, + - label: "" + mutation: 'root = "eh"'`, }, { typeStr: "processor", @@ -854,16 +854,16 @@ input: name: "dynamic", conf: `label: "" dynamic: - inputs: - foo: - file: - paths: [aaa.txt]`, + inputs: + foo: + file: + paths: [aaa.txt]`, }, { typeStr: "input", name: "file", conf: `file: - paths: [aaa.txt]`, + paths: [aaa.txt]`, }, { typeStr: "buffer", diff --git a/public/service/stream_config_walker.go b/public/service/stream_config_walker.go new file mode 100644 index 000000000..55d2078da --- /dev/null +++ b/public/service/stream_config_walker.go @@ -0,0 +1,122 @@ +package service + +import ( + "errors" + "strings" + + "gopkg.in/yaml.v3" + + "github.com/redpanda-data/benthos/v4/internal/docs" +) + +// ErrSkipComponents is used as a return value from a component walking func to +// indicate that the component currently viewed should not have its child fields +// walked. It is not returned as an error by any function. +var ErrSkipComponents = errors.New("skip components") + +// StreamConfigWalker provides utilities for parsing and then walking stream +// configs, allowing you to analyse the structure of a given config. +type StreamConfigWalker struct { + env *Environment + spec docs.FieldSpecs +} + +// NewStreamConfigWalker creates a component for parsing and then walking stream +// configs, allowing you to analyse the structure of a given config. +func (s *ConfigSchema) NewStreamConfigWalker() *StreamConfigWalker { + return &StreamConfigWalker{ + env: s.env, + spec: s.fields, + } +} + +// WalkedComponent is a struct containing information about a component yielded +// via the WalkComponents method. +type WalkedComponent struct { + ComponentType string + Name string + Path string + Label string + + LineStart int + LineEnd int + + jpPath string // Memoized + + confYAML *yaml.Node + c docs.WalkedComponent +} + +func walkedComponentFromInternal(c docs.WalkedComponent) *WalkedComponent { + return &WalkedComponent{ + ComponentType: string(c.Field.Type), + Name: c.Name, + Path: c.Path, + Label: c.Label, + LineStart: c.LineStart, + LineEnd: c.LineEnd, + confYAML: c.Value, + c: c, + } +} + +// WalkComponentsYAML attempts to walk the co tree from the currently walked +// component, calling the provided func for all child components. +func (w *WalkedComponent) WalkComponentsYAML(fn func(w *WalkedComponent) error) error { + return w.c.WalkComponentsYAML(func(c docs.WalkedComponent) error { + tmpErr := fn(walkedComponentFromInternal(c)) + if errors.Is(tmpErr, ErrSkipComponents) { + tmpErr = docs.ErrSkipChildComponents + } + return tmpErr + }) +} + +// PathAsJSONPointer returns the Path, stored as a dot path, as a JSON pointer. +func (w *WalkedComponent) PathAsJSONPointer() string { + if w.jpPath != "" { + return w.jpPath + } + w.jpPath = "/" + strings.ReplaceAll(w.Path, ".", "/") + return w.jpPath +} + +// ConfigYAML returns the configuration of a walked component in YAML form. +func (w *WalkedComponent) ConfigYAML() string { + yamlBytes, err := docs.MarshalYAML(*w.confYAML) + if err != nil { + return "" + } + return string(yamlBytes) +} + +// ConfigAny returns the configuration of a walked component in any form. +func (w *WalkedComponent) ConfigAny() (any, error) { + var v any + if err := w.confYAML.Decode(&v); err != nil { + return nil, err + } + return v, nil +} + +// WalkComponentsYAML attempts to parse a YAML config and walk its structure, +// calling a provided function for each component found within the config. +func (s *StreamConfigWalker) WalkComponentsYAML(confYAML []byte, fn func(w *WalkedComponent) error) error { + node, err := docs.UnmarshalYAML(confYAML) + if err != nil { + return err + } + + conf := docs.WalkComponentConfig{ + Provider: s.env.internal, + Func: func(c docs.WalkedComponent) error { + tmpErr := fn(walkedComponentFromInternal(c)) + if errors.Is(tmpErr, ErrSkipComponents) { + tmpErr = docs.ErrSkipChildComponents + } + return tmpErr + }, + } + + return s.spec.WalkComponentsYAML(conf, node) +} diff --git a/public/service/stream_config_walker_test.go b/public/service/stream_config_walker_test.go new file mode 100644 index 000000000..3b34a5de9 --- /dev/null +++ b/public/service/stream_config_walker_test.go @@ -0,0 +1,313 @@ +package service_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/internal/docs" + "github.com/redpanda-data/benthos/v4/public/service" +) + +func getMockEnv(t testing.TB) *service.Environment { + t.Helper() + + svc := service.NewEmptyEnvironment() + + require.NoError(t, svc.RegisterInput("kafka", service.NewConfigSpec().Fields( + service.NewStringListField("address"), + service.NewStringListField("topics"), + ), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return nil, errors.New("nope") + })) + + require.NoError(t, svc.RegisterInput("generate", service.NewConfigSpec().Fields( + service.NewStringField("mapping"), + ), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return nil, errors.New("nope") + })) + + require.NoError(t, svc.RegisterInput("dynamic", service.NewConfigSpec().Fields( + service.NewInputMapField("inputs"), + ), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return nil, errors.New("nope") + })) + + require.NoError(t, svc.RegisterOutput("nats", service.NewConfigSpec().Fields( + service.NewStringListField("urls"), + service.NewStringField("subject"), + ), func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) { + return nil, 0, errors.New("nope") + })) + + require.NoError(t, svc.RegisterProcessor("compress", service.NewConfigSpec().Fields( + service.NewStringField("algorithm"), + ), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + return nil, errors.New("nope") + })) + + require.NoError(t, svc.RegisterProcessor("switch", service.NewConfigSpec().Fields( + service.NewObjectListField("", + service.NewStringField("check"), + service.NewProcessorListField("processors"), + ), + ), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + return nil, errors.New("nope") + })) + + return svc +} + +func TestStreamConfigWalkerYAML(t *testing.T) { + mockEnv := getMockEnv(t) + + tests := []struct { + name string + input string + output map[string][2]string + }{ + { + name: "simple input and output", + input: ` +input: + label: a + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + +output: + label: b + nats: + urls: [ nats://127.0.0.1:4222 ] + subject: benthos_messages + max_in_flight: 1 +`, + output: map[string][2]string{ + "input": {"a", "kafka"}, + "output": {"b", "nats"}, + }, + }, + { + name: "switch processor", + input: ` +pipeline: + processors: + - label: a + switch: + - check: 'root = "foobar"' + processors: + - label: b + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: c + compress: + algorithm: meow2 + - label: d + compress: + algorithm: meow3 +`, + output: map[string][2]string{ + "pipeline.processors.0": {"a", "switch"}, + "pipeline.processors.0.switch.0.processors.0": {"b", "compress"}, + "pipeline.processors.0.switch.1.processors.0": {"c", "compress"}, + "pipeline.processors.0.switch.1.processors.1": {"d", "compress"}, + }, + }, + { + name: "nested inputs and processors", + input: ` +input: + label: a + dynamic: + inputs: + foo: + label: b + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: c + compress: + algorithm: meow1 + bar: + label: d + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: e + switch: + - check: 'root = "foobar"' + processors: + - label: f + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: g + compress: + algorithm: meow2 + - label: h + compress: + algorithm: meow3 +`, + output: map[string][2]string{ + "input": {"a", "dynamic"}, + "input.dynamic.inputs.foo": {"b", "kafka"}, + "input.dynamic.inputs.foo.processors.0": {"c", "compress"}, + "input.dynamic.inputs.bar": {"d", "kafka"}, + "input.processors.0": {"e", "switch"}, + "input.processors.0.switch.0.processors.0": {"f", "compress"}, + "input.processors.0.switch.1.processors.0": {"g", "compress"}, + "input.processors.0.switch.1.processors.1": {"h", "compress"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + res := map[string][2]string{} + + require.NoError(t, mockEnv.CoreConfigSchema("", ""). + NewStreamConfigWalker(). + WalkComponentsYAML([]byte(test.input), func(w *service.WalkedComponent) error { + res[w.Path] = [2]string{w.Label, w.Name} + return nil + })) + + assert.Equal(t, test.output, res) + }) + } +} + +func TestWalkYAMLFragmented(t *testing.T) { + mockEnv := getMockEnv(t) + + input := ` +input: + label: a + dynamic: + inputs: + foo: + label: b + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: c + compress: + algorithm: meow1 + bar: + label: d + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: e + switch: + - check: 'root = "foobar"' + processors: + - label: f + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: g + compress: + algorithm: meow2 + - label: h + compress: + algorithm: meow3 +` + + res := map[string][2]string{} + + var walkFunc func(w *service.WalkedComponent) error + walkFunc = func(w *service.WalkedComponent) error { + res[w.Path] = [2]string{w.Label, w.Name} + if err := w.WalkComponentsYAML(walkFunc); err != nil { + return err + } + return docs.ErrSkipChildComponents + } + + require.NoError(t, mockEnv.CoreConfigSchema("", ""). + NewStreamConfigWalker(). + WalkComponentsYAML([]byte(input), walkFunc)) + + assert.Equal(t, map[string][2]string{ + "input": {"a", "dynamic"}, + "input.dynamic.inputs.foo": {"b", "kafka"}, + "input.dynamic.inputs.foo.processors.0": {"c", "compress"}, + "input.dynamic.inputs.bar": {"d", "kafka"}, + "input.processors.0": {"e", "switch"}, + "input.processors.0.switch.0.processors.0": {"f", "compress"}, + "input.processors.0.switch.1.processors.0": {"g", "compress"}, + "input.processors.0.switch.1.processors.1": {"h", "compress"}, + }, res) +} + +func TestWalkYAMLLines(t *testing.T) { + mockEnv := getMockEnv(t) + + input := ` +input: + label: a + dynamic: + inputs: + foo: + label: b + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: c + compress: + algorithm: meow1 + bar: + label: d + kafka: + addresses: [ "foo", "bar" ] + topics: [ "baz" ] + processors: + - label: e + switch: + - check: 'root = "foobar"' + processors: + - label: f + compress: + algorithm: meow1 + - check: 'root = "foobar2"' + processors: + - label: g + compress: + algorithm: meow2 + - label: h + compress: + algorithm: meow3 +` + + res := map[string][2]int{} + + require.NoError(t, mockEnv.CoreConfigSchema("", ""). + NewStreamConfigWalker(). + WalkComponentsYAML([]byte(input), func(w *service.WalkedComponent) error { + res[w.Path] = [2]int{w.LineStart, w.LineEnd} + return nil + })) + + assert.Equal(t, map[string][2]int{ + "input": {3, 35}, + "input.dynamic.inputs.foo": {7, 14}, + "input.dynamic.inputs.foo.processors.0": {12, 14}, + "input.dynamic.inputs.bar": {16, 19}, + "input.processors.0": {21, 35}, + "input.processors.0.switch.0.processors.0": {25, 27}, + "input.processors.0.switch.1.processors.0": {30, 32}, + "input.processors.0.switch.1.processors.1": {33, 35}, + }, res) +}