Skip to content

Commit

Permalink
Add Stream.ConcurrentThreadHeavy benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Jul 9, 2024
1 parent a9a769a commit dfb8212
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 5 deletions.
109 changes: 109 additions & 0 deletions benchmark/Streamly/Benchmark/Data/Stream/ConcurrentThreadHeavy.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
-- |
-- Module : Main
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- Maintainer : [email protected]

import Control.Concurrent
import Control.Monad (when, replicateM)
import Streamly.Data.Stream.Prelude (Config)

import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream

import Test.Tasty (localOption)
import Test.Tasty.Bench

-------------------------------------------------------------------------------
-- Append
-------------------------------------------------------------------------------

-- | Run @tcount@ number of actions concurrently using the given concurrency
-- style. Each thread produces a single output after a delay of @d@
-- microseconds.
--
{-# INLINE append #-}
append :: Int -> Int -> Int -> (Config -> Config) -> IO ()
append buflen tcount d modifier =
let work i = when (d /= 0) (threadDelay d) >> return i
cfg =
modifier
. Stream.maxBuffer buflen
. Stream.maxThreads (-1)
in Stream.fold Fold.drain
$ Stream.parMapM cfg work
$ Stream.fromList [1..tcount]

-- | Run @threads@ concurrently, each producing streams of @elems@ elements
-- with a delay of @d@ microseconds between successive elements, and merge
-- their outputs in a single output stream. The individual streams are produced
-- serially but merged using the provided concurrency style.
--
{-# INLINE concated #-}
concated
:: Int
-> Int
-> Int
-> Int
-> (Config -> Config)
-> IO ()
concated buflen threads d elems modifier =
let work i =
Stream.replicateM i (when (d /= 0) (threadDelay d) >> return i)
cfg =
modifier
. Stream.maxBuffer buflen
. Stream.maxThreads (-1)
in Stream.fold Fold.drain
$ Stream.parConcatMap cfg work
$ Stream.replicate threads elems

appendGroup :: Int -> Int -> Int -> [Benchmark]
appendGroup buflen threads usec =
[ -- bench "serial" $ nfIO $ append buflen threads delay fromSerial
bench "async"
$ nfIO $ append buflen threads usec id
, bench "ordered"
$ nfIO $ append buflen threads usec (Stream.ordered True)
, bench "interleaved"
$ nfIO $ append buflen threads usec (Stream.interleaved True)
, bench "parallel"
$ nfIO $ append buflen threads usec (Stream.eager True)
]

concatGroup :: Int -> Int -> Int -> Int -> [Benchmark]
concatGroup buflen threads usec n =
[ -- bench "serial" $ nfIO $ concated buflen threads usec n serial
bench "async"
$ nfIO $ concated buflen threads usec n id
, bench "ordered"
$ nfIO $ concated buflen threads usec n (Stream.ordered True)
, bench "interleaved"
$ nfIO $ concated buflen threads usec n (Stream.interleaved True)
, bench "parallel"
$ nfIO $ concated buflen threads usec n (Stream.eager True)
]

main :: IO ()
main =
defaultMain $ map (localOption (RelStDev 100000))
[ -- bgroup "append/buf-1-threads-10k-0sec" (appendGroup 1 10000 0)
-- , bgroup "append/buf-100-threads-100k-0sec" (appendGroup 100 100000 0)
bgroup "streamly/threads10k-delay5s-singleton(buf10k)" (appendGroup 10000 10000 5000000)
-- bgroup "concat/buf-1-threads-100k-count-1" (concatGroup 1 100000 0 1)
-- bgroup "concat/buf-1-threads-1-count-10m" (concatGroup 1 1 0 10000000)
, bgroup "streamly/threads100-delay0s-stream500k(buf100)" (concatGroup 100 100 0 500000)

, bench "forkIO/threads10k-delay5s-singleton" $
let delay = threadDelay 5000000
count = 10000 :: Int
list = [1..count]
work i = delay >> return i
in nfIO $ do
ref <- newEmptyMVar
mapM_ (\i -> forkIO (work i >>= \j -> putMVar ref j)) list
replicateM 10000 (takeMVar ref)
]
10 changes: 10 additions & 0 deletions benchmark/streamly-benchmarks.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ benchmark Data.Stream.ConcurrentOrdered
else
buildable: True

benchmark Data.Stream.ConcurrentThreadHeavy
import: bench-options-threaded
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: ConcurrentThreadHeavy.hs
if flag(use-streamly-core) || impl(ghcjs)
buildable: False
else
buildable: True

benchmark Data.Stream.Rate
import: bench-options-threaded
type: exitcode-stdio-1.0
Expand Down
16 changes: 11 additions & 5 deletions targets/Targets.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ targets =
, "fold_parser_grp"
]
)
{-
, ("Data.List",
[ "list_grp"
, "noBench"
, "testDevOnly"
]
)
-}
, ("Data.List.Base",
[ "list_grp"
, "noBench"
Expand Down Expand Up @@ -121,6 +119,11 @@ targets =
, "serial_concurrent_cmp"
]
)
, ("Data.Stream.Adaptive",
[ "concurrent_stream_grp"
, "noTest"
]
)
, ("Data.Stream.Concurrent",
[ "infinite_grp"
, "concurrent_stream_grp"
Expand Down Expand Up @@ -149,14 +152,17 @@ targets =
, "noTest"
]
)
{-
-- devOnly
, ("Data.Stream.ConcurrentThreadHeavy",
[ "concurrent_stream_grp"
, "noTest"
]
)
, ("Data.Stream.Rate",
[ "infinite_grp"
, "concurrent_stream_grp"
, "testDevOnly"
]
)
-}
, ("Data.StreamK",
[ "infinite_grp"
, "serial_stream_grp"
Expand Down

0 comments on commit dfb8212

Please sign in to comment.