From c128d21220b8ca4d710c628df5785acb7334d8b8 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Tue, 28 May 2024 13:09:57 +0200 Subject: [PATCH] Organize Buffer's segments as a regular list Previously, Buffer's segments were organized into a circular list. That allowed storing only a single reference to buffer's head, and also facilitated insertion/removal of new list nodes. The downside of a circular list is that one has to always compare a current node with a head when iterating over segments. That complicates the implementation of a public API for segments iterations. See https://github.com/Kotlin/kotlinx-io/issues/135#issuecomment-2125014081 for details on segment iteration API. --- core/apple/src/AppleCore.kt | 6 +- core/apple/src/BuffersApple.kt | 9 ++- core/common/src/Buffer.kt | 133 +++++++++++++++++++++++---------- core/common/src/Buffers.kt | 9 ++- core/common/src/ByteStrings.kt | 9 ++- core/common/src/Segment.kt | 69 ++++++++++------- core/common/src/Utf8.kt | 3 +- core/common/test/util.kt | 10 +-- core/jvm/src/BuffersJvm.kt | 12 +-- core/jvm/src/JvmCore.kt | 6 +- core/jvm/src/SourcesJvm.kt | 3 +- 11 files changed, 162 insertions(+), 107 deletions(-) diff --git a/core/apple/src/AppleCore.kt b/core/apple/src/AppleCore.kt index b277f2c64..36d235078 100644 --- a/core/apple/src/AppleCore.kt +++ b/core/apple/src/AppleCore.kt @@ -52,8 +52,7 @@ private open class OutputStreamSink( source.size -= bytesWritten if (head.pos == head.limit) { - source.head = head.pop() - SegmentPool.recycle(head) + source.recycleHead() } } } @@ -101,8 +100,7 @@ private open class NSInputStreamSource( if (bytesRead == 0L) { if (tail.pos == tail.limit) { // We allocated a tail segment, but didn't end up needing it. Recycle! - sink.head = tail.pop() - SegmentPool.recycle(tail) + sink.recycleTail() } return -1 } diff --git a/core/apple/src/BuffersApple.kt b/core/apple/src/BuffersApple.kt index ef477fe04..587bda03a 100644 --- a/core/apple/src/BuffersApple.kt +++ b/core/apple/src/BuffersApple.kt @@ -8,7 +8,9 @@ package kotlinx.io import kotlinx.cinterop.* -import platform.Foundation.* +import platform.Foundation.NSData +import platform.Foundation.create +import platform.Foundation.data import platform.darwin.NSUIntegerMax import platform.posix.* @@ -44,8 +46,7 @@ internal fun Buffer.readAtMostTo(sink: CPointer, maxLength: Int): In size -= toCopy.toLong() if (s.pos == s.limit) { - head = s.pop() - SegmentPool.recycle(s) + recycleHead() } return toCopy @@ -70,6 +71,6 @@ internal fun Buffer.snapshotAsNSData(): NSData { } curr = curr.next index += length - } while (curr !== head) + } while (curr !== null) return NSData.create(bytesNoCopy = bytes, length = size.convert()) } diff --git a/core/common/src/Buffer.kt b/core/common/src/Buffer.kt index 0d6ffc0cf..97acd56ae 100644 --- a/core/common/src/Buffer.kt +++ b/core/common/src/Buffer.kt @@ -42,6 +42,9 @@ public class Buffer : Source, Sink { @JvmField internal var head: Segment? = null + @JvmField + internal var tail: Segment? = null + /** * The number of bytes accessible for read from this buffer. */ @@ -76,8 +79,7 @@ public class Buffer : Source, Sink { val b = data[pos++] size -= 1L if (pos == limit) { - head = segment.pop() - SegmentPool.recycle(segment) + recycleHead() } else { segment.pos = pos } @@ -102,8 +104,7 @@ public class Buffer : Source, Sink { size -= 2L if (pos == limit) { - head = segment.pop() - SegmentPool.recycle(segment) + recycleHead() } else { segment.pos = pos } @@ -138,8 +139,7 @@ public class Buffer : Source, Sink { size -= 4L if (pos == limit) { - head = segment.pop() - SegmentPool.recycle(segment) + recycleHead() } else { segment.pos = pos } @@ -176,8 +176,7 @@ public class Buffer : Source, Sink { size -= 8L if (pos == limit) { - head = segment.pop() - SegmentPool.recycle(segment) + recycleHead() } else { segment.pos = pos } @@ -242,11 +241,10 @@ public class Buffer : Source, Sink { copy.pos += currentOffset.toInt() copy.limit = minOf(copy.pos + remainingByteCount.toInt(), copy.limit) if (out.head == null) { - copy.prev = copy - copy.next = copy.prev - out.head = copy.next + out.head = copy + out.tail = copy } else { - out.head!!.prev!!.push(copy) + out.tail = out.tail!!.push(copy) } remainingByteCount -= (copy.limit - copy.pos).toLong() currentOffset = 0L @@ -264,7 +262,7 @@ public class Buffer : Source, Sink { if (result == 0L) return 0L // Omit the tail if it's still writable. - val tail = head!!.prev!! + val tail = tail!! if (tail.limit < Segment.SIZE && tail.owner) { result -= (tail.limit - tail.pos).toLong() } @@ -317,8 +315,7 @@ public class Buffer : Source, Sink { head.pos += toSkip if (head.pos == head.limit) { - this.head = head.pop() - SegmentPool.recycle(head) + recycleHead() } } } @@ -336,8 +333,7 @@ public class Buffer : Source, Sink { size -= toCopy.toLong() if (s.pos == s.limit) { - head = s.pop() - SegmentPool.recycle(s) + recycleHead() } return toCopy @@ -377,19 +373,20 @@ public class Buffer : Source, Sink { internal fun writableSegment(minimumCapacity: Int): Segment { require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" } - if (head == null) { + if (tail == null) { val result = SegmentPool.take() // Acquire a first segment. head = result - result.prev = result - result.next = result + tail = result return result } - var tail = head!!.prev - if (tail!!.limit + minimumCapacity > Segment.SIZE || !tail.owner) { - tail = tail.push(SegmentPool.take()) // Append a new empty segment to fill up. + val t = tail!! + if (t.limit + minimumCapacity > Segment.SIZE || !t.owner) { + val newTail = t.push(SegmentPool.take()) // Append a new empty segment to fill up. + tail = newTail + return newTail } - return tail + return t } override fun write(source: ByteArray, startIndex: Int, endIndex: Int) { @@ -486,7 +483,7 @@ public class Buffer : Source, Sink { while (remainingByteCount > 0L) { // Is a prefix of the source's head segment all that we need to move? if (remainingByteCount < source.head!!.limit - source.head!!.pos) { - val tail = if (head != null) head!!.prev else null + val tail = tail if (tail != null && tail.owner && remainingByteCount + tail.limit - (if (tail.shared) 0 else tail.pos) <= Segment.SIZE ) { @@ -498,7 +495,11 @@ public class Buffer : Source, Sink { } else { // We're going to need another segment. Split the source's head // segment in two, then move the first of those two to this buffer. - source.head = source.head!!.split(remainingByteCount.toInt()) + val newHead = source.head!!.split(remainingByteCount.toInt()) + if (source.head === source.tail) { + source.tail = newHead + } + source.head = newHead } } @@ -506,14 +507,20 @@ public class Buffer : Source, Sink { val segmentToMove = source.head val movedByteCount = (segmentToMove!!.limit - segmentToMove.pos).toLong() source.head = segmentToMove.pop() + if (source.head == null) { + source.tail = null + } if (head == null) { head = segmentToMove - segmentToMove.prev = segmentToMove - segmentToMove.next = segmentToMove.prev + tail = segmentToMove + segmentToMove.prev = null + segmentToMove.next = null } else { - var tail = head!!.prev - tail = tail!!.push(segmentToMove) - tail.compact() + val newTail = tail!!.push(segmentToMove).compact() + tail = newTail + if (newTail.prev == null) { + this.head = newTail + } } source.size -= movedByteCount size += movedByteCount @@ -582,16 +589,15 @@ public class Buffer : Source, Sink { val result = Buffer() if (size == 0L) return result - val head = head!! + val head = this.head!! val headCopy = head.sharedCopy() result.head = headCopy - headCopy.prev = result.head - headCopy.next = headCopy.prev + result.tail = headCopy var s = head.next - while (s !== head) { - headCopy.prev!!.push(s!!.sharedCopy()) + while (s != null) { + result.tail = result.tail!!.push(s.sharedCopy()) s = s.next } @@ -642,6 +648,48 @@ public class Buffer : Source, Sink { return "Buffer(size=$size hex=$builder)" } + + /** + * Unlinks and recycles this buffer's head. + * + * If head had a successor, it'll become a new head. + * Otherwise, both [head] and [tail] will be set to null. + * + * It's up to a caller to ensure that the head exists. + */ + internal fun recycleHead() { + val oldHead = head!! + val nextHead = oldHead.next + head = nextHead + if (nextHead == null) { + tail = null + } else { + nextHead.prev = null + } + oldHead.next = null + SegmentPool.recycle(oldHead) + } + + /** + * Unlinks and recycles this buffer's tail segment. + * + * If tail had a predecessor, it'll become a new tail. + * Otherwise, both [head] and [tail] will be set to null. + * + * It's up to a caller to ensure that the tail exists. + */ + internal fun recycleTail() { + val oldTail = tail!! + val newTail = oldTail.prev + tail = newTail + if (newTail == null) { + head = null + } else { + newTail.next = null + } + oldTail.prev = null + SegmentPool.recycle(oldTail) + } } /** @@ -652,23 +700,26 @@ internal inline fun Buffer.seek( fromIndex: Long, lambda: (Segment?, Long) -> T ): T { - var s: Segment = head ?: return lambda(null, -1L) + if (this.head == null) lambda(null, -1L) if (size - fromIndex < fromIndex) { + var s = tail // We're scanning in the back half of this buffer. Find the segment starting at the back. var offset = size - while (offset > fromIndex) { - s = s.prev!! + while (s != null && offset > fromIndex) { offset -= (s.limit - s.pos).toLong() + if (offset <= fromIndex) break + s = s.prev } return lambda(s, offset) } else { + var s = this.head // We're scanning in the front half of this buffer. Find the segment starting at the front. var offset = 0L - while (true) { + while (s != null) { val nextOffset = offset + (s.limit - s.pos) if (nextOffset > fromIndex) break - s = s.next!! + s = s.next offset = nextOffset } return lambda(s, offset) diff --git a/core/common/src/Buffers.kt b/core/common/src/Buffers.kt index f6d69f1e4..6709aad6b 100644 --- a/core/common/src/Buffers.kt +++ b/core/common/src/Buffers.kt @@ -24,7 +24,7 @@ public fun Buffer.snapshot(): ByteString { check(curr != null) { "Current segment is null" } append(curr.data, curr.pos, curr.limit) curr = curr.next - } while (curr !== head) + } while (curr !== null) } } @@ -53,10 +53,11 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz if (o == -1L) { return -1L } - var segment = seg!! + var segment: Segment? = seg!! var offset = o do { check(endOffset > offset) + segment!! val idx = segment.indexOf( byte, // If start index within this segment, the diff will be positive and @@ -71,8 +72,8 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz return offset + idx.toLong() } offset += segment.size - segment = segment.next!! - } while (segment !== head && offset < endOffset) + segment = segment.next + } while (segment !== null && offset < endOffset) return -1L } } diff --git a/core/common/src/ByteStrings.kt b/core/common/src/ByteStrings.kt index b2d1a6fba..30873d548 100644 --- a/core/common/src/ByteStrings.kt +++ b/core/common/src/ByteStrings.kt @@ -129,9 +129,10 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long { if (o == -1L) { return -1L } - var segment = seg!! + var segment: Segment? = seg var offset = o do { + segment!! // If start index within this segment, the diff will be positive and // we'll scan the segment starting from the corresponding offset. // Otherwise, the diff will be negative and we'll scan the segment from the beginning. @@ -147,7 +148,7 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long { val firstOutboundOffset = maxOf(startOffset, segment.size - byteStringData.size + 1) // Try to find a pattern in all suffixes shorter than the pattern. These suffixes start // in the current segment, but ends in the following segments; thus we're using outbound function. - val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset, head) + val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset) if (idx1 != -1) { // Offset corresponds to the segment's start, idx - to offset within the segment. return offset + idx1.toLong() @@ -155,8 +156,8 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long { // We scanned the whole segment, so let's go to the next one offset += segment.size - segment = segment.next!! - } while (segment !== head && offset + byteString.size <= size) + segment = segment.next + } while (segment !== null && offset + byteString.size <= size) return -1L } } diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index 08d87908c..6332c1a41 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -25,7 +25,7 @@ import kotlin.jvm.JvmField /** * A segment of a buffer. * - * Each segment in a buffer is a circularly-linked list node referencing the following and + * Each segment in a buffer is a doubly-linked list node referencing the following and * preceding segments in the buffer. * * Each segment in the pool is a singly-linked list node referencing the rest of segments in the pool. @@ -61,11 +61,11 @@ internal class Segment { @JvmField var owner: Boolean = false - /** Next segment in a linked or circularly-linked list. */ + /** Next segment in a list, or null. */ @JvmField var next: Segment? = null - /** Previous segment in a circularly-linked list. */ + /** Previous segment in the list, or null. */ @JvmField var prev: Segment? = null @@ -93,40 +93,43 @@ internal class Segment { return Segment(data, pos, limit, true, false) } - /** Returns a new segment that its own private copy of the underlying byte array. */ - fun unsharedCopy() = Segment(data.copyOf(), pos, limit, false, true) - /** - * Removes this segment of a circularly-linked list and returns its successor. + * Removes this segment of a list and returns its successor. * Returns null if the list is now empty. */ fun pop(): Segment? { - val result = if (next !== this) next else null - prev!!.next = next - next!!.prev = prev - next = null - prev = null + val result = this.next + if (this.prev != null) { + this.prev!!.next = this.next + } + if (this.next != null) { + this.next!!.prev = this.prev + } + this.next = null + this.prev = null return result } /** - * Appends `segment` after this segment in the circularly-linked list. Returns the pushed segment. + * Appends `segment` after this segment in the list. Returns the pushed segment. */ fun push(segment: Segment): Segment { segment.prev = this - segment.next = next - next!!.prev = segment - next = segment + segment.next = this.next + if (this.next != null) { + this.next!!.prev = segment + } + this.next = segment return segment } /** - * Splits this head of a circularly-linked list into two segments. The first segment contains the + * Splits this head of a list into two segments. The first segment contains the * data in `[pos..pos+byteCount)`. The second segment contains the data in * `[pos+byteCount..limit)`. This can be useful when moving partial segments from one buffer to * another. * - * Returns the new head of the circularly-linked list. + * Returns the new head of the list. */ fun split(byteCount: Int): Segment { require(byteCount > 0 && byteCount <= limit - pos) { "byteCount out of range" } @@ -146,7 +149,12 @@ internal class Segment { prefix.limit = prefix.pos + byteCount pos += byteCount - prev!!.push(prefix) + if (this.prev != null) { + this.prev!!.push(prefix) + } else { + prefix.next = this + this.prev = prefix + } return prefix } @@ -154,15 +162,18 @@ internal class Segment { * Call this when the tail and its predecessor may both be less than half full. This will copy * data so that segments can be recycled. */ - fun compact() { - check(prev !== this) { "cannot compact" } - if (!prev!!.owner) return // Cannot compact: prev isn't writable. + fun compact(): Segment { + check(this.prev !== null) { "cannot compact" } + if (!this.prev!!.owner) return this // Cannot compact: prev isn't writable. val byteCount = limit - pos - val availableByteCount = SIZE - prev!!.limit + if (prev!!.shared) 0 else prev!!.pos - if (byteCount > availableByteCount) return // Cannot compact: not enough writable space. - writeTo(prev!!, byteCount) - pop() + val availableByteCount = SIZE - this.prev!!.limit + if (this.prev!!.shared) 0 else this.prev!!.pos + if (byteCount > availableByteCount) return this // Cannot compact: not enough writable space. + val predecessor = this.prev + writeTo(predecessor!!, byteCount) + val successor = pop() + check(successor === null) SegmentPool.recycle(this) + return predecessor } /** Moves `byteCount` bytes from this segment to `sink`. */ @@ -248,7 +259,7 @@ internal fun Segment.indexOfBytesInbound(bytes: ByteArray, startOffset: Int): In * and continued in the following segments. * `startOffset` is relative and should be within `[0, size)`. */ -internal fun Segment.indexOfBytesOutbound(bytes: ByteArray, startOffset: Int, head: Segment?): Int { +internal fun Segment.indexOfBytesOutbound(bytes: ByteArray, startOffset: Int): Int { var offset = startOffset val firstByte = bytes[0] @@ -267,8 +278,8 @@ internal fun Segment.indexOfBytesOutbound(bytes: ByteArray, startOffset: Int, he // so let's take the next one and continue the scan there. if (scanOffset == seg.size) { val next = seg.next - if (next === head) return -1 - seg = next!! + if (next === null) return -1 + seg = next scanOffset = 0 // we're scanning the next segment right from the beginning } if (element != seg.data[seg.pos + scanOffset]) { diff --git a/core/common/src/Utf8.kt b/core/common/src/Utf8.kt index eaa9912d2..b5028a5d1 100644 --- a/core/common/src/Utf8.kt +++ b/core/common/src/Utf8.kt @@ -602,8 +602,7 @@ private fun Buffer.commonReadUtf8(byteCount: Long): String { size -= byteCount if (s.pos == s.limit) { - head = s.pop() - SegmentPool.recycle(s) + recycleHead() } return result diff --git a/core/common/test/util.kt b/core/common/test/util.kt index a69c024b4..285131242 100644 --- a/core/common/test/util.kt +++ b/core/common/test/util.kt @@ -25,13 +25,13 @@ import kotlin.test.assertEquals import kotlin.test.assertTrue fun segmentSizes(buffer: Buffer): List { - var segment = buffer.head ?: return emptyList() + var segment: Segment? = buffer.head ?: return emptyList() - val sizes = mutableListOf(segment.limit - segment.pos) - segment = segment.next!! - while (segment !== buffer.head) { + val sizes = mutableListOf(segment!!.limit - segment.pos) + segment = segment.next + while (segment !== null) { sizes.add(segment.limit - segment.pos) - segment = segment.next!! + segment = segment.next } return sizes } diff --git a/core/jvm/src/BuffersJvm.kt b/core/jvm/src/BuffersJvm.kt index d4a929cc6..1754e865b 100644 --- a/core/jvm/src/BuffersJvm.kt +++ b/core/jvm/src/BuffersJvm.kt @@ -66,8 +66,7 @@ private fun Buffer.write(input: InputStream, byteCount: Long, forever: Boolean) if (bytesRead == -1) { if (tail.pos == tail.limit) { // We allocated a tail segment, but didn't end up needing it. Recycle! - head = tail.pop() - SegmentPool.recycle(tail) + recycleTail() } if (forever) return throw EOFException("Stream exhausted before $byteCount bytes were read.") @@ -102,10 +101,8 @@ public fun Buffer.readTo(out: OutputStream, byteCount: Long = size) { remainingByteCount -= toCopy.toLong() if (s.pos == s.limit) { - val toRecycle = s - s = toRecycle.pop() - head = s - SegmentPool.recycle(toRecycle) + recycleHead() + s = head } } } @@ -170,8 +167,7 @@ public fun Buffer.readAtMostTo(sink: ByteBuffer): Int { size -= toCopy.toLong() if (s.pos == s.limit) { - head = s.pop() - SegmentPool.recycle(s) + recycleHead() } return toCopy diff --git a/core/jvm/src/JvmCore.kt b/core/jvm/src/JvmCore.kt index fd2a108bf..036138660 100644 --- a/core/jvm/src/JvmCore.kt +++ b/core/jvm/src/JvmCore.kt @@ -52,8 +52,7 @@ private open class OutputStreamSink( source.size -= toCopy if (head.pos == head.limit) { - source.head = head.pop() - SegmentPool.recycle(head) + source.recycleHead() } } } @@ -88,8 +87,7 @@ private open class InputStreamSource( if (bytesRead == -1) { if (tail.pos == tail.limit) { // We allocated a tail segment, but didn't end up needing it. Recycle! - sink.head = tail.pop() - SegmentPool.recycle(tail) + sink.recycleTail() } return -1 } diff --git a/core/jvm/src/SourcesJvm.kt b/core/jvm/src/SourcesJvm.kt index ed7f20fc1..c4d808a57 100644 --- a/core/jvm/src/SourcesJvm.kt +++ b/core/jvm/src/SourcesJvm.kt @@ -46,8 +46,7 @@ private fun Buffer.readStringImpl(byteCount: Long, charset: Charset): String { size -= byteCount if (s.pos == s.limit) { - head = s.pop() - SegmentPool.recycle(s) + recycleHead() } return result