From 76b3134a4edd06c2910eb91ed98934e9ddafd7ad Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 1 Aug 2024 13:02:33 -0700 Subject: [PATCH 1/4] Reduce overall chunker memory use When the chunker is not able to fill a chunk with data, it allocated a new buffer for the partial chunk of data. With many files this results in allocation of many small buffers of varying sizes, leading to heap fragmentation. This PR allocates a new buffer from the pool only if doing so would save space, otherwise it uses an partially filled (over-allocated) chunk. This makes all chunker allocation sizes be powers of 2. Heap fragmentation is reduced at the cost of some temporary over-allocation. The advantage is that the overallocation is much shorter lived than the heap fragmentation. --- chunker/splitting.go | 34 ++++++++++++++++++++++++---------- chunker/splitting_test.go | 3 +-- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/chunker/splitting.go b/chunker/splitting.go index 64306943b..9557b452b 100644 --- a/chunker/splitting.go +++ b/chunker/splitting.go @@ -5,6 +5,7 @@ package chunk import ( + "errors" "io" logging "github.com/ipfs/go-log/v2" @@ -13,6 +14,10 @@ import ( var log = logging.Logger("chunk") +// Maximum allowed chunk over-allocation without re-allocating a buffer of the +// exact size needed to hold data. +const maxOverAllocBytes = 1024 + // A Splitter reads bytes from a Reader and creates "chunks" (byte slices) // that can be used to build DAG nodes. type Splitter interface { @@ -81,19 +86,28 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { full := pool.Get(int(ss.size)) n, err := io.ReadFull(ss.r, full) - switch err { - case io.ErrUnexpectedEOF: - ss.err = io.EOF - small := make([]byte, n) - copy(small, full) - pool.Put(full) - return small, nil - case nil: - return full, nil - default: + if err != nil { + if errors.Is(err, io.ErrUnexpectedEOF) { + ss.err = io.EOF + // Do not return an emty buffer. + if n == 0 { + pool.Put(full) + return nil, nil + } + // If reallocating from pool would save space. + if n <= (cap(full) >> 1) { + small := pool.Get(n) + copy(small, full) + pool.Put(full) + return small, nil + } + // Use overallocated chunk. + return full[:n], nil + } pool.Put(full) return nil, err } + return full, nil } // Reader returns the io.Reader associated to this Splitter. diff --git a/chunker/splitting_test.go b/chunker/splitting_test.go index 23170ee37..c35815f07 100644 --- a/chunker/splitting_test.go +++ b/chunker/splitting_test.go @@ -33,7 +33,7 @@ func TestSizeSplitterOverAllocate(t *testing.T) { if err != nil { t.Fatal(err) } - if cap(chunk) > len(chunk) { + if cap(chunk)-len(chunk) > cap(chunk)/2 { t.Fatal("chunk capacity too large") } } @@ -89,7 +89,6 @@ func TestSizeSplitterFillsChunks(t *testing.T) { sofar := 0 whole := make([]byte, max) for chunk := range c { - bc := b[sofar : sofar+len(chunk)] if !bytes.Equal(bc, chunk) { t.Fatalf("chunk not correct: (sofar: %d) %d != %d, %v != %v", sofar, len(bc), len(chunk), bc[:100], chunk[:100]) From 32a7fbc750caffe3ee50139f503eaf64c21f9375 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 1 Aug 2024 13:31:01 -0700 Subject: [PATCH 2/4] update changelog --- CHANGELOG.md | 1 + chunker/splitting.go | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1be6d134..c2877a24d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The following emojis are used to highlight certain changes: - `bitswap` unify logger names to use uniform format bitswap/path/pkgname - `gateway` now always returns meaningful cache-control headers for generated HTML listings of UnixFS directories - generate random test data using `ipfs/go-test` instead of internal util code +- Reduce overall memory use by chunker by reducing heap fragmentation ### Removed diff --git a/chunker/splitting.go b/chunker/splitting.go index 9557b452b..8b7d36502 100644 --- a/chunker/splitting.go +++ b/chunker/splitting.go @@ -14,10 +14,6 @@ import ( var log = logging.Logger("chunk") -// Maximum allowed chunk over-allocation without re-allocating a buffer of the -// exact size needed to hold data. -const maxOverAllocBytes = 1024 - // A Splitter reads bytes from a Reader and creates "chunks" (byte slices) // that can be used to build DAG nodes. type Splitter interface { From 30beb291ac15cb42762b6af5a8666905b1d50a99 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 2 Aug 2024 10:17:14 -0700 Subject: [PATCH 3/4] Benchmark pool vs no pool for many files --- chunker/benchmark_test.go | 36 ++++++++++++++++++++++ chunker/splitting_test.go | 63 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/chunker/benchmark_test.go b/chunker/benchmark_test.go index 5069b0653..9374da45e 100644 --- a/chunker/benchmark_test.go +++ b/chunker/benchmark_test.go @@ -57,3 +57,39 @@ func benchmarkChunkerSize(b *testing.B, ns newSplitter, size int) { } Res = Res + res } + +func benchmarkFilesAlloc(b *testing.B, ns newSplitter) { + const ( + chunkSize = 4096 + minDataSize = 20000 + maxDataSize = 60000 + fileCount = 10000 + ) + rng := rand.New(rand.NewSource(1)) + data := make([]byte, maxDataSize) + rng.Read(data) + + b.SetBytes(maxDataSize) + b.ReportAllocs() + b.ResetTimer() + + var res uint64 + + for i := 0; i < b.N; i++ { + for j := 0; j < fileCount; j++ { + fileSize := rng.Intn(maxDataSize-minDataSize) + minDataSize + r := ns(bytes.NewReader(data[:fileSize])) + for { + chunk, err := r.NextBytes() + if err != nil { + if err == io.EOF { + break + } + b.Fatal(err) + } + res = res + uint64(len(chunk)) + } + } + } + Res = Res + res +} diff --git a/chunker/splitting_test.go b/chunker/splitting_test.go index c35815f07..f999185df 100644 --- a/chunker/splitting_test.go +++ b/chunker/splitting_test.go @@ -2,6 +2,7 @@ package chunk import ( "bytes" + "errors" "io" "testing" @@ -126,3 +127,65 @@ func BenchmarkDefault(b *testing.B) { return DefaultSplitter(r) }) } + +// BenchmarkFilesAllocPool benchmarks splitter that uses go-buffer-pool, +// simulating use in unixfs with many small files. +func BenchmarkFilesAllocPool(b *testing.B) { + const fileBlockSize = 4096 + + benchmarkFilesAlloc(b, func(r io.Reader) Splitter { + return NewSizeSplitter(r, fileBlockSize) + }) +} + +// BenchmarkFilesAllocPool benchmarks splitter that does not use +// go-buffer-pool, simulating use in unixfs with many small files. +func BenchmarkFilesAllocNoPool(b *testing.B) { + const fileBlockSize = 4096 + + benchmarkFilesAlloc(b, func(r io.Reader) Splitter { + return &sizeSplitterNoPool{ + r: r, + size: uint32(fileBlockSize), + } + }) +} + +// sizeSplitterNoPool implements Splitter that allocates without pool. Provided +// for benchmarking against implementation with pool. +type sizeSplitterNoPool struct { + r io.Reader + size uint32 + err error +} + +func (ss *sizeSplitterNoPool) NextBytes() ([]byte, error) { + const maxOverAllocBytes = 512 + + if ss.err != nil { + return nil, ss.err + } + + full := make([]byte, ss.size) + n, err := io.ReadFull(ss.r, full) + if err != nil { + if errors.Is(err, io.ErrUnexpectedEOF) { + ss.err = io.EOF + if n == 0 { + return nil, nil + } + if cap(full)-n < maxOverAllocBytes { + return full[:n], nil + } + small := make([]byte, n) + copy(small, full) + return small, nil + } + return nil, err + } + return full, nil +} + +func (ss *sizeSplitterNoPool) Reader() io.Reader { + return ss.r +} From 5f2dbb0acb1fc348fcf66e86eada5a7d9e15b57e Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 5 Aug 2024 03:45:07 -0700 Subject: [PATCH 4/4] Limit amount of overallocation --- chunker/splitting.go | 46 +++++++++++++++++++++++++++------------ chunker/splitting_test.go | 7 +----- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/chunker/splitting.go b/chunker/splitting.go index 8b7d36502..6340c0f50 100644 --- a/chunker/splitting.go +++ b/chunker/splitting.go @@ -7,6 +7,7 @@ package chunk import ( "errors" "io" + "math/bits" logging "github.com/ipfs/go-log/v2" pool "github.com/libp2p/go-buffer-pool" @@ -14,6 +15,10 @@ import ( var log = logging.Logger("chunk") +// maxOverAllocBytes is the maximum unused space a chunk can have without being +// reallocated to a smaller size to fit the data. +const maxOverAllocBytes = 1024 + // A Splitter reads bytes from a Reader and creates "chunks" (byte slices) // that can be used to build DAG nodes. type Splitter interface { @@ -85,20 +90,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { if err != nil { if errors.Is(err, io.ErrUnexpectedEOF) { ss.err = io.EOF - // Do not return an emty buffer. - if n == 0 { - pool.Put(full) - return nil, nil - } - // If reallocating from pool would save space. - if n <= (cap(full) >> 1) { - small := pool.Get(n) - copy(small, full) - pool.Put(full) - return small, nil - } - // Use overallocated chunk. - return full[:n], nil + return reallocChunk(full, n), nil } pool.Put(full) return nil, err @@ -106,6 +98,32 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { return full, nil } +func reallocChunk(full []byte, n int) []byte { + // Do not return an empty buffer. + if n == 0 { + pool.Put(full) + return nil + } + + // If chunk is close enough to fully used. + if cap(full)-n <= maxOverAllocBytes { + return full[:n] + } + + var small []byte + // If reallocating to the nearest power of two saves space without leaving + // too much unused space. + powTwoSize := 1 << bits.Len32(uint32(n-1)) + if powTwoSize-n <= maxOverAllocBytes { + small = make([]byte, n, powTwoSize) + } else { + small = make([]byte, n) + } + copy(small, full) + pool.Put(full) + return small +} + // Reader returns the io.Reader associated to this Splitter. func (ss *sizeSplitterv2) Reader() io.Reader { return ss.r diff --git a/chunker/splitting_test.go b/chunker/splitting_test.go index f999185df..d21faf512 100644 --- a/chunker/splitting_test.go +++ b/chunker/splitting_test.go @@ -34,7 +34,7 @@ func TestSizeSplitterOverAllocate(t *testing.T) { if err != nil { t.Fatal(err) } - if cap(chunk)-len(chunk) > cap(chunk)/2 { + if cap(chunk)-len(chunk) > maxOverAllocBytes { t.Fatal("chunk capacity too large") } } @@ -160,8 +160,6 @@ type sizeSplitterNoPool struct { } func (ss *sizeSplitterNoPool) NextBytes() ([]byte, error) { - const maxOverAllocBytes = 512 - if ss.err != nil { return nil, ss.err } @@ -174,9 +172,6 @@ func (ss *sizeSplitterNoPool) NextBytes() ([]byte, error) { if n == 0 { return nil, nil } - if cap(full)-n < maxOverAllocBytes { - return full[:n], nil - } small := make([]byte, n) copy(small, full) return small, nil