Skip to content

Commit

Permalink
perf: optimize pprof split (#3569)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Sep 19, 2024
1 parent ffed89e commit b60259a
Show file tree
Hide file tree
Showing 4 changed files with 485 additions and 8 deletions.
14 changes: 10 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ func extractSampleSeries(
) {
for _, series := range req.Series {
for _, p := range series.Samples {
v := &sampleSeriesVisitor{profile: p.Profile.Profile}
v := &sampleSeriesVisitor{profile: p.Profile}
pprofsplit.VisitSampleSeries(
p.Profile.Profile,
series.Labels,
Expand All @@ -834,22 +834,28 @@ func extractSampleSeries(
CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(v.discardedBytes))
}
}

return result, bytesRelabelDropped, profilesRelabelDropped
}

type sampleSeriesVisitor struct {
profile *profilev1.Profile
profile *pprof.Profile
exp *pprof.SampleExporter
series []*distributormodel.ProfileSeries

discardedBytes int
discardedProfiles int
}

func (v *sampleSeriesVisitor) VisitProfile(labels []*typesv1.LabelPair) {
v.series = append(v.series, &distributormodel.ProfileSeries{
Samples: []*distributormodel.ProfileSample{{Profile: v.profile}},
Labels: labels,
})
}

func (v *sampleSeriesVisitor) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) {
if v.exp == nil {
v.exp = pprof.NewSampleExporter(v.profile)
v.exp = pprof.NewSampleExporter(v.profile.Profile)
}
v.series = append(v.series, &distributormodel.ProfileSeries{
Samples: []*distributormodel.ProfileSample{{Profile: exportSamples(v.exp, samples)}},
Expand Down
14 changes: 11 additions & 3 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,9 @@ func (s *segment) ingest(ctx context.Context, tenantID string, p *profilev1.Prof
rules := s.sw.limits.IngestionRelabelingRules(tenantID)
usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels)
appender := &sampleAppender{
id: id,
head: s.headForIngest(k),
exporter: pprofmodel.NewSampleExporter(p),
head: s.headForIngest(k),
profile: p,
id: id,
}
pprofsplit.VisitSampleSeries(p, labels, rules, appender)
size -= appender.discardedBytes
Expand All @@ -467,13 +467,21 @@ func (s *segment) ingest(ctx context.Context, tenantID string, p *profilev1.Prof
type sampleAppender struct {
id uuid.UUID
head *memdb.Head
profile *profilev1.Profile
exporter *pprofmodel.SampleExporter

discardedProfiles int
discardedBytes int
}

func (v *sampleAppender) VisitProfile(labels []*typesv1.LabelPair) {
v.head.Ingest(v.profile, v.id, labels)
}

func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) {
if v.exporter == nil {
v.exporter = pprofmodel.NewSampleExporter(v.profile)
}
var n profilev1.Profile
v.exporter.ExportSamples(&n, samples)
v.head.Ingest(&n, v.id, labels)
Expand Down
6 changes: 5 additions & 1 deletion pkg/model/pprof_split/pprof_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
)

type SampleSeriesVisitor interface {
// VisitProfile is called when no sample labels are present in
// the profile, or if all the sample labels are identical.
// Provided labels are the series labels processed with relabeling rules.
VisitProfile([]*typesv1.LabelPair)
VisitSampleSeries([]*typesv1.LabelPair, []*profilev1.Sample)
Discarded(profiles, bytes int)
}
Expand Down Expand Up @@ -44,7 +48,7 @@ func VisitSampleSeries(
}
}
if len(profile.Sample) > 0 {
visitor.VisitSampleSeries(builder.Labels(), profile.Sample)
visitor.VisitProfile(builder.Labels())
}
return
}
Expand Down
Loading

0 comments on commit b60259a

Please sign in to comment.