Skip to content

Commit

Permalink
Add pipeChunks transformation function
Browse files Browse the repository at this point in the history
  • Loading branch information
hezhenxing committed Apr 24, 2024
1 parent b60aef3 commit 2ff463e
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions src/Streamly/Internal/Network/Inet/TCP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ module Streamly.Internal.Network.Inet.TCP

-- ** Transformation
, pipeBytes
, pipeChunks
{-
-- ** Sink Servers
Expand Down Expand Up @@ -461,19 +462,20 @@ withInputConnect
:: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> Stream m a
-> (Socket -> Stream m a)
-> (Socket -> Stream m a -> m ())
-> Stream m a
withInputConnect addr port input f = S.bracket pre post handler
withInputConnect addr port input fread fwrite = S.bracket pre post handler

where

pre = do
sk <- liftIO $ connect addr port
tid <- fork (ISK.putBytes sk input)
tid <- fork (fwrite sk input)
return (sk, tid)

handler (sk, _) = f sk
handler (sk, _) = fread sk

-- XXX kill the thread immediately?
post (sk, _) = liftIO $ Net.close sk
Expand All @@ -491,4 +493,17 @@ pipeBytes
-> PortNumber
-> Stream m Word8
-> Stream m Word8
pipeBytes addr port input = withInputConnect addr port input ISK.read
pipeBytes addr port input = withInputConnect addr port input ISK.read ISK.putBytes

-- | This is similar to pipeBytes, but works on chunks of data.
--
-- /Pre-release/
--
{-# INLINE pipeChunks #-}
pipeChunks
:: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m (Array Word8)
-> Stream m (Array Word8)
pipeChunks addr port input = withInputConnect addr port input ISK.readChunks ISK.putChunks

0 comments on commit 2ff463e

Please sign in to comment.