Skip to content

Commit

Permalink
Use MutableByteArray as the primitive for the ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyaov committed Jul 4, 2024
1 parent 39a800a commit facacc5
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 193 deletions.
17 changes: 8 additions & 9 deletions benchmark/Streamly/Benchmark/Data/Ring/Unboxed.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
module Main (main) where

import Control.Monad (void)
import GHC.Ptr (Ptr(..))
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Ring as Ring
import qualified Data.Foldable as P
Expand All @@ -25,18 +24,18 @@ import Prelude as P
-- Benchmark ops
-------------------------------------------------------------------------------

unsafeEqArrayN :: (Int, Array.Array Int, (Ring.Ring Int, Ptr Int)) -> Bool
unsafeEqArrayN :: (Int, Array.Array Int, (Ring.Ring Int, Int)) -> Bool
unsafeEqArrayN (value, arr, (ring, rh)) = Ring.unsafeEqArrayN ring rh arr value

unsafeEqArray :: (Array.Array Int, (Ring.Ring Int, Ptr Int)) -> Bool
unsafeEqArray :: (Array.Array Int, (Ring.Ring Int, Int)) -> Bool
unsafeEqArray (arr, (ring, rh)) = Ring.unsafeEqArray ring rh arr

-------------------------------------------------------------------------------
-- Benchmark groups
-------------------------------------------------------------------------------

o_1_space_serial ::
Int -> Array.Array Int -> (Ring.Ring Int, Ptr Int) -> [Benchmark]
Int -> Array.Array Int -> (Ring.Ring Int, Int) -> [Benchmark]
o_1_space_serial value arr (ring, rh) =
[ bench "unsafeEqArrayN" $ nf unsafeEqArrayN (value, arr, (ring, rh))
, bench "unsafeEqArray" $ nf unsafeEqArray (arr, (ring, rh))
Expand All @@ -58,12 +57,12 @@ main = do
alloc value = do
let input = [1 .. value] :: [Int]
let arr = Array.fromList input
(ring, rh) <- Ring.new value
void $ P.foldlM (Ring.unsafeInsert ring) rh input
return (arr, (ring, rh))
ring <- Ring.new value
void $ P.foldlM (Ring.unsafeInsert ring) 0 input
return (arr, ring)

allBenchmarks (arr, (ring, rh)) value =
allBenchmarks (arr, ring) value =
[ bgroup
(o_1_space_prefix moduleName)
(o_1_space_serial value arr (ring, rh))
(o_1_space_serial value arr (ring, 0))
]
5 changes: 2 additions & 3 deletions core/src/Streamly/Internal/Data/Array.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ import Data.Proxy (Proxy(..))
import Data.Word (Word8)
import Foreign.C.String (CString)
import Foreign.Ptr (castPtr)
import Foreign.Storable (Storable)
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.Unbox (Unbox(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)
Expand Down Expand Up @@ -212,7 +211,7 @@ last = getIndexRev 0
--
{-# INLINE writeLastN #-}
writeLastN ::
(Storable a, Unbox a, MonadIO m) => Int -> Fold m a (Array a)
(Unbox a, MonadIO m) => Int -> Fold m a (Array a)
writeLastN n
| n <= 0 = fmap (const mempty) FL.drain
| otherwise = unsafeFreeze <$> Fold step initial done done
Expand All @@ -224,7 +223,7 @@ writeLastN n
return $ FL.Partial $ Tuple3Fused' rb rh1 (i + 1)

initial =
let f (a, b) = FL.Partial $ Tuple3Fused' a b (0 :: Int)
let f a = FL.Partial $ Tuple3Fused' a 0 (0 :: Int)
in fmap f $ liftIO $ RB.new n

done (Tuple3Fused' rb rh i) = do
Expand Down
7 changes: 7 additions & 0 deletions core/src/Streamly/Internal/Data/Array/ArrayMacros.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@

#define INDEX_VALID(i,end,a) i + SIZE_OF(a) <= end
#define INDEX_INVALID(i,end,a) i + SIZE_OF(a) > end

-------------------------------------------------------------------------------
-- Macros to use Unbox with element indices
-------------------------------------------------------------------------------

#define PEEK_ELEM(a,i,arr) peekAt (i * SIZE_OF(a)) arr
#define POKE_ELEM(a,i,arr,val) pokeAt (i * SIZE_OF(a)) arr val
29 changes: 14 additions & 15 deletions core/src/Streamly/Internal/Data/Fold/Combinators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,13 @@ import Data.Either (isLeft, isRight, fromLeft, fromRight)
import Data.Int (Int64)
import Data.Proxy (Proxy(..))
import Data.Word (Word32)
import Foreign.Storable (Storable, peek)
import Streamly.Internal.Data.Unbox (Unbox(..))
import Streamly.Internal.Data.MutArray.Type (MutArray(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Pipe.Type (Pipe (..))
import Streamly.Internal.Data.Scan (Scan (..))
import Streamly.Internal.Data.Stream.Type (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Unbox (Unbox, sizeOf)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))

import qualified Prelude
Expand Down Expand Up @@ -1564,7 +1563,7 @@ data SplitOnSeqState acc a rb rh w ck =
--
-- /Pre-release/
{-# INLINE takeEndBySeq #-}
takeEndBySeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
takeEndBySeq :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) =>
Array.Array a
-> Fold m a b
-> Fold m a b
Expand All @@ -1590,8 +1589,8 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
| SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) ->
return $ Partial $ SplitOnSeqWord acc 0 0
| otherwise -> do
(rb, rhead) <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb rhead
rb <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb 0
Done b -> return $ Done b

-- Word pattern related
Expand Down Expand Up @@ -1664,7 +1663,7 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
rh1 <- liftIO $ Ring.unsafeInsert rb rh x
if idx == maxIndex
then do
let fld = Ring.unsafeFoldRing (Ring.ringBound rb)
let fld = Ring.unsafeFoldRing (Ring.ringCapacity rb)
let !ringHash = fld addCksum 0 rb
if ringHash == patHash && Ring.unsafeEqArray rb rh1 patArr
then Done <$> ffinal s1
Expand All @@ -1676,7 +1675,7 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
res <- fstep s x
case res of
Partial s1 -> do
old <- liftIO $ peek rh
(old :: a) <- liftIO $ PEEK_ELEM(a, rh, (Ring.ringContents rb))
rh1 <- liftIO $ Ring.unsafeInsert rb rh x
let ringHash = deltaCksum cksum old x
if ringHash == patHash && Ring.unsafeEqArray rb rh1 patArr
Expand Down Expand Up @@ -1704,7 +1703,7 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
-- /Pre-release/
--
{-# INLINE takeEndBySeq_ #-}
takeEndBySeq_ :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
takeEndBySeq_ :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) =>
Array.Array a
-> Fold m a b
-> Fold m a b
Expand All @@ -1731,8 +1730,8 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
| SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) ->
return $ Partial $ SplitOnSeqWord acc 0 0
| otherwise -> do
(rb, rhead) <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb rhead
rb <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb 0
Done b -> return $ Done b

-- Word pattern related
Expand Down Expand Up @@ -1804,14 +1803,14 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
rh1 <- liftIO $ Ring.unsafeInsert rb rh x
if idx == maxIndex
then do
let fld = Ring.unsafeFoldRing (Ring.ringBound rb)
let fld = Ring.unsafeFoldRing (Ring.ringCapacity rb)
let !ringHash = fld addCksum 0 rb
if ringHash == patHash && Ring.unsafeEqArray rb rh1 patArr
then Done <$> ffinal s
else return $ Partial $ SplitOnSeqKRLoop s ringHash rb rh1
else return $ Partial $ SplitOnSeqKR s (idx + 1) rb rh1
step (SplitOnSeqKRLoop s cksum rb rh) x = do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a, rh, (Ring.ringContents rb))
res <- fstep s old
case res of
Partial s1 -> do
Expand Down Expand Up @@ -1841,7 +1840,7 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
if n == 0
then fex s
else do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a, rh, (Ring.ringContents rb))
let rh1 = Ring.advance rb rh
r <- fstep s old
case r of
Expand All @@ -1853,7 +1852,7 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
SplitOnSeqSingle s _ -> fex s
SplitOnSeqWord s idx wrd -> consumeWord s idx wrd
SplitOnSeqWordLoop s wrd -> consumeWord s patLen wrd
SplitOnSeqKR s idx rb _ -> consumeRing s idx rb (Ring.startOf rb)
SplitOnSeqKR s idx rb _ -> consumeRing s idx rb 0
SplitOnSeqKRLoop s _ rb rh -> consumeRing s patLen rb rh

extract = extractFunc fextract
Expand Down Expand Up @@ -1888,7 +1887,7 @@ tee = teeWith (,)

-- XXX use "List" instead of "[]"?, use Array for output to scale it to a large
-- number of consumers? For polymorphic case a vector could be helpful. For
-- Storables we can use arrays. Will need separate APIs for those.
-- Unboxs we can use arrays. Will need separate APIs for those.
--
-- | Distribute one copy of the stream to each fold and collect the results in
-- a container.
Expand Down
14 changes: 6 additions & 8 deletions core/src/Streamly/Internal/Data/Fold/Window.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,12 @@ where

import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor(bimap)
import Foreign.Storable (Storable, peek)
import Streamly.Internal.Data.Unbox (Unbox(..))

import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict
(Tuple'(..), Tuple3Fused' (Tuple3Fused'))

import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)

import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.Ring as Ring

Expand Down Expand Up @@ -268,7 +266,7 @@ windowPowerSumFrac p = windowLmap (** p) windowSum
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowRange #-}
windowRange :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a))
windowRange :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (Maybe (a, a))
windowRange n = Fold step initial extract extract

where
Expand All @@ -280,7 +278,7 @@ windowRange n = Fold step initial extract extract
if n <= 0
then error "range: window size must be > 0"
else
let f (a, b) = Partial $ Tuple3Fused' a b (0 :: Int)
let f a = Partial $ Tuple3Fused' a 0 (0 :: Int)
in fmap f $ liftIO $ Ring.new n

step (Tuple3Fused' rb rh i) a = do
Expand All @@ -306,7 +304,7 @@ windowRange n = Fold step initial extract extract
-- uninitialized if the ring is not full.
-- Using "unsafeForeignPtrToPtr" here is safe as we touch the ring
-- again in "foldFunc".
x <- liftIO $ peek (unsafeForeignPtrToPtr (Ring.ringStart rb))
x <- liftIO $ peekAt 0 (Ring.ringContents rb)
let accum (mn, mx) a = return (min mn a, max mx a)
fmap Just $ foldFunc i rh accum (x, x) rb

Expand All @@ -323,7 +321,7 @@ windowRange n = Fold step initial extract extract
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMinimum #-}
windowMinimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
windowMinimum :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (Maybe a)
windowMinimum n = fmap (fmap fst) $ windowRange n

-- | The maximum element in a rolling window.
Expand All @@ -336,7 +334,7 @@ windowMinimum n = fmap (fmap fst) $ windowRange n
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMaximum #-}
windowMaximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
windowMaximum :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (Maybe a)
windowMaximum n = fmap (fmap snd) $ windowRange n

-- | Arithmetic mean of elements in a sliding window:
Expand Down
Loading

0 comments on commit facacc5

Please sign in to comment.