diff --git a/cmd/profilecli/main.go b/cmd/profilecli/main.go index 05ad4a4ba0..e59bef762c 100644 --- a/cmd/profilecli/main.go +++ b/cmd/profilecli/main.go @@ -79,6 +79,10 @@ func main() { queryTracerCmd := app.Command("query-tracer", "Analyze query traces.") queryTracerParams := addQueryTracerParams(queryTracerCmd) + queryBlockCmd := app.Command("query-blocks", "Query on local/remote blocks") + queryBlockSeriesCmd := queryBlockCmd.Command("series", "Request series labels on a local/remote block") + queryBlockSeriesParams := addQueryBlockSeriesParams(queryBlockSeriesCmd) + uploadCmd := app.Command("upload", "Upload profile(s).") uploadParams := addUploadParams(uploadCmd) @@ -131,6 +135,10 @@ func main() { if err := querySeries(ctx, querySeriesParams); err != nil { os.Exit(checkError(err)) } + case queryBlockSeriesCmd.FullCommand(): + if err := queryBlockSeries(ctx, queryBlockSeriesParams); err != nil { + os.Exit(checkError(err)) + } case queryLabelValuesCardinalityCmd.FullCommand(): if err := queryLabelValuesCardinality(ctx, queryLabelValuesCardinalityParams); err != nil { diff --git a/cmd/profilecli/output.go b/cmd/profilecli/output.go new file mode 100644 index 0000000000..498ba6ea86 --- /dev/null +++ b/cmd/profilecli/output.go @@ -0,0 +1,25 @@ +package main + +import ( + "encoding/json" + "os" + + "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" +) + +func outputSeries(result []*typesv1.Labels) error { + enc := json.NewEncoder(os.Stdout) + m := make(map[string]interface{}) + for _, s := range result { + for k := range m { + delete(m, k) + } + for _, l := range s.Labels { + m[l.Name] = l.Value + } + if err := enc.Encode(m); err != nil { + return err + } + } + return nil +} diff --git a/cmd/profilecli/query-blocks.go b/cmd/profilecli/query-blocks.go new file mode 100644 index 0000000000..c9355733c8 --- /dev/null +++ b/cmd/profilecli/query-blocks.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "fmt" + "math" + + "connectrpc.com/connect" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + + ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" + "github.com/grafana/pyroscope/pkg/objstore" + objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client" + "github.com/grafana/pyroscope/pkg/objstore/providers/filesystem" + "github.com/grafana/pyroscope/pkg/objstore/providers/gcs" + "github.com/grafana/pyroscope/pkg/phlaredb" +) + +type queryBlockParams struct { + LocalPath string + BucketName string + BlockIds []string + TenanId string + ObjectStoreType string + Query string +} + +type queryBlockSeriesParams struct { + *queryBlockParams + LabelNames []string +} + +func addQueryBlockParams(queryCmd commander) *queryBlockParams { + params := new(queryBlockParams) + queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(¶ms.LocalPath) + queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(¶ms.BucketName) + queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(¶ms.ObjectStoreType) + queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(¶ms.BlockIds) + queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(¶ms.TenanId) + queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(¶ms.Query) + return params +} + +func addQueryBlockSeriesParams(queryCmd commander) *queryBlockSeriesParams { + params := new(queryBlockSeriesParams) + params.queryBlockParams = addQueryBlockParams(queryCmd) + queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(¶ms.LabelNames) + return params +} + +func queryBlockSeries(ctx context.Context, params *queryBlockSeriesParams) error { + level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames), + "blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenanId) + + bucket, err := getBucket(ctx, params) + if err != nil { + return err + } + + blockQuerier := phlaredb.NewBlockQuerier(ctx, bucket) + + var from, to int64 + from, to = math.MaxInt64, math.MinInt64 + var targetBlockQueriers phlaredb.Queriers + for _, blockId := range params.queryBlockParams.BlockIds { + meta, err := blockQuerier.BlockMeta(ctx, blockId) + if err != nil { + return err + } + from = min(from, meta.MinTime.Time().UnixMilli()) + to = max(to, meta.MaxTime.Time().UnixMilli()) + targetBlockQueriers = append(targetBlockQueriers, phlaredb.NewSingleBlockQuerierFromMeta(ctx, bucket, meta)) + } + + response, err := targetBlockQueriers.Series(ctx, connect.NewRequest( + &ingestv1.SeriesRequest{ + Start: from, + End: to, + Matchers: []string{params.Query}, + LabelNames: params.LabelNames, + }, + )) + if err != nil { + return err + } + + return outputSeries(response.Msg.LabelsSet) +} + +func getBucket(ctx context.Context, params *queryBlockSeriesParams) (objstore.Bucket, error) { + if params.BucketName != "" { + return getRemoteBucket(ctx, params) + } else { + return filesystem.NewBucket(params.LocalPath) + } +} + +func getRemoteBucket(ctx context.Context, params *queryBlockSeriesParams) (objstore.Bucket, error) { + if params.TenanId == "" { + return nil, errors.New("specify tenant id for remote bucket") + } + return objstoreclient.NewBucket(ctx, objstoreclient.Config{ + StorageBackendConfig: objstoreclient.StorageBackendConfig{ + Backend: params.ObjectStoreType, + GCS: gcs.Config{ + BucketName: params.BucketName, + }, + }, + StoragePrefix: fmt.Sprintf("%s/phlaredb", params.TenanId), + }, params.BucketName) +} diff --git a/cmd/profilecli/query.go b/cmd/profilecli/query.go index c965f302c4..4e993c0b0d 100644 --- a/cmd/profilecli/query.go +++ b/cmd/profilecli/query.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "encoding/json" "fmt" "io" "os" @@ -320,22 +319,8 @@ func querySeries(ctx context.Context, params *querySeriesParams) (err error) { return errors.Errorf("unknown api type %s", params.APIType) } - enc := json.NewEncoder(os.Stdout) - m := make(map[string]interface{}) - for _, s := range result { - for k := range m { - delete(m, k) - } - for _, l := range s.Labels { - m[l.Name] = l.Value - } - if err := enc.Encode(m); err != nil { - return err - } - } - - return nil - + err = outputSeries(result) + return err } type queryLabelValuesCardinalityParams struct { diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 0349e426b6..f4267136c1 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -159,6 +159,23 @@ func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ e return metas[0 : pos+1], nil } +func (b *BlockQuerier) BlockMeta(ctx context.Context, name string) (meta *block.Meta, _ error) { + path := filepath.Join(name, block.MetaFilename) + metaReader, err := b.bkt.Get(ctx, path) + if err != nil { + level.Error(b.logger).Log("msg", "error reading block meta", "block", path, "err", err) + return nil, err + } + + meta, err = block.Read(metaReader) + if err != nil { + level.Error(b.logger).Log("msg", "error parsing block meta", "block", path, "err", err) + return nil, err + } + + return meta, nil +} + // Sync gradually scans the available blocks. If there are any changes to the // last run it will Open/Close new/no longer existing ones. func (b *BlockQuerier) Sync(ctx context.Context) error { diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go index 4ce7732152..3aade6c2de 100644 --- a/pkg/phlaredb/block_querier_test.go +++ b/pkg/phlaredb/block_querier_test.go @@ -1386,3 +1386,21 @@ func testSelectMergeByStacktracesRace(t testing.TB, times int) { require.NoError(t, g.Wait()) require.NoError(t, querier.Close()) } + +func TestBlockMeta_loadsMetasIndividually(t *testing.T) { + path := "./block/testdata/" + bucket, err := filesystem.NewBucket(path) + require.NoError(t, err) + + ctx := context.Background() + blockQuerier := NewBlockQuerier(ctx, bucket) + metas, err := blockQuerier.BlockMetas(ctx) + require.NoError(t, err) + + for _, meta := range metas { + singleMeta, err := blockQuerier.BlockMeta(ctx, meta.ULID.String()) + require.NoError(t, err) + + require.Equal(t, meta, singleMeta) + } +}