Skip to content

Commit

Permalink
direct transformers in workflow mode are working
Browse files Browse the repository at this point in the history
  • Loading branch information
sjdv1982 committed Sep 8, 2023
1 parent dbc8b10 commit d919bb9
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 37 deletions.
8 changes: 8 additions & 0 deletions seamless/core/cache/transformation_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ def __str__(self):
def log(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)

def clear_future_exception(future):
# To avoid "Task exception was never retrieved" messages
try:
future.result()
except Exception:
pass

import datetime
import json
import ast
Expand Down Expand Up @@ -1035,6 +1042,7 @@ async def incref_and_run():

coro = incref_and_run()
fut = asyncio.ensure_future(coro)
fut.add_done_callback(clear_future_exception)
last_result_checksum = None
last_progress = None
fut_done_time = None
Expand Down
1 change: 0 additions & 1 deletion seamless/core/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,6 @@ def fut_done(fut):

_got_global_info = False
def get_global_info():
return ###
from .cache.database_client import database
global _got_global_info
if _got_global_info:
Expand Down
2 changes: 2 additions & 0 deletions seamless/highlevel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def load_graph(graph, *, zip=None, cache_ctx=None, static=False, mounts=True, sh
class Checksum:
def __init__(self, checksum):
from seamless import parse_checksum
if isinstance(checksum, Checksum):
checksum = checksum.value
self.value = parse_checksum(checksum, as_bytes=False)

def bytes(self) -> bytes | None:
Expand Down
97 changes: 70 additions & 27 deletions seamless/highlevel/direct/Transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self,
self._resolved = False

self._evaluator_sync = evaluator_sync
self._evaluator_async = evaluator_async
self._evaluator_async = evaluator_async
self._result_checksum = None
self._evaluated = False
self._exception = None
Expand All @@ -34,7 +34,7 @@ def _resolve_sync(self):
raise ValueError("Cannot obtain transformation checksum")
self._transformation_checksum = tf_checksum
except Exception:
self._exception = traceback.format_exc()
self._exception = traceback.format_exc(limit=0).strip("\n") + "\n"
finally:
self._resolved = True

Expand All @@ -47,7 +47,7 @@ async def _resolve_async(self):
raise ValueError("Cannot obtain transformation checksum")
self._transformation_checksum = tf_checksum
except Exception:
self._exception = traceback.format_exc()
self._exception = traceback.format_exc(limit=0).strip("\n") + "\n"
finally:
self._resolved = True

Expand All @@ -65,7 +65,7 @@ def _evaluate_sync(self):
Checksum(result_checksum)
self._result_checksum = result_checksum
except Exception:
self._exception = traceback.format_exc()
self._exception = traceback.format_exc(limit=0).strip("\n") + "\n"
finally:
self._evaluated = True

Expand All @@ -83,52 +83,82 @@ async def _evaluate_async(self):
Checksum(result_checksum)
self._result_checksum = result_checksum
except Exception:
self._exception = traceback.format_exc()
self._exception = traceback.format_exc(limit=0).strip("\n") + "\n"
finally:
self._evaluated = True

def _run_dependencies(self):
for depname, dep in self._upstream_dependencies.items():
dep.start()
for depname, dep in self._upstream_dependencies.items():
dep.compute()
if dep.exception is not None:
msg = "Dependency '{}' has an exception: {}"
raise RuntimeError(msg.format(depname, dep.exception))
try:
self.start()
for depname, dep in self._upstream_dependencies.items():
dep.compute()
if dep.exception is not None:
msg = "Dependency '{}' has an exception:\n{}"
raise RuntimeError(msg.format(depname, dep.exception))
except Exception:
self._exception = traceback.format_exc(limit=0).strip("\n") + "\n"

async def _run_dependencies_async(self):
tasks = {}
for depname, dep in self._upstream_dependencies.items():
tasks[depname] = asyncio.get_event_loop().create_task(dep.computation())
if len(tasks):
await asyncio.gather(tasks.values(), return_exceptions=True)
for depname, _ in tasks.items():
dep = self._upstream_dependencies[depname]
if dep.exception is not None:
msg = "Dependency '{}' has an exception: {}"
raise RuntimeError(msg.format(depname, dep.exception))
await asyncio.gather(*tasks.values(), return_exceptions=True)
for depname, task in tasks.items():
self._future_cleanup(task)
try:
for depname, _ in tasks.items():
dep = self._upstream_dependencies[depname]
if dep.exception is not None:
msg = "Dependency '{}' has an exception:\n{}"
raise RuntimeError(msg.format(depname, dep.exception))
except Exception:
self._exception = traceback.format_exc(limit=0).strip("\n") + "\n"

def compute(self):
self._run_dependencies()
self._evaluate_sync()
if self._future is not None:
self._future.cancel() # redundant
self._future = None
if self._evaluated:
return
loop = asyncio.get_event_loop()
if not loop.is_running():
self.start()
loop.run_until_complete(self._future)
else:
self._run_dependencies()
if self._exception is None:
self._evaluate_sync()
if self._future is not None:
self._future.cancel() # redundant
self._future = None
return self

async def _computation(self):
await self._run_dependencies_async()
if self._exception is None:
await self._evaluate_async()
return self

async def computation(self):
if self._future is not None:
await self._future
self._future = None
else:
await self._run_dependencies_async()
await self._evaluate_async()
await self._computation()
return self

def _future_cleanup(self, fut):
# to avoid "Task exception was never retrieved" messages
# Is this currently triggered?
try:
fut.result()
except Exception:
pass

def start(self):
for depname, dep in self._upstream_dependencies.items():
dep.start()
if self._future is not None:
return
self._future = asyncio.get_event_loop().create_task(self.computation())
self._future = asyncio.get_event_loop().create_task(self._computation())
self._future.add_done_callback(self._future_cleanup)

def as_checksum(self):
from .. import Checksum
Expand Down Expand Up @@ -210,6 +240,19 @@ def clear_exception(self):
self._evaluated = False


@property
def status(self):
try:
if self._exception is not None:
return "Status: exception"
if self._evaluated:
assert self._result_checksum is not None
return "Status: OK"
if self._future is not None:
return "Status: pending"
return "Status: ready"
except Exception:
return "Status: unknown exception"
def transformation_from_dict(transformation_dict, result_celltype, upstream_dependencies = None) -> Transformation:
from .run import run_transformation_dict, run_transformation_dict_async, prepare_transformation_dict
from seamless.core.cache.transformation_cache import tf_get_buffer
Expand Down
19 changes: 10 additions & 9 deletions seamless/highlevel/direct/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ def prepare_transformation_dict(transformation_dict):
from .. import Checksum
from ...core.cache.buffer_remote import write_buffer as remote_write_buffer
from ...core.cache.database_client import database
from .Transformation import Transformation

non_checksum_items = ("__output__", "__language__", "__meta__", "__env__")

argnames = list(transformation_dict.keys())
Expand All @@ -275,6 +277,11 @@ def prepare_transformation_dict(transformation_dict):
pass
elif value is None:
value = Checksum(value)
elif isinstance(value, Transformation):
assert value.status == "Status: OK", value.status # must have been checked before
checksum = value.checksum
assert checksum is not None # can't be true if status is OK
value = Checksum(checksum)
else:
if isinstance(value, bytes):
buf = value
Expand Down Expand Up @@ -440,10 +447,7 @@ def _direct_transformer_to_transformation_dict(
raise ValueError(f"Argument {pinname} (Cell {value}) has no checksum available")
v = Checksum(checksum)
elif isinstance(value, Transformation):
assert value.status == "Status: OK" # must have been checked before
checksum = value.checksum
assert checksum is not None # can't be true if status is OK
v = Checksum(checksum)
v = value
else:
v = value

Expand Down Expand Up @@ -491,7 +495,7 @@ def _get_node_transformation_dependencies(node):
def _node_to_transformation_dict(node):
# builds transformation dict from highlevel.Transformer node
# - node must be unbound (inputs come from .TEMP and .checksum items).
# - Transformation dependencies inside .TEMP must have been resolved
# - Transformation dependencies inside .TEMP can be present
# - The result transformation dict cannot be submitted directly,
# it must still be prepared.

Expand Down Expand Up @@ -608,10 +612,7 @@ def _node_to_transformation_dict(node):
raise ValueError(f"Argument {pinname} (Cell {value}) has no checksum available")
v = Checksum(checksum)
elif isinstance(value, Transformation):
assert value.status == "Status: OK" # must have been checked before
checksum = value.checksum
assert checksum is not None # can't be true if status is OK
v = Checksum(checksum)
v = value
else:
v = value

Expand Down
75 changes: 75 additions & 0 deletions tests/highlevel/imperative-workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from seamless import transformer

@transformer(return_transformation=True)
def add(a, b):
import time
time.sleep(1)
return a + b

@transformer(return_transformation=True)
def mul(a, b):
import time
time.sleep(1)
return a * b

print(add(10,20).compute().value)
print(mul(10,20).compute().value)

tfm1 = mul(8,9)
tfm2 = add(tfm1,4)
print(tfm2.compute().value)
print(tfm1.status, tfm1.exception)
print(tfm2.status, tfm2.exception)

print(mul(13,add(2,2)).compute().value)

print()
print("Error run 0")
tfm = add("zzz",80)
tfm.compute()
print(tfm.exception)
print()

import time

def run(p, q, x, y, z):
pq = add(p, q)
pq_z = mul(pq, z)
xy = mul(x, y)
xy_z = add(xy, z)
result = mul(xy_z, pq_z)

result.compute()
if result.exception is not None:
msg = f"""Something went wrong.
Status and exceptions:
pq: {pq.status}, {pq.exception}
pq_z: {pq_z.status}, {pq_z.exception}
xy: {xy.status}, {xy.exception}
xy_z: {xy_z.status}, {xy_z.exception}
result: {result.status}, {result.exception}
"""
raise RuntimeError(msg)
ret = result.value
return ret

t = time.time()
result = run(2,3,4,5,6) # (2+3 * 6) * (4*5 + 6) = 30 * 26 = 780
print("run() result", result)
print("{:.1f} seconds elapsed".format(time.time()-t)) # should be 3 seconds, rather than 5

print()
print("Error run 1")
import traceback
try:
run("p","q",4,"y",6) # errors in xy_z, propagating to result
except RuntimeError:
traceback.print_exc(1)
print()

print("Error run 2")
try:
run("pp", "qq","x","y", "z") # errors in pq_z and xy, propagating to xy_z and result
except RuntimeError:
traceback.print_exc(1)
1 change: 1 addition & 0 deletions tests/highlevel/test-list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ imperative-async.sh
imperative-async-parallel.py: can be run with delegation using "export DELEGATE=''"
imperative-async-parallel-jupyter.sh: can be run with delegation using "export DELEGATE=''"
imperative-celltypes.py
imperative-workflow.py
imperative-nested.py: requires delegation level 3.
imperative-nested-jupyter.sh: requires delegation level 3.
imperative-nested-async.sh: requires delegation level 3.
Expand Down
Loading

0 comments on commit d919bb9

Please sign in to comment.