Skip to content

Commit

Permalink
Entities support prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Oct 21, 2024
1 parent ba4ab80 commit ce41a6b
Show file tree
Hide file tree
Showing 113 changed files with 8,183 additions and 60 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ ocb:
# Definitions for ProtoBuf generation.

# The source directory for OTLP ProtoBufs.
OPENTELEMETRY_PROTO_SRC_DIR=pdata/internal/opentelemetry-proto
OPENTELEMETRY_PROTO_SRC_DIR?=pdata/internal/opentelemetry-proto

# The branch matching the current version of the proto to use
OPENTELEMETRY_PROTO_VERSION=v1.3.1
Expand Down Expand Up @@ -200,7 +200,7 @@ genproto: genproto-cleanup
# Call a sub-make to ensure OPENTELEMETRY_PROTO_FILES is populated
$(MAKE) genproto_sub
$(MAKE) fmt
$(MAKE) genproto-cleanup
# $(MAKE) genproto-cleanup

genproto_sub:
@echo Generating code for the following files:
Expand Down Expand Up @@ -234,8 +234,8 @@ genproto_sub:
cp -R $(PROTO_INTERMEDIATE_DIR)/$(PROTO_PACKAGE)/* $(PROTO_TARGET_GEN_DIR)/
rm -rf $(PROTO_INTERMEDIATE_DIR)/go.opentelemetry.io

@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/*
@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/.* > /dev/null 2>&1 || true
#@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/*
#@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/.* > /dev/null 2>&1 || true

# Generate structs, functions and tests for pdata package. Must be used after any changes
# to proto and after running `make genproto`
Expand Down
11 changes: 11 additions & 0 deletions cmd/mdatagen/internal/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ func (t telemetry) Levels() map[string]interface{} {
return levels
}

type Entity struct {
// Type of the entity.
Type string `mapstructure:"type"`
// Identifying attributes of the entity.
IDAttributes []AttributeName `mapstructure:"id_attributes"`
// Descriptive attributes of the entity.
DescriptiveAttributes []AttributeName `mapstructure:"descriptive_attributes"`
}

type Metadata struct {
// Type of the component.
Type string `mapstructure:"type"`
Expand All @@ -284,6 +293,8 @@ type Metadata struct {
SemConvVersion string `mapstructure:"sem_conv_version"`
// ResourceAttributes that can be emitted by the component.
ResourceAttributes map[AttributeName]Attribute `mapstructure:"resource_attributes"`
// Entities associated with the emitted resource attributes.
Entities []Entity `mapstructure:"entities"`
// Attributes emitted by one or more metrics.
Attributes map[AttributeName]Attribute `mapstructure:"attributes"`
// Metrics that can be emitted by the component.
Expand Down
16 changes: 16 additions & 0 deletions cmd/mdatagen/internal/templates/resource.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ func (rb *ResourceBuilder) Set{{ $name.Render }}(val {{ $attr.Type.Primitive }})
// Emit returns the built resource and resets the internal builder state.
func (rb *ResourceBuilder) Emit() pcommon.Resource {
r := rb.res
{{- range $entity := .Entities }}
{{- range $attr := .IDAttributes }}
_, found{{ $attr.Render }} := r.Attributes().Get("{{ $attr }}")
{{- end }}
if {{ range $i, $attr := .IDAttributes }}{{ if $i }}&& {{ end }}found{{ $attr.Render }} {{ end }} {
ref := pcommon.NewResourceEntityRef()
ref.SetType("{{ $entity.Type }}")
ref.IdAttrKeys().Append({{ range $i, $attr := .IDAttributes }}{{ if $i }}, {{ end }}"{{ $attr }}"{{ end }})
{{- range $attr := .DescriptiveAttributes }}
if _, ok := r.Attributes().Get("{{ $attr }}"); ok {
ref.DescrAttrKeys().Append("{{ $attr }}")
}
{{- end }}
ref.CopyTo(r.Entities().AppendEmpty())
}
{{- end }}
rb.res = pcommon.NewResource()
return r
}
16 changes: 16 additions & 0 deletions cmd/mdatagen/internal/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (md *Metadata) Validate() error {
if err := md.validateMetrics(); err != nil {
errs = errors.Join(errs, err)
}
if err := md.validateEntities(); err != nil {
errs = errors.Join(errs, err)
}
return errs
}

Expand Down Expand Up @@ -96,6 +99,7 @@ func (s *Status) validateStability() error {
c != "traces" &&
c != "logs" &&
c != "profiles" &&
c != "entities" &&
c != "traces_to_traces" &&
c != "traces_to_metrics" &&
c != "traces_to_logs" &&
Expand Down Expand Up @@ -143,6 +147,18 @@ func (md *Metadata) validateMetrics() error {
return errs
}

func (md *Metadata) validateEntities() error {
var errs error
for _, entity := range md.Entities {
for _, attr := range append(entity.IDAttributes, entity.DescriptiveAttributes...) {
if _, ok := md.ResourceAttributes[attr]; !ok {
errs = errors.Join(errs, fmt.Errorf("undefined resource attribute: %v", attr))
}
}
}
return errs
}

func (m *Metric) validate() error {
var errs error
if m.Description == "" {
Expand Down
7 changes: 7 additions & 0 deletions cmd/mdatagen/metadata-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ resource_attributes:
# Should be used for deprecated optional resource_attributes that will be removed soon.
if_configured:

# Optional: list of entities associated with the produced resource.
entities:
- type: string
# Array of attribute names that are used to identify the entity.
id_attributes: [string]
# Optional: array of attribute names that are used to describe the entity.
descriptive_attributes: [string]

# Optional: map of attribute definitions with the key being the attribute name and value
# being described below.
Expand Down
177 changes: 168 additions & 9 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ type Logs interface {
consumer.Logs
}

// An Entities connector acts as an exporter from a logs pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Entities feeds a consumer.Traces, consumer.Metrics, or consumer.Entities with data.
//
// Examples:
// - Structured logs containing span information could be consumed and emitted as traces.
// - Metrics could be extracted from structured logs that contain numeric data.
// - Entities could be collected in one pipeline and routed to another logs pipeline
// based on criteria such as attributes or other content of the log. The second
// pipeline can then process and export the log to the appropriate backend.
type Entities interface {
component.Component
consumer.Entities
}

// Settings configures Connector creators.
type Settings struct {
// ID returns the ID of the component that will be created.
Expand Down Expand Up @@ -91,26 +106,42 @@ type Factory interface {
CreateTracesToTraces(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Traces, error)
CreateTracesToMetrics(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Traces, error)
CreateTracesToLogs(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Traces, error)
CreateTracesToEntities(ctx context.Context, set Settings, cfg component.Config, next consumer.Entities) (Traces, error)

CreateMetricsToTraces(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Metrics, error)
CreateMetricsToMetrics(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Metrics, error)
CreateMetricsToLogs(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Metrics, error)
CreateMetricsToEntities(ctx context.Context, set Settings, cfg component.Config, next consumer.Entities) (Metrics, error)

CreateLogsToTraces(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Logs, error)
CreateLogsToMetrics(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Logs, error)
CreateLogsToLogs(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Logs, error)
CreateLogsToEntities(ctx context.Context, set Settings, cfg component.Config, next consumer.Entities) (Logs, error)

CreateEntitiesToTraces(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Entities, error)
CreateEntitiesToMetrics(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Entities, error)
CreateEntitiesToLogs(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Entities, error)
CreateEntitiesToEntities(ctx context.Context, set Settings, cfg component.Config, next consumer.Entities) (Entities, error)

TracesToTracesStability() component.StabilityLevel
TracesToMetricsStability() component.StabilityLevel
TracesToLogsStability() component.StabilityLevel
TracesToEntitiesStability() component.StabilityLevel

MetricsToTracesStability() component.StabilityLevel
MetricsToMetricsStability() component.StabilityLevel
MetricsToLogsStability() component.StabilityLevel
MetricsToEntitiesStability() component.StabilityLevel

LogsToTracesStability() component.StabilityLevel
LogsToMetricsStability() component.StabilityLevel
LogsToLogsStability() component.StabilityLevel
LogsToEntitiesStability() component.StabilityLevel

EntitiesToTracesStability() component.StabilityLevel
EntitiesToMetricsStability() component.StabilityLevel
EntitiesToLogsStability() component.StabilityLevel
EntitiesToEntitiesStability() component.StabilityLevel

unexportedFactoryFunc()
}
Expand Down Expand Up @@ -163,6 +194,18 @@ func (f CreateTracesToLogsFunc) CreateTracesToLogs(ctx context.Context, set Sett
return f(ctx, set, cfg, next)
}

// CreateTracesToEntitiesFunc is the equivalent of Factory.CreateTracesToEntities().
type CreateTracesToEntitiesFunc func(context.Context, Settings, component.Config, consumer.Entities) (Traces, error)

// CreateTracesToEntities implements Factory.CreateTracesToEntities().
func (f CreateTracesToEntitiesFunc) CreateTracesToEntities(ctx context.Context, set Settings, cfg component.Config,
next consumer.Entities) (Traces, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, pipeline.SignalEntities)
}
return f(ctx, set, cfg, next)
}

// CreateMetricsToTracesFunc is the equivalent of Factory.CreateMetricsToTraces().
type CreateMetricsToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Metrics, error)

Expand Down Expand Up @@ -196,6 +239,18 @@ func (f CreateMetricsToLogsFunc) CreateMetricsToLogs(ctx context.Context, set Se
return f(ctx, set, cfg, next)
}

// CreateMetricsToEntitiesFunc is the equivalent of Factory.CreateMetricsToEntities().
type CreateMetricsToEntitiesFunc func(context.Context, Settings, component.Config, consumer.Entities) (Metrics, error)

// CreateMetricsToEntities implements Factory.CreateMetricsToEntities().
func (f CreateMetricsToEntitiesFunc) CreateMetricsToEntities(ctx context.Context, set Settings, cfg component.Config,
next consumer.Entities) (Metrics, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, pipeline.SignalEntities)
}
return f(ctx, set, cfg, next)
}

// CreateLogsToTracesFunc is the equivalent of Factory.CreateLogsToTraces().
type CreateLogsToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Logs, error)

Expand Down Expand Up @@ -229,6 +284,66 @@ func (f CreateLogsToLogsFunc) CreateLogsToLogs(ctx context.Context, set Settings
return f(ctx, set, cfg, next)
}

// CreateLogsToEntitiesFunc is the equivalent of Factory.CreateLogsToEntities().
type CreateLogsToEntitiesFunc func(context.Context, Settings, component.Config, consumer.Entities) (Logs, error)

// CreateLogsToEntities implements Factory.CreateLogsToEntities().
func (f CreateLogsToEntitiesFunc) CreateLogsToEntities(ctx context.Context, set Settings, cfg component.Config,
next consumer.Entities) (Logs, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, pipeline.SignalEntities)
}
return f(ctx, set, cfg, next)
}

// CreateEntitiesToTracesFunc is the equivalent of Factory.CreateEntitiesToTraces().
type CreateEntitiesToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Entities, error)

// CreateEntitiesToTraces implements Factory.CreateEntitiesToTraces().
func (f CreateEntitiesToTracesFunc) CreateEntitiesToTraces(ctx context.Context, set Settings, cfg component.Config,
next consumer.Traces) (Entities, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalEntities, pipeline.SignalTraces)
}
return f(ctx, set, cfg, next)
}

// CreateEntitiesToMetricsFunc is the equivalent of Factory.CreateEntitiesToMetrics().
type CreateEntitiesToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Entities, error)

// CreateEntitiesToMetrics implements Factory.CreateEntitiesToMetrics().
func (f CreateEntitiesToMetricsFunc) CreateEntitiesToMetrics(ctx context.Context, set Settings, cfg component.Config,
next consumer.Metrics) (Entities, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalEntities, pipeline.SignalMetrics)
}
return f(ctx, set, cfg, next)
}

// CreateEntitiesToLogsFunc is the equivalent of Factory.CreateEntitiesToLogs().
type CreateEntitiesToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Entities, error)

// CreateEntitiesToLogs implements Factory.CreateEntitiesToLogs().
func (f CreateEntitiesToLogsFunc) CreateEntitiesToLogs(ctx context.Context, set Settings, cfg component.Config,
next consumer.Logs) (Entities, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalEntities, pipeline.SignalLogs)
}
return f(ctx, set, cfg, next)
}

// CreateEntitiesToEntitiesFunc is the equivalent of Factory.CreateEntitiesToLogs().
type CreateEntitiesToEntitiesFunc func(context.Context, Settings, component.Config, consumer.Entities) (Entities, error)

// CreateEntitiesToEntities implements Factory.CreateEntitiesToEntities().
func (f CreateEntitiesToEntitiesFunc) CreateEntitiesToEntities(ctx context.Context, set Settings, cfg component.Config,
next consumer.Entities) (Entities, error) {
if f == nil {
return nil, internal.ErrDataTypes(set.ID, pipeline.SignalEntities, pipeline.SignalLogs)
}
return f(ctx, set, cfg, next)
}

// WithTracesToTraces overrides the default "error not supported" implementation for WithTracesToTraces and the default "undefined" stability level.
func WithTracesToTraces(createTracesToTraces CreateTracesToTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand Down Expand Up @@ -309,26 +424,42 @@ type factory struct {
CreateTracesToTracesFunc
CreateTracesToMetricsFunc
CreateTracesToLogsFunc
CreateTracesToEntitiesFunc

CreateMetricsToTracesFunc
CreateMetricsToMetricsFunc
CreateMetricsToLogsFunc
CreateMetricsToEntitiesFunc

CreateLogsToTracesFunc
CreateLogsToMetricsFunc
CreateLogsToLogsFunc
CreateLogsToEntitiesFunc

CreateEntitiesToTracesFunc
CreateEntitiesToMetricsFunc
CreateEntitiesToLogsFunc
CreateEntitiesToEntitiesFunc

tracesToTracesStabilityLevel component.StabilityLevel
tracesToMetricsStabilityLevel component.StabilityLevel
tracesToLogsStabilityLevel component.StabilityLevel
tracesToEntitiesStabilityLevel component.StabilityLevel

tracesToTracesStabilityLevel component.StabilityLevel
tracesToMetricsStabilityLevel component.StabilityLevel
tracesToLogsStabilityLevel component.StabilityLevel
metricsToTracesStabilityLevel component.StabilityLevel
metricsToMetricsStabilityLevel component.StabilityLevel
metricsToLogsStabilityLevel component.StabilityLevel
metricsToEntitiesStabilityLevel component.StabilityLevel

metricsToTracesStabilityLevel component.StabilityLevel
metricsToMetricsStabilityLevel component.StabilityLevel
metricsToLogsStabilityLevel component.StabilityLevel
logsToTracesStabilityLevel component.StabilityLevel
logsToMetricsStabilityLevel component.StabilityLevel
logsToLogsStabilityLevel component.StabilityLevel
logsToEntitiesStabilityLevel component.StabilityLevel

logsToTracesStabilityLevel component.StabilityLevel
logsToMetricsStabilityLevel component.StabilityLevel
logsToLogsStabilityLevel component.StabilityLevel
entitiesToTracesStabilityLevel component.StabilityLevel
entitiesToMetricsStabilityLevel component.StabilityLevel
entitiesToLogsStabilityLevel component.StabilityLevel
entitiesToEntitiesStabilityLevel component.StabilityLevel
}

// Type returns the type of component.
Expand All @@ -350,6 +481,10 @@ func (f *factory) TracesToLogsStability() component.StabilityLevel {
return f.tracesToLogsStabilityLevel
}

func (f *factory) TracesToEntitiesStability() component.StabilityLevel {
return f.tracesToEntitiesStabilityLevel
}

func (f *factory) MetricsToTracesStability() component.StabilityLevel {
return f.metricsToTracesStabilityLevel
}
Expand All @@ -362,6 +497,10 @@ func (f *factory) MetricsToLogsStability() component.StabilityLevel {
return f.metricsToLogsStabilityLevel
}

func (f *factory) MetricsToEntitiesStability() component.StabilityLevel {
return f.metricsToEntitiesStabilityLevel
}

func (f *factory) LogsToTracesStability() component.StabilityLevel {
return f.logsToTracesStabilityLevel
}
Expand All @@ -374,6 +513,26 @@ func (f *factory) LogsToLogsStability() component.StabilityLevel {
return f.logsToLogsStabilityLevel
}

func (f *factory) LogsToEntitiesStability() component.StabilityLevel {
return f.logsToEntitiesStabilityLevel
}

func (f *factory) EntitiesToTracesStability() component.StabilityLevel {
return f.entitiesToTracesStabilityLevel
}

func (f *factory) EntitiesToMetricsStability() component.StabilityLevel {
return f.entitiesToMetricsStabilityLevel
}

func (f *factory) EntitiesToLogsStability() component.StabilityLevel {
return f.entitiesToLogsStabilityLevel
}

func (f *factory) EntitiesToEntitiesStability() component.StabilityLevel {
return f.entitiesToEntitiesStabilityLevel
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down
Loading

0 comments on commit ce41a6b

Please sign in to comment.