Skip to content

Commit

Permalink
Add support for halting execution plans
Browse files Browse the repository at this point in the history
Up until now, Dynflow only had support for clean cancellation. Upon cancellation
a cancel event was sent to all cancellable suspended and running steps and it
was the action's responsibility to cancel itself. However, there were actions
which just weren't cancellable at all.

This commit adds support for halting execution plans. The actions are completely
unaware of this, on halt dynflow just destroys its internal state about a given
execution plan and turns it to stopped state. It does not remove running steps
from workers as we currently don't have any real means of doing so. Any pending
events will be rejected. In other words, halting an execution plan ensure it
will not run any further.
  • Loading branch information
adamruzicka committed Jul 28, 2022
1 parent 5ed8bcc commit 475a027
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 5 deletions.
71 changes: 71 additions & 0 deletions examples/halt.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative 'example_helper'

example_description = <<DESC
Halting example
===================
This example shows, how halting works in Dynflow. It spawns a single action,
which in turn spawns a few evented actions and a single action which occupies
the executor for a long time.
Once the halt event is sent, the execution plan is halted, suspended steps
stay suspended forever, running steps stay running until they actually finish
the current run and the execution state is flipped over to stopped state.
You can see the details at #{ExampleHelper::DYNFLOW_URL}
DESC

class EventedCounter < Dynflow::Action
def run(event = nil)
output[:counter] ||= 0
output[:counter] += 1
action_logger.info "Iteration #{output[:counter]}"

if output[:counter] < input[:count]
plan_event(:tick, 5)
suspend
end
action_logger.info "Done"
end
end

class Sleeper < Dynflow::Action
def run
sleep input[:time]
end
end

class Wrapper < Dynflow::Action
def plan
sequence do
concurrence do
5.times { |i| plan_action(EventedCounter, :count => i + 1) }
plan_action Sleeper, :time => 20
end
plan_self
end
end

def run
# Noop
end
end

if $0 == __FILE__
puts example_description

ExampleHelper.world.action_logger.level = Logger::DEBUG
ExampleHelper.world
t = ExampleHelper.world.trigger(Wrapper)
Thread.new do
sleep 8
ExampleHelper.world.halt(t.id)
end

ExampleHelper.run_web_console
end
9 changes: 9 additions & 0 deletions lib/dynflow/director.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ def terminate
end
end

def halt(event)
manager = @execution_plan_managers[event.execution_plan_id]
return unless manager

@logger.warn "Halting execution plan #{event.execution_plan_id}"
manager.halt
finish_manager manager
end

private

def unless_done(manager, work_items)
Expand Down
9 changes: 8 additions & 1 deletion lib/dynflow/director/execution_plan_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def initialize(world, execution_plan, future)
@execution_plan = Type! execution_plan, ExecutionPlan
@future = Type! future, Concurrent::Promises::ResolvableFuture
@running_steps_manager = RunningStepsManager.new(world)
@halted = false

unless [:planned, :paused].include? execution_plan.state
raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}"
Expand All @@ -24,6 +25,11 @@ def start
start_run or start_finalize or finish
end

def halt
@halted = true
@running_steps_manager.terminate
end

def restart
@run_manager = nil
@finalize_manager = nil
Expand Down Expand Up @@ -71,7 +77,7 @@ def event(event)
end

def done?
(!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
@halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
end

def terminate
Expand All @@ -87,6 +93,7 @@ def update_steps(steps)
def compute_next_from_step(step)
raise "run manager not set" unless @run_manager
raise "run manager already done" if @run_manager.done?
return [] if @halted

next_steps = @run_manager.what_is_next(step)
if @run_manager.done?
Expand Down
9 changes: 9 additions & 0 deletions lib/dynflow/director/running_steps_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def initialize(world)
# to handle potential updates of the step object (that is part of the event)
@events = QueueHash.new(Integer, Director::Event)
@events_by_request_id = {}
@halted = false
end

def terminate
Expand All @@ -26,6 +27,10 @@ def terminate
end
end

def halt
@halted = true
end

def add(step, work)
Type! step, ExecutionPlan::Steps::RunStep
@running_steps[step.id] = step
Expand Down Expand Up @@ -83,6 +88,10 @@ def event(event)
event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events')
return []
end
if @halted
event.result.reject UnprocessableEvent.new('execution plan is halted, it cannot receive events')
return []
end

can_run_event = @work_items.empty?(step.id)
@events_by_request_id[event.request_id] = event
Expand Down
6 changes: 5 additions & 1 deletion lib/dynflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ module Dispatcher
execution_plan_id: type { variants String, NilClass }
end

variants Event, Execution, Ping, Status, Planning
Halt = type do
fields! execution_plan_id: String, optional: Algebrick::Types::Boolean
end

variants Event, Execution, Ping, Status, Planning, Halt
end

Response = Algebrick.type do
Expand Down
4 changes: 2 additions & 2 deletions lib/dynflow/dispatcher/client_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def dispatch_request(request, client_world_id, request_id)
(on ~Execution | ~Planning do |execution|
AnyExecutor
end),
(on ~Event do |event|
(on ~Event | ~Halt do |event|
ignore_unknown = event.optional
find_executor(event.execution_plan_id)
end),
Expand Down Expand Up @@ -236,7 +236,7 @@ def resolve_tracked_request(id, error = nil)
(on Execution.(execution_plan_id: ~any) do |uuid|
@world.persistence.load_execution_plan(uuid)
end),
(on Event | Ping do
(on Event | Ping | Halt do
true
end)
@tracked_requests.delete(id).success! resolve_to
Expand Down
8 changes: 7 additions & 1 deletion lib/dynflow/dispatcher/executor_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ def handle_request(envelope)
on(Planning) { perform_planning(envelope, envelope.message)},
on(Execution) { perform_execution(envelope, envelope.message) },
on(Event) { perform_event(envelope, envelope.message) },
on(Status) { get_execution_status(envelope, envelope.message) })
on(Status) { get_execution_status(envelope, envelope.message) },
on(Halt) { halt_execution_plan(envelope, envelope.message) })
end

protected
Expand Down Expand Up @@ -51,6 +52,11 @@ def when_done(plan, envelope, execution, execution_lock)
end
end

def halt_execution_plan(envelope, execution_plan_id)
@world.executor.halt execution_plan_id
respond(envelope, Done)
end

def perform_event(envelope, event_request)
future = on_finish do |f|
f.then do
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/executors/abstract/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def handle_persistence_error(error, work = nil)
end
end

def halt(execution_plan_id)
@director.halt execution_plan_id
end

def start_termination(*args)
logger.info 'shutting down Core ...'
super
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/executors/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def execution_status(execution_plan_id = nil)
@core.ask!([:execution_status, execution_plan_id])
end

def halt(execution_plan_id)
@core.tell([:halt, execution_plan_id])
end

def initialized
@core_initialized
end
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/world.rb
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent
publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end

def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future)
publish_request(Dispatcher::Halt[execution_plan_id], accepted, false)
end

def publish_request(request, done, wait_for_accepted, timeout = nil)
accepted = Concurrent::Promises.resolvable_future
accepted.rescue do |reason|
Expand Down

0 comments on commit 475a027

Please sign in to comment.