Skip to content

Commit

Permalink
Introduce and use unsafe(Pinned)CreateUsingPtr
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyaov committed Jul 25, 2024
1 parent 64895d2 commit d7c4042
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 45 deletions.
54 changes: 41 additions & 13 deletions core/src/Streamly/Internal/Data/MutArray/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ module Streamly.Internal.Data.MutArray.Type
, fromChunksK
, fromChunksRealloced -- fromSmallChunks

, unsafeCreateUsingPtr
, unsafePinnedCreateUsingPtr

-- ** Random writes
, putIndex
, unsafePutIndex
Expand Down Expand Up @@ -333,7 +336,7 @@ import Data.Functor.Identity (Identity(..))
import Data.Proxy (Proxy(..))
import Data.Word (Word8, Word16)
import Foreign.C.Types (CSize(..), CInt(..))
import Foreign.Ptr (plusPtr, minusPtr, nullPtr, castPtr)
import Foreign.Ptr (plusPtr, minusPtr, nullPtr)
import Streamly.Internal.Data.MutByteArray.Type
( MutByteArray(..)
, PinnedState(..)
Expand Down Expand Up @@ -2450,15 +2453,13 @@ fromPureStream xs =

{-# INLINABLE fromPtrN #-}
fromPtrN :: MonadIO m => Int -> Ptr Word8 -> m (MutArray Word8)
fromPtrN len addr = do
fromPtrN len addr =
-- memcpy is better than stream copy when the size is known.
-- XXX We can implement a stream copy in a similar way by streaming Word64
-- first and then remaining Word8.
arr <- emptyOf len
-- Here len == byteLen
_ <- unsafeAsPtr arr
(\ptr byteLen -> liftIO $ c_memcpy ptr addr (fromIntegral byteLen))
return (arr {arrEnd = len})
unsafeCreateUsingPtr len
$ \ptr -> liftIO $ c_memcpy ptr addr (fromIntegral len) >> pure len


{-# INLINABLE fromCString# #-}
fromCString# :: MonadIO m => Addr# -> m (MutArray Word8)
Expand Down Expand Up @@ -2492,12 +2493,13 @@ fromW16CString# addr = do
let bytes = w16len * 2
-- The array type is inferred from c_memcpy type, therefore, it is not the
-- same as the returned array type.
arr :: MutArray Word8 <- emptyOf bytes
_ <- unsafeAsPtr arr (\ptr _ -> liftIO
$ c_memcpy (castPtr ptr) (Ptr addr) (fromIntegral bytes))
-- CAUTION! The array type is inferred from the return type and may be
-- different from the arr type.
return (arr {arrEnd = bytes})
arr <-
unsafeCreateUsingPtr
bytes
(\ptr ->
liftIO $
c_memcpy ptr (Ptr addr) (fromIntegral bytes) >> pure bytes)
pure $ unsafeCast arr

-------------------------------------------------------------------------------
-- convert a stream of arrays to a single array by reallocating and copying
Expand Down Expand Up @@ -2944,6 +2946,32 @@ unsafeAsPtr arr f =
(arrContents arr)
(\ptr -> f (ptr `plusPtr` arrStart arr) (byteLength arr))

-- | @unsafeCreateUsingPtr capacity populater@ creates an array of @capacity@
-- bytes lets the @populater@ function populate it. The @populater@ get the
-- pointer to the array and should return the amount of the capacity populated
-- in bytes.
--
-- /Unsafe/ because the pointer given should be used with care. Bytes populated
-- should be lesser than the total capacity.
{-# INLINE unsafeCreateUsingPtr #-}
unsafeCreateUsingPtr
:: MonadIO m => Int -> (Ptr Word8 -> m Int) -> m (MutArray Word8)
unsafeCreateUsingPtr cap pop = do
(arr :: MutArray Word8) <- emptyOf cap
len <- Unboxed.unsafeAsPtr (arrContents arr) pop
-- arrStart == 0
pure (arr { arrEnd = len })

-- | Similar to "unsafeCreateUsingPtr" but creates a pinned array.
{-# INLINE unsafePinnedCreateUsingPtr #-}
unsafePinnedCreateUsingPtr
:: MonadIO m => Int -> (Ptr Word8 -> m Int) -> m (MutArray Word8)
unsafePinnedCreateUsingPtr cap pop = do
(arr :: MutArray Word8) <- pinnedEmptyOf cap
len <- Unboxed.unsafeAsPtr (arrContents arr) pop
-- arrStart == 0
pure (arr { arrEnd = len })

-------------------------------------------------------------------------------
-- Equality
-------------------------------------------------------------------------------
Expand Down
11 changes: 3 additions & 8 deletions core/src/Streamly/Internal/FileSystem/Handle.hs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,10 @@ import qualified Streamly.Internal.Data.StreamK.Type as K (mkStream)
{-# INLINABLE getChunk #-}
getChunk :: MonadIO m => Int -> Handle -> m (Array Word8)
getChunk size h = liftIO $ do
arr :: MArray.MutArray Word8 <- MArray.pinnedEmptyOf size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
-- size == byteLen
MArray.unsafePinnedAsPtr arr $ \p byteLen -> do
n <- hGetBufSome h p byteLen
-- XXX shrink only if the diff is significant
return $
unsafeFreezeWithShrink $
arr { MArray.arrEnd = n, MArray.arrBound = size }
arr <- MArray.unsafePinnedCreateUsingPtr size $ \p -> hGetBufSome h p size
-- XXX shrink only if the diff is significant
pure $ unsafeFreezeWithShrink arr

-- This could be useful in implementing the "reverse" read APIs or if you want
-- to read arrays of exact size instead of compacting them later. Compacting
Expand Down
20 changes: 8 additions & 12 deletions src/Streamly/Internal/FileSystem/FD.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,17 @@ openFile path mode = Handle . fst <$> FD.openFile path mode True
{-# INLINABLE readArrayUpto #-}
readArrayUpto :: Int -> Handle -> IO (Array Word8)
readArrayUpto size (Handle fd) = do
arr <- MArray.pinnedEmptyOf size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
-- size == byteLen
MArray.unsafePinnedAsPtr arr $ \p byteLen -> do
-- n <- hGetBufSome h p size
arr <-
MArray.unsafePinnedCreateUsingPtr size $ \p ->
-- n <- hGetBufSome h p size
#if MIN_VERSION_base(4,15,0)
n <- RawIO.read fd p 0 byteLen
RawIO.read fd p 0 size
#else
n <- RawIO.read fd p byteLen
RawIO.read fd p size
#endif
-- XXX shrink only if the diff is significant
-- Use unsafeFreezeWithShrink
return
$ unsafeFreeze
$ arr { MArray.arrEnd = n, MArray.arrBound = size }
-- XXX shrink only if the diff is significant
-- Use unsafeFreezeWithShrink
pure $ unsafeFreeze arr

-------------------------------------------------------------------------------
-- Array IO (output)
Expand Down
17 changes: 5 additions & 12 deletions src/Streamly/Internal/Network/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ import qualified Streamly.Internal.Data.Array as A
( unsafeFreeze, unsafePinnedAsPtr, pinnedChunksOf,
pinnedCreateOf, unsafePinnedCreateOf, lCompactGE )
import qualified Streamly.Internal.Data.MutArray as MArray
(MutArray(..), unsafePinnedAsPtr, pinnedEmptyOf)
(unsafePinnedCreateUsingPtr)
import qualified Streamly.Internal.Data.Stream as S (fromStreamK, Stream(..), Step(..))
import qualified Streamly.Internal.Data.StreamK as K (mkStream)

Expand Down Expand Up @@ -261,17 +261,10 @@ readArrayUptoWith
-> h
-> IO (Array Word8)
readArrayUptoWith f size h = do
arr <- MArray.pinnedEmptyOf size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
-- size == byteLen
MArray.unsafePinnedAsPtr arr $ \p byteLen -> do
n <- f h p byteLen
let v = A.unsafeFreeze
$ arr { MArray.arrEnd = n, MArray.arrBound = size }

-- XXX shrink only if the diff is significant
-- A.shrinkToFit v
return v
arr <- MArray.unsafePinnedCreateUsingPtr size $ \p -> f h p size
-- XXX shrink only if the diff is significant
-- unsafeFreezeWithShrink
pure $ A.unsafeFreeze arr

-- | Read a byte array from a file handle up to a maximum of the requested
-- size. If no data is available on the handle it blocks until some data
Expand Down

0 comments on commit d7c4042

Please sign in to comment.