Skip to content

Commit

Permalink
implemented contest mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
sjdv1982 committed Sep 12, 2023
1 parent fec365c commit 2cdbd88
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 29 deletions.
8 changes: 8 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@


The database PUT "contest" method takes a checksum of a transformation that has been solved already. It is *moved* to the table of contested transformation, meaning that


Random things TODO regarding integration:

- Rip the entire communion protocol, too much trouble with websockets,
Expand All @@ -18,6 +23,9 @@ Random things TODO regarding integration:
After that, support transformation meta (see old jobless), progress/prelim, logs, hardcancel, etc.
Some kind of response-and-make-a-new-request? We really don't want websockets... UPDATE: use repeated PUTs instead
In general, assistant protocol cannot deal with streaming logs/results, would be nice if it could.
UPDATE: Make a mini-assistant that has meta but also env arguments.
***Dogma: env (recipe) != Transformation.__env__ (schema) !!!***
In addition, implement "contest" API for mini-assistant (tests/highlevel/contest.py can't work without).

Finally, rename imperative-communion and split it into a re-entrant version
(local=False, currently commented out), and the all-local version.
Expand Down
45 changes: 16 additions & 29 deletions seamless/core/cache/database_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,34 +60,7 @@ def connect(self, host, port):

self.active = True

def delete_key(self, key_type, checksum):
assert key_type in [
"buffer_info",
"compilation",
"transformation",
"elision",
"metadata",
"expression",
"structured_cell_join",
]

if not self.active:
return
url = "http://" + self.host + ":" + str(self.port)
request = {
"type": "delete_key",
"key_type": key_type,
"checksum": parse_checksum(checksum)
}
response = session.put(url, data=json.dumps(request))
if response.status_code != 200:
raise Exception((response.status_code, response.text))
return response.json() == True

def delete_syntactic_to_semantic(self, *, semantic, syntactic, celltype, subcelltype):
raise NotImplementedError

def send_put_request(self, request):
def send_put_request(self, request, *, raise_exception=True):
if not self.active:
return
url = "http://" + self.host + ":" + str(self.port)
Expand All @@ -96,7 +69,7 @@ def send_put_request(self, request):
else:
rqbuf = json.dumps(request)
response = session.put(url, data=rqbuf)
if response.status_code != 200:
if raise_exception and response.status_code != 200:
raise Exception((response.status_code, response.text))
return response

Expand Down Expand Up @@ -186,6 +159,20 @@ def set_structured_cell_join(self, checksum, join_dict: dict):
self._log("SET", request["type"], request["type"])
self.send_put_request(request)

def contest(self, transformation_checksum:bytes, result_checksum:bytes):
"""Contests a previously calculated transformation result"""
transformation_checksum = parse_checksum(transformation_checksum, as_bytes=False)
assert transformation_checksum is not None
result_checksum = parse_checksum(result_checksum, as_bytes=False)
assert result_checksum is not None
request = {
"type": "contest",
"checksum": transformation_checksum,
"result": result_checksum,
}
response = self.send_put_request(request, raise_exception=False)
return response.status_code, response.text

def send_get_request(self, request):
if not self.active:
return
Expand Down
30 changes: 30 additions & 0 deletions seamless/core/cache/transformation_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,36 @@ async def stop_loop(timeout):
asyncio.get_event_loop().run_until_complete(fut)
return fut.result()

def contest(self, transformation_checksum:bytes | str):
"""Contests a previously calculated transformation result"""
from seamless.util import parse_checksum
transformation_checksum = parse_checksum(transformation_checksum, as_bytes=True)
if transformation_checksum is None:
raise ValueError("transformation_checksum")

result_checksum, _ = self._get_transformation_result(transformation_checksum)
print("RES", result_checksum.hex())
result_checksum2 = self.known_transformations.pop(transformation_checksum, None)
assert result_checksum is None or result_checksum2 is None or (result_checksum == result_checksum2)

if transformation_checksum in self.transformation_results:
self.transformation_results.pop(transformation_checksum, None)
if result_checksum is not None:
buffer_cache.decref(result_checksum)
self.transformation_logs.pop(transformation_checksum, None)

if result_checksum2 is not None:
self.known_transformations_rev[result_checksum2].remove(transformation_checksum)
result_checksum = result_checksum2

if result_checksum is None:
raise RuntimeError("Unknown transformation result")
status, response = database.contest(transformation_checksum, result_checksum)
if status == 200:
return None
else:
return response

def destroy(self):
# only called when Seamless shuts down
a = self.transformer_to_transformations
Expand Down
25 changes: 25 additions & 0 deletions seamless/highlevel/Transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,31 @@ def cancel(self) -> None:
else:
tf.tf.hard_cancel()

def contest(self) -> str | None:
"""Contest the result of a finished transformer.
This may be useful in the case of non-reproducible transformers.
While the correct solution is to make them deterministic, this method
will allow repeated execution under various conditions, in order to
investigate the issue.
If the transformer has no associated transformation (e.g. undefined inputs)
or the transformation result is not known, an exception is raised.
Otherwise, if the database returns an error message, that is returned as string.
"""
from seamless.core.cache.transformation_cache import transformation_cache
if self._parent() is not None:
tcache = self._parent()._manager.cachemanager.transformation_cache
else:
tcache = transformation_cache
tf_checksum = self.get_transformation_checksum()
if tf_checksum is None:
raise RuntimeError("Transformer has no defined transformation")
return tcache.contest(tf_checksum)

@property
def self(self):
"""Returns a wrapper where the pins are not directly accessible.
Expand Down
47 changes: 47 additions & 0 deletions tests/highlevel/contest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import seamless

if "DELEGATE" in os.environ:
seamless.config.delegate()
else:
seamless.config.delegate(level=3)
from seamless.core.transformation import get_global_info
get_global_info() # avoid timing errors

from seamless.highlevel import Context

def func(a,b,c):
import time
time.sleep(3)
return (a+b) * c

ctx = Context()
ctx.tf = func
tf = ctx.tf
tf.a = 2
tf.b = 3
tf.c = 4
import time
t = time.time()
ctx.compute()
print("{:.1f} seconds".format(time.time() - t))
print(tf.result.value)

tf_checksum = tf.get_transformation_checksum()
print("Transformation:", tf_checksum)
result_checksum = tf.result.checksum
print("Result:", result_checksum)

error_msg = tf.contest()
print("Error:", error_msg)
print()

ctx.translate(force=True) # delete tf.result.checksum
t = time.time()
ctx.compute()
print("{:.1f} seconds".format(time.time() - t))
print(tf.result.value)

error_msg = tf.contest()
print("Error:", error_msg)
print()
1 change: 1 addition & 0 deletions tests/highlevel/test-list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ library-reassign.py
library-context-argument.py
library-in-library.py
library-schema.py
contest.py: requires delegation level 3. Can be run with full delegation using "export DELEGATE=''", but this requires the mini-assistant or better.
imperative.py
imperative-jupyter.sh
imperative-async.sh
Expand Down
13 changes: 13 additions & 0 deletions tests/highlevel/test-outputs/contest.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Waiting for: Seamless transformer: .tf.tf
3.1 seconds
<Silk: 20 >
Transformation: ce7f2f2038e94912b7f84ecdcd25284868a7bf017a57eb6decc4feac6b6c6521
Result: 8e843baef228089dc379d4c3b6e28c1bb5d44eee257f1206b5dfee44ef6b05ad
RES 8e843baef228089dc379d4c3b6e28c1bb5d44eee257f1206b5dfee44ef6b05ad
Error: None

1.0 seconds
<Silk: 20 >
RES 8e843baef228089dc379d4c3b6e28c1bb5d44eee257f1206b5dfee44ef6b05ad
Error: None

0 comments on commit 2cdbd88

Please sign in to comment.