Skip to content

Commit

Permalink
Organize Buffer's segments as a regular list
Browse files Browse the repository at this point in the history
Previously, Buffer's segments were organized into a circular list.
That allowed storing only a single reference to buffer's head,
and also facilitated insertion/removal of new list nodes.
The downside of a circular list is that one has
to always compare a current node with a head when
iterating over segments.
That complicates the implementation of a public API
for segments iterations.

See #135 (comment)
for details on segment iteration API.
  • Loading branch information
fzhinkin committed Jun 6, 2024
1 parent 7c4d095 commit c128d21
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 107 deletions.
6 changes: 2 additions & 4 deletions core/apple/src/AppleCore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ private open class OutputStreamSink(
source.size -= bytesWritten

if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
source.recycleHead()
}
}
}
Expand Down Expand Up @@ -101,8 +100,7 @@ private open class NSInputStreamSource(
if (bytesRead == 0L) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.head = tail.pop()
SegmentPool.recycle(tail)
sink.recycleTail()
}
return -1
}
Expand Down
9 changes: 5 additions & 4 deletions core/apple/src/BuffersApple.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
package kotlinx.io

import kotlinx.cinterop.*
import platform.Foundation.*
import platform.Foundation.NSData
import platform.Foundation.create
import platform.Foundation.data
import platform.darwin.NSUIntegerMax
import platform.posix.*

Expand Down Expand Up @@ -44,8 +46,7 @@ internal fun Buffer.readAtMostTo(sink: CPointer<uint8_tVar>, maxLength: Int): In
size -= toCopy.toLong()

if (s.pos == s.limit) {
head = s.pop()
SegmentPool.recycle(s)
recycleHead()
}

return toCopy
Expand All @@ -70,6 +71,6 @@ internal fun Buffer.snapshotAsNSData(): NSData {
}
curr = curr.next
index += length
} while (curr !== head)
} while (curr !== null)
return NSData.create(bytesNoCopy = bytes, length = size.convert())
}
133 changes: 92 additions & 41 deletions core/common/src/Buffer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class Buffer : Source, Sink {
@JvmField
internal var head: Segment? = null

@JvmField
internal var tail: Segment? = null

/**
* The number of bytes accessible for read from this buffer.
*/
Expand Down Expand Up @@ -76,8 +79,7 @@ public class Buffer : Source, Sink {
val b = data[pos++]
size -= 1L
if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand All @@ -102,8 +104,7 @@ public class Buffer : Source, Sink {
size -= 2L

if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand Down Expand Up @@ -138,8 +139,7 @@ public class Buffer : Source, Sink {
size -= 4L

if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand Down Expand Up @@ -176,8 +176,7 @@ public class Buffer : Source, Sink {
size -= 8L

if (pos == limit) {
head = segment.pop()
SegmentPool.recycle(segment)
recycleHead()
} else {
segment.pos = pos
}
Expand Down Expand Up @@ -242,11 +241,10 @@ public class Buffer : Source, Sink {
copy.pos += currentOffset.toInt()
copy.limit = minOf(copy.pos + remainingByteCount.toInt(), copy.limit)
if (out.head == null) {
copy.prev = copy
copy.next = copy.prev
out.head = copy.next
out.head = copy
out.tail = copy
} else {
out.head!!.prev!!.push(copy)
out.tail = out.tail!!.push(copy)
}
remainingByteCount -= (copy.limit - copy.pos).toLong()
currentOffset = 0L
Expand All @@ -264,7 +262,7 @@ public class Buffer : Source, Sink {
if (result == 0L) return 0L

// Omit the tail if it's still writable.
val tail = head!!.prev!!
val tail = tail!!
if (tail.limit < Segment.SIZE && tail.owner) {
result -= (tail.limit - tail.pos).toLong()
}
Expand Down Expand Up @@ -317,8 +315,7 @@ public class Buffer : Source, Sink {
head.pos += toSkip

if (head.pos == head.limit) {
this.head = head.pop()
SegmentPool.recycle(head)
recycleHead()
}
}
}
Expand All @@ -336,8 +333,7 @@ public class Buffer : Source, Sink {
size -= toCopy.toLong()

if (s.pos == s.limit) {
head = s.pop()
SegmentPool.recycle(s)
recycleHead()
}

return toCopy
Expand Down Expand Up @@ -377,19 +373,20 @@ public class Buffer : Source, Sink {
internal fun writableSegment(minimumCapacity: Int): Segment {
require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" }

if (head == null) {
if (tail == null) {
val result = SegmentPool.take() // Acquire a first segment.
head = result
result.prev = result
result.next = result
tail = result
return result
}

var tail = head!!.prev
if (tail!!.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()) // Append a new empty segment to fill up.
val t = tail!!
if (t.limit + minimumCapacity > Segment.SIZE || !t.owner) {
val newTail = t.push(SegmentPool.take()) // Append a new empty segment to fill up.
tail = newTail
return newTail
}
return tail
return t
}

override fun write(source: ByteArray, startIndex: Int, endIndex: Int) {
Expand Down Expand Up @@ -486,7 +483,7 @@ public class Buffer : Source, Sink {
while (remainingByteCount > 0L) {
// Is a prefix of the source's head segment all that we need to move?
if (remainingByteCount < source.head!!.limit - source.head!!.pos) {
val tail = if (head != null) head!!.prev else null
val tail = tail
if (tail != null && tail.owner &&
remainingByteCount + tail.limit - (if (tail.shared) 0 else tail.pos) <= Segment.SIZE
) {
Expand All @@ -498,22 +495,32 @@ public class Buffer : Source, Sink {
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head!!.split(remainingByteCount.toInt())
val newHead = source.head!!.split(remainingByteCount.toInt())
if (source.head === source.tail) {
source.tail = newHead
}
source.head = newHead
}
}

// Remove the source's head segment and append it to our tail.
val segmentToMove = source.head
val movedByteCount = (segmentToMove!!.limit - segmentToMove.pos).toLong()
source.head = segmentToMove.pop()
if (source.head == null) {
source.tail = null
}
if (head == null) {
head = segmentToMove
segmentToMove.prev = segmentToMove
segmentToMove.next = segmentToMove.prev
tail = segmentToMove
segmentToMove.prev = null
segmentToMove.next = null
} else {
var tail = head!!.prev
tail = tail!!.push(segmentToMove)
tail.compact()
val newTail = tail!!.push(segmentToMove).compact()
tail = newTail
if (newTail.prev == null) {
this.head = newTail
}
}
source.size -= movedByteCount
size += movedByteCount
Expand Down Expand Up @@ -582,16 +589,15 @@ public class Buffer : Source, Sink {
val result = Buffer()
if (size == 0L) return result

val head = head!!
val head = this.head!!
val headCopy = head.sharedCopy()

result.head = headCopy
headCopy.prev = result.head
headCopy.next = headCopy.prev
result.tail = headCopy

var s = head.next
while (s !== head) {
headCopy.prev!!.push(s!!.sharedCopy())
while (s != null) {
result.tail = result.tail!!.push(s.sharedCopy())
s = s.next
}

Expand Down Expand Up @@ -642,6 +648,48 @@ public class Buffer : Source, Sink {

return "Buffer(size=$size hex=$builder)"
}

/**
* Unlinks and recycles this buffer's head.
*
* If head had a successor, it'll become a new head.
* Otherwise, both [head] and [tail] will be set to null.
*
* It's up to a caller to ensure that the head exists.
*/
internal fun recycleHead() {
val oldHead = head!!
val nextHead = oldHead.next
head = nextHead
if (nextHead == null) {
tail = null
} else {
nextHead.prev = null
}
oldHead.next = null
SegmentPool.recycle(oldHead)
}

/**
* Unlinks and recycles this buffer's tail segment.
*
* If tail had a predecessor, it'll become a new tail.
* Otherwise, both [head] and [tail] will be set to null.
*
* It's up to a caller to ensure that the tail exists.
*/
internal fun recycleTail() {
val oldTail = tail!!
val newTail = oldTail.prev
tail = newTail
if (newTail == null) {
head = null
} else {
newTail.next = null
}
oldTail.prev = null
SegmentPool.recycle(oldTail)
}
}

/**
Expand All @@ -652,23 +700,26 @@ internal inline fun <T> Buffer.seek(
fromIndex: Long,
lambda: (Segment?, Long) -> T
): T {
var s: Segment = head ?: return lambda(null, -1L)
if (this.head == null) lambda(null, -1L)

if (size - fromIndex < fromIndex) {
var s = tail
// We're scanning in the back half of this buffer. Find the segment starting at the back.
var offset = size
while (offset > fromIndex) {
s = s.prev!!
while (s != null && offset > fromIndex) {
offset -= (s.limit - s.pos).toLong()
if (offset <= fromIndex) break
s = s.prev
}
return lambda(s, offset)
} else {
var s = this.head
// We're scanning in the front half of this buffer. Find the segment starting at the front.
var offset = 0L
while (true) {
while (s != null) {
val nextOffset = offset + (s.limit - s.pos)
if (nextOffset > fromIndex) break
s = s.next!!
s = s.next
offset = nextOffset
}
return lambda(s, offset)
Expand Down
9 changes: 5 additions & 4 deletions core/common/src/Buffers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public fun Buffer.snapshot(): ByteString {
check(curr != null) { "Current segment is null" }
append(curr.data, curr.pos, curr.limit)
curr = curr.next
} while (curr !== head)
} while (curr !== null)
}
}

Expand Down Expand Up @@ -53,10 +53,11 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz
if (o == -1L) {
return -1L
}
var segment = seg!!
var segment: Segment? = seg!!
var offset = o
do {
check(endOffset > offset)
segment!!
val idx = segment.indexOf(
byte,
// If start index within this segment, the diff will be positive and
Expand All @@ -71,8 +72,8 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz
return offset + idx.toLong()
}
offset += segment.size
segment = segment.next!!
} while (segment !== head && offset < endOffset)
segment = segment.next
} while (segment !== null && offset < endOffset)
return -1L
}
}
9 changes: 5 additions & 4 deletions core/common/src/ByteStrings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
if (o == -1L) {
return -1L
}
var segment = seg!!
var segment: Segment? = seg
var offset = o
do {
segment!!
// If start index within this segment, the diff will be positive and
// we'll scan the segment starting from the corresponding offset.
// Otherwise, the diff will be negative and we'll scan the segment from the beginning.
Expand All @@ -147,16 +148,16 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
val firstOutboundOffset = maxOf(startOffset, segment.size - byteStringData.size + 1)
// Try to find a pattern in all suffixes shorter than the pattern. These suffixes start
// in the current segment, but ends in the following segments; thus we're using outbound function.
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset, head)
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset)
if (idx1 != -1) {
// Offset corresponds to the segment's start, idx - to offset within the segment.
return offset + idx1.toLong()
}

// We scanned the whole segment, so let's go to the next one
offset += segment.size
segment = segment.next!!
} while (segment !== head && offset + byteString.size <= size)
segment = segment.next
} while (segment !== null && offset + byteString.size <= size)
return -1L
}
}
Expand Down
Loading

0 comments on commit c128d21

Please sign in to comment.