-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[processor/logdedup] feat: add ottl condition to logdedup processor #35443
base: main
Are you sure you want to change the base?
[processor/logdedup] feat: add ottl condition to logdedup processor #35443
Conversation
wg sync.WaitGroup | ||
mux sync.Mutex | ||
emitInterval time.Duration | ||
condition *expr.OTTLCondition[ottllog.TransformContext] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are doing this in this repo, we should be able to make use of this internal package:
func NewBoolExprForLog(conditions []string, functions map[string]ottl.Factory[ottllog.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (expr.BoolExpr[ottllog.TransformContext], error) { |
Can you investigate and see if we can switch over to using this package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great find, looks like it works just fine. That way we don't use the bindplane-agent/expr package 👍
Thanks Brandon
|
||
logs.RemoveIf(func(logRecord plog.LogRecord) bool { | ||
var conditionMatch bool | ||
if p.conditionString == "true" || p.conditionString == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this empty condition string here, an empty string won't compile so the processor won't run if you specify an empty string as the condition.
logMatch, err := p.condition.Match(ctx, logCtx) | ||
conditionMatch = err == nil && logMatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should log if this errors
| Field | Type | Default | Description | | ||
| --- | --- | --- | --- | | ||
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. | | ||
| condition | string | `true` | An [OTTL] expression used to match which log records to sample from. All paths in the [log context] are available to reference. All [converters] are available to use. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intuitively, as a user I think I would expect a condition to control whether or not a log is deduplicated, not whether it is kept at all. In other words, I would have expected that logs that do not match the condition pass through the processor without being affected.
The actual meaning seems to be that the condition acts as a filter for the logs. The problem is, we already have ways to filter data, so I'm unconvinced this functionality belongs in this component. The user can always filter a log stream before sending it through this component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhh good catch, I had forgotten to change the description when I edited the readme.
I just updated it to be accurate with what the condition does.
The condition is actually used to determine what logs are deduped. Any logs that don't match the condition are just exported without processing.
|
||
logs.RemoveIf(func(logRecord plog.LogRecord) bool { | ||
// no need to evaluate condition if it is "true" | ||
conditionMatch := p.conditionString == "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of computing this boolean for every single log record that flows through the processor, we can just compute it once in the factory and save it on the receiver (rather than saving the string).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better yet, just break this into two functions and save the appropriate function onto the reciever:
func dedupMatches(logRecord plog.LogRecord) bool {
logCtx := ottllog.NewTransformContext(logRecord, scope, resource, sl, rl)
logMatch, err := p.condition.Eval(ctx, logCtx)
if err != nil {
p.logger.Error("error matching condition", zap.Error(err))
return false
}
return logMatch
}
func dedupAll(logRecord plog.LogRecord) bool {
p.remover.RemoveFields(logRecord)
p.aggregator.Add(resource, scope, logRecord)
return true
}
type logDedupProcessor struct {
...
matchFunc func(plog.LogRecord) bool
}
// in factory
matchFunc := dedupAll
if c.Condition != defaultCondition {
matchFunc = dedupMatches
}
...
return &logDedupProcessor{
...
matchFunc: matchFunc,
}
// here, in the receiver
logs.RemoveIf(p.matchFunc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good ideas, thanks for the suggestions. Just committed a refactor.
if processorCfg.Condition == defaultCondition { | ||
processor.matchFunc = processor.dedupAll | ||
} else { | ||
condition, err := filterottl.NewBoolExprForLog([]string{processorCfg.Condition}, filterottl.StandardLogFuncs(), ottl.PropagateError, settings.TelemetrySettings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When parameter lists cause very long lines we can break them up for clarity or move them into a function.
condition, err := filterottl.NewBoolExprForLog([]string{processorCfg.Condition}, filterottl.StandardLogFuncs(), ottl.PropagateError, settings.TelemetrySettings) | |
condition, err := filterottl.NewBoolExprForLog( | |
[]string{processorCfg.Condition}, | |
filterottl.StandardLogFuncs(), | |
ottl.PropagateError, | |
settings.TelemetrySettings, | |
) |
logs.RemoveIf(func(logRecord plog.LogRecord) bool { | ||
return p.matchFunc(ctx, logRecord, scope, resource, sl, rl) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The matchFunc signature ended up being very verbose. Maybe we should just bypass RemoveIf
altogether when appropriate.
if p.condition == nil {
// Don't worry about RemoveIf
p.remover.RemoveFields(logRecord)
p.aggregator.Add(resource, scope, logRecord)
continue
}
// now worry about RemoveIf
logs.RemoveIf(func(logRecord plog.LogRecord) bool {
logCtx := ottllog.NewTransformContext(logRecord, scope, resource, sl, rl)
logMatch, err := p.condition.Eval(ctx, logCtx)
if err != nil {
p.logger.Error("error matching condition", zap.Error(err))
return false
}
if logMatch {
p.remover.RemoveFields(logRecord)
p.aggregator.Add(resource, scope, logRecord)
}
return logMatch
})
You might also consider moving the two common lines into a function to ensure we don't update one and not the other later.
Description:
Adds OTTL Condition field to Deduplicate Logs Processor
Link to tracking Issue: Closes #35440
Testing:
Documentation: Added documentation to the logdedup processor README about the condition field and an example configuration with a condition.