Skip to content

Commit

Permalink
implement DelayQueue (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
Krastanov authored Aug 8, 2023
1 parent 91c7e36 commit bd473f4
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# News

## v1.4.0 - 2023-08-07

- Implement a `DelayQueue`, i.e. a `QueueStore` with latency between the store and take events.
- Bugfix to `QueueStore` and `StackStore` for take events on empty stores.

## v1.3.0 - 2023-08-07

- Implement ordered versions of `Store`, namely `QueueStore` and `StackStore`.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ license = "MIT"
desc = "A discrete event process oriented simulation framework."
authors = ["Ben Lauwens and SimJulia and ConcurrentSim contributors"]
repo = "https://github.com/JuliaDynamics/ConcurrentSim.jl.git"
version = "1.3.0"
version = "1.4.0"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
2 changes: 2 additions & 0 deletions docs/src/examples/Latency.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

# Event Latency

There is a built-in [`DelayQueue`](@ref) if you need a store [`Store`](@ref) with latency between `put!` and `take!` events. However here, we show you how you could have built one for yourself. If you modify this in order to construct a particularly useful type of latency store, please contribute it to the library through a pull request.

## Description
In this example we show how to separate the time delay between processes from the processes themselves. We model a communications channel, called a `Cable`, where a sender sends messages regularly each `SEND_PERIOD` time units and a receiver listens each `RECEIVE_PERIOD`. The messages in the cable have a delay fo `DELAY_DURATION` until they reach the recevier.

Expand Down
4 changes: 3 additions & 1 deletion src/ConcurrentSim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ module ConcurrentSim
export @resumable, @yield
export AbstractProcess, Simulation, run, now, active_process, StopSimulation
export Process, @process, interrupt
export Container, Resource, Store, StackStore, QueueStore, put!, get, cancel, request, tryrequest, release
export Container, Resource, Store, StackStore, QueueStore, DelayQueue
export put!, get, cancel, request, tryrequest, release
export nowDatetime

include("base.jl")
Expand All @@ -29,6 +30,7 @@ module ConcurrentSim
include("resources/containers.jl")
include("resources/stores.jl")
include("resources/ordered_stores.jl")
include("resources/delayed_stores.jl")
include("utils/time.jl")
include("deprecated_aliased.jl")
end
58 changes: 58 additions & 0 deletions src/resources/delayed_stores.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
@doc raw"""
DelayQueue{T}
A queue in which items are stored in a FIFO order, but are only available after a delay.
```jldoctest
julia> sim = Simulation()
queue = DelayQueue{Symbol}(sim, 10)
@resumable function producer(env, queue)
for item in [:a,:b,:a,:c]
@info "putting $item at time $(now(env))"
put!(queue, item)
@yield timeout(env, 2)
end
end
@resumable function consumer(env, queue)
@yield timeout(env, 5)
while true
t = @yield take!(queue)
@info "taking $(t) at time $(now(env))"
end
end
@process producer(sim, queue)
@process consumer(sim, queue)
run(sim, 30)
[ Info: putting a at time 0.0
[ Info: putting b at time 2.0
[ Info: putting a at time 4.0
[ Info: putting c at time 6.0
[ Info: taking a at time 10.0
[ Info: taking b at time 12.0
[ Info: taking a at time 14.0
[ Info: taking c at time 16.0
```
"""
mutable struct DelayQueue{T}
store::QueueStore{T, Int}
delay::Float64
end
function DelayQueue(env::Environment, delay)
return DelayQueue(QueueStore{Any}(env), float(delay))
end
function DelayQueue{T}(env::Environment, delay) where T
return DelayQueue(QueueStore{T}(env), float(delay))
end

@resumable function latency(env::Environment, channel::DelayQueue, value)
@yield timeout(channel.store.env, channel.delay)
put!(channel.store, value)
end

function Base.put!(channel::DelayQueue, value)
@process latency(channel.store.env, channel, value) # start the process, but do not wait on it
end

function Base.take!(channel::DelayQueue)
get(channel.store)
end
2 changes: 2 additions & 0 deletions src/resources/ordered_stores.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ end

function do_get(sto::StackStore{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number}
key.filter !== get_any_item && error("Filtering not supported for `StackStore`. Use an unordered store instead, or submit a feature request for implementing filtering to our issue tracker.")
isempty(sto.items) && return true
item = pop!(sto.items)
sto.load -= one(UInt)
schedule(get_ev; value=item)
Expand All @@ -96,6 +97,7 @@ end

function do_get(sto::QueueStore{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number}
key.filter !== get_any_item && error("Filtering not supported for `QueueStore`. Use an unordered store instead, or submit a feature request for implementing filtering to our issue tracker.")
isempty(sto.items) && return true
item = dequeue!(sto.items)
sto.load -= one(UInt)
schedule(get_ev; value=item)
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ println("Starting tests with $(Threads.nthreads()) threads out of `Sys.CPU_THREA
@doset "resources_containers_deprecated"
@doset "resources_stores"
@doset "resources_stores_deprecated"
@doset "resources_fancy_stores"
@doset "resource_priorities"
@doset "utils_time"
VERSION >= v"1.9" && @doset "doctests"
Expand Down
29 changes: 29 additions & 0 deletions test/test_resources_fancy_stores.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using ConcurrentSim
using ResumableFunctions

@resumable function producer(env, queue)
for item in [:a,:b,:a,:c]
@info "putting $item at time $(now(env))"
put!(queue, item)
@yield timeout(env, 2)
end
end
@resumable function consumer(env, queue)
@yield timeout(env, 5)
while true
t = @yield take!(queue)
@info "taking $(t) at time $(now(env))"
end
end

function runsim(storeconstructor)
sim = Simulation()
queue = storeconstructor(sim)
@process producer(sim, queue)
@process consumer(sim, queue)
run(sim, 30)
end

runsim(sim->DelayQueue{Symbol}(sim, 10))
runsim(sim->QueueStore{Symbol}(sim))
runsim(sim->Store{Symbol}(sim))

2 comments on commit bd473f4

@Krastanov
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/89243

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.4.0 -m "<description of version>" bd473f4709c60b288dd4910829d003a05c97eb62
git push origin v1.4.0

Please sign in to comment.