From 475a0277e79ed6a96780f847d2c5060ef1b3f5bd Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 28 Jan 2022 14:46:06 +0100 Subject: [PATCH] Add support for halting execution plans Up until now, Dynflow only had support for clean cancellation. Upon cancellation a cancel event was sent to all cancellable suspended and running steps and it was the action's responsibility to cancel itself. However, there were actions which just weren't cancellable at all. This commit adds support for halting execution plans. The actions are completely unaware of this, on halt dynflow just destroys its internal state about a given execution plan and turns it to stopped state. It does not remove running steps from workers as we currently don't have any real means of doing so. Any pending events will be rejected. In other words, halting an execution plan ensure it will not run any further. --- examples/halt.rb | 71 +++++++++++++++++++ lib/dynflow/director.rb | 9 +++ .../director/execution_plan_manager.rb | 9 ++- lib/dynflow/director/running_steps_manager.rb | 9 +++ lib/dynflow/dispatcher.rb | 6 +- lib/dynflow/dispatcher/client_dispatcher.rb | 4 +- lib/dynflow/dispatcher/executor_dispatcher.rb | 8 ++- lib/dynflow/executors/abstract/core.rb | 4 ++ lib/dynflow/executors/parallel.rb | 4 ++ lib/dynflow/world.rb | 4 ++ 10 files changed, 123 insertions(+), 5 deletions(-) create mode 100755 examples/halt.rb diff --git a/examples/halt.rb b/examples/halt.rb new file mode 100755 index 000000000..a9b95e766 --- /dev/null +++ b/examples/halt.rb @@ -0,0 +1,71 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative 'example_helper' + +example_description = < i + 1) } + plan_action Sleeper, :time => 20 + end + plan_self + end + end + + def run + # Noop + end +end + +if $0 == __FILE__ + puts example_description + + ExampleHelper.world.action_logger.level = Logger::DEBUG + ExampleHelper.world + t = ExampleHelper.world.trigger(Wrapper) + Thread.new do + sleep 8 + ExampleHelper.world.halt(t.id) + end + + ExampleHelper.run_web_console +end diff --git a/lib/dynflow/director.rb b/lib/dynflow/director.rb index 3fc735985..95c9a5e26 100644 --- a/lib/dynflow/director.rb +++ b/lib/dynflow/director.rb @@ -243,6 +243,15 @@ def terminate end end + def halt(event) + manager = @execution_plan_managers[event.execution_plan_id] + return unless manager + + @logger.warn "Halting execution plan #{event.execution_plan_id}" + manager.halt + finish_manager manager + end + private def unless_done(manager, work_items) diff --git a/lib/dynflow/director/execution_plan_manager.rb b/lib/dynflow/director/execution_plan_manager.rb index 07a88c13e..db3fad28c 100644 --- a/lib/dynflow/director/execution_plan_manager.rb +++ b/lib/dynflow/director/execution_plan_manager.rb @@ -12,6 +12,7 @@ def initialize(world, execution_plan, future) @execution_plan = Type! execution_plan, ExecutionPlan @future = Type! future, Concurrent::Promises::ResolvableFuture @running_steps_manager = RunningStepsManager.new(world) + @halted = false unless [:planned, :paused].include? execution_plan.state raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}" @@ -24,6 +25,11 @@ def start start_run or start_finalize or finish end + def halt + @halted = true + @running_steps_manager.terminate + end + def restart @run_manager = nil @finalize_manager = nil @@ -71,7 +77,7 @@ def event(event) end def done? - (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) + @halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) end def terminate @@ -87,6 +93,7 @@ def update_steps(steps) def compute_next_from_step(step) raise "run manager not set" unless @run_manager raise "run manager already done" if @run_manager.done? + return [] if @halted next_steps = @run_manager.what_is_next(step) if @run_manager.done? diff --git a/lib/dynflow/director/running_steps_manager.rb b/lib/dynflow/director/running_steps_manager.rb index 802db3213..ae58d01e2 100644 --- a/lib/dynflow/director/running_steps_manager.rb +++ b/lib/dynflow/director/running_steps_manager.rb @@ -15,6 +15,7 @@ def initialize(world) # to handle potential updates of the step object (that is part of the event) @events = QueueHash.new(Integer, Director::Event) @events_by_request_id = {} + @halted = false end def terminate @@ -26,6 +27,10 @@ def terminate end end + def halt + @halted = true + end + def add(step, work) Type! step, ExecutionPlan::Steps::RunStep @running_steps[step.id] = step @@ -83,6 +88,10 @@ def event(event) event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events') return [] end + if @halted + event.result.reject UnprocessableEvent.new('execution plan is halted, it cannot receive events') + return [] + end can_run_event = @work_items.empty?(step.id) @events_by_request_id[event.request_id] = event diff --git a/lib/dynflow/dispatcher.rb b/lib/dynflow/dispatcher.rb index dfea6e504..555748b82 100644 --- a/lib/dynflow/dispatcher.rb +++ b/lib/dynflow/dispatcher.rb @@ -28,7 +28,11 @@ module Dispatcher execution_plan_id: type { variants String, NilClass } end - variants Event, Execution, Ping, Status, Planning + Halt = type do + fields! execution_plan_id: String, optional: Algebrick::Types::Boolean + end + + variants Event, Execution, Ping, Status, Planning, Halt end Response = Algebrick.type do diff --git a/lib/dynflow/dispatcher/client_dispatcher.rb b/lib/dynflow/dispatcher/client_dispatcher.rb index fc436660f..9c7e96169 100644 --- a/lib/dynflow/dispatcher/client_dispatcher.rb +++ b/lib/dynflow/dispatcher/client_dispatcher.rb @@ -137,7 +137,7 @@ def dispatch_request(request, client_world_id, request_id) (on ~Execution | ~Planning do |execution| AnyExecutor end), - (on ~Event do |event| + (on ~Event | ~Halt do |event| ignore_unknown = event.optional find_executor(event.execution_plan_id) end), @@ -236,7 +236,7 @@ def resolve_tracked_request(id, error = nil) (on Execution.(execution_plan_id: ~any) do |uuid| @world.persistence.load_execution_plan(uuid) end), - (on Event | Ping do + (on Event | Ping | Halt do true end) @tracked_requests.delete(id).success! resolve_to diff --git a/lib/dynflow/dispatcher/executor_dispatcher.rb b/lib/dynflow/dispatcher/executor_dispatcher.rb index d8ae0d12c..7d4d6a3a4 100644 --- a/lib/dynflow/dispatcher/executor_dispatcher.rb +++ b/lib/dynflow/dispatcher/executor_dispatcher.rb @@ -12,7 +12,8 @@ def handle_request(envelope) on(Planning) { perform_planning(envelope, envelope.message)}, on(Execution) { perform_execution(envelope, envelope.message) }, on(Event) { perform_event(envelope, envelope.message) }, - on(Status) { get_execution_status(envelope, envelope.message) }) + on(Status) { get_execution_status(envelope, envelope.message) }, + on(Halt) { halt_execution_plan(envelope, envelope.message) }) end protected @@ -51,6 +52,11 @@ def when_done(plan, envelope, execution, execution_lock) end end + def halt_execution_plan(envelope, execution_plan_id) + @world.executor.halt execution_plan_id + respond(envelope, Done) + end + def perform_event(envelope, event_request) future = on_finish do |f| f.then do diff --git a/lib/dynflow/executors/abstract/core.rb b/lib/dynflow/executors/abstract/core.rb index cbac0568d..7f2ad0379 100644 --- a/lib/dynflow/executors/abstract/core.rb +++ b/lib/dynflow/executors/abstract/core.rb @@ -65,6 +65,10 @@ def handle_persistence_error(error, work = nil) end end + def halt(execution_plan_id) + @director.halt execution_plan_id + end + def start_termination(*args) logger.info 'shutting down Core ...' super diff --git a/lib/dynflow/executors/parallel.rb b/lib/dynflow/executors/parallel.rb index b7a056094..d6cc8075b 100644 --- a/lib/dynflow/executors/parallel.rb +++ b/lib/dynflow/executors/parallel.rb @@ -56,6 +56,10 @@ def execution_status(execution_plan_id = nil) @core.ask!([:execution_status, execution_plan_id]) end + def halt(execution_plan_id) + @core.tell([:halt, execution_plan_id]) + end + def initialized @core_initialized end diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index d18458999..412ccad7a 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -251,6 +251,10 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end + def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) + publish_request(Dispatcher::Halt[execution_plan_id], accepted, false) + end + def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason|