Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: More detailed tracing for distributors #14504

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 48 additions & 31 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type Distributor struct {
kafkaWriteBytesTotal prometheus.Counter
kafkaWriteLatency prometheus.Histogram
kafkaRecordsPerRequest prometheus.Histogram
kafkaBytesPerRecord prometheus.Histogram
}

// New a distributor creates.
Expand Down Expand Up @@ -304,6 +305,12 @@ func New(
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
}),
kafkaBytesPerRecord: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "distributor_kafka_bytes_per_record",
benclive marked this conversation as resolved.
Show resolved Hide resolved
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
}),
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
Expand Down Expand Up @@ -594,36 +601,40 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc

tracker := pushTracker{
kafkaTracker := pushTracker{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that is unclear to me is what was the behavior before when sharing a common tracker, and what is the behavior now having separate trackers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API behaviour is exactly the same. The only benefit of the separate trackers is that we can monitor when all the ingesters complete vs when all the kafka writes complete. Before, the tracker only fired "done" once both sets of writes had completed and there was no way to tell which one was faster.

Initially I did this so we could create a histogram from the timings, but I've since replaced it with Spans. We do get subspans from the ingesters once we hit their Push endpoint, but there are no sub-spans from writing to Kafka which is what I wanted to amend here.

done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
streamsToWrite := 0
if d.cfg.IngesterEnabled {
streamsToWrite += len(streams)
}
var kafkaSpan opentracing.Span
if d.cfg.KafkaEnabled {
streamsToWrite += len(streams)
}
// We must correctly set streamsPending before beginning any writes to ensure we don't have a race between finishing all of one path before starting the other.
tracker.streamsPending.Store(int32(streamsToWrite))
var kafkaCtx context.Context
kafkaSpan, kafkaCtx = opentracing.StartSpanFromContext(ctx, "sendStreamsToKafka")
kafkaTracker.streamsPending.Store(int32(len(streams)))

if d.cfg.KafkaEnabled {
subring, err := d.partitionRing.PartitionRing().ShuffleShard(tenantID, d.validator.IngestionPartitionsTenantShardSize(tenantID))
if err != nil {
return nil, err
}
// We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded.
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker, subring)
d.sendStreamsToKafka(kafkaCtx, streams, tenantID, &kafkaTracker, subring)
}

ingesterTracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
var ingesterSpan opentracing.Span
if d.cfg.IngesterEnabled {
var ingesterCtx context.Context
ingesterSpan, ingesterCtx = opentracing.StartSpanFromContext(ctx, "sendStreamsToIngesters")
ingesterTracker.streamsPending.Store(int32(len(streams)))

streamTrackers := make([]streamTracker, len(streams))
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}

if err := func() error {
sp := opentracing.SpanFromContext(ctx)
sp := opentracing.SpanFromContext(ingesterCtx)
if sp != nil {
sp.LogKV("event", "started to query ingesters ring")
defer func() {
Expand Down Expand Up @@ -654,10 +665,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

for ingester, streams := range streamsByIngester {
func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
// Use a background context to make sure all ingesters get samples even if we return early.
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
localCtx = user.InjectOrgID(localCtx, tenantID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
if sp := opentracing.SpanFromContext(ingesterCtx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
select {
Expand All @@ -667,7 +678,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
case d.ingesterTasks <- pushIngesterTask{
ingester: ingester,
streamTracker: samples,
pushTracker: &tracker,
pushTracker: &ingesterTracker,
ctx: localCtx,
cancel: cancel,
}:
Expand All @@ -677,14 +688,26 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

select {
case err := <-tracker.err:
return nil, err
case <-tracker.done:
return &logproto.PushResponse{}, validationErr
case <-ctx.Done():
return nil, ctx.Err()
kafkaDone := !d.cfg.KafkaEnabled
ingestersDone := !d.cfg.IngesterEnabled
for !kafkaDone || !ingestersDone {
select {
// If either tracker errors, return the first one
case err := <-ingesterTracker.err:
return nil, err
case err := <-kafkaTracker.err:
return nil, err
case <-kafkaTracker.done:
kafkaSpan.Finish()
kafkaDone = true
case <-ingesterTracker.done:
ingesterSpan.Finish()
ingestersDone = true
case <-ctx.Done():
return nil, ctx.Err()
}
}
return &logproto.PushResponse{}, validationErr
}

func (d *Distributor) trackDiscardedData(
Expand Down Expand Up @@ -836,15 +859,6 @@ func (d *Distributor) createShard(lbls labels.Labels, streamPattern string, shar
}
}

// maxT returns the highest between two given timestamps.
func maxT(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t2
}

return t1
}

func (d *Distributor) truncateLines(vContext validationContext, stream *logproto.Stream) {
if !vContext.maxLineSizeTruncate {
return
Expand Down Expand Up @@ -970,6 +984,9 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
return fmt.Errorf("failed to marshal write request to records: %w", err)
}

for _, record := range records {
d.kafkaBytesPerRecord.Observe(float64(len(record.Value)))
}
d.kafkaRecordsPerRequest.Observe(float64(len(records)))

produceResults := d.kafkaWriter.ProduceSync(ctx, records)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

var (
success = &logproto.PushResponse{}
ctx = user.InjectOrgID(context.Background(), "test")
ctx, _ = context.WithTimeout(user.InjectOrgID(context.Background(), "test"), 10*time.Second)
)

func TestDistributor(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kafka/client/writer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -277,6 +278,10 @@ func (c *Producer) updateMetricsLoop() {
// This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered,
// if the configured limit is reached.
func (c *Producer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults {
span, ctx := opentracing.StartSpanFromContext(ctx, "kafka.produceSync")
span.LogKV("records", len(records))
defer span.Finish()

var (
remaining = atomic.NewInt64(int64(len(records)))
done = make(chan struct{})
Expand Down
7 changes: 5 additions & 2 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/pkg/push"

Expand All @@ -20,13 +21,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"

loki_util "github.com/grafana/loki/v3/pkg/util"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util"
loki_util "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/unmarshal"
unmarshal2 "github.com/grafana/loki/v3/pkg/util/unmarshal/legacy"
Expand Down Expand Up @@ -107,6 +107,9 @@ type Stats struct {
}

func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
span, ctx := opentracing.StartSpanFromContext(r.Context(), "parseRequest")
defer span.Finish()
r = r.WithContext(ctx)
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
if err != nil {
return nil, err
Expand Down
Loading