Skip to content

Commit

Permalink
__env__ now removed from transformation checksums.
Browse files Browse the repository at this point in the history
"metalike" has been renamed "tf_dunder" .
This reflects that it contains __meta__, __env__,
and some other double underscore methods
that complement transformations.
  • Loading branch information
sjdv1982 committed Sep 12, 2023
1 parent 2cdbd88 commit b23ff17
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 49 deletions.
5 changes: 0 additions & 5 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@


The database PUT "contest" method takes a checksum of a transformation that has been solved already. It is *moved* to the table of contested transformation, meaning that


Random things TODO regarding integration:

- Rip the entire communion protocol, too much trouble with websockets,
Expand Down
8 changes: 4 additions & 4 deletions seamless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def activate_transformations():
from .core.cache.transformation_cache import transformation_cache
transformation_cache.active = True

def run_transformation(checksum, *, fingertip=False, metalike=None, new_event_loop=False):
def run_transformation(checksum, *, fingertip=False, tf_dunder=None, new_event_loop=False):
from seamless.util import is_forked
if running_in_jupyter and not new_event_loop:
raise RuntimeError("'run_transformation' cannot be called from within Jupyter. Use 'await run_transformation_async' instead")
Expand All @@ -100,13 +100,13 @@ def run_transformation(checksum, *, fingertip=False, metalike=None, new_event_lo

from .core.cache.transformation_cache import transformation_cache
checksum = parse_checksum(checksum, as_bytes=True)
return transformation_cache.run_transformation(checksum, fingertip=fingertip, metalike=metalike, new_event_loop=new_event_loop)
return transformation_cache.run_transformation(checksum, fingertip=fingertip, tf_dunder=tf_dunder, new_event_loop=new_event_loop)

async def run_transformation_async(checksum, *, fingertip, metalike=None):
async def run_transformation_async(checksum, *, fingertip, tf_dunder=None):
from .core.cache.transformation_cache import transformation_cache
checksum = parse_checksum(checksum, as_bytes=True)
transformation_cache.transformation_exceptions.pop(checksum, None)
return await transformation_cache.run_transformation_async(checksum,metalike=metalike, fingertip=fingertip)
return await transformation_cache.run_transformation_async(checksum,tf_dunder=tf_dunder, fingertip=fingertip)

_original_event_loop = asyncio.get_event_loop()
def check_original_event_loop():
Expand Down
5 changes: 3 additions & 2 deletions seamless/assistant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import atexit
#session = aiohttp.ClientSession()

async def run_job(checksum):
async def run_job(checksum, tf_dunder):
from . import parse_checksum
from .config import get_assistant
checksum = parse_checksum(checksum)
Expand All @@ -13,8 +13,9 @@ async def run_job(checksum):

# One session per request is really bad... but what can we do?
async with aiohttp.ClientSession() as session:
data={"checksum":checksum, "dunder":tf_dunder}
while 1:
async with session.put(assistant, data=checksum) as response:
async with session.put(assistant, json=data) as response:
content = await response.read()
if response.status == 202:
await asyncio.sleep(0.1)
Expand Down
37 changes: 18 additions & 19 deletions seamless/core/cache/transformation_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,12 @@ def tf_get_buffer(transformation):
for k in transformation:
if k.isupper():
continue
if k in ("__compilers__", "__languages__", "__meta__"):
if k in ("__compilers__", "__languages__", "__meta__", "__env__"):
continue
v = transformation[k]
if k in ("__language__", "__output__", "__as__", "__format__"):
d[k] = v
continue
elif k == "__env__":
checksum = v
d[k] = checksum
continue
celltype, subcelltype, checksum = v
d[k] = celltype, subcelltype, checksum
buffer = json_dumps(d, as_bytes=True) + b"\n"
Expand Down Expand Up @@ -191,7 +187,11 @@ def __init__(self):
self.rev_transformation_jobs = {} # job-to-tf-checksum
self.job_progress = {}

self.transformer_to_transformations = {} # 1:1, transformations as tf-checksums
# 1:1, transformations as tf-checksums.
# Note that tf_checksum does not exactly correspond to the serialized transformation dict,
# (see Transformer.get_transformation_checksum)
self.transformer_to_transformations = {}

self.transformations_to_transformers = {} # 1:list, transformations as tf-checksums

self.remote_transformers = {}
Expand Down Expand Up @@ -404,7 +404,7 @@ async def incref_transformation(self, transformation, transformer, *, transforma
buffer_cache.incref(result_checksum, False)
incref_transformation(tf_checksum, tf_buffer, transformation)
else:
# Just to reflect updates in __meta__ etc.
# Just to reflect updates in __meta__, __env__ etc.
self.transformations[tf_checksum] = transformation

tf = self.transformations_to_transformers[tf_checksum]
Expand Down Expand Up @@ -450,7 +450,7 @@ async def incref_transformation(self, transformation, transformer, *, transforma
if result_checksum is not None:
if isinstance(transformer, Transformer):
#print("CACHE HIT", transformer, result_checksum.hex())
from ...metalevel.debugmount import debugmountmanager
from ...metalevel.debugmount import debugmountmanager
if debugmountmanager.is_mounted(transformer):
debugmountmanager.debug_result(transformer, result_checksum)
return
Expand Down Expand Up @@ -992,7 +992,7 @@ def hard_cancel(self, transformer=None, *, tf_checksum=None):
return
self._hard_cancel(job)

async def run_transformation_async(self, tf_checksum, *, fingertip, metalike=None):
async def run_transformation_async(self, tf_checksum, *, fingertip, tf_dunder=None):
from . import CacheMissError

result_checksum, prelim = self._get_transformation_result(tf_checksum)
Expand All @@ -1003,11 +1003,11 @@ async def run_transformation_async(self, tf_checksum, *, fingertip, metalike=Non
transformation = await self.serve_get_transformation(tf_checksum, None)
if transformation is None:
raise CacheMissError(tf_checksum.hex())
if metalike is not None:
if tf_dunder is not None:
transformation = transformation.copy()
for k in ("__compilers__", "__languages__", "__meta__"):
if k in metalike:
transformation[k] = metalike[k]
for k in ("__compilers__", "__languages__", "__meta__", "__env__"):
if k in tf_dunder:
transformation[k] = tf_dunder[k]
for k,v in transformation.items():
if k in ("__language__", "__output__", "__as__",):
continue
Expand All @@ -1026,7 +1026,7 @@ async def run_transformation_async(self, tf_checksum, *, fingertip, metalike=Non
transformer = DummyTransformer(tf_checksum)
async def incref_and_run():
result = await self.incref_transformation(
transformation, transformer,
transformation, transformer,
transformation_build_exception=None
)
if result is not None:
Expand Down Expand Up @@ -1075,7 +1075,7 @@ async def incref_and_run():
self.register_known_transformation(tf_checksum, result_checksum)
return result_checksum

def run_transformation(self, tf_checksum, *, fingertip, metalike=None, new_event_loop=False):
def run_transformation(self, tf_checksum, *, fingertip, tf_dunder=None, new_event_loop=False):
event_loop = asyncio.get_event_loop()
if event_loop.is_running() or new_event_loop:
# To support run_transformation inside transformer code
Expand All @@ -1085,7 +1085,7 @@ def run_transformation(self, tf_checksum, *, fingertip, metalike=None, new_event
# Therefore, we can't update the Seamless workflow graph, but we shouldn't have to
# The use case is essentially: using the functional style under Jupyter
def func():
coro = self.run_transformation_async(tf_checksum, fingertip=fingertip, metalike=metalike)
coro = self.run_transformation_async(tf_checksum, fingertip=fingertip, tf_dunder=tf_dunder)
# The following hangs, even for a "dummy" coroutine:
# future = asyncio.run_coroutine_threadsafe(coro, event_loop)
# return future.result()
Expand All @@ -1110,7 +1110,7 @@ async def stop_loop(timeout):
await asyncio.sleep(0.01)
try:
return loop.run_until_complete(
self.run_transformation_async(tf_checksum, fingertip=fingertip, metalike=metalike)
self.run_transformation_async(tf_checksum, fingertip=fingertip, tf_dunder=tf_dunder)
)
finally:
for task in asyncio.all_tasks(loop):
Expand All @@ -1119,7 +1119,7 @@ async def stop_loop(timeout):

else:
fut = asyncio.ensure_future(
self.run_transformation_async(tf_checksum, fingertip=fingertip, metalike=metalike)
self.run_transformation_async(tf_checksum, fingertip=fingertip, tf_dunder=tf_dunder)
)
asyncio.get_event_loop().run_until_complete(fut)
return fut.result()
Expand All @@ -1132,7 +1132,6 @@ def contest(self, transformation_checksum:bytes | str):
raise ValueError("transformation_checksum")

result_checksum, _ = self._get_transformation_result(transformation_checksum)
print("RES", result_checksum.hex())
result_checksum2 = self.known_transformations.pop(transformation_checksum, None)
assert result_checksum is None or result_checksum2 is None or (result_checksum == result_checksum2)

Expand Down
22 changes: 11 additions & 11 deletions seamless/core/direct/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ def run_transformation_dict(transformation_dict, fingertip):
transformation
)
if result_checksum is not None and not prelim:
metalike, syntactic_cache = None, []
tf_dunder, syntactic_cache = None, []
else:
assert _parent_process_queue is not None
metalike = {}
for k in ("__compilers__", "__languages__", "__meta__"):
tf_dunder = {}
for k in ("__compilers__", "__languages__", "__meta__", "__env__"):
if k in transformation_dict:
metalike[k] = transformation_dict[k]
meta = metalike.get("__meta__")
tf_dunder[k] = transformation_dict[k]
meta = tf_dunder.get("__meta__")
if meta is None:
meta = {}
metalike["__meta__"] = meta
tf_dunder["__meta__"] = meta
if meta.get("local") is None:
# local (fat) by default
meta["local"] = True
Expand All @@ -173,7 +173,7 @@ def run_transformation_dict(transformation_dict, fingertip):
assert syn_buffer is not None
syntactic_cache.append((celltype, subcelltype, syn_buffer))
else:
metalike, syntactic_cache = None, []
tf_dunder, syntactic_cache = None, []

result = None
def result_callback(result2):
Expand All @@ -185,7 +185,7 @@ def result_callback(result2):
result_callback,
transformation.hex(),
transformation_dict,
metalike,
tf_dunder,
syntactic_cache,
increfed,
fingertip
Expand Down Expand Up @@ -634,7 +634,7 @@ def _wait():
result_callback,
transformation,
transformation_dict,
metalike,
tf_dunder,
syntactic_cache,
increfed,
fingertip
Expand All @@ -643,13 +643,13 @@ def _wait():
#print(f"Delegate to parent: {transformation}, fingertip = {fingertip}, stack = {TRANSFORMATION_STACK}")
assert transformation not in TRANSFORMATION_STACK
_parent_process_queue.put(
(7, (transformation, metalike, syntactic_cache, fingertip))
(7, (transformation, tf_dunder, syntactic_cache, fingertip))
)
for (
result_callback,
transformation,
transformation_dict,
metalike,
tf_dunder,
syntactic_cache,
increfed,
fingertip
Expand Down
15 changes: 12 additions & 3 deletions seamless/core/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,17 @@ async def _execute_remote(self,
prelim_callback, progress_callback
):
from seamless.assistant_client import run_job
tf_dunder = {}
tf = self.transformation
for k in ("__compilers__", "__languages__", "__meta__", "__env__"):
if k in tf:
tf_dunder[k] = tf[k]
if tf_dunder.get("__meta__", {}).get("local", OverflowError) != OverflowError:
meta = tf_dunder["__meta__"].copy()
meta.pop("local")
tf_dunder["__meta__"] = meta
try:
result = await run_job(self.checksum)
result = await run_job(self.checksum, tf_dunder)
except RuntimeError as exc:
raise RemoteJobError(str(exc)) from None
if result is None:
Expand Down Expand Up @@ -753,7 +762,7 @@ async def get_result_checksum(result_buffer):
raise Exception("Unknown return message '{}'".format(msg))
elif status == 7:
# run_transformation
tf_checksum, metalike, syntactic_cache, fingertip = msg
tf_checksum, tf_dunder, syntactic_cache, fingertip = msg
assert not is_forked()
for celltype, subcelltype, buf in syntactic_cache:
# TODO: create a transformation_cache method and invoke it, common with other code
Expand All @@ -775,7 +784,7 @@ async def get_result_checksum(result_buffer):
buffer_cache.decref(syn_checksum)
print_info(f"Nested local transformation job`: {tf_checksum}, forked = {is_forked()}")
fut = asyncio.ensure_future(
run_transformation_async(tf_checksum, metalike=metalike, fingertip=fingertip)
run_transformation_async(tf_checksum, tf_dunder=tf_dunder, fingertip=fingertip)
)
def fut_done(fut):
print_info(f"Finished nested local transformation job: {tf_checksum}")
Expand Down
6 changes: 3 additions & 3 deletions seamless/highlevel/Transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,13 +1084,13 @@ def get_transformation_checksum(self) -> Optional[str]:
In addition, it may contain the following special keys:
- __output__: the name (usually "result") and (sub)celltype of the output pin
If it has a hash pattern, this is appended as the fourth element.
- __env__: the checksum of the environment description
- __as__: a dictionary of pin-to-variable renames (pins.pinname.as_ attribute)
- __format__: a dictionary that contains deepcell and filesystem attributes
Finally, it may contain additional information that is not reflected
in this checksum:
- __env__: the checksum of the environment description
- __meta__: meta information (Transformer.meta).
- __compilers__: context-wide compiler definitions.
- __languages__: context-wide language definition.
Expand All @@ -1099,11 +1099,11 @@ def get_transformation_checksum(self) -> Optional[str]:
`seamless.run_transformation(checksum)`.
`ctx.resolve(checksum, "plain")` will return the transformation dict,
minus __meta__, __compilers__ and __languages__. The checksum is
minus __env__, __meta__, __compilers__ and __languages__. The checksum is
treated like any other buffer, i.e. including database, assistant etc.
With Transformation.get_transformation_dict(), you can obtain the full transformation dict,
including __meta__, __compilers__ and __languages__.
including __env__, __meta__, __compilers__ and __languages__.
"""
_ = self._get_parent2()
htf = self._get_htf()
Expand Down
3 changes: 1 addition & 2 deletions tests/highlevel/environment4.seamless
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"__seamless__": "0.8",
"__seamless__": "0.11",
"connections": [],
"lib": [],
"nodes": [
Expand All @@ -12,7 +12,6 @@
"input": "d0a1b2af1705c1b8495b00145082ef7470384e62ac1c4d9b9cdbbe0476c28f8c",
"input_auth": "d0a1b2af1705c1b8495b00145082ef7470384e62ac1c4d9b9cdbbe0476c28f8c",
"input_buffer": "d0a1b2af1705c1b8495b00145082ef7470384e62ac1c4d9b9cdbbe0476c28f8c",
"result": "fa2fe6c9c0556871073be9a00d6d29bd3b9b6dd560587ee6e8c163755bf669d3",
"result_schema": "d0a1b2af1705c1b8495b00145082ef7470384e62ac1c4d9b9cdbbe0476c28f8c",
"schema": "d0a1b2af1705c1b8495b00145082ef7470384e62ac1c4d9b9cdbbe0476c28f8c"
},
Expand Down

0 comments on commit b23ff17

Please sign in to comment.