diff --git a/block_test.go b/block_test.go index 3f39a899..cec33349 100644 --- a/block_test.go +++ b/block_test.go @@ -16,16 +16,15 @@ package tsdb import ( "context" "encoding/binary" - "errors" "io/ioutil" "math/rand" "os" "path/filepath" - "strconv" "testing" "github.com/go-kit/kit/log" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -57,7 +56,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) + blockDir := CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 1)) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -77,7 +76,7 @@ func TestCreateBlock(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil) + b, err := OpenBlock(nil, CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 10)), nil) if err == nil { testutil.Ok(t, b.Close()) } @@ -134,7 +133,7 @@ func TestCorruptedChunk(t *testing.T) { testutil.Ok(t, os.RemoveAll(tmpdir)) }() - blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) + blockDir := CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 1)) files, err := sequenceFiles(chunkDir(blockDir)) testutil.Ok(t, err) testutil.Assert(t, len(files) > 0, "No chunk created.") @@ -168,7 +167,7 @@ func TestBlockSize(t *testing.T) { // Create a block and compare the reported size vs actual disk size. { - blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100)) + blockDirInit = CreateBlock(t, tmpdir, GenSeries(10, 1, 1, 100)) blockInit, err = OpenBlock(nil, blockDirInit, nil) testutil.Ok(t, err) defer func() { @@ -204,76 +203,6 @@ func TestBlockSize(t *testing.T) { } } -// createBlock creates a block with given set of series and returns its dir. -func createBlock(tb testing.TB, dir string, series []Series) string { - head := createHead(tb, series) - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) - testutil.Ok(tb, err) - - testutil.Ok(tb, os.MkdirAll(dir, 0777)) - - // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). - // Because of this block intervals are always +1 than the total samples it includes. - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) - testutil.Ok(tb, err) - return filepath.Join(dir, ulid.String()) -} - -func createHead(tb testing.TB, series []Series) *Head { - head, err := NewHead(nil, nil, nil, 2*60*60*1000) - testutil.Ok(tb, err) - defer head.Close() - - app := head.Appender() - for _, s := range series { - ref := uint64(0) - it := s.Iterator() - for it.Next() { - t, v := it.At() - if ref != 0 { - err := app.AddFast(ref, t, v) - if err == nil { - continue - } - } - ref, err = app.Add(s.Labels(), t, v) - testutil.Ok(tb, err) - } - testutil.Ok(tb, it.Err()) - } - err = app.Commit() - testutil.Ok(tb, err) - return head -} - -const ( - defaultLabelName = "labelName" - defaultLabelValue = "labelValue" -) - -// genSeries generates series with a given number of labels and values. -func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series { - if totalSeries == 0 || labelCount == 0 { - return nil - } - - series := make([]Series, totalSeries) - - for i := 0; i < totalSeries; i++ { - lbls := make(map[string]string, labelCount) - lbls[defaultLabelName] = strconv.Itoa(i) - for j := 1; len(lbls) < labelCount; j++ { - lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) - } - samples := make([]tsdbutil.Sample, 0, maxt-mint+1) - for t := mint; t < maxt; t++ { - samples = append(samples, sample{t: t, v: rand.Float64()}) - } - series[i] = newSeries(lbls, samples) - } - return series -} - // populateSeries generates series from given labels, mint and maxt. func populateSeries(lbls []map[string]string, mint, maxt int64) []Series { if len(lbls) == 0 { diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index e3dc530a..23a72d64 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -32,11 +32,18 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/labels" - "gopkg.in/alecthomas/kingpin.v2" +) + +const ( + printBlocksTableHeader = "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES" + defaultAnalyzeLimit = "20" + timeDelta = 30000 ) func main() { @@ -62,7 +69,7 @@ func execute() (err error) { analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() - analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() + analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default(defaultAnalyzeLimit).Int() dumpCmd = cli.Command("dump", "dump samples from a TSDB") dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() @@ -95,7 +102,7 @@ func execute() (err error) { if err != nil { return err } - printBlocks(blocks, listCmdHumanReadable) + printBlocks(os.Stdout, blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*analyzePath, nil) if err != nil { @@ -110,21 +117,12 @@ func execute() (err error) { if err != nil { return err } - var block tsdb.BlockReader - if *analyzeBlockID != "" { - for _, b := range blocks { - if b.Meta().ULID.String() == *analyzeBlockID { - block = b - break - } - } - } else if len(blocks) > 0 { - block = blocks[len(blocks)-1] - } - if block == nil { - return fmt.Errorf("block not found") + block, err := extractBlock(blocks, analyzeBlockID) + if err != nil { + return err } - return analyzeBlock(block, *analyzeLimit) + + return analyzeBlock(os.Stdout, block, *analyzeLimit) case dumpCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*dumpPath, nil) if err != nil { @@ -140,6 +138,25 @@ func execute() (err error) { return nil } +// extractBlock takes a slice of BlockReader and returns a specific block by ID. +func extractBlock(blocks []tsdb.BlockReader, analyzeBlockID *string) (tsdb.BlockReader, error) { + var block tsdb.BlockReader + if *analyzeBlockID != "" { + for _, b := range blocks { + if b.Meta().ULID.String() == *analyzeBlockID { + block = b + break + } + } + } else if len(blocks) > 0 { + block = blocks[len(blocks)-1] + } + if block == nil { + return nil, fmt.Errorf("block not found") + } + return block, nil +} + type writeBenchmark struct { outPath string samplesFile string @@ -235,8 +252,6 @@ func (b *writeBenchmark) run() error { return nil } -const timeDelta = 30000 - func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) { var mu sync.Mutex var total uint64 @@ -434,11 +449,11 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { return mets, nil } -func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) { - tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) +func printBlocks(w io.Writer, blocks []tsdb.BlockReader, humanReadable *bool) { + tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0) defer tw.Flush() - fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES") + fmt.Fprintln(tw, printBlocksTableHeader) for _, b := range blocks { meta := b.Meta() @@ -461,12 +476,12 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string { return strconv.FormatInt(timestamp, 10) } -func analyzeBlock(b tsdb.BlockReader, limit int) error { +func analyzeBlock(w io.Writer, b tsdb.BlockReader, limit int) error { meta := b.Meta() - fmt.Printf("Block ID: %s\n", meta.ULID) + fmt.Fprintf(w, "Block ID: %s\n", meta.ULID) // Presume 1ms resolution that Prometheus uses. - fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) - fmt.Printf("Series: %d\n", meta.Stats.NumSeries) + fmt.Fprintf(w, "Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) + fmt.Fprintf(w, "Series: %d\n", meta.Stats.NumSeries) ir, err := b.Index() if err != nil { return err @@ -477,7 +492,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { if err != nil { return err } - fmt.Printf("Label names: %d\n", len(allLabelNames)) + fmt.Fprintf(w, "Label names: %d\n", len(allLabelNames)) type postingInfo struct { key string @@ -489,7 +504,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { sort.Slice(postingInfos, func(i, j int) bool { return postingInfos[i].metric > postingInfos[j].metric }) for i, pc := range postingInfos { - fmt.Printf("%d %s\n", pc.metric, pc.key) + fmt.Fprintf(w, "%d %s\n", pc.metric, pc.key) if i >= limit { break } @@ -523,15 +538,15 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { if p.Err() != nil { return p.Err() } - fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered)) - fmt.Printf("Postings entries (total label pairs): %d\n", entries) + fmt.Fprintf(w, "Postings (unique label pairs): %d\n", len(labelpairsUncovered)) + fmt.Fprintf(w, "Postings entries (total label pairs): %d\n", entries) postingInfos = postingInfos[:0] for k, m := range labelpairsUncovered { postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))}) } - fmt.Printf("\nLabel pairs most involved in churning:\n") + fmt.Fprintf(w, "\nLabel pairs most involved in churning:\n") printInfo(postingInfos) postingInfos = postingInfos[:0] @@ -539,7 +554,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))}) } - fmt.Printf("\nLabel names most involved in churning:\n") + fmt.Fprintf(w, "\nLabel names most involved in churning:\n") printInfo(postingInfos) postingInfos = postingInfos[:0] @@ -547,7 +562,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { postingInfos = append(postingInfos, postingInfo{k, m}) } - fmt.Printf("\nMost common label pairs:\n") + fmt.Fprintf(w, "\nMost common label pairs:\n") printInfo(postingInfos) postingInfos = postingInfos[:0] @@ -571,7 +586,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { postingInfos = append(postingInfos, postingInfo{n, cumulativeLength}) } - fmt.Printf("\nLabel names with highest cumulative label value length:\n") + fmt.Fprintf(w, "\nLabel names with highest cumulative label value length:\n") printInfo(postingInfos) postingInfos = postingInfos[:0] @@ -582,7 +597,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { } postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())}) } - fmt.Printf("\nHighest cardinality labels:\n") + fmt.Fprintf(w, "\nHighest cardinality labels:\n") printInfo(postingInfos) postingInfos = postingInfos[:0] @@ -610,7 +625,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { postingInfos = append(postingInfos, postingInfo{n, uint64(count)}) } } - fmt.Printf("\nHighest cardinality metric names:\n") + fmt.Fprintf(w, "\nHighest cardinality metric names:\n") printInfo(postingInfos) return nil } diff --git a/cmd/tsdb/main_test.go b/cmd/tsdb/main_test.go new file mode 100644 index 00000000..63d0429f --- /dev/null +++ b/cmd/tsdb/main_test.go @@ -0,0 +1,227 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "strconv" + "strings" + "testing" + "text/tabwriter" + "time" + + "github.com/prometheus/tsdb" +) + +func createRoDb(t *testing.T) (*tsdb.DBReadOnly, func()) { + tmpdir, err := ioutil.TempDir("", "test") + if err != nil { + os.RemoveAll(tmpdir) + t.Error(err) + } + + safeDBOptions := *tsdb.DefaultOptions + safeDBOptions.RetentionDuration = 0 + + tsdb.CreateBlock(nil, tmpdir, tsdb.GenSeries(1, 1, 0, 1)) + + dbRO, err := tsdb.OpenDBReadOnly(tmpdir, nil) + if err != nil { + t.Error(err) + } + + return dbRO, func() { + os.RemoveAll(tmpdir) + } +} + +func TestPrintBlocks(t *testing.T) { + db, closeFn := createRoDb(t) + defer closeFn() + + var b bytes.Buffer + hr := false + tw := tabwriter.NewWriter(&b, 0, 0, 2, ' ', 0) + defer tw.Flush() + + // Set table header. + _, err := fmt.Fprintln(&b, printBlocksTableHeader) + if err != nil { + t.Error(err) + } + + // Test table header. + actual := b.String() + expected := fmt.Sprintln(printBlocksTableHeader) + if expected != actual { + t.Errorf("expected (%#v) != actual (%#v)", expected, actual) + } + + // Set table contents. + blocks, err := db.Blocks() + if err != nil { + t.Error(err) + } + meta := blocks[0].Meta() + + _, err = fmt.Fprintf(&b, + "%v\t%v\t%v\t%v\t%v\t%v\n", + meta.ULID, + getFormatedTime(meta.MinTime, &hr), + getFormatedTime(meta.MaxTime, &hr), + meta.Stats.NumSamples, + meta.Stats.NumChunks, + meta.Stats.NumSeries, + ) + + if err != nil { + t.Error(err) + } + + // Test table contents. + blocks, err = db.Blocks() + if err != nil { + t.Error(err) + } + + var actualStdout bytes.Buffer + printBlocks(&actualStdout, blocks, &hr) + + actual = actualStdout.String() + actual = strings.Replace(actual, " ", "", -1) + actual = strings.Replace(actual, "\t", "", -1) + actual = strings.Replace(actual, "\n", "", -1) + + expected = b.String() + expected = strings.Replace(expected, " ", "", -1) + expected = strings.Replace(expected, "\t", "", -1) + expected = strings.Replace(expected, "\n", "", -1) + + if expected != actual { + t.Errorf("expected (%#v) != actual (%#v)", b.String(), actualStdout.String()) + } +} + +func TestExtractBlock(t *testing.T) { + db, closeFn := createRoDb(t) + defer closeFn() + + blocks, err := db.Blocks() + if err != nil { + t.Error(err) + } + + var analyzeBlockID string + + // Pass: analyze last block (default). + block, err := extractBlock(blocks, &analyzeBlockID) + if err != nil { + t.Error(err) + } + if block == nil { + t.Error("block shouldn't be nil") + } + + // Pass: analyze specific block. + analyzeBlockID = block.Meta().ULID.String() + block, err = extractBlock(blocks, &analyzeBlockID) + if err != nil { + t.Error(err) + } + if block == nil { + t.Error("block shouldn't be nil") + } + + // Fail: analyze non-existing block + analyzeBlockID = "foo" + block, err = extractBlock(blocks, &analyzeBlockID) + if err == nil { + t.Errorf("Analyzing block ID %q should throw error", analyzeBlockID) + } + if block != nil { + t.Error("block should be nil") + } +} + +func TestAnalyzeBlocks(t *testing.T) { + db, closeFn := createRoDb(t) + defer closeFn() + + blocks, err := db.Blocks() + if err != nil { + t.Error(err) + } + + var analyzeBlockID string + block, err := extractBlock(blocks, &analyzeBlockID) + if err != nil { + t.Error(err) + } + if block == nil { + t.Errorf("block shouldn't be nil") + } + + dal, err := strconv.Atoi(defaultAnalyzeLimit) + if err != nil { + t.Error(err) + } + + var ( + expected bytes.Buffer + actual bytes.Buffer + ) + + // Actual output. + err = analyzeBlock(&actual, block, dal) + if err != nil { + t.Error(err) + } + + act := actual.String() + act = strings.Replace(act, " ", "", -1) + act = strings.Replace(act, "\t", "", -1) + act = strings.Replace(act, "\n", "", -1) + + // Expected output. + meta := block.Meta() + fmt.Fprintf(&expected, "Block ID: %s\n", meta.ULID) + fmt.Fprintf(&expected, "Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) + fmt.Fprintf(&expected, "Series: %d\n", 1) + fmt.Fprintf(&expected, "Label names: %d\n", 1) + fmt.Fprintf(&expected, "Postings (unique label pairs): %d\n", 1) + fmt.Fprintf(&expected, "Postings entries (total label pairs): %d\n", 1) + fmt.Fprintf(&expected, "\nLabel pairs most involved in churning:\n") + fmt.Fprintf(&expected, "1 %s=0", tsdb.MockDefaultLabelName) + fmt.Fprintf(&expected, "\nLabel names most involved in churning:\n") + fmt.Fprintf(&expected, "1 %s", tsdb.MockDefaultLabelName) + fmt.Fprintf(&expected, "\nMost common label pairs:\n") + fmt.Fprintf(&expected, "1 %s=0", tsdb.MockDefaultLabelName) + fmt.Fprintf(&expected, "\nLabel names with highest cumulative label value length:\n") + fmt.Fprintf(&expected, "1 %s", tsdb.MockDefaultLabelName) + fmt.Fprintf(&expected, "\nHighest cardinality labels:\n") + fmt.Fprintf(&expected, "1 %s", tsdb.MockDefaultLabelName) + fmt.Fprintf(&expected, "\nHighest cardinality metric names:\n") + + exp := expected.String() + exp = strings.Replace(exp, " ", "", -1) + exp = strings.Replace(exp, "\t", "", -1) + exp = strings.Replace(exp, "\n", "", -1) + + if exp != act { + t.Errorf("expected (%#v) != actual (%#v)", expected.String(), actual.String()) + } +} diff --git a/compact_test.go b/compact_test.go index 18990ed5..3824f856 100644 --- a/compact_test.go +++ b/compact_test.go @@ -837,7 +837,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs := make([]string, 0, len(c.ranges)) var blocks []*Block for _, r := range c.ranges { - block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil) + block, err := OpenBlock(nil, CreateBlock(b, dir, GenSeries(nSeries, 10, r[0], r[1])), nil) testutil.Ok(b, err) blocks = append(blocks, block) defer func() { @@ -923,9 +923,9 @@ func TestCancelCompactions(t *testing.T) { }() // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000)) - createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000)) - createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. + CreateBlock(t, tmpdir, GenSeries(10, 10000, 0, 1000)) + CreateBlock(t, tmpdir, GenSeries(10, 10000, 1000, 2000)) + CreateBlock(t, tmpdir, GenSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. // Copy the db so we have an exact copy to compare compaction times. tmpdirCopy := tmpdir + "Copy" @@ -1009,7 +1009,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { {MinTime: 150, MaxTime: 200}, } for _, m := range blocks { - createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) + CreateBlock(t, db.Dir(), GenSeries(1, 1, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload") @@ -1032,7 +1032,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { expBlocks := bootStrap(db) // Create a block that will trigger the reload to fail. - blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) + blockPath := CreateBlock(t, db.Dir(), GenSeries(1, 1, 200, 300)) lastBlockIndex := path.Join(blockPath, indexFilename) actBlocks, err := blockDirs(db.Dir()) testutil.Ok(t, err) diff --git a/db_test.go b/db_test.go index 25fb8a7e..3a37a5c3 100644 --- a/db_test.go +++ b/db_test.go @@ -95,7 +95,7 @@ func TestDB_reloadOrder(t *testing.T) { {MinTime: 100, MaxTime: 110}, } for _, m := range metas { - createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) + CreateBlock(t, db.Dir(), GenSeries(1, 1, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) @@ -986,7 +986,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1)) + blockDir := CreateBlock(t, db.Dir(), GenSeries(1, 1, 0, 1)) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -1030,7 +1030,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) + block, err := OpenBlock(nil, CreateBlock(c.t, dest, GenSeries(1, 1, 0, 1)), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -1070,7 +1070,7 @@ func TestTimeRetention(t *testing.T) { } for _, m := range blocks { - createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime)) + CreateBlock(t, db.Dir(), GenSeries(10, 10, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. @@ -1106,7 +1106,7 @@ func TestSizeRetention(t *testing.T) { } for _, m := range blocks { - createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime)) + CreateBlock(t, db.Dir(), GenSeries(100, 10, m.MinTime, m.MaxTime)) } // Test that registered size matches the actual disk size. @@ -1498,7 +1498,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - createBlock(t, dir, genSeries(1, 1, 1000, 2000)) + CreateBlock(t, dir, GenSeries(1, 1, 1000, 2000)) db, err := Open(dir, nil, nil, nil) testutil.Ok(t, err) @@ -1514,7 +1514,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - createBlock(t, dir, genSeries(1, 1, 1000, 6000)) + CreateBlock(t, dir, GenSeries(1, 1, 1000, 6000)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) @@ -1635,7 +1635,7 @@ func TestNoEmptyBlocks(t *testing.T) { {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, } for _, m := range blocks { - createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime)) + CreateBlock(t, db.Dir(), GenSeries(2, 2, m.MinTime, m.MaxTime)) } oldBlocks := db.Blocks() @@ -2117,7 +2117,7 @@ func TestVerticalCompaction(t *testing.T) { }() for _, series := range c.blockSeries { - createBlock(t, tmpdir, series) + CreateBlock(t, tmpdir, series) } opts := *DefaultOptions opts.AllowOverlappingBlocks = true @@ -2177,7 +2177,7 @@ func TestBlockRanges(t *testing.T) { // Test that the compactor doesn't create overlapping blocks // when a non standard block already exists. firstBlockMaxT := int64(3) - createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) + CreateBlock(t, dir, GenSeries(1, 1, 0, firstBlockMaxT)) db, err := Open(dir, logger, nil, DefaultOptions) if err != nil { t.Fatalf("Opening test storage failed: %s", err) @@ -2227,7 +2227,7 @@ func TestBlockRanges(t *testing.T) { testutil.Ok(t, db.Close()) thirdBlockMaxt := secondBlockMaxt + 2 - createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt)) + CreateBlock(t, dir, GenSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt)) db, err = Open(dir, logger, nil, DefaultOptions) if err != nil { @@ -2285,7 +2285,7 @@ func TestDBReadOnly(t *testing.T) { } for _, m := range dbBlocks { - createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime)) + CreateBlock(t, dbDir, GenSeries(1, 1, m.MinTime, m.MaxTime)) } expSeriesCount++ } diff --git a/head_test.go b/head_test.go index 040ae828..37d29378 100644 --- a/head_test.go +++ b/head_test.go @@ -36,7 +36,7 @@ import ( ) func BenchmarkCreateSeries(b *testing.B) { - series := genSeries(b.N, 10, 0, 0) + series := GenSeries(b.N, 10, 0, 0) h, err := NewHead(nil, nil, nil, 10000) testutil.Ok(b, err) diff --git a/mocks.go b/mocks.go new file mode 100644 index 00000000..d11103ae --- /dev/null +++ b/mocks.go @@ -0,0 +1,157 @@ +/* + * Copyright 2019 The Prometheus Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tsdb + +import ( + "context" + "math/rand" + "os" + "path/filepath" + "sort" + "strconv" + "testing" + + "github.com/go-kit/kit/log" + + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tsdbutil" +) + +// This file holds types and functions that are used for testing +// purposes. + +const ( + MockDefaultLabelName = "labelName" + MockDefaultLabelValue = "labelValue" +) + +type mockSeries struct { + labels func() labels.Labels + iterator func() SeriesIterator +} + +// CreateBlock creates a block with given set of series and returns its dir. +// Intended for testing purposes. +func CreateBlock(tb testing.TB, dir string, series []Series) string { + head := createHead(tb, series) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) + testutil.Ok(tb, err) + + testutil.Ok(tb, os.MkdirAll(dir, 0777)) + + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + testutil.Ok(tb, err) + return filepath.Join(dir, ulid.String()) +} + +func createHead(tb testing.TB, series []Series) *Head { + head, err := NewHead(nil, nil, nil, 2*60*60*1000) + testutil.Ok(tb, err) + defer head.Close() + + app := head.Appender() + for _, s := range series { + ref := uint64(0) + it := s.Iterator() + for it.Next() { + t, v := it.At() + if ref != 0 { + err := app.AddFast(ref, t, v) + if err == nil { + continue + } + } + ref, err = app.Add(s.Labels(), t, v) + testutil.Ok(tb, err) + } + testutil.Ok(tb, it.Err()) + } + err = app.Commit() + testutil.Ok(tb, err) + return head +} + +// GenSeries generates series with a given number of labels and values. +// Intended for testing purposes. +func GenSeries(totalSeries, labelCount int, mint, maxt int64) []Series { + if totalSeries == 0 || labelCount == 0 { + return nil + } + + series := make([]Series, totalSeries) + + for i := 0; i < totalSeries; i++ { + lbls := make(map[string]string, labelCount) + lbls[MockDefaultLabelName] = strconv.Itoa(i) + for j := 1; len(lbls) < labelCount; j++ { + lbls[MockDefaultLabelName+strconv.Itoa(j)] = MockDefaultLabelValue + strconv.Itoa(j) + } + samples := make([]tsdbutil.Sample, 0, maxt-mint+1) + for t := mint; t < maxt; t++ { + samples = append(samples, sample{t: t, v: rand.Float64()}) + } + series[i] = newSeries(lbls, samples) + } + return series +} + +func newSeries(l map[string]string, s []tsdbutil.Sample) Series { + return &mockSeries{ + labels: func() labels.Labels { return labels.FromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } +} +func (m *mockSeries) Labels() labels.Labels { return m.labels() } +func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } + +type listSeriesIterator struct { + list []tsdbutil.Sample + idx int +} + +func newListSeriesIterator(list []tsdbutil.Sample) *listSeriesIterator { + return &listSeriesIterator{list: list, idx: -1} +} + +func (it *listSeriesIterator) At() (int64, float64) { + s := it.list[it.idx] + return s.T(), s.V() +} + +func (it *listSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Seek(t int64) bool { + if it.idx == -1 { + it.idx = 0 + } + // Do binary search between current position and end. + it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + s := it.list[i+it.idx] + return s.T() >= t + }) + + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Err() error { + return nil +} diff --git a/querier_test.go b/querier_test.go index 2be48fcd..5a4a4f8f 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1428,56 +1428,6 @@ func (m mockIndex) LabelNames() ([]string, error) { return labelNames, nil } -type mockSeries struct { - labels func() labels.Labels - iterator func() SeriesIterator -} - -func newSeries(l map[string]string, s []tsdbutil.Sample) Series { - return &mockSeries{ - labels: func() labels.Labels { return labels.FromMap(l) }, - iterator: func() SeriesIterator { return newListSeriesIterator(s) }, - } -} -func (m *mockSeries) Labels() labels.Labels { return m.labels() } -func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } - -type listSeriesIterator struct { - list []tsdbutil.Sample - idx int -} - -func newListSeriesIterator(list []tsdbutil.Sample) *listSeriesIterator { - return &listSeriesIterator{list: list, idx: -1} -} - -func (it *listSeriesIterator) At() (int64, float64) { - s := it.list[it.idx] - return s.T(), s.V() -} - -func (it *listSeriesIterator) Next() bool { - it.idx++ - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Seek(t int64) bool { - if it.idx == -1 { - it.idx = 0 - } - // Do binary search between current position and end. - it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { - s := it.list[i+it.idx] - return s.T() >= t - }) - - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Err() error { - return nil -} - func BenchmarkQueryIterator(b *testing.B) { cases := []struct { numBlocks int @@ -1516,14 +1466,14 @@ func BenchmarkQueryIterator(b *testing.B) { mint := i*int64(c.numSamplesPerSeriesPerBlock) - offset maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 if len(prefilledLabels) == 0 { - generatedSeries = genSeries(c.numSeries, 10, mint, maxt) + generatedSeries = GenSeries(c.numSeries, 10, mint, maxt) for _, s := range generatedSeries { prefilledLabels = append(prefilledLabels, s.Labels().Map()) } } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, CreateBlock(b, dir, generatedSeries), nil) testutil.Ok(b, err) blocks = append(blocks, block) defer block.Close() @@ -1590,14 +1540,14 @@ func BenchmarkQuerySeek(b *testing.B) { mint := i*int64(c.numSamplesPerSeriesPerBlock) - offset maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 if len(prefilledLabels) == 0 { - generatedSeries = genSeries(c.numSeries, 10, mint, maxt) + generatedSeries = GenSeries(c.numSeries, 10, mint, maxt) for _, s := range generatedSeries { prefilledLabels = append(prefilledLabels, s.Labels().Map()) } } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, CreateBlock(b, dir, generatedSeries), nil) testutil.Ok(b, err) blocks = append(blocks, block) defer block.Close() @@ -1735,14 +1685,14 @@ func BenchmarkSetMatcher(b *testing.B) { mint := i * int64(c.numSamplesPerSeriesPerBlock) maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 if len(prefilledLabels) == 0 { - generatedSeries = genSeries(c.numSeries, 10, mint, maxt) + generatedSeries = GenSeries(c.numSeries, 10, mint, maxt) for _, s := range generatedSeries { prefilledLabels = append(prefilledLabels, s.Labels().Map()) } } else { generatedSeries = populateSeries(prefilledLabels, mint, maxt) } - block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + block, err := OpenBlock(nil, CreateBlock(b, dir, generatedSeries), nil) testutil.Ok(b, err) blocks = append(blocks, block) defer block.Close() @@ -2090,8 +2040,8 @@ func TestClose(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - createBlock(t, dir, genSeries(1, 1, 0, 10)) - createBlock(t, dir, genSeries(1, 1, 10, 20)) + CreateBlock(t, dir, GenSeries(1, 1, 0, 10)) + CreateBlock(t, dir, GenSeries(1, 1, 10, 20)) db, err := Open(dir, nil, nil, DefaultOptions) if err != nil { @@ -2155,7 +2105,7 @@ func BenchmarkQueries(b *testing.B) { testutil.Ok(b, os.RemoveAll(dir)) }() - series := genSeries(nSeries, 5, 1, int64(nSamples)) + series := GenSeries(nSeries, 5, 1, int64(nSamples)) // Add some common labels to make the matchers select these series. { @@ -2181,7 +2131,7 @@ func BenchmarkQueries(b *testing.B) { qs := []Querier{} for x := 0; x <= 10; x++ { - block, err := OpenBlock(nil, createBlock(b, dir, series), nil) + block, err := OpenBlock(nil, CreateBlock(b, dir, series), nil) testutil.Ok(b, err) q, err := NewBlockQuerier(block, 1, int64(nSamples)) testutil.Ok(b, err)