diff --git a/examples/halt.rb b/examples/halt.rb new file mode 100755 index 00000000..a9b95e76 --- /dev/null +++ b/examples/halt.rb @@ -0,0 +1,71 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative 'example_helper' + +example_description = < i + 1) } + plan_action Sleeper, :time => 20 + end + plan_self + end + end + + def run + # Noop + end +end + +if $0 == __FILE__ + puts example_description + + ExampleHelper.world.action_logger.level = Logger::DEBUG + ExampleHelper.world + t = ExampleHelper.world.trigger(Wrapper) + Thread.new do + sleep 8 + ExampleHelper.world.halt(t.id) + end + + ExampleHelper.run_web_console +end diff --git a/lib/dynflow/director.rb b/lib/dynflow/director.rb index 3fc73598..95c9a5e2 100644 --- a/lib/dynflow/director.rb +++ b/lib/dynflow/director.rb @@ -243,6 +243,15 @@ def terminate end end + def halt(event) + manager = @execution_plan_managers[event.execution_plan_id] + return unless manager + + @logger.warn "Halting execution plan #{event.execution_plan_id}" + manager.halt + finish_manager manager + end + private def unless_done(manager, work_items) diff --git a/lib/dynflow/director/execution_plan_manager.rb b/lib/dynflow/director/execution_plan_manager.rb index 07a88c13..db3fad28 100644 --- a/lib/dynflow/director/execution_plan_manager.rb +++ b/lib/dynflow/director/execution_plan_manager.rb @@ -12,6 +12,7 @@ def initialize(world, execution_plan, future) @execution_plan = Type! execution_plan, ExecutionPlan @future = Type! future, Concurrent::Promises::ResolvableFuture @running_steps_manager = RunningStepsManager.new(world) + @halted = false unless [:planned, :paused].include? execution_plan.state raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}" @@ -24,6 +25,11 @@ def start start_run or start_finalize or finish end + def halt + @halted = true + @running_steps_manager.terminate + end + def restart @run_manager = nil @finalize_manager = nil @@ -71,7 +77,7 @@ def event(event) end def done? - (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) + @halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) end def terminate @@ -87,6 +93,7 @@ def update_steps(steps) def compute_next_from_step(step) raise "run manager not set" unless @run_manager raise "run manager already done" if @run_manager.done? + return [] if @halted next_steps = @run_manager.what_is_next(step) if @run_manager.done? diff --git a/lib/dynflow/director/running_steps_manager.rb b/lib/dynflow/director/running_steps_manager.rb index 802db321..ae58d01e 100644 --- a/lib/dynflow/director/running_steps_manager.rb +++ b/lib/dynflow/director/running_steps_manager.rb @@ -15,6 +15,7 @@ def initialize(world) # to handle potential updates of the step object (that is part of the event) @events = QueueHash.new(Integer, Director::Event) @events_by_request_id = {} + @halted = false end def terminate @@ -26,6 +27,10 @@ def terminate end end + def halt + @halted = true + end + def add(step, work) Type! step, ExecutionPlan::Steps::RunStep @running_steps[step.id] = step @@ -83,6 +88,10 @@ def event(event) event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events') return [] end + if @halted + event.result.reject UnprocessableEvent.new('execution plan is halted, it cannot receive events') + return [] + end can_run_event = @work_items.empty?(step.id) @events_by_request_id[event.request_id] = event diff --git a/lib/dynflow/dispatcher.rb b/lib/dynflow/dispatcher.rb index dfea6e50..555748b8 100644 --- a/lib/dynflow/dispatcher.rb +++ b/lib/dynflow/dispatcher.rb @@ -28,7 +28,11 @@ module Dispatcher execution_plan_id: type { variants String, NilClass } end - variants Event, Execution, Ping, Status, Planning + Halt = type do + fields! execution_plan_id: String, optional: Algebrick::Types::Boolean + end + + variants Event, Execution, Ping, Status, Planning, Halt end Response = Algebrick.type do diff --git a/lib/dynflow/dispatcher/client_dispatcher.rb b/lib/dynflow/dispatcher/client_dispatcher.rb index fc436660..9c7e9616 100644 --- a/lib/dynflow/dispatcher/client_dispatcher.rb +++ b/lib/dynflow/dispatcher/client_dispatcher.rb @@ -137,7 +137,7 @@ def dispatch_request(request, client_world_id, request_id) (on ~Execution | ~Planning do |execution| AnyExecutor end), - (on ~Event do |event| + (on ~Event | ~Halt do |event| ignore_unknown = event.optional find_executor(event.execution_plan_id) end), @@ -236,7 +236,7 @@ def resolve_tracked_request(id, error = nil) (on Execution.(execution_plan_id: ~any) do |uuid| @world.persistence.load_execution_plan(uuid) end), - (on Event | Ping do + (on Event | Ping | Halt do true end) @tracked_requests.delete(id).success! resolve_to diff --git a/lib/dynflow/dispatcher/executor_dispatcher.rb b/lib/dynflow/dispatcher/executor_dispatcher.rb index d8ae0d12..7d4d6a3a 100644 --- a/lib/dynflow/dispatcher/executor_dispatcher.rb +++ b/lib/dynflow/dispatcher/executor_dispatcher.rb @@ -12,7 +12,8 @@ def handle_request(envelope) on(Planning) { perform_planning(envelope, envelope.message)}, on(Execution) { perform_execution(envelope, envelope.message) }, on(Event) { perform_event(envelope, envelope.message) }, - on(Status) { get_execution_status(envelope, envelope.message) }) + on(Status) { get_execution_status(envelope, envelope.message) }, + on(Halt) { halt_execution_plan(envelope, envelope.message) }) end protected @@ -51,6 +52,11 @@ def when_done(plan, envelope, execution, execution_lock) end end + def halt_execution_plan(envelope, execution_plan_id) + @world.executor.halt execution_plan_id + respond(envelope, Done) + end + def perform_event(envelope, event_request) future = on_finish do |f| f.then do diff --git a/lib/dynflow/executors/abstract/core.rb b/lib/dynflow/executors/abstract/core.rb index cbac0568..7f2ad037 100644 --- a/lib/dynflow/executors/abstract/core.rb +++ b/lib/dynflow/executors/abstract/core.rb @@ -65,6 +65,10 @@ def handle_persistence_error(error, work = nil) end end + def halt(execution_plan_id) + @director.halt execution_plan_id + end + def start_termination(*args) logger.info 'shutting down Core ...' super diff --git a/lib/dynflow/executors/parallel.rb b/lib/dynflow/executors/parallel.rb index b7a05609..d6cc8075 100644 --- a/lib/dynflow/executors/parallel.rb +++ b/lib/dynflow/executors/parallel.rb @@ -56,6 +56,10 @@ def execution_status(execution_plan_id = nil) @core.ask!([:execution_status, execution_plan_id]) end + def halt(execution_plan_id) + @core.tell([:halt, execution_plan_id]) + end + def initialized @core_initialized end diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index d1845899..412ccad7 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -251,6 +251,10 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end + def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) + publish_request(Dispatcher::Halt[execution_plan_id], accepted, false) + end + def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason|