From d919bb96fa384d1ffc3befcc13143dda58aada52 Mon Sep 17 00:00:00 2001 From: Sjoerd de Vries Date: Fri, 8 Sep 2023 12:21:49 +0200 Subject: [PATCH] direct transformers in workflow mode are working --- seamless/core/cache/transformation_cache.py | 8 ++ seamless/core/transformation.py | 1 - seamless/highlevel/__init__.py | 2 + seamless/highlevel/direct/Transformation.py | 97 ++++++++++++----- seamless/highlevel/direct/run.py | 19 ++-- tests/highlevel/imperative-workflow.py | 75 +++++++++++++ tests/highlevel/test-list.txt | 1 + .../test-outputs/imperative-workflow.out | 100 ++++++++++++++++++ 8 files changed, 266 insertions(+), 37 deletions(-) create mode 100644 tests/highlevel/imperative-workflow.py create mode 100644 tests/highlevel/test-outputs/imperative-workflow.out diff --git a/seamless/core/cache/transformation_cache.py b/seamless/core/cache/transformation_cache.py index e21e5c79..da0c48b5 100644 --- a/seamless/core/cache/transformation_cache.py +++ b/seamless/core/cache/transformation_cache.py @@ -13,6 +13,13 @@ def __str__(self): def log(*args, **kwargs): print(*args, **kwargs, file=sys.stderr) +def clear_future_exception(future): + # To avoid "Task exception was never retrieved" messages + try: + future.result() + except Exception: + pass + import datetime import json import ast @@ -1035,6 +1042,7 @@ async def incref_and_run(): coro = incref_and_run() fut = asyncio.ensure_future(coro) + fut.add_done_callback(clear_future_exception) last_result_checksum = None last_progress = None fut_done_time = None diff --git a/seamless/core/transformation.py b/seamless/core/transformation.py index 76f510d4..974bce1f 100644 --- a/seamless/core/transformation.py +++ b/seamless/core/transformation.py @@ -927,7 +927,6 @@ def fut_done(fut): _got_global_info = False def get_global_info(): - return ### from .cache.database_client import database global _got_global_info if _got_global_info: diff --git a/seamless/highlevel/__init__.py b/seamless/highlevel/__init__.py index 44f7b817..31d84256 100644 --- a/seamless/highlevel/__init__.py +++ b/seamless/highlevel/__init__.py @@ -129,6 +129,8 @@ def load_graph(graph, *, zip=None, cache_ctx=None, static=False, mounts=True, sh class Checksum: def __init__(self, checksum): from seamless import parse_checksum + if isinstance(checksum, Checksum): + checksum = checksum.value self.value = parse_checksum(checksum, as_bytes=False) def bytes(self) -> bytes | None: diff --git a/seamless/highlevel/direct/Transformation.py b/seamless/highlevel/direct/Transformation.py index c1ef5960..d0209371 100644 --- a/seamless/highlevel/direct/Transformation.py +++ b/seamless/highlevel/direct/Transformation.py @@ -20,7 +20,7 @@ def __init__(self, self._resolved = False self._evaluator_sync = evaluator_sync - self._evaluator_async = evaluator_async + self._evaluator_async = evaluator_async self._result_checksum = None self._evaluated = False self._exception = None @@ -34,7 +34,7 @@ def _resolve_sync(self): raise ValueError("Cannot obtain transformation checksum") self._transformation_checksum = tf_checksum except Exception: - self._exception = traceback.format_exc() + self._exception = traceback.format_exc(limit=0).strip("\n") + "\n" finally: self._resolved = True @@ -47,7 +47,7 @@ async def _resolve_async(self): raise ValueError("Cannot obtain transformation checksum") self._transformation_checksum = tf_checksum except Exception: - self._exception = traceback.format_exc() + self._exception = traceback.format_exc(limit=0).strip("\n") + "\n" finally: self._resolved = True @@ -65,7 +65,7 @@ def _evaluate_sync(self): Checksum(result_checksum) self._result_checksum = result_checksum except Exception: - self._exception = traceback.format_exc() + self._exception = traceback.format_exc(limit=0).strip("\n") + "\n" finally: self._evaluated = True @@ -83,52 +83,82 @@ async def _evaluate_async(self): Checksum(result_checksum) self._result_checksum = result_checksum except Exception: - self._exception = traceback.format_exc() + self._exception = traceback.format_exc(limit=0).strip("\n") + "\n" finally: self._evaluated = True def _run_dependencies(self): - for depname, dep in self._upstream_dependencies.items(): - dep.start() - for depname, dep in self._upstream_dependencies.items(): - dep.compute() - if dep.exception is not None: - msg = "Dependency '{}' has an exception: {}" - raise RuntimeError(msg.format(depname, dep.exception)) + try: + self.start() + for depname, dep in self._upstream_dependencies.items(): + dep.compute() + if dep.exception is not None: + msg = "Dependency '{}' has an exception:\n{}" + raise RuntimeError(msg.format(depname, dep.exception)) + except Exception: + self._exception = traceback.format_exc(limit=0).strip("\n") + "\n" async def _run_dependencies_async(self): tasks = {} for depname, dep in self._upstream_dependencies.items(): tasks[depname] = asyncio.get_event_loop().create_task(dep.computation()) if len(tasks): - await asyncio.gather(tasks.values(), return_exceptions=True) - for depname, _ in tasks.items(): - dep = self._upstream_dependencies[depname] - if dep.exception is not None: - msg = "Dependency '{}' has an exception: {}" - raise RuntimeError(msg.format(depname, dep.exception)) + await asyncio.gather(*tasks.values(), return_exceptions=True) + for depname, task in tasks.items(): + self._future_cleanup(task) + try: + for depname, _ in tasks.items(): + dep = self._upstream_dependencies[depname] + if dep.exception is not None: + msg = "Dependency '{}' has an exception:\n{}" + raise RuntimeError(msg.format(depname, dep.exception)) + except Exception: + self._exception = traceback.format_exc(limit=0).strip("\n") + "\n" def compute(self): - self._run_dependencies() - self._evaluate_sync() - if self._future is not None: - self._future.cancel() # redundant - self._future = None + if self._evaluated: + return + loop = asyncio.get_event_loop() + if not loop.is_running(): + self.start() + loop.run_until_complete(self._future) + else: + self._run_dependencies() + if self._exception is None: + self._evaluate_sync() + if self._future is not None: + self._future.cancel() # redundant + self._future = None + return self + + async def _computation(self): + await self._run_dependencies_async() + if self._exception is None: + await self._evaluate_async() return self async def computation(self): if self._future is not None: await self._future - self._future = None else: - await self._run_dependencies_async() - await self._evaluate_async() + await self._computation() return self + def _future_cleanup(self, fut): + # to avoid "Task exception was never retrieved" messages + # Is this currently triggered? + try: + fut.result() + except Exception: + pass + def start(self): + for depname, dep in self._upstream_dependencies.items(): + dep.start() if self._future is not None: return - self._future = asyncio.get_event_loop().create_task(self.computation()) + self._future = asyncio.get_event_loop().create_task(self._computation()) + self._future.add_done_callback(self._future_cleanup) def as_checksum(self): from .. import Checksum @@ -210,6 +240,19 @@ def clear_exception(self): self._evaluated = False + @property + def status(self): + try: + if self._exception is not None: + return "Status: exception" + if self._evaluated: + assert self._result_checksum is not None + return "Status: OK" + if self._future is not None: + return "Status: pending" + return "Status: ready" + except Exception: + return "Status: unknown exception" def transformation_from_dict(transformation_dict, result_celltype, upstream_dependencies = None) -> Transformation: from .run import run_transformation_dict, run_transformation_dict_async, prepare_transformation_dict from seamless.core.cache.transformation_cache import tf_get_buffer diff --git a/seamless/highlevel/direct/run.py b/seamless/highlevel/direct/run.py index 0af16d5a..58fca188 100644 --- a/seamless/highlevel/direct/run.py +++ b/seamless/highlevel/direct/run.py @@ -260,6 +260,8 @@ def prepare_transformation_dict(transformation_dict): from .. import Checksum from ...core.cache.buffer_remote import write_buffer as remote_write_buffer from ...core.cache.database_client import database + from .Transformation import Transformation + non_checksum_items = ("__output__", "__language__", "__meta__", "__env__") argnames = list(transformation_dict.keys()) @@ -275,6 +277,11 @@ def prepare_transformation_dict(transformation_dict): pass elif value is None: value = Checksum(value) + elif isinstance(value, Transformation): + assert value.status == "Status: OK", value.status # must have been checked before + checksum = value.checksum + assert checksum is not None # can't be true if status is OK + value = Checksum(checksum) else: if isinstance(value, bytes): buf = value @@ -440,10 +447,7 @@ def _direct_transformer_to_transformation_dict( raise ValueError(f"Argument {pinname} (Cell {value}) has no checksum available") v = Checksum(checksum) elif isinstance(value, Transformation): - assert value.status == "Status: OK" # must have been checked before - checksum = value.checksum - assert checksum is not None # can't be true if status is OK - v = Checksum(checksum) + v = value else: v = value @@ -491,7 +495,7 @@ def _get_node_transformation_dependencies(node): def _node_to_transformation_dict(node): # builds transformation dict from highlevel.Transformer node # - node must be unbound (inputs come from .TEMP and .checksum items). - # - Transformation dependencies inside .TEMP must have been resolved + # - Transformation dependencies inside .TEMP can be present # - The result transformation dict cannot be submitted directly, # it must still be prepared. @@ -608,10 +612,7 @@ def _node_to_transformation_dict(node): raise ValueError(f"Argument {pinname} (Cell {value}) has no checksum available") v = Checksum(checksum) elif isinstance(value, Transformation): - assert value.status == "Status: OK" # must have been checked before - checksum = value.checksum - assert checksum is not None # can't be true if status is OK - v = Checksum(checksum) + v = value else: v = value diff --git a/tests/highlevel/imperative-workflow.py b/tests/highlevel/imperative-workflow.py new file mode 100644 index 00000000..b23dcd0d --- /dev/null +++ b/tests/highlevel/imperative-workflow.py @@ -0,0 +1,75 @@ +from seamless import transformer + +@transformer(return_transformation=True) +def add(a, b): + import time + time.sleep(1) + return a + b + +@transformer(return_transformation=True) +def mul(a, b): + import time + time.sleep(1) + return a * b + +print(add(10,20).compute().value) +print(mul(10,20).compute().value) + +tfm1 = mul(8,9) +tfm2 = add(tfm1,4) +print(tfm2.compute().value) +print(tfm1.status, tfm1.exception) +print(tfm2.status, tfm2.exception) + +print(mul(13,add(2,2)).compute().value) + +print() +print("Error run 0") +tfm = add("zzz",80) +tfm.compute() +print(tfm.exception) +print() + +import time + +def run(p, q, x, y, z): + pq = add(p, q) + pq_z = mul(pq, z) + xy = mul(x, y) + xy_z = add(xy, z) + result = mul(xy_z, pq_z) + + result.compute() + if result.exception is not None: + msg = f"""Something went wrong. + +Status and exceptions: +pq: {pq.status}, {pq.exception} +pq_z: {pq_z.status}, {pq_z.exception} +xy: {xy.status}, {xy.exception} +xy_z: {xy_z.status}, {xy_z.exception} +result: {result.status}, {result.exception} +""" + raise RuntimeError(msg) + ret = result.value + return ret + +t = time.time() +result = run(2,3,4,5,6) # (2+3 * 6) * (4*5 + 6) = 30 * 26 = 780 +print("run() result", result) +print("{:.1f} seconds elapsed".format(time.time()-t)) # should be 3 seconds, rather than 5 + +print() +print("Error run 1") +import traceback +try: + run("p","q",4,"y",6) # errors in xy_z, propagating to result +except RuntimeError: + traceback.print_exc(1) +print() + +print("Error run 2") +try: + run("pp", "qq","x","y", "z") # errors in pq_z and xy, propagating to xy_z and result +except RuntimeError: + traceback.print_exc(1) diff --git a/tests/highlevel/test-list.txt b/tests/highlevel/test-list.txt index 20ae3b70..6a59656a 100644 --- a/tests/highlevel/test-list.txt +++ b/tests/highlevel/test-list.txt @@ -124,6 +124,7 @@ imperative-async.sh imperative-async-parallel.py: can be run with delegation using "export DELEGATE=''" imperative-async-parallel-jupyter.sh: can be run with delegation using "export DELEGATE=''" imperative-celltypes.py +imperative-workflow.py imperative-nested.py: requires delegation level 3. imperative-nested-jupyter.sh: requires delegation level 3. imperative-nested-async.sh: requires delegation level 3. diff --git a/tests/highlevel/test-outputs/imperative-workflow.out b/tests/highlevel/test-outputs/imperative-workflow.out new file mode 100644 index 00000000..a1ce9dd8 --- /dev/null +++ b/tests/highlevel/test-outputs/imperative-workflow.out @@ -0,0 +1,100 @@ +30 +200 +76 +Status: OK None +Status: OK None +52 + +Error run 0 +seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = add(a=a,b=b) + File "transformer", line 5, in add + return a + b +TypeError: can only concatenate str (not "int") to str +************************************************* +Execution time: 1.0 seconds + + +run() result 780 +3.0 seconds elapsed + +Error run 1 +Traceback (most recent call last): + File "/cwd/seamless/tests/highlevel/imperative-workflow.py", line 66, in + run("p","q",4,"y",6) # errors in xy_z, propagating to result +RuntimeError: Something went wrong. + +Status and exceptions: +pq: Status: OK, None +pq_z: Status: OK, None +xy: Status: OK, None +xy_z: Status: exception, seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = add(a=a,b=b) + File "transformer", line 5, in add + return a + b +TypeError: can only concatenate str (not "int") to str +************************************************* +Execution time: 1.0 seconds + +result: Status: exception, RuntimeError: Dependency 'a' has an exception: +seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = add(a=a,b=b) + File "transformer", line 5, in add + return a + b +TypeError: can only concatenate str (not "int") to str +************************************************* +Execution time: 1.0 seconds + + + +Error run 2 +Traceback (most recent call last): + File "/cwd/seamless/tests/highlevel/imperative-workflow.py", line 73, in + run("pp", "qq","x","y", "z") # errors in pq_z and xy, propagating to xy_z and result +RuntimeError: Something went wrong. + +Status and exceptions: +pq: Status: OK, None +pq_z: Status: exception, seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = mul(a=a,b=b) + File "transformer", line 5, in mul + return a * b +TypeError: can't multiply sequence by non-int of type 'str' +************************************************* +Execution time: 1.0 seconds + +xy: Status: exception, seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = mul(a=a,b=b) + File "transformer", line 5, in mul + return a * b +TypeError: can't multiply sequence by non-int of type 'str' +************************************************* +Execution time: 1.0 seconds + +xy_z: Status: exception, RuntimeError: Dependency 'a' has an exception: +seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = mul(a=a,b=b) + File "transformer", line 5, in mul + return a * b +TypeError: can't multiply sequence by non-int of type 'str' +************************************************* +Execution time: 1.0 seconds + +result: Status: exception, RuntimeError: Dependency 'a' has an exception: +RuntimeError: Dependency 'a' has an exception: +seamless.core.transformation.SeamlessTransformationError: Traceback (most recent call last): + File "transformer", line 7, in + result = mul(a=a,b=b) + File "transformer", line 5, in mul + return a * b +TypeError: can't multiply sequence by non-int of type 'str' +************************************************* +Execution time: 1.0 seconds + +