diff --git a/.mockery.yaml b/.mockery.yaml index caf85a81da..376a9390f0 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -37,6 +37,3 @@ packages: interfaces: SegmentWriterClient: IngesterClient: - github.com/grafana/pyroscope/pkg/experiment/metastore/blockcleaner: - interfaces: - Cleaner: diff --git a/pkg/experiment/metastore/blockcleaner/block_cleaner.go b/pkg/experiment/metastore/blockcleaner/block_cleaner.go deleted file mode 100644 index f70868bb3a..0000000000 --- a/pkg/experiment/metastore/blockcleaner/block_cleaner.go +++ /dev/null @@ -1,435 +0,0 @@ -package blockcleaner - -import ( - "context" - "encoding/binary" - "flag" - "fmt" - "path/filepath" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - "go.etcd.io/bbolt" - "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" - - "github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader" - "github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb" - "github.com/grafana/pyroscope/pkg/util" -) - -const ( - removedBlocksBucketName = "removed-blocks" -) - -var removedBlocksBucketNameBytes = []byte(removedBlocksBucketName) - -type Config struct { - CompactedBlocksCleanupInterval time.Duration `yaml:"compacted_blocks_cleanup_interval"` - CompactedBlocksCleanupDelay time.Duration `yaml:"compacted_blocks_cleanup_delay"` -} - -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.CompactedBlocksCleanupDelay, prefix+"compacted-blocks-cleanup-delay", time.Minute*30, "The grace period for permanently deleting compacted blocks.") - f.DurationVar(&cfg.CompactedBlocksCleanupInterval, prefix+"compacted-blocks-cleanup-interval", time.Minute, "The interval at which block cleanup is performed.") -} - -type CleanerLifecycler interface { - raftleader.LeaderRoutine - LoadMarkers() -} - -type RaftLog[Req, Resp proto.Message] interface { - ApplyCommand(req Req) (resp Resp, err error) -} - -type RaftLogCleanBlocks RaftLog[*raftlogpb.CleanBlocksRequest, *anypb.Any] - -type metrics struct { - markedBlocks *prometheus.CounterVec - expiredBlocks *prometheus.CounterVec - bucketObjectRemovals *prometheus.CounterVec -} - -func newMetrics(reg prometheus.Registerer) *metrics { - m := &metrics{ - markedBlocks: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "pyroscope", - Subsystem: "metastore", - Name: "block_cleaner_marked_block_count", - Help: "The number of blocks marked as removed", - }, []string{"tenant", "shard"}), - expiredBlocks: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "pyroscope", - Subsystem: "metastore", - Name: "block_cleaner_expired_block_count", - Help: "The number of marked blocks that expired and were removed", - }, []string{"tenant", "shard"}), - bucketObjectRemovals: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "pyroscope", - Subsystem: "metastore", - Name: "block_cleaner_bucket_removal_count", - Help: "The number of expired blocks that were removed from the bucket", - }, []string{"tenant", "shard"}), - } - if reg != nil { - util.Register(reg, - m.markedBlocks, - m.expiredBlocks, - m.bucketObjectRemovals, - ) - } - return m -} - -type BlockCleaner struct { - blocks map[string]struct{} - blocksMu sync.Mutex - - raftLog RaftLogCleanBlocks - db *bbolt.DB - bkt objstore.Bucket - logger log.Logger - cfg *Config - metrics *metrics - - started bool - mu sync.Mutex - wg sync.WaitGroup - cancel func() - isLeader bool -} - -func New( - raftLog RaftLogCleanBlocks, - db *bbolt.DB, - logger log.Logger, - config *Config, - bkt objstore.Bucket, - reg prometheus.Registerer, -) *BlockCleaner { - return newBlockCleaner(raftLog, db, logger, config, bkt, reg) -} - -func newBlockCleaner( - raftLog RaftLogCleanBlocks, - db *bbolt.DB, - logger log.Logger, - config *Config, - bkt objstore.Bucket, - reg prometheus.Registerer, -) *BlockCleaner { - return &BlockCleaner{ - blocks: make(map[string]struct{}), - raftLog: raftLog, - db: db, - logger: logger, - cfg: config, - bkt: bkt, - metrics: newMetrics(reg), - } -} - -type blockRemovalContext struct { - tenant string - expiryTs int64 -} - -func (c *BlockCleaner) LoadMarkers() { - c.mu.Lock() - defer c.mu.Unlock() - - _ = c.db.View(func(tx *bbolt.Tx) error { - bkt := tx.Bucket(removedBlocksBucketNameBytes) - if bkt == nil { - return nil - } - return bkt.ForEachBucket(func(k []byte) error { - shardBkt := bkt.Bucket(k) - if shardBkt == nil { - return nil - } - return shardBkt.ForEach(func(k, v []byte) error { - if len(k) < 34 { - return fmt.Errorf("block key too short (expected 34 chars, was %d)", len(k)) - } - blockId := string(k[:26]) - c.blocks[blockId] = struct{}{} - return nil - }) - }) - }) - level.Info(c.logger).Log("msg", "loaded metastore block deletion markers", "marker_count", len(c.blocks)) -} - -func (c *BlockCleaner) MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error { - if c.IsMarked(blockId) { - return nil - } - err := c.db.Update(func(tx *bbolt.Tx) error { - bkt, err := tx.CreateBucketIfNotExists(removedBlocksBucketNameBytes) - if err != nil { - return err - } - shardBkt, err := getOrCreateSubBucket(bkt, getShardBucketName(shard)) - if err != nil { - return err - } - expiryTs := deletedTs + c.cfg.CompactedBlocksCleanupDelay.Milliseconds() - blockKey := getBlockKey(blockId, expiryTs, tenant) - - return shardBkt.Put(blockKey, []byte{}) - }) - if err != nil { - return err - } - c.blocksMu.Lock() - defer c.blocksMu.Unlock() - c.blocks[blockId] = struct{}{} - c.metrics.markedBlocks.WithLabelValues(tenant, fmt.Sprint(shard)).Inc() - return nil -} - -func (c *BlockCleaner) IsMarked(blockId string) bool { - c.blocksMu.Lock() - defer c.blocksMu.Unlock() - _, ok := c.blocks[blockId] - return ok -} - -func (c *BlockCleaner) Start() { - c.mu.Lock() - defer c.mu.Unlock() - if c.started { - level.Info(c.logger).Log("msg", "blockc cleaner already started") - return - } - ctx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - c.started = true - c.isLeader = true - go c.loop(ctx) - level.Info(c.logger).Log("msg", "block cleaner started") -} - -func (c *BlockCleaner) Stop() { - c.mu.Lock() - defer c.mu.Unlock() - if !c.started { - level.Warn(c.logger).Log("msg", "block cleaner already stopped") - return - } - c.cancel() - c.started = false - c.isLeader = false - c.wg.Wait() - level.Info(c.logger).Log("msg", "block cleaner stopped") -} - -func (c *BlockCleaner) loop(ctx context.Context) { - t := time.NewTicker(c.cfg.CompactedBlocksCleanupInterval) - defer func() { - t.Stop() - }() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - _, err := c.raftLog.ApplyCommand(&raftlogpb.CleanBlocksRequest{}) - if err != nil { - _ = level.Error(c.logger).Log("msg", "failed to apply clean blocks command", "err", err) - } - } - } -} - -func (c *BlockCleaner) RemoveExpiredBlocks(now int64) error { - shards, err := c.listShards() - if err != nil { - panic(fmt.Errorf("failed to list shards for pending block removals: %w", err)) - } - g, ctx := errgroup.WithContext(context.Background()) - for _, shard := range shards { - g.Go(func() error { - c.wg.Add(1) - defer c.wg.Done() - return c.cleanShard(ctx, shard, now) - }) - } - err = g.Wait() - if err != nil { - level.Warn(c.logger).Log("msg", "error during pending block removal", "err", err) - } - return err -} - -func (c *BlockCleaner) listShards() ([]uint32, error) { - shards := make([]uint32, 0) - err := c.db.View(func(tx *bbolt.Tx) error { - bkt, err := getPendingBlockRemovalsBucket(tx) - if err != nil { - return err - } - return bkt.ForEachBucket(func(k []byte) error { - shards = append(shards, binary.BigEndian.Uint32(k)) - return nil - }) - }) - if err != nil { - return nil, err - } - return shards, nil -} - -func (c *BlockCleaner) cleanShard(ctx context.Context, shard uint32, now int64) error { - blocks, err := c.listBlocks(shard) - if err != nil { - level.Warn(c.logger).Log("msg", "failed to list removed blocks for shard", "err", err, "shard", shard) - return err - } - level.Info(c.logger).Log("msg", "cleaning removed blocks in shard", "shard", shard, "blocks", len(blocks)) - cntDeleted := 0 - cntDeletedBucket := 0 - for blockId, removalContext := range blocks { - if removalContext.expiryTs < now { - metricLabels := []string{removalContext.tenant, fmt.Sprint(shard)} - if c.isLeader { - var key string - if removalContext.tenant != "" { - key = filepath.Join("blocks", fmt.Sprint(shard), removalContext.tenant, blockId, "block.bin") - } else { - key = filepath.Join("segments", fmt.Sprint(shard), "anonymous", blockId, "block.bin") - } - level.Debug(c.logger).Log( - "msg", "removing block from bucket", - "shard", shard, - "tenant", removalContext.tenant, - "blockId", blockId, - "expiryTs", removalContext.expiryTs, - "bucket_key", key) - err := c.bkt.Delete(ctx, key) - if err != nil { - level.Warn(c.logger).Log( - "msg", "failed to remove block from bucket", - "err", err, - "blockId", blockId, - "shard", shard, - "tenant", removalContext.tenant) - // TODO(aleks-p): Detect if the error is "object does not exist" or something else. Handle each case appropriately. - continue - } - c.metrics.bucketObjectRemovals.WithLabelValues(metricLabels...).Inc() - cntDeletedBucket++ - } - err = c.removeBlock(blockId, shard, removalContext) - if err != nil { - level.Warn(c.logger).Log( - "msg", "failed to remove block from pending block removals", - "err", err, - "blockId", blockId, - "shard", shard, - "tenant", removalContext.tenant, - "expiry", removalContext.expiryTs) - } - level.Debug(c.logger).Log( - "msg", "removed block from pending block removals", - "blockId", blockId, - "shard", shard, - "tenant", removalContext.tenant, - "expiryTs", removalContext.expiryTs) - c.metrics.expiredBlocks.WithLabelValues(metricLabels...).Inc() - cntDeleted++ - } - } - level.Info(c.logger).Log("msg", "finished shard cleanup", "shard", shard, "blocks_removed", cntDeleted, "blocks_removed_bucket", cntDeletedBucket) - return nil -} - -func (c *BlockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext, error) { - blocks := make(map[string]*blockRemovalContext) - err := c.db.View(func(tx *bbolt.Tx) error { - bkt, err := getPendingBlockRemovalsBucket(tx) - if err != nil { - return err - } - shardBkt := bkt.Bucket(getShardBucketName(shard)) - if shardBkt == nil { - return nil - } - return shardBkt.ForEach(func(k, v []byte) error { - if len(k) < 34 { - return fmt.Errorf("block key too short (expected 34 chars, was %d)", len(k)) - } - blockId := string(k[:26]) - blocks[blockId] = &blockRemovalContext{ - expiryTs: int64(binary.BigEndian.Uint64(k[26:34])), - tenant: string(k[34:]), - } - return nil - }) - }) - if err != nil { - return nil, err - } - return blocks, nil -} - -func (c *BlockCleaner) removeBlock(blockId string, shard uint32, removalContext *blockRemovalContext) error { - err := c.db.Update(func(tx *bbolt.Tx) error { - bkt, err := getPendingBlockRemovalsBucket(tx) - if err != nil { - return err - } - shardBkt := bkt.Bucket(getShardBucketName(shard)) - if shardBkt == nil { - return errors.New("no bucket found for shard when clearing pending block removal") - } - blockKey := getBlockKey(blockId, removalContext.expiryTs, removalContext.tenant) - - return shardBkt.Delete(blockKey) - }) - if err != nil { - return err - } - c.blocksMu.Lock() - defer c.blocksMu.Unlock() - delete(c.blocks, blockId) - return nil -} - -func getPendingBlockRemovalsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { - bkt := tx.Bucket(removedBlocksBucketNameBytes) - if bkt == nil { - return nil, bbolt.ErrBucketNotFound - } - return bkt, nil -} - -func getOrCreateSubBucket(parent *bbolt.Bucket, name []byte) (*bbolt.Bucket, error) { - bucket := parent.Bucket(name) - if bucket == nil { - return parent.CreateBucket(name) - } - return bucket, nil -} - -func getShardBucketName(shard uint32) []byte { - shardBucketName := make([]byte, 4) - binary.BigEndian.PutUint32(shardBucketName, shard) - return shardBucketName -} - -func getBlockKey(blockId string, expiryTs int64, tenant string) []byte { - blockKey := make([]byte, 26+8+len(tenant)) - copy(blockKey[:26], blockId) - binary.BigEndian.PutUint64(blockKey[26:34], uint64(expiryTs)) - copy(blockKey[34:], tenant) - return blockKey -} diff --git a/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go b/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go deleted file mode 100644 index f48025d89b..0000000000 --- a/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package blockcleaner - -import ( - "bytes" - "context" - "crypto/rand" - "fmt" - "path/filepath" - "testing" - "time" - - "github.com/oklog/ulid" - "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" - - "github.com/grafana/pyroscope/pkg/objstore/providers/memory" - "github.com/grafana/pyroscope/pkg/util" -) - -func Test_AddAndCheck(t *testing.T) { - cleaner := newBlockCleaner(nil, createDb(t), util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil) - - blockId := ulid.MustNew(ulid.Now(), rand.Reader).String() - err := cleaner.MarkBlock(0, "tenant", blockId, 1000) - require.NoError(t, err) - - require.True(t, cleaner.IsMarked(blockId)) -} - -func Test_AddAndRemove(t *testing.T) { - cleaner := newBlockCleaner(nil, createDb(t), util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil) - cleaner.isLeader = true - - blockId := ulid.MustNew(ulid.Now(), rand.Reader).String() - err := cleaner.MarkBlock(0, "tenant", blockId, 1000) - require.NoError(t, err) - err = cleaner.bkt.Upload(context.Background(), fmt.Sprintf("blocks/0/tenant/%s/block.bin", blockId), bytes.NewReader([]byte{1, 2, 3})) - require.NoError(t, err) - - err = cleaner.RemoveExpiredBlocks(5000) - require.NoError(t, err) - require.False(t, cleaner.IsMarked(blockId)) - inBucket, err := cleaner.bkt.Exists(context.Background(), fmt.Sprintf("blocks/0/tenant/%s/block.bin", blockId)) - require.NoError(t, err) - require.False(t, inBucket) -} - -func createDb(t *testing.T) *bbolt.DB { - opts := *bbolt.DefaultOptions - opts.ReadOnly = false - opts.NoSync = true - db, err := bbolt.Open(filepath.Join(t.TempDir(), "db.boltdb"), 0644, &opts) - require.NoError(t, err) - return db -} diff --git a/pkg/experiment/metastore/blockcleaner/deletion_markers.go b/pkg/experiment/metastore/blockcleaner/deletion_markers.go new file mode 100644 index 0000000000..656aaf54ea --- /dev/null +++ b/pkg/experiment/metastore/blockcleaner/deletion_markers.go @@ -0,0 +1,258 @@ +package blockcleaner + +import ( + "encoding/binary" + "flag" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/bbolt" + + "github.com/grafana/pyroscope/pkg/util" +) + +type metrics struct { + markedBlocks *prometheus.CounterVec + expiredBlocks *prometheus.CounterVec +} + +func newMetrics(reg prometheus.Registerer) *metrics { + m := &metrics{ + markedBlocks: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "pyroscope", + Subsystem: "metastore", + Name: "block_cleaner_marked_block_count", + Help: "The number of blocks marked as removed", + }, []string{"tenant", "shard"}), + expiredBlocks: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "pyroscope", + Subsystem: "metastore", + Name: "block_cleaner_expired_block_count", + Help: "The number of marked blocks that expired and were removed", + }, []string{"tenant", "shard"}), + } + if reg != nil { + util.Register(reg, + m.markedBlocks, + m.expiredBlocks, + ) + } + return m +} + +const ( + removedBlocksBucketName = "removed-blocks" +) + +var removedBlocksBucketNameBytes = []byte(removedBlocksBucketName) + +type Config struct { + CompactedBlocksCleanupInterval time.Duration `yaml:"compacted_blocks_cleanup_interval"` + CompactedBlocksCleanupDelay time.Duration `yaml:"compacted_blocks_cleanup_delay"` +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.CompactedBlocksCleanupDelay, prefix+"compacted-blocks-cleanup-delay", time.Minute*30, "The grace period for permanently deleting compacted blocks.") + f.DurationVar(&cfg.CompactedBlocksCleanupInterval, prefix+"compacted-blocks-cleanup-interval", time.Minute, "The interval at which block cleanup is performed.") +} + +type BlockRemovalContext struct { + Shard uint32 + Tenant string + ExpiryTs int64 +} + +type DeletionMarkers struct { + blockMarkers map[string]*BlockRemovalContext + mu sync.Mutex + + db *bbolt.DB + logger log.Logger + cfg *Config + metrics *metrics +} + +func NewDeletionMarkers(db *bbolt.DB, cfg *Config, logger log.Logger, reg prometheus.Registerer) *DeletionMarkers { + return &DeletionMarkers{ + blockMarkers: make(map[string]*BlockRemovalContext), + db: db, + logger: logger, + cfg: cfg, + metrics: newMetrics(reg), + } +} + +func (m *DeletionMarkers) Load() { + m.mu.Lock() + defer m.mu.Unlock() + m.blockMarkers = make(map[string]*BlockRemovalContext) + + _ = m.db.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(removedBlocksBucketNameBytes) + if bkt == nil { + return nil + } + return bkt.ForEachBucket(func(k []byte) error { + shardBkt := bkt.Bucket(k) + if shardBkt == nil { + return nil + } + shard := binary.BigEndian.Uint32(k) + return shardBkt.ForEach(func(k, v []byte) error { + if len(k) < 34 { + return fmt.Errorf("block key too short (expected 34 chars, was %d)", len(k)) + } + blockId := string(k[:26]) + m.blockMarkers[blockId] = &BlockRemovalContext{ + Shard: shard, + Tenant: string(k[34:]), + ExpiryTs: int64(binary.BigEndian.Uint64(k[26:34])), + } + return nil + }) + }) + }) + level.Info(m.logger).Log("msg", "loaded metastore block deletion markers", "marker_count", len(m.blockMarkers)) +} + +func (m *DeletionMarkers) Mark(shard uint32, tenant string, blockId string, deletedTs int64) error { + if m.IsMarked(blockId) { + return nil + } + expiryTs := deletedTs + m.cfg.CompactedBlocksCleanupDelay.Milliseconds() + err := m.db.Update(func(tx *bbolt.Tx) error { + bkt, err := tx.CreateBucketIfNotExists(removedBlocksBucketNameBytes) + if err != nil { + return err + } + shardBkt, err := getOrCreateSubBucket(bkt, getShardBucketName(shard)) + if err != nil { + return err + } + blockKey := getBlockKey(blockId, expiryTs, tenant) + + return shardBkt.Put(blockKey, []byte{}) + }) + if err != nil { + return err + } + m.mu.Lock() + defer m.mu.Unlock() + m.blockMarkers[blockId] = &BlockRemovalContext{ + Shard: shard, + Tenant: tenant, + ExpiryTs: expiryTs, + } + m.metrics.markedBlocks.WithLabelValues(tenant, fmt.Sprint(shard)).Inc() + return nil +} + +func (m *DeletionMarkers) IsMarked(blockId string) bool { + m.mu.Lock() + defer m.mu.Unlock() + _, ok := m.blockMarkers[blockId] + return ok +} + +func (m *DeletionMarkers) FindExpiredMarkers(now int64) map[string]*BlockRemovalContext { + blocks := make(map[string]*BlockRemovalContext) + m.mu.Lock() + defer m.mu.Unlock() + for b, removalContext := range m.blockMarkers { + if removalContext.ExpiryTs < now { + blocks[b] = removalContext + } + } + return blocks +} + +func (m *DeletionMarkers) Remove(markers map[string]*BlockRemovalContext) error { + m.mu.Lock() + defer m.mu.Unlock() + if len(markers) == 0 { + return nil + } + markersPerShard := make(map[uint32]map[string]*BlockRemovalContext) + for blockId, removalContext := range markers { + s, ok := markersPerShard[removalContext.Shard] + if !ok { + s = make(map[string]*BlockRemovalContext) + markersPerShard[removalContext.Shard] = s + } + s[blockId] = removalContext + } + err := m.db.Update(func(tx *bbolt.Tx) error { + bkt, err := getPendingBlockRemovalsBucket(tx) + if err != nil { + return err + } + for shard, shardMarkers := range markersPerShard { + shardBkt, err := getOrCreateSubBucket(bkt, getShardBucketName(shard)) + if err != nil { + return err + } + for b, m := range shardMarkers { + key := getBlockKey(b, m.ExpiryTs, m.Tenant) + err := shardBkt.Delete(key) + if err != nil { + return err + } + } + } + return nil + }) + if err != nil { + return err + } + for b, removalContext := range markers { + delete(m.blockMarkers, b) + level.Debug(m.logger).Log( + "msg", "removed block from pending block removals", + "blockId", b, + "Shard", removalContext.Shard, + "Tenant", removalContext.Tenant, + "ExpiryTs", removalContext.ExpiryTs) + m.metrics.expiredBlocks.WithLabelValues(removalContext.Tenant, fmt.Sprint(removalContext.Shard)).Inc() + } + level.Info(m.logger).Log("msg", "finished deletion marker cleanup", "markers_removed", len(markers)) + return nil +} + +func (m *DeletionMarkers) Reload(db *bbolt.DB) { + m.db = db + m.Load() +} + +func getPendingBlockRemovalsBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + bkt := tx.Bucket(removedBlocksBucketNameBytes) + if bkt == nil { + return nil, bbolt.ErrBucketNotFound + } + return bkt, nil +} + +func getOrCreateSubBucket(parent *bbolt.Bucket, name []byte) (*bbolt.Bucket, error) { + bucket := parent.Bucket(name) + if bucket == nil { + return parent.CreateBucket(name) + } + return bucket, nil +} + +func getShardBucketName(shard uint32) []byte { + shardBucketName := make([]byte, 4) + binary.BigEndian.PutUint32(shardBucketName, shard) + return shardBucketName +} + +func getBlockKey(blockId string, expiryTs int64, tenant string) []byte { + blockKey := make([]byte, 26+8+len(tenant)) + copy(blockKey[:26], blockId) + binary.BigEndian.PutUint64(blockKey[26:34], uint64(expiryTs)) + copy(blockKey[34:], tenant) + return blockKey +} diff --git a/pkg/experiment/metastore/blockcleaner/deletion_markers_test.go b/pkg/experiment/metastore/blockcleaner/deletion_markers_test.go new file mode 100644 index 0000000000..549a5671d0 --- /dev/null +++ b/pkg/experiment/metastore/blockcleaner/deletion_markers_test.go @@ -0,0 +1,38 @@ +package blockcleaner + +import ( + "crypto/rand" + "path/filepath" + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + + "github.com/grafana/pyroscope/pkg/util" +) + +func Test_AddAndCheck(t *testing.T) { + markers := NewDeletionMarkers( + createDb(t), + &Config{CompactedBlocksCleanupDelay: time.Second * 2}, + util.Logger, + nil, + ) + + blockId := ulid.MustNew(ulid.Now(), rand.Reader).String() + err := markers.Mark(0, "Tenant", blockId, 1000) + require.NoError(t, err) + + require.True(t, markers.IsMarked(blockId)) +} + +func createDb(t *testing.T) *bbolt.DB { + opts := *bbolt.DefaultOptions + opts.ReadOnly = false + opts.NoSync = true + db, err := bbolt.Open(filepath.Join(t.TempDir(), "db.boltdb"), 0644, &opts) + require.NoError(t, err) + return db +} diff --git a/pkg/experiment/metastore/metastore.go b/pkg/experiment/metastore/metastore.go index d4d70cdf2f..8936c28021 100644 --- a/pkg/experiment/metastore/metastore.go +++ b/pkg/experiment/metastore/metastore.go @@ -146,7 +146,7 @@ type Metastore struct { placementMgr *adaptiveplacement.Manager dnsProvider *dns.Provider dlq *dlq.Recovery - blockCleaner blockcleaner.CleanerLifecycler + blockCleaner *blockCleaner } func New( @@ -168,6 +168,7 @@ func New( client: client, placementMgr: placementMgr, } + m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index) m.leaderhealth = raftleader.NewRaftLeaderHealthObserver(logger, reg) m.dlq = dlq.NewRecovery(dlq.RecoveryConfig{ Period: config.DLQRecoveryPeriod, @@ -190,20 +191,18 @@ func (m *Metastore) starting(context.Context) error { if err := m.db.open(false); err != nil { return fmt.Errorf("failed to initialize database: %w", err) } - blockCleaner := blockcleaner.New( - m, - m.db.boltdb, - m.logger, - &m.config.BlockCleaner, - m.bucket, - m.reg, - ) - blockCleaner.LoadMarkers() - m.blockCleaner = blockCleaner - m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, blockCleaner) + // deletion markers rely on the db being opened + m.state.deletionMarkers = blockcleaner.NewDeletionMarkers(m.db.boltdb, &m.config.BlockCleaner, m.logger, m.reg) + m.state.deletionMarkers.Load() + if err := m.initRaft(); err != nil { return fmt.Errorf("failed to initialize raft: %w", err) } + // the block cleaner needs raft to apply a raft command, we hold a reference here because we control the lifecycle + m.blockCleaner = newBlockCleaner(&m.config.BlockCleaner, m.raft, &m.config.Raft, m.bucket, m.logger, m.reg) + // the raft command is implemented in metastoreState, we need to pass a reference to determine the scope of the cleanup + m.state.blockCleaner = m.blockCleaner + m.blockCleaner.Start() return nil } @@ -266,11 +265,9 @@ func (m *Metastore) initRaft() (err error) { if st == raft.Leader { m.dlq.Start() m.placementMgr.Start() - m.blockCleaner.Start() } else { m.dlq.Stop() m.placementMgr.Stop() - m.blockCleaner.Stop() } }) return nil diff --git a/pkg/experiment/metastore/metastore_compaction_planner_test.go b/pkg/experiment/metastore/metastore_compaction_planner_test.go index ea369f1aad..f73ba25dc7 100644 --- a/pkg/experiment/metastore/metastore_compaction_planner_test.go +++ b/pkg/experiment/metastore/metastore_compaction_planner_test.go @@ -7,13 +7,12 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/blockcleaner" "github.com/grafana/pyroscope/pkg/experiment/metastore/index" - "github.com/grafana/pyroscope/pkg/test/mocks/mockblockcleaner" "github.com/grafana/pyroscope/pkg/util" ) @@ -66,11 +65,10 @@ func initState(tb testing.TB) *metastoreState { db := newDB(config, util.Logger, newMetastoreMetrics(reg)) err := db.open(false) require.NoError(tb, err) - blockCleaner := mockblockcleaner.NewMockCleaner(tb) - blockCleaner.On("IsMarked", mock.Anything).Return(false).Maybe() - blockCleaner.On("MarkBlock", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + deletionMarkers := blockcleaner.NewDeletionMarkers(db.boltdb, &blockcleaner.Config{}, util.Logger, nil) - m := newMetastoreState(util.Logger, db, reg, &config.Compaction, &index.DefaultConfig, blockCleaner) + m := newMetastoreState(util.Logger, db, reg, &config.Compaction, &index.DefaultConfig) + m.deletionMarkers = deletionMarkers require.NotNil(tb, m) return m } @@ -92,7 +90,8 @@ func getQueueLen(m *metastoreState, shard int, tenant string, level int) int { } func verifyCompactionState(t *testing.T, m *metastoreState) { - stateFromDb := newMetastoreState(util.Logger, m.db, prometheus.DefaultRegisterer, m.compactionConfig, &index.DefaultConfig, m.blockCleaner) + stateFromDb := newMetastoreState(util.Logger, m.db, prometheus.DefaultRegisterer, m.compactionConfig, m.indexConfig) + stateFromDb.deletionMarkers = m.deletionMarkers err := m.db.boltdb.View(func(tx *bbolt.Tx) error { return stateFromDb.restoreCompactionPlan(tx) }) diff --git a/pkg/experiment/metastore/metastore_state.go b/pkg/experiment/metastore/metastore_state.go index 8e87a98b3c..e3e03e6cbe 100644 --- a/pkg/experiment/metastore/metastore_state.go +++ b/pkg/experiment/metastore/metastore_state.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/bbolt" + "github.com/grafana/pyroscope/pkg/experiment/metastore/blockcleaner" "github.com/grafana/pyroscope/pkg/experiment/metastore/compactionpb" "github.com/grafana/pyroscope/pkg/experiment/metastore/index" ) @@ -24,20 +25,17 @@ type tenantShard struct { shard uint32 } -type BlockCleaner interface { - MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error - IsMarked(blockId string) bool - RemoveExpiredBlocks(now int64) error -} - type metastoreState struct { logger log.Logger + reg prometheus.Registerer compactionMetrics *compactionMetrics compactionConfig *CompactionConfig - indexConfig *index.Config - index *index.Index - blockCleaner BlockCleaner + index *index.Index + indexConfig *index.Config + + deletionMarkers *blockcleaner.DeletionMarkers + blockCleaner *blockCleaner compactionMutex sync.Mutex compactionJobBlockQueues map[tenantShard]*compactionJobBlockQueue @@ -57,10 +55,10 @@ func newMetastoreState( reg prometheus.Registerer, compactionCfg *CompactionConfig, indexCfg *index.Config, - blockCleaner BlockCleaner, ) *metastoreState { return &metastoreState{ logger: logger, + reg: reg, index: index.NewIndex(newIndexStore(db, logger), logger, indexCfg), db: db, compactionJobBlockQueues: make(map[tenantShard]*compactionJobBlockQueue), @@ -68,17 +66,17 @@ func newMetastoreState( compactionMetrics: newCompactionMetrics(reg), compactionConfig: compactionCfg, indexConfig: indexCfg, - blockCleaner: blockCleaner, } } func (m *metastoreState) reset(db *boltdb) { m.compactionMutex.Lock() + defer m.compactionMutex.Unlock() clear(m.compactionJobBlockQueues) m.index = index.NewIndex(newIndexStore(db, m.logger), m.logger, m.indexConfig) m.compactionJobQueue = newJobQueue(m.compactionConfig.JobLeaseDuration.Nanoseconds()) m.db = db - m.compactionMutex.Unlock() + m.deletionMarkers.Reload(db.boltdb) } func (m *metastoreState) restore(db *boltdb) error { diff --git a/pkg/experiment/metastore/metastore_state_add_block.go b/pkg/experiment/metastore/metastore_state_add_block.go index 27344ac848..ac0cb4a315 100644 --- a/pkg/experiment/metastore/metastore_state_add_block.go +++ b/pkg/experiment/metastore/metastore_state_add_block.go @@ -52,7 +52,7 @@ func (m *Metastore) AddRecoveredBlock(_ context.Context, req *metastorev1.AddBlo } func (m *metastoreState) applyAddBlock(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { - if m.blockCleaner.IsMarked(request.Block.Id) { + if m.deletionMarkers.IsMarked(request.Block.Id) { _ = level.Warn(m.logger).Log("msg", "block already added and compacted", "block_id", request.Block.Id) return &metastorev1.AddBlockResponse{}, nil } diff --git a/pkg/experiment/metastore/metastore_state_clean_blocks.go b/pkg/experiment/metastore/metastore_state_clean_blocks.go index 3009b55a3e..7d8cda8df2 100644 --- a/pkg/experiment/metastore/metastore_state_clean_blocks.go +++ b/pkg/experiment/metastore/metastore_state_clean_blocks.go @@ -1,17 +1,156 @@ package metastore import ( + "context" + "crypto/rand" + "fmt" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/hashicorp/raft" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/anypb" + "github.com/grafana/pyroscope/pkg/experiment/metastore/blockcleaner" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb" ) -func (m *Metastore) ApplyCommand(req *raftlogpb.CleanBlocksRequest) (resp *anypb.Any, err error) { - _, resp, err = applyCommand[*raftlogpb.CleanBlocksRequest, *anypb.Any](m.raft, req, m.config.Raft.ApplyTimeout) - return resp, err +type blockCleaner struct { + cfg *blockcleaner.Config + + raft *raft.Raft + raftCfg *RaftConfig + + bucket objstore.Bucket + + wg sync.WaitGroup + done chan struct{} + + logger log.Logger + bucketObjectRemovals *prometheus.CounterVec + + lastRequestId string +} + +func newBlockCleaner( + cfg *blockcleaner.Config, + r *raft.Raft, + raftCfg *RaftConfig, + bucket objstore.Bucket, + logger log.Logger, + reg prometheus.Registerer, +) *blockCleaner { + m := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "pyroscope", + Subsystem: "metastore", + Name: "block_cleaner_bucket_removal_count", + Help: "The number of expired blocks that were removed from the bucket", + }, []string{"tenant", "shard"}) + if reg != nil { + reg.MustRegister(m) + } + return &blockCleaner{ + cfg: cfg, + raft: r, + raftCfg: raftCfg, + bucket: bucket, + done: make(chan struct{}), + logger: logger, + bucketObjectRemovals: m, + } +} + +func (c *blockCleaner) Start() { + c.wg.Add(1) + go c.runLoop() +} + +func (c *blockCleaner) runLoop() { + t := time.NewTicker(c.cfg.CompactedBlocksCleanupInterval) + defer func() { + t.Stop() + c.wg.Done() + }() + for { + select { + case <-c.done: + return + case <-t.C: + if c.raft.State() != raft.Leader { + continue + } + requestId := ulid.MustNew(ulid.Now(), rand.Reader).String() + c.lastRequestId = requestId + req := &raftlogpb.CleanBlocksRequest{RequestId: requestId} + _, _, err := applyCommand[*raftlogpb.CleanBlocksRequest, *anypb.Any](c.raft, req, c.raftCfg.ApplyTimeout) + if err != nil { + _ = level.Error(c.logger).Log("msg", "failed to apply clean blocks command", "err", err) + } + } + } +} + +func (c *blockCleaner) Stop() { + close(c.done) + c.wg.Wait() } func (m *metastoreState) applyCleanBlocks(log *raft.Log, request *raftlogpb.CleanBlocksRequest) (*anypb.Any, error) { - return nil, m.blockCleaner.RemoveExpiredBlocks(log.AppendedAt.UnixMilli()) + expired := m.deletionMarkers.FindExpiredMarkers(log.AppendedAt.UnixMilli()) + level.Info(m.logger).Log( + "msg", "cleaning expired block deletion markers", + "count", len(expired), + "request_id", request.RequestId, + "stored_request_id", m.blockCleaner.lastRequestId, + ) + cleanBucket := m.blockCleaner.lastRequestId == request.RequestId + if cleanBucket { + var cnt atomic.Int64 + g, grpCtx := errgroup.WithContext(context.Background()) + for b, removalContext := range expired { + g.Go(func() error { + var key string + if removalContext.Tenant != "" { + key = filepath.Join("blocks", fmt.Sprint(removalContext.Shard), removalContext.Tenant, b, "block.bin") + } else { + key = filepath.Join("segments", fmt.Sprint(removalContext.Shard), "anonymous", b, "block.bin") + } + level.Debug(m.logger).Log( + "msg", "removing block from bucket", + "shard", removalContext.Shard, + "tenant", removalContext.Tenant, + "blockId", b, + "expiryTs", removalContext.ExpiryTs, + "bucket_key", key) + err := m.blockCleaner.bucket.Delete(grpCtx, key) + if err != nil { + level.Warn(m.logger).Log( + "msg", "failed to remove block from bucket", + "err", err, + "blockId", b, + "shard", removalContext.Shard, + "tenant", removalContext.Tenant) + // TODO(aleks-p): Detect if the error is "object does not exist" or something else. Handle each case appropriately. + return err + } + m.blockCleaner.bucketObjectRemovals.WithLabelValues(removalContext.Tenant, fmt.Sprint(removalContext.Shard)).Inc() + cnt.Add(1) + return nil + }) + } + err := g.Wait() + level.Info(m.logger).Log("msg", "finished bucket cleanup", "blocks_removed", cnt.Load()) + if err != nil { + return nil, err + } + return nil, m.deletionMarkers.Remove(expired) + } + return nil, m.deletionMarkers.Remove(expired) } diff --git a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go index 479022d52e..8b01e7d687 100644 --- a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go +++ b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go @@ -200,7 +200,7 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ for key, blocks := range stateUpdate.deletedBlocks { for _, block := range blocks { - err = m.blockCleaner.MarkBlock(key.shard, key.tenant, block, raftAppendedAtNanos/time.Millisecond.Nanoseconds()) + err = m.deletionMarkers.Mark(key.shard, key.tenant, block, raftAppendedAtNanos/time.Millisecond.Nanoseconds()) if err != nil { panic(fatalCommandError{fmt.Errorf("error persisting block removals, %w", err)}) } diff --git a/pkg/experiment/metastore/metastore_state_test.go b/pkg/experiment/metastore/metastore_state_test.go index 38bbc39f4b..f7218a4269 100644 --- a/pkg/experiment/metastore/metastore_state_test.go +++ b/pkg/experiment/metastore/metastore_state_test.go @@ -23,7 +23,7 @@ func TestMetadataStateManagement(t *testing.T) { err := db.open(false) require.NoError(t, err) - m := newMetastoreState(util.Logger, db, reg, &config.Compaction, &index.DefaultConfig, nil) + m := newMetastoreState(util.Logger, db, reg, &config.Compaction, &index.DefaultConfig) require.NotNil(t, m) t.Run("restore compaction state", func(t *testing.T) { diff --git a/pkg/experiment/metastore/raftlogpb/raflog.pb.go b/pkg/experiment/metastore/raftlogpb/raflog.pb.go index 517878d222..fe89a628ea 100644 --- a/pkg/experiment/metastore/raftlogpb/raflog.pb.go +++ b/pkg/experiment/metastore/raftlogpb/raflog.pb.go @@ -131,6 +131,8 @@ type CleanBlocksRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` } func (x *CleanBlocksRequest) Reset() { @@ -165,6 +167,13 @@ func (*CleanBlocksRequest) Descriptor() ([]byte, []int) { return file_experiment_metastore_raftlogpb_raflog_proto_rawDescGZIP(), []int{1} } +func (x *CleanBlocksRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + var File_experiment_metastore_raftlogpb_raflog_proto protoreflect.FileDescriptor var file_experiment_metastore_raftlogpb_raflog_proto_rawDesc = []byte{ @@ -176,28 +185,30 @@ var file_experiment_metastore_raftlogpb_raflog_proto_rawDesc = []byte{ 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x14, 0x0a, 0x12, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x33, 0x0a, 0x12, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x2a, 0x90, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, - 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x44, 0x44, - 0x5f, 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x10, 0x01, 0x12, 0x2c, 0x0a, 0x28, 0x43, 0x4f, 0x4d, 0x4d, - 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x50, 0x4f, 0x4c, 0x4c, 0x5f, 0x43, 0x4f, - 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x4a, 0x4f, 0x42, 0x53, 0x5f, 0x53, 0x54, - 0x41, 0x54, 0x55, 0x53, 0x10, 0x02, 0x12, 0x1d, 0x0a, 0x19, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, - 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4c, 0x45, 0x41, 0x4e, 0x5f, 0x42, 0x4c, 0x4f, - 0x43, 0x4b, 0x53, 0x10, 0x03, 0x42, 0x98, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x61, - 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x42, 0x0b, 0x52, 0x61, 0x66, 0x6c, 0x6f, 0x67, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, - 0x6f, 0x70, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, - 0x6e, 0x74, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x61, 0x66, - 0x74, 0x6c, 0x6f, 0x67, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x52, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x52, - 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xca, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, - 0xe2, 0x02, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, + 0x64, 0x2a, 0x90, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x43, + 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x44, 0x44, 0x5f, + 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x10, 0x01, 0x12, 0x2c, 0x0a, 0x28, 0x43, 0x4f, 0x4d, 0x4d, 0x41, + 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x50, 0x4f, 0x4c, 0x4c, 0x5f, 0x43, 0x4f, 0x4d, + 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x4a, 0x4f, 0x42, 0x53, 0x5f, 0x53, 0x54, 0x41, + 0x54, 0x55, 0x53, 0x10, 0x02, 0x12, 0x1d, 0x0a, 0x19, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, + 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4c, 0x45, 0x41, 0x4e, 0x5f, 0x42, 0x4c, 0x4f, 0x43, + 0x4b, 0x53, 0x10, 0x03, 0x42, 0x98, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x61, 0x66, + 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x42, 0x0b, 0x52, 0x61, 0x66, 0x6c, 0x6f, 0x67, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, + 0x70, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, + 0x74, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x61, 0x66, 0x74, + 0x6c, 0x6f, 0x67, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x52, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x52, 0x61, + 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xca, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xe2, + 0x02, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/experiment/metastore/raftlogpb/raflog.proto b/pkg/experiment/metastore/raftlogpb/raflog.proto index 5041b8db37..879e549819 100644 --- a/pkg/experiment/metastore/raftlogpb/raflog.proto +++ b/pkg/experiment/metastore/raftlogpb/raflog.proto @@ -14,4 +14,6 @@ enum CommandType { COMMAND_TYPE_CLEAN_BLOCKS = 3; } -message CleanBlocksRequest {} +message CleanBlocksRequest { + string request_id = 1; +} diff --git a/pkg/experiment/metastore/raftlogpb/raflog_vtproto.pb.go b/pkg/experiment/metastore/raftlogpb/raflog_vtproto.pb.go index 092d03e15f..a2f2329e46 100644 --- a/pkg/experiment/metastore/raftlogpb/raflog_vtproto.pb.go +++ b/pkg/experiment/metastore/raftlogpb/raflog_vtproto.pb.go @@ -93,6 +93,13 @@ func (m *CleanBlocksRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.RequestId) > 0 { + i -= len(m.RequestId) + copy(dAtA[i:], m.RequestId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.RequestId))) + i-- + dAtA[i] = 0xa + } return len(dAtA) - i, nil } @@ -119,6 +126,10 @@ func (m *CleanBlocksRequest) SizeVT() (n int) { } var l int _ = l + l = len(m.RequestId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -256,6 +267,38 @@ func (m *CleanBlocksRequest) UnmarshalVT(dAtA []byte) error { return fmt.Errorf("proto: CleanBlocksRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RequestId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:])