diff --git a/core/src/Streamly/Internal/Data/MutArray/Type.hs b/core/src/Streamly/Internal/Data/MutArray/Type.hs index 237707006d..7265043e88 100644 --- a/core/src/Streamly/Internal/Data/MutArray/Type.hs +++ b/core/src/Streamly/Internal/Data/MutArray/Type.hs @@ -100,6 +100,9 @@ module Streamly.Internal.Data.MutArray.Type , fromChunksK , fromChunksRealloced -- fromSmallChunks + , unsafeCreateUsingPtr + , unsafePinnedCreateUsingPtr + -- ** Random writes , putIndex , unsafePutIndex @@ -333,7 +336,7 @@ import Data.Functor.Identity (Identity(..)) import Data.Proxy (Proxy(..)) import Data.Word (Word8, Word16) import Foreign.C.Types (CSize(..), CInt(..)) -import Foreign.Ptr (plusPtr, minusPtr, nullPtr, castPtr) +import Foreign.Ptr (plusPtr, minusPtr, nullPtr) import Streamly.Internal.Data.MutByteArray.Type ( MutByteArray(..) , PinnedState(..) @@ -2450,15 +2453,13 @@ fromPureStream xs = {-# INLINABLE fromPtrN #-} fromPtrN :: MonadIO m => Int -> Ptr Word8 -> m (MutArray Word8) -fromPtrN len addr = do +fromPtrN len addr = -- memcpy is better than stream copy when the size is known. -- XXX We can implement a stream copy in a similar way by streaming Word64 -- first and then remaining Word8. - arr <- emptyOf len - -- Here len == byteLen - _ <- unsafeAsPtr arr - (\ptr byteLen -> liftIO $ c_memcpy ptr addr (fromIntegral byteLen)) - return (arr {arrEnd = len}) + unsafeCreateUsingPtr len + $ \ptr -> liftIO $ c_memcpy ptr addr (fromIntegral len) >> pure len + {-# INLINABLE fromCString# #-} fromCString# :: MonadIO m => Addr# -> m (MutArray Word8) @@ -2492,12 +2493,13 @@ fromW16CString# addr = do let bytes = w16len * 2 -- The array type is inferred from c_memcpy type, therefore, it is not the -- same as the returned array type. - arr :: MutArray Word8 <- emptyOf bytes - _ <- unsafeAsPtr arr (\ptr _ -> liftIO - $ c_memcpy (castPtr ptr) (Ptr addr) (fromIntegral bytes)) - -- CAUTION! The array type is inferred from the return type and may be - -- different from the arr type. - return (arr {arrEnd = bytes}) + arr <- + unsafeCreateUsingPtr + bytes + (\ptr -> + liftIO $ + c_memcpy ptr (Ptr addr) (fromIntegral bytes) >> pure bytes) + pure $ unsafeCast arr ------------------------------------------------------------------------------- -- convert a stream of arrays to a single array by reallocating and copying @@ -2944,6 +2946,32 @@ unsafeAsPtr arr f = (arrContents arr) (\ptr -> f (ptr `plusPtr` arrStart arr) (byteLength arr)) +-- | @unsafeCreateUsingPtr capacity populater@ creates an array of @capacity@ +-- bytes lets the @populater@ function populate it. The @populater@ get the +-- pointer to the array and should return the amount of the capacity populated +-- in bytes. +-- +-- /Unsafe/ because the pointer given should be used with care. Bytes populated +-- should be lesser than the total capacity. +{-# INLINE unsafeCreateUsingPtr #-} +unsafeCreateUsingPtr + :: MonadIO m => Int -> (Ptr Word8 -> m Int) -> m (MutArray Word8) +unsafeCreateUsingPtr cap pop = do + (arr :: MutArray Word8) <- emptyOf cap + len <- Unboxed.unsafeAsPtr (arrContents arr) pop + -- arrStart == 0 + pure (arr { arrEnd = len }) + +-- | Similar to "unsafeCreateUsingPtr" but creates a pinned array. +{-# INLINE unsafePinnedCreateUsingPtr #-} +unsafePinnedCreateUsingPtr + :: MonadIO m => Int -> (Ptr Word8 -> m Int) -> m (MutArray Word8) +unsafePinnedCreateUsingPtr cap pop = do + (arr :: MutArray Word8) <- pinnedEmptyOf cap + len <- Unboxed.unsafeAsPtr (arrContents arr) pop + -- arrStart == 0 + pure (arr { arrEnd = len }) + ------------------------------------------------------------------------------- -- Equality ------------------------------------------------------------------------------- diff --git a/core/src/Streamly/Internal/FileSystem/Handle.hs b/core/src/Streamly/Internal/FileSystem/Handle.hs index 6337156a97..1746456e7d 100644 --- a/core/src/Streamly/Internal/FileSystem/Handle.hs +++ b/core/src/Streamly/Internal/FileSystem/Handle.hs @@ -177,15 +177,10 @@ import qualified Streamly.Internal.Data.StreamK.Type as K (mkStream) {-# INLINABLE getChunk #-} getChunk :: MonadIO m => Int -> Handle -> m (Array Word8) getChunk size h = liftIO $ do - arr :: MArray.MutArray Word8 <- MArray.pinnedEmptyOf size -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) - -- size == byteLen - MArray.unsafePinnedAsPtr arr $ \p byteLen -> do - n <- hGetBufSome h p byteLen - -- XXX shrink only if the diff is significant - return $ - unsafeFreezeWithShrink $ - arr { MArray.arrEnd = n, MArray.arrBound = size } + arr <- MArray.unsafePinnedCreateUsingPtr size $ \p -> hGetBufSome h p size + -- XXX shrink only if the diff is significant + pure $ unsafeFreezeWithShrink arr -- This could be useful in implementing the "reverse" read APIs or if you want -- to read arrays of exact size instead of compacting them later. Compacting diff --git a/src/Streamly/Internal/FileSystem/FD.hs b/src/Streamly/Internal/FileSystem/FD.hs index 028d8dead1..40bfcf3926 100644 --- a/src/Streamly/Internal/FileSystem/FD.hs +++ b/src/Streamly/Internal/FileSystem/FD.hs @@ -218,21 +218,17 @@ openFile path mode = Handle . fst <$> FD.openFile path mode True {-# INLINABLE readArrayUpto #-} readArrayUpto :: Int -> Handle -> IO (Array Word8) readArrayUpto size (Handle fd) = do - arr <- MArray.pinnedEmptyOf size - -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) - -- size == byteLen - MArray.unsafePinnedAsPtr arr $ \p byteLen -> do - -- n <- hGetBufSome h p size + arr <- + MArray.unsafePinnedCreateUsingPtr size $ \p -> + -- n <- hGetBufSome h p size #if MIN_VERSION_base(4,15,0) - n <- RawIO.read fd p 0 byteLen + RawIO.read fd p 0 size #else - n <- RawIO.read fd p byteLen + RawIO.read fd p size #endif - -- XXX shrink only if the diff is significant - -- Use unsafeFreezeWithShrink - return - $ unsafeFreeze - $ arr { MArray.arrEnd = n, MArray.arrBound = size } + -- XXX shrink only if the diff is significant + -- Use unsafeFreezeWithShrink + pure $ unsafeFreeze arr ------------------------------------------------------------------------------- -- Array IO (output) diff --git a/src/Streamly/Internal/Network/Socket.hs b/src/Streamly/Internal/Network/Socket.hs index 0793181ae7..e4d8a8acd3 100644 --- a/src/Streamly/Internal/Network/Socket.hs +++ b/src/Streamly/Internal/Network/Socket.hs @@ -101,7 +101,7 @@ import qualified Streamly.Internal.Data.Array as A ( unsafeFreeze, unsafePinnedAsPtr, pinnedChunksOf, pinnedCreateOf, unsafePinnedCreateOf, lCompactGE ) import qualified Streamly.Internal.Data.MutArray as MArray - (MutArray(..), unsafePinnedAsPtr, pinnedEmptyOf) + (unsafePinnedCreateUsingPtr) import qualified Streamly.Internal.Data.Stream as S (fromStreamK, Stream(..), Step(..)) import qualified Streamly.Internal.Data.StreamK as K (mkStream) @@ -261,17 +261,10 @@ readArrayUptoWith -> h -> IO (Array Word8) readArrayUptoWith f size h = do - arr <- MArray.pinnedEmptyOf size - -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) - -- size == byteLen - MArray.unsafePinnedAsPtr arr $ \p byteLen -> do - n <- f h p byteLen - let v = A.unsafeFreeze - $ arr { MArray.arrEnd = n, MArray.arrBound = size } - - -- XXX shrink only if the diff is significant - -- A.shrinkToFit v - return v + arr <- MArray.unsafePinnedCreateUsingPtr size $ \p -> f h p size + -- XXX shrink only if the diff is significant + -- unsafeFreezeWithShrink + pure $ A.unsafeFreeze arr -- | Read a byte array from a file handle up to a maximum of the requested -- size. If no data is available on the handle it blocks until some data