From d35d63e748c350fd4ee3056074de1cb0101e280a Mon Sep 17 00:00:00 2001 From: superstar54 Date: Mon, 2 Sep 2024 16:53:17 +0200 Subject: [PATCH] Support multiple scheduler. 1) can run multiple runner (daemon) for the scheduler, each runner will listen to the `scheduler_queue`, and the prefetch_count is set to 1, thus each runner can only launch one Scheduler process. 2) The scheduler process listen to the `workgraph_queue` to launch WorkGraph 3) the scheduler recieve rpc call to launch WorkGrpah 4) user can submit workgraph to the workgraph queue, or select the shceduler to run it by pk --- aiida_workgraph/cli/cmd_scheduler.py | 32 +-- aiida_workgraph/engine/launch.py | 84 +------- aiida_workgraph/engine/override.py | 71 +++++++ aiida_workgraph/engine/scheduler/client.py | 106 +++++++++- aiida_workgraph/engine/scheduler/scheduler.py | 11 +- aiida_workgraph/utils/control.py | 22 +- aiida_workgraph/workgraph.py | 30 +-- docs/source/howto/scheduler.ipynb | 200 +++++++++++------- tests/test_scheduler.py | 3 +- 9 files changed, 343 insertions(+), 216 deletions(-) create mode 100644 aiida_workgraph/engine/override.py diff --git a/aiida_workgraph/cli/cmd_scheduler.py b/aiida_workgraph/cli/cmd_scheduler.py index d8c52d09..52cb54d3 100644 --- a/aiida_workgraph/cli/cmd_scheduler.py +++ b/aiida_workgraph/cli/cmd_scheduler.py @@ -1,27 +1,11 @@ from aiida_workgraph.cli.cmd_workgraph import workgraph import click -from pathlib import Path from aiida.cmdline.utils import decorators, echo +from aiida.cmdline.commands.cmd_daemon import validate_daemon_workers from aiida.cmdline.params import options from aiida_workgraph.engine.scheduler.client import get_scheduler_client import sys -REACT_PORT = "3000" - - -def get_package_root(): - """Returns the root directory of the package.""" - current_file = Path(__file__) - # Root directory of your package - return current_file.parent - - -def get_pid_file_path(): - """Get the path to the PID file in the desired directory.""" - from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER - - return AIIDA_CONFIG_FOLDER / "scheduler_processes.pid" - @workgraph.group("scheduler") def scheduler(): @@ -31,7 +15,7 @@ def scheduler(): @scheduler.command() def worker(): """Start the scheduler application.""" - from aiida_workgraph.engine.launch import start_scheduler_worker + from aiida_workgraph.engine.scheduler.client import start_scheduler_worker click.echo("Starting the scheduler worker...") @@ -40,17 +24,20 @@ def worker(): @scheduler.command() @click.option("--foreground", is_flag=True, help="Run in foreground.") +@click.argument("number", required=False, type=int, callback=validate_daemon_workers) @options.TIMEOUT(default=None, required=False, type=int) @decorators.with_dbenv() @decorators.requires_broker @decorators.check_circus_zmq_version -def start(foreground, timeout): +def start(foreground, number, timeout): """Start the scheduler application.""" + from aiida_workgraph.engine.scheduler.client import start_scheduler_process click.echo("Starting the scheduler process...") client = get_scheduler_client() - client.start_daemon(foreground=foreground) + client.start_daemon(number_workers=number, foreground=foreground, timeout=timeout) + start_scheduler_process(number) @scheduler.command() @@ -86,10 +73,11 @@ def stop(ctx, no_wait, all_profiles, timeout): @scheduler.command(hidden=True) @click.option("--foreground", is_flag=True, help="Run in foreground.") +@click.argument("number", required=False, type=int, callback=validate_daemon_workers) @decorators.with_dbenv() @decorators.requires_broker @decorators.check_circus_zmq_version -def start_circus(foreground): +def start_circus(foreground, number): """This will actually launch the circus daemon, either daemonized in the background or in the foreground. If run in the foreground all logs are redirected to stdout. @@ -97,7 +85,7 @@ def start_circus(foreground): .. note:: this should not be called directly from the commandline! """ - get_scheduler_client()._start_daemon(foreground=foreground) + get_scheduler_client()._start_daemon(number_workers=number, foreground=foreground) @scheduler.command() diff --git a/aiida_workgraph/engine/launch.py b/aiida_workgraph/engine/launch.py index 96324c75..25b8c1e8 100644 --- a/aiida_workgraph/engine/launch.py +++ b/aiida_workgraph/engine/launch.py @@ -27,6 +27,13 @@ LOGGER = AIIDA_LOGGER.getChild("engine.launch") +""" +Note: I modified the run_get_node and submit functions to include the parent_pid argument. +This is necessary for keeping track of the provenance of the processes. + +""" + + def run_get_node( process_class, *args, **kwargs ) -> tuple[dict[str, t.Any] | None, "ProcessNode"]: @@ -170,80 +177,3 @@ def submit( time.sleep(wait_interval) return node - - -def start_scheduler_worker(foreground: bool = False) -> None: - """Start a scheduler worker for the currently configured profile. - - :param foreground: If true, the logging will be configured to write to stdout, otherwise it will be configured to - write to the scheduler log file. - """ - import asyncio - import signal - import sys - - from aiida.common.log import configure_logging - from aiida.manage import get_config_option, get_manager - from aiida_workgraph.engine.scheduler import WorkGraphScheduler - from aiida_workgraph.engine.scheduler.client import ( - get_scheduler_client, - get_scheduler, - ) - from aiida.engine.processes.launcher import ProcessLauncher - from aiida.engine import persistence - from plumpy.persistence import LoadSaveContext - from aiida.engine.daemon.worker import shutdown_worker - - daemon_client = get_scheduler_client() - configure_logging( - daemon=not foreground, daemon_log_file=daemon_client.daemon_log_file - ) - - LOGGER.debug(f"sys.executable: {sys.executable}") - LOGGER.debug(f"sys.path: {sys.path}") - - try: - manager = get_manager() - # runner = manager.create_daemon_runner() - runner = manager.create_runner(broker_submit=True) - manager.set_runner(runner) - except Exception: - LOGGER.exception("daemon worker failed to start") - raise - - if isinstance(rlimit := get_config_option("daemon.recursion_limit"), int): - LOGGER.info("Setting maximum recursion limit of daemon worker to %s", rlimit) - sys.setrecursionlimit(rlimit) - - signals = (signal.SIGTERM, signal.SIGINT) - for s in signals: - # https://github.com/python/mypy/issues/12557 - runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc] - - try: - running_scheduler = get_scheduler() - runner_loop = runner.loop - task_receiver = ProcessLauncher( - loop=runner_loop, - persister=manager.get_persister(), - load_context=LoadSaveContext(runner=runner), - loader=persistence.get_object_loader(), - ) - asyncio.run( - task_receiver._continue( - communicator=None, pid=running_scheduler, nowait=True - ) - ) - except ValueError: - print("Starting a new Scheduler") - process_inited = instantiate_process(runner, WorkGraphScheduler) - runner.loop.create_task(process_inited.step_until_terminated()) - - try: - LOGGER.info("Starting a daemon worker") - runner.start() - except SystemError as exception: - LOGGER.info("Received a SystemError: %s", exception) - runner.close() - - LOGGER.info("Daemon worker started") diff --git a/aiida_workgraph/engine/override.py b/aiida_workgraph/engine/override.py new file mode 100644 index 00000000..020be102 --- /dev/null +++ b/aiida_workgraph/engine/override.py @@ -0,0 +1,71 @@ +from plumpy.process_comms import RemoteProcessThreadController +from typing import Any, Optional + +""" +Note: I modified the the create_daemon_runner function and RemoteProcessThreadController +to include the queue_name argument. + +""" + + +def create_daemon_runner( + manager, queue_name: str = None, loop: Optional["asyncio.AbstractEventLoop"] = None +) -> "Runner": + """Create and return a new daemon runner. + This is used by workers when the daemon is running and in testing. + :param loop: the (optional) asyncio event loop to use + :return: a runner configured to work in the daemon configuration + """ + from plumpy.persistence import LoadSaveContext + from aiida.engine import persistence + from aiida.engine.processes.launcher import ProcessLauncher + from plumpy.communications import convert_to_comm + + runner = manager.create_runner(broker_submit=True, loop=loop) + runner_loop = runner.loop + # Listen for incoming launch requests + task_receiver = ProcessLauncher( + loop=runner_loop, + persister=manager.get_persister(), + load_context=LoadSaveContext(runner=runner), + loader=persistence.get_object_loader(), + ) + + def callback(_comm, msg): + print("Received message: {}".format(msg)) + import asyncio + + asyncio.run(task_receiver(_comm, msg)) + print("task_receiver._continue done") + return True + + assert runner.communicator is not None, "communicator not set for runner" + if queue_name is not None: + print("queue_name: {}".format(queue_name)) + queue = runner.communicator._communicator.task_queue( + queue_name, prefetch_count=1 + ) + # queue.add_task_subscriber(callback) + # important to convert the callback + converted = convert_to_comm(task_receiver, runner.communicator._loop) + queue.add_task_subscriber(converted) + else: + runner.communicator.add_task_subscriber(task_receiver) + return runner + + +class ControllerWithQueueName(RemoteProcessThreadController): + def __init__(self, queue_name: str, **kwargs): + super().__init__(**kwargs) + self.queue_name = queue_name + + def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]: + """ + Send a task to be performed using the communicator + + :param message: the task message + :param no_reply: if True, this call will be fire-and-forget, i.e. no return value + :return: the response from the remote side (if no_reply=False) + """ + queue = self._communicator.task_queue(self.queue_name) + return queue.task_send(message, no_reply=no_reply) diff --git a/aiida_workgraph/engine/scheduler/client.py b/aiida_workgraph/engine/scheduler/client.py index 1774eb64..1beaeac8 100644 --- a/aiida_workgraph/engine/scheduler/client.py +++ b/aiida_workgraph/engine/scheduler/client.py @@ -4,8 +4,11 @@ from aiida.common.exceptions import ConfigurationError import os from typing import Optional +from aiida.common.log import AIIDA_LOGGER +from typing import List WORKGRAPH_BIN = shutil.which("workgraph") +LOGGER = AIIDA_LOGGER.getChild("engine.launch") class SchedulerClient(DaemonClient): @@ -102,6 +105,7 @@ def cmd_start_daemon( self.profile.name, "scheduler", "start-circus", + str(number_workers), ] if foreground: @@ -114,7 +118,7 @@ def cmd_start_daemon_worker(self) -> list[str]: """Return the command to start a daemon worker process.""" return [self._workgraph_bin, "-p", self.profile.name, "scheduler", "worker"] - def _start_daemon(self, foreground: bool = False) -> None: + def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> None: """Start the daemon. .. warning:: This will daemonize the current process and put it in the background. It is most likely not what @@ -149,7 +153,7 @@ def _start_daemon(self, foreground: bool = False) -> None: { "cmd": " ".join(self.cmd_start_daemon_worker), "name": self.daemon_name, - "numprocesses": 1, + "numprocesses": number_workers, "virtualenv": self.virtualenv, "copy_env": True, "stdout_stream": { @@ -210,7 +214,7 @@ def get_scheduler_client(profile_name: Optional[str] = None) -> "SchedulerClient return SchedulerClient(profile) -def get_scheduler(): +def get_scheduler() -> List[int]: from aiida.orm import QueryBuilder from aiida_workgraph.engine.scheduler import WorkGraphScheduler @@ -224,7 +228,95 @@ def get_scheduler(): } qb.append(WorkGraphScheduler, filters=filters, project=projections, tag="process") results = qb.all() - if len(results) == 0: - raise ValueError("No scheduler found. Please start the scheduler first.") - scheduler_id = results[0][0] - return scheduler_id + pks = [r[0] for r in results] + return pks + + +def start_scheduler_worker(foreground: bool = False) -> None: + """Start a scheduler worker for the currently configured profile. + + :param foreground: If true, the logging will be configured to write to stdout, otherwise it will be configured to + write to the scheduler log file. + """ + import asyncio + import signal + import sys + from aiida_workgraph.engine.scheduler.client import get_scheduler_client + from aiida_workgraph.engine.override import create_daemon_runner + + from aiida.common.log import configure_logging + from aiida.manage import get_config_option + from aiida.engine.daemon.worker import shutdown_worker + + daemon_client = get_scheduler_client() + configure_logging( + daemon=not foreground, daemon_log_file=daemon_client.daemon_log_file + ) + + LOGGER.debug(f"sys.executable: {sys.executable}") + LOGGER.debug(f"sys.path: {sys.path}") + + try: + manager = get_manager() + runner = create_daemon_runner(manager, queue_name="scheduler_queue") + except Exception: + LOGGER.exception("daemon worker failed to start") + raise + + if isinstance(rlimit := get_config_option("daemon.recursion_limit"), int): + LOGGER.info("Setting maximum recursion limit of daemon worker to %s", rlimit) + sys.setrecursionlimit(rlimit) + + signals = (signal.SIGTERM, signal.SIGINT) + for s in signals: + # https://github.com/python/mypy/issues/12557 + runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc] + + try: + LOGGER.info("Starting a daemon worker") + runner.start() + except SystemError as exception: + LOGGER.info("Received a SystemError: %s", exception) + runner.close() + + LOGGER.info("Daemon worker started") + + +def start_scheduler_process(number: int = 1) -> None: + """Start or restart the specified number of scheduler processes.""" + from aiida_workgraph.engine.scheduler import WorkGraphScheduler + from aiida_workgraph.engine.scheduler.client import get_scheduler + from aiida_workgraph.utils.control import create_scheduler_action + from aiida_workgraph.engine.utils import instantiate_process + + try: + schedulers: List[int] = get_scheduler() + existing_schedulers_count = len(schedulers) + print( + "Found {} existing scheduler(s): {}".format( + existing_schedulers_count, " ".join([str(pk) for pk in schedulers]) + ) + ) + + count = 0 + + # Restart existing schedulers if they exceed the number to start + for pk in schedulers[:number]: + create_scheduler_action(pk) + print(f"Scheduler with pk {pk} running.") + count += 1 + # not running + for pk in schedulers[number:]: + print(f"Scheduler with pk {pk} not running.") + + # Start new schedulers if more are needed + runner = get_manager().get_runner() + for i in range(count, number): + process_inited = instantiate_process(runner, WorkGraphScheduler) + process_inited.runner.persister.save_checkpoint(process_inited) + process_inited.close() + create_scheduler_action(process_inited.node.pk) + print(f"Scheduler with pk {process_inited.node.pk} running.") + + except Exception as e: + raise (f"An error occurred while starting schedulers: {e}") diff --git a/aiida_workgraph/engine/scheduler/scheduler.py b/aiida_workgraph/engine/scheduler/scheduler.py index 5d91c489..53f77cfa 100644 --- a/aiida_workgraph/engine/scheduler/scheduler.py +++ b/aiida_workgraph/engine/scheduler/scheduler.py @@ -778,15 +778,12 @@ def update_task_state( ) -> None: """Update task state when the task is finished.""" - print("update task state: ", pk, name) task = self.ctx._workgraph[pk]["_tasks"][name] # print(f"set task result: {name}") node = self.get_task_state_info(pk, name, "process") - print("node", node) if isinstance(node, orm.ProcessNode): # print(f"set task result: {name} process") state = node.process_state.value.upper() - print("state", state) if node.is_finished_ok: self.set_task_state_info(pk, task["name"], "state", state) if task["metadata"]["node_type"].upper() == "WORKGRAPH": @@ -1637,18 +1634,18 @@ def message_receive( def call_on_receive_workgraph_message(self, _comm, msg): """Call on receive workgraph message.""" # self.report(f"Received workgraph message: {msg}") - pk = int(msg) + pk = msg["args"]["pid"] # To avoid "DbNode is not persistent", we need to schedule the call self._schedule_rpc(self.launch_workgraph, pk=pk) return True def add_workgraph_subsriber(self) -> None: """Add workgraph subscriber.""" - queue_name = "scheduler_queue" - # self.report(f"Add workgraph subscriber on queue: {queue_name}") + queue_name = "workgraph_queue" + self.report(f"Add workgraph subscriber on queue: {queue_name}") comm = self.runner.communicator._communicator queue = comm.task_queue(queue_name, prefetch_count=1000) - queue.add_task_subscriber(self.callback) + queue.add_task_subscriber(self.call_on_receive_workgraph_message) def finalize_workgraph(self, pk: int) -> t.Optional[ExitCode]: """""" diff --git a/aiida_workgraph/utils/control.py b/aiida_workgraph/utils/control.py index 8b54f439..4fa50be6 100644 --- a/aiida_workgraph/utils/control.py +++ b/aiida_workgraph/utils/control.py @@ -1,6 +1,7 @@ from aiida.manage import get_manager from aiida import orm from aiida.engine.processes import control +from aiida_workgraph.engine.override import ControllerWithQueueName def create_task_action( @@ -22,10 +23,23 @@ def create_scheduler_action( ): """Send workgraph task to scheduler.""" - controller = get_manager().get_process_controller() - message = str(pk) - queue = controller._communicator.task_queue("scheduler_queue") - queue.task_send(message) + manager = get_manager() + controller = ControllerWithQueueName( + queue_name="scheduler_queue", communicator=manager.get_communicator() + ) + controller.continue_process(pk, nowait=False) + + +def create_workgraph_action( + pk: int, +): + """Send workgraph task to scheduler.""" + + manager = get_manager() + controller = ControllerWithQueueName( + queue_name="workgraph_queue", communicator=manager.get_communicator() + ) + controller.continue_process(pk, nowait=False) def get_task_state_info(node, name: str, key: str) -> str: diff --git a/aiida_workgraph/workgraph.py b/aiida_workgraph/workgraph.py index 647f8778..e99a34f6 100644 --- a/aiida_workgraph/workgraph.py +++ b/aiida_workgraph/workgraph.py @@ -124,15 +124,6 @@ def submit( restart (bool): Restart the process, and reset the modified tasks, then only re-run the modified tasks. new (bool): Submit a new process. """ - from aiida_workgraph.engine.scheduler.client import get_scheduler - - if to_scheduler: - try: - get_scheduler() - except ValueError as e: - print(e) - return - # set task inputs if inputs is not None: for name, input in inputs.items(): @@ -439,27 +430,14 @@ def continue_process_in_scheduler(self, to_scheduler: Union[int, bool]) -> None: """ from aiida_workgraph.utils.control import ( create_task_action, - create_scheduler_action, + create_workgraph_action, ) - from aiida_workgraph.engine.scheduler.client import get_scheduler - import kiwipy try: - if isinstance(to_scheduler, int): - scheduler_pk = get_scheduler() - create_task_action(scheduler_pk, [self.pk], action="launch_workgraph") + if isinstance(to_scheduler, int) and not isinstance(to_scheduler, bool): + create_task_action(to_scheduler, [self.pk], action="launch_workgraph") else: - create_scheduler_action(self.pk) - except ValueError: - print( - """Scheduler is not running. -Please start the scheduler first with `aiida-workgraph scheduler start`""" - ) - except kiwipy.exceptions.UnroutableError: - print( - """Scheduler exists, but the daemon is not running. -Please start the scheduler first with `aiida-workgraph scheduler start`""" - ) + create_workgraph_action(self.pk) except Exception as e: print("""An unexpected error occurred:""", e) diff --git a/docs/source/howto/scheduler.ipynb b/docs/source/howto/scheduler.ipynb index ad046289..e88f2f50 100644 --- a/docs/source/howto/scheduler.ipynb +++ b/docs/source/howto/scheduler.ipynb @@ -6,42 +6,93 @@ "source": [ "# Scheduler\n", "\n", - "Start a scheduler daemon:\n", + "## Overview\n", + "\n", + "This documentation provides a guide on using the `aiida-workgraph` Scheduler to manage `WorkGraph` processes efficiently.\n", + "\n", + "### Background\n", + "\n", + "Traditional workflow processes, particularly in nested structures like `PwBandsWorkChain`, tend to create multiple Workflow processes in a waiting state, while only a few `CalcJob` processes run actively. This results in inefficient resource usage. The `WorkChain` structure makes it challenging to eliminate these waiting processes due to its encapsulated logic.\n", + "\n", + "In contrast, the `WorkGraph` offers a more clear task dependency and allow other process to run its tasks in a controllable way. In a scheduler, one only need create the `WorkGraph` process in the database, not run it via a daemon worker.\n", + "\n", + "### Process Comparison: `PwBands` Case\n", + "\n", + "- **Old Approach**: 300 Workflow processes (Bands, Relax, Base) + 100 CalcJob processes.\n", + "- **New Approach**: 1 Scheduler process + 100 CalcJob processes.\n", + "\n", + "This new approach significantly reduces the number of active processes and mitigates the risk of deadlocks.\n", + "\n", + "## Getting Started with the Scheduler\n", + "\n", + "### Starting the Scheduler\n", + "\n", + "To launch a scheduler daemon:\n", "\n", "```console\n", "workgraph scheduler start\n", "```\n", "\n", - "Check the status of the scheduler:\n", + "### Monitoring the Scheduler\n", + "\n", + "To check the current status of the scheduler:\n", "\n", "```console\n", "workgraph scheduler status\n", "```\n", "\n", - "Stop the scheduler:\n", + "### Stopping the Scheduler\n", + "\n", + "To stop the scheduler daemon:\n", "\n", "```console\n", "workgraph scheduler stop\n", "```\n", "\n", - "## Submit workgraph to the scheduler\n", - "Set `to_scheduler` to `True` when submitting a workgraph to the scheduler:\n", + "## Submitting WorkGraphs to the Scheduler\n", + "\n", + "To submit a WorkGraph to the scheduler, set the `to_scheduler` flag to `True`:\n", "\n", "```python\n", "wg.submit(to_scheduler=True)\n", - "```" + "```\n", + "\n", + "\n", + "### Using Multiple Schedulers\n", + "\n", + "For environments with a high volume of WorkGraphs, starting multiple schedulers can enhance throughput:\n", + "\n", + "```console\n", + "workgraph scheduler start 2\n", + "```\n", + "\n", + "WorkGraphs will be automatically distributed among available schedulers.\n", + "\n", + "#### Specifying a Scheduler\n", + "\n", + "To submit a WorkGraph to a specific scheduler using its primary key (`pk`):\n", + "\n", + "```python\n", + "wg.submit(to_scheduler=pk_scheduler)\n", + "```\n", + "\n", + "### Best Practices for Scheduler Usage\n", + "\n", + "While a single scheduler suffices for most use cases, scaling up the number of schedulers may be beneficial when significantly increasing the number of task workers (created by `verdi daemon start`). A general rule is to maintain a ratio of less than 5 workers per scheduler.\n", + "\n", + "## Example" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "WorkGraph process created, PK: 134971\n", + "WorkGraph process created, PK: 142617\n", "State of WorkGraph : FINISHED\n", "Result of add2 : 4\n" ] @@ -78,7 +129,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 2, "metadata": {}, "outputs": [ { @@ -94,41 +145,41 @@ " viewBox=\"0.00 0.00 1036.43 720.06\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n", "\n", "\n", - "\n", + "\n", "\n", - "N134971\n", + "N142617\n", "\n", - "WorkGraph<test_scheduler> (134971)\n", + "WorkGraph<test_scheduler> (142617)\n", "State: finished\n", "Exit Code: 0\n", "\n", - "\n", + "\n", "\n", - "N134974\n", + "N142620\n", "\n", - "ArithmeticAddCalculation (134974)\n", + "ArithmeticAddCalculation (142620)\n", "State: finished\n", "Exit Code: 0\n", "\n", - "\n", - "\n", - "N134971->N134974\n", + "\n", + "\n", + "N142617->N142620\n", "\n", "\n", "CALL_CALC\n", "add1\n", "\n", - "\n", + "\n", "\n", - "N134979\n", + "N142625\n", "\n", - "ArithmeticAddCalculation (134979)\n", + "ArithmeticAddCalculation (142625)\n", "State: finished\n", "Exit Code: 0\n", "\n", - "\n", - "\n", - "N134971->N134979\n", + "\n", + "\n", + "N142617->N142625\n", "\n", "\n", "CALL_CALC\n", @@ -141,111 +192,111 @@ "InstalledCode (37)\n", "add@localhost\n", "\n", - "\n", + "\n", "\n", - "N37->N134971\n", + "N37->N142617\n", "\n", "\n", "INPUT_WORK\n", - "wg__tasks__add2__properties__code__value\n", + "wg__tasks__add1__properties__code__value\n", "\n", - "\n", + "\n", "\n", - "N37->N134971\n", + "N37->N142617\n", "\n", "\n", "INPUT_WORK\n", - "wg__tasks__add1__properties__code__value\n", + "wg__tasks__add2__properties__code__value\n", "\n", - "\n", + "\n", "\n", - "N134975\n", + "N142621\n", "\n", - "RemoteData (134975)\n", + "RemoteData (142621)\n", "@localhost\n", "\n", - "\n", - "\n", - "N134974->N134975\n", + "\n", + "\n", + "N142620->N142621\n", "\n", "\n", "CREATE\n", "remote_folder\n", "\n", - "\n", + "\n", "\n", - "N134976\n", + "N142622\n", "\n", - "FolderData (134976)\n", + "FolderData (142622)\n", "\n", - "\n", - "\n", - "N134974->N134976\n", + "\n", + "\n", + "N142620->N142622\n", "\n", "\n", "CREATE\n", "retrieved\n", "\n", - "\n", + "\n", "\n", - "N134977\n", + "N142623\n", "\n", - "Int (134977)\n", + "Int (142623)\n", "\n", - "\n", - "\n", - "N134974->N134977\n", + "\n", + "\n", + "N142620->N142623\n", "\n", "\n", "CREATE\n", "sum\n", "\n", - "\n", - "\n", - "N134977->N134979\n", + "\n", + "\n", + "N142623->N142625\n", "\n", "\n", "INPUT_CALC\n", "y\n", "\n", - "\n", + "\n", "\n", - "N134980\n", + "N142626\n", "\n", - "RemoteData (134980)\n", + "RemoteData (142626)\n", "@localhost\n", "\n", - "\n", - "\n", - "N134979->N134980\n", + "\n", + "\n", + "N142625->N142626\n", "\n", "\n", "CREATE\n", "remote_folder\n", "\n", - "\n", + "\n", "\n", - "N134981\n", + "N142627\n", "\n", - "FolderData (134981)\n", + "FolderData (142627)\n", "\n", - "\n", - "\n", - "N134979->N134981\n", + "\n", + "\n", + "N142625->N142627\n", "\n", "\n", "CREATE\n", "retrieved\n", "\n", - "\n", + "\n", "\n", - "N134982\n", + "N142628\n", "\n", - "Int (134982)\n", + "Int (142628)\n", "\n", - "\n", + "\n", "\n", - "N134979->N134982\n", + "N142625->N142628\n", "\n", "\n", "CREATE\n", @@ -255,10 +306,10 @@ "\n" ], "text/plain": [ - "" + "" ] }, - "execution_count": 6, + "execution_count": 2, "metadata": {}, "output_type": "execute_result" } @@ -272,7 +323,14 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Conclusion" + "\n", + "## Checkpointing\n", + "\n", + "The Scheduler checkpoints its status to the database whenever a WorkGraph is updated, ensuring that the Scheduler can recover its state in case of a crash or restart. This feature is particularly useful for long-running WorkGraphs.\n", + "\n", + "## Conclusion\n", + "\n", + "The Scheduler offers a streamlined approach to managing complex workflows, significantly reducing active process counts and improving resource efficiency." ] } ], diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index f51a1320..c6308551 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -6,7 +6,7 @@ from aiida import orm -@pytest.skip("Skip for now") +@pytest.mark.skip("Skip for now") @pytest.mark.usefixtures("started_daemon_client") def test_scheduler(decorated_add: Callable, started_scheduler_client) -> None: """Test graph build.""" @@ -14,7 +14,6 @@ def test_scheduler(decorated_add: Callable, started_scheduler_client) -> None: add1 = wg.add_task(decorated_add, x=2, y=3) add2 = wg.add_task(decorated_add, "add2", x=3, y=add1.outputs["result"]) # use run to check if graph builder workgraph can be submit inside the engine - pk = get_scheduler() wg.submit(to_scheduler=True, wait=True) pk = get_scheduler() report = get_workchain_report(orm.load(pk), "REPORT")