Skip to content

Commit

Permalink
remove usage of global tagger from dogstasd replay component
Browse files Browse the repository at this point in the history
  • Loading branch information
GustavoCaso committed Oct 17, 2024
1 parent 40c637a commit 100ed74
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock"
"github.com/DataDog/datadog-agent/comp/dogstatsd"
Expand All @@ -38,12 +40,17 @@ func TestDogstatsdMetricsStats(t *testing.T) {
assert := assert.New(t)
var err error

taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())

deps := fxutil.Test[testDeps](t, fx.Options(
core.MockBundle(),
fx.Supply(core.BundleParams{}),
demultiplexerimpl.MockModule(),
dogstatsd.Bundle(server.Params{Serverless: false}),
defaultforwarder.MockModule(),
fx.Provide(func() tagger.Component {
return taggerComponent
}),
workloadmetafxmock.MockModule(workloadmeta.NewParams()),
))

Expand Down
4 changes: 2 additions & 2 deletions cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func RunDogstatsdFct(cliParams *CLIParams, defaultConfPath string, defaultLogFil
eventplatformimpl.Module(eventplatformimpl.NewDisabledParams()),
eventplatformreceiverimpl.Module(),
hostnameimpl.Module(),
taggerimpl.OptionalModule(),
taggerimpl.Module(),
// injecting the shared Serializer to FX until we migrate it to a prpoper component. This allows other
// already migrated components to request it.
fx.Provide(func(demuxInstance demultiplexer.Component) serializer.MetricSerializer {
Expand Down Expand Up @@ -191,7 +191,7 @@ func start(
server dogstatsdServer.Component,
_ defaultforwarder.Component,
wmeta optional.Option[workloadmeta.Component],
_ optional.Option[tagger.Component],
_ tagger.Component,
demultiplexer demultiplexer.Component,
_ runner.Component,
_ resources.Component,
Expand Down
6 changes: 5 additions & 1 deletion comp/dogstatsd/replay/impl/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/spf13/afero"

configComponent "github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/tagger"
compdef "github.com/DataDog/datadog-agent/comp/def"
"github.com/DataDog/datadog-agent/comp/dogstatsd/packets"
replay "github.com/DataDog/datadog-agent/comp/dogstatsd/replay/def"
Expand All @@ -26,12 +27,14 @@ import (
type Requires struct {
Lc compdef.Lifecycle
Config configComponent.Component
Tagger tagger.Component
}

// trafficCapture allows capturing traffic from our listeners and writing it to file
type trafficCapture struct {
writer *TrafficCaptureWriter
config model.Reader
tagger tagger.Component
startUpError error

sync.RWMutex
Expand All @@ -41,6 +44,7 @@ type trafficCapture struct {
func NewTrafficCapture(deps Requires) replay.Component {
tc := &trafficCapture{
config: deps.Config,
tagger: deps.Tagger,
}
deps.Lc.Append(compdef.Hook{
OnStart: tc.configure,
Expand All @@ -50,7 +54,7 @@ func NewTrafficCapture(deps Requires) replay.Component {
}

func (tc *trafficCapture) configure(_ context.Context) error {
writer := NewTrafficCaptureWriter(tc.config.GetInt("dogstatsd_capture_depth"))
writer := NewTrafficCaptureWriter(tc.config.GetInt("dogstatsd_capture_depth"), tc.tagger)
if writer == nil {
tc.startUpError = fmt.Errorf("unable to instantiate capture writer")
}
Expand Down
6 changes: 4 additions & 2 deletions comp/dogstatsd/replay/impl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ type TrafficCaptureWriter struct {
oobPacketPoolManager *packets.PoolManager[[]byte]

taggerState map[int32]string
tagger tagger.Component

// Synchronizes access to ongoing, accepting and closing of Traffic
sync.RWMutex
}

// NewTrafficCaptureWriter creates a TrafficCaptureWriter instance.
func NewTrafficCaptureWriter(depth int) *TrafficCaptureWriter {
func NewTrafficCaptureWriter(depth int, tagger tagger.Component) *TrafficCaptureWriter {

return &TrafficCaptureWriter{
Traffic: make(chan *replay.CaptureBuffer, depth),
taggerState: make(map[int32]string),
tagger: tagger,
}
}

Expand Down Expand Up @@ -313,7 +315,7 @@ func (tc *TrafficCaptureWriter) writeState() (int, error) {

// iterate entities
for _, id := range tc.taggerState {
entity, err := tagger.GetEntity(id)
entity, err := tc.tagger.GetEntity(id)
if err != nil {
log.Warnf("There was no entity for container id: %v present in the tagger", entity)
continue
Expand Down
6 changes: 5 additions & 1 deletion comp/dogstatsd/replay/impl/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
telemetrynoop "github.com/DataDog/datadog-agent/comp/core/telemetry/noopsimpl"
"github.com/DataDog/datadog-agent/comp/dogstatsd/packets"
Expand All @@ -38,7 +40,9 @@ func writerTest(t *testing.T, z bool) {

cfg := config.NewMock(t)

writer := NewTrafficCaptureWriter(1)
taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())

writer := NewTrafficCaptureWriter(1, taggerComponent)

// initialize telemeytry store
telemetryComponent := fxutil.Test[telemetry.Component](t, telemetrynoop.Module())
Expand Down

0 comments on commit 100ed74

Please sign in to comment.