Skip to content

Commit

Permalink
fix: yield from handle timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
0xArdi committed Aug 17, 2023
1 parent d2b46ac commit 796a7d1
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 46 deletions.
10 changes: 5 additions & 5 deletions packages/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
"dev": {
"connection/valory/websocket_client/0.1.0": "bafybeicz53kzs5uvyiod2azntl76zwgmpgr22ven4wl5fnwt2m546j3wsu",
"skill/valory/contract_subscription/0.1.0": "bafybeif3hkpgbzuoxsbqxnd752qkvk3onytltrufnyrphnqbi62si4mdhy",
"agent/valory/mech/0.1.0": "bafybeidqhanaygnuv437dvo27xnsx46seqmwbcnsvbnbhdcvrcyxghxdiu",
"skill/valory/multiplexer_abci/0.1.0": "bafybeibjvmrz5r3qgynkpuawwzwmesc5ozmnvs4xykbybo6zno7qpaqg4m",
"skill/valory/task_execution_abci/0.1.0": "bafybeigjibaukyjtza5plzs7voa3oppfz2bj7u7uwxy7rlzzvsseqarihu",
"skill/valory/mech_abci/0.1.0": "bafybeifktss75xgzasqvitwu2ytbzpomy7uaai4fc2hg7koygdjcrk4uci",
"agent/valory/mech/0.1.0": "bafybeihfvjudvxtexjiesqr7qji5jpnsj3yidmj4xnu2qyz3ckcmomtgvq",
"skill/valory/multiplexer_abci/0.1.0": "bafybeieyjeaej6isimm2mh4bmn3q3hjnwpcksus7dhxxif73de5ltspxmi",
"skill/valory/task_execution_abci/0.1.0": "bafybeicgkksijhoffbsot5wbvyskzr3djqocvaewkpqqvch6tgsfhn2b74",
"skill/valory/mech_abci/0.1.0": "bafybeieycyiex6nnef3cd4g7kvaadbu3iduu5djqoydmsdiaovtrngmw7i",
"contract/valory/agent_mech/0.1.0": "bafybeiasgzwum4kypi5d4ubldvj7ihg6tyvpnvt5gyqbx3jdsku7b3gxgy",
"service/valory/mech/0.1.0": "bafybeibj7u7knm3kyjzxa4azcy7hk2r3wq5qniorp4v7f6xrwepm4sdjrm",
"service/valory/mech/0.1.0": "bafybeiayndq6u3ntpkfrh2ftau26wvrxf2wy66rfhtij64zxko7rtpwdze",
"protocol/valory/acn_data_share/0.1.0": "bafybeieyixetwvz767zekhvg7r6etumyanzys6xbalx2brrfswybinnlhi",
"protocol/valory/default/1.0.0": "bafybeiecmut3235aen7wxukllv424f3dysvvlgfmn562kzdunc5hdj3hxu"
},
Expand Down
6 changes: 3 additions & 3 deletions packages/valory/agents/mech/aea-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ skills:
- valory/abstract_abci:0.1.0:bafybeih3bwx5apteinnoxts7sqmjlskntdbo7vvnmdbs5noo2pv76by7fu
- valory/abstract_round_abci:0.1.0:bafybeibqpzbklnljvtc67yon4ciijxj75d7vazm7rurcvbbfnk6jtudukm
- valory/contract_subscription:0.1.0:bafybeif3hkpgbzuoxsbqxnd752qkvk3onytltrufnyrphnqbi62si4mdhy
- valory/mech_abci:0.1.0:bafybeifktss75xgzasqvitwu2ytbzpomy7uaai4fc2hg7koygdjcrk4uci
- valory/multiplexer_abci:0.1.0:bafybeibjvmrz5r3qgynkpuawwzwmesc5ozmnvs4xykbybo6zno7qpaqg4m
- valory/mech_abci:0.1.0:bafybeieycyiex6nnef3cd4g7kvaadbu3iduu5djqoydmsdiaovtrngmw7i
- valory/multiplexer_abci:0.1.0:bafybeieyjeaej6isimm2mh4bmn3q3hjnwpcksus7dhxxif73de5ltspxmi
- valory/registration_abci:0.1.0:bafybeia25gpusnkakb2dp4heqkwtuftbc2apppq3i4bds6sphltsovgzvi
- valory/reset_pause_abci:0.1.0:bafybeigzvwbzktclahjbsyiwqnj6poree4iveon5pric6s5ixb6wrhkdhq
- valory/task_execution_abci:0.1.0:bafybeigjibaukyjtza5plzs7voa3oppfz2bj7u7uwxy7rlzzvsseqarihu
- valory/task_execution_abci:0.1.0:bafybeicgkksijhoffbsot5wbvyskzr3djqocvaewkpqqvch6tgsfhn2b74
- valory/termination_abci:0.1.0:bafybeibcdgnarxyyqexncpfewcemraryywtwueuv7qthsjuean5l77lp2e
- valory/transaction_settlement_abci:0.1.0:bafybeiepus7qsa47gt7dyk32gaqsoae6whjoxfnplttulxrvmcauyerrdm
default_ledger: ethereum
Expand Down
10 changes: 5 additions & 5 deletions packages/valory/services/mech/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
fingerprint:
README.md: bafybeif7ia4jdlazy6745ke2k2x5yoqlwsgwr6sbztbgqtwvs3ndm2p7ba
fingerprint_ignore_patterns: []
agent: valory/mech:0.1.0:bafybeidqhanaygnuv437dvo27xnsx46seqmwbcnsvbnbhdcvrcyxghxdiu
agent: valory/mech:0.1.0:bafybeihfvjudvxtexjiesqr7qji5jpnsj3yidmj4xnu2qyz3ckcmomtgvq
number_of_agents: 4
deployment:
agent:
Expand Down Expand Up @@ -87,7 +87,7 @@ type: skill
validate_timeout: 1205
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESS:str:0xFf82123dFB52ab75C417195c5fDB87630145ae81}
reset_period_count: ${RESET_PERIOD_COUNT:int:1000}
task_deadline: ${TASK_DEADLINE:int:240.0}
task_deadline: ${TASK_DEADLINE:float:240.0}
file_hash_to_tools_json: ${FILE_HASH_TO_TOOLS:list:[]}
api_keys_json: ${API_KEYS:list:[]}
1:
Expand Down Expand Up @@ -125,7 +125,7 @@ type: skill
polling_interval: ${POLLING_INTERVAL:int:25}
tendermint_com_url: ${TENDERMINT_COM_URL:str:http://localhost:8080}
tendermint_max_retries: 5
task_deadline: ${TASK_DEADLINE:int:240.0}
task_deadline: ${TASK_DEADLINE:float:240.0}
tendermint_url: ${TENDERMINT_URL:str:http://localhost:26657}
tendermint_p2p_url: ${TM_P2P_ENDPOINT_NODE_1:str:node0:26656}
termination_sleep: ${TERMINATION_SLEEP:int:900}
Expand Down Expand Up @@ -159,7 +159,7 @@ type: skill
retry_timeout: 3
reset_pause_duration: ${RESET_PAUSE_DURATION:int:10}
request_retry_delay: 1.0
task_deadline: ${TASK_DEADLINE:int:240.0}
task_deadline: ${TASK_DEADLINE:float:240.0}
request_timeout: 10.0
round_timeout_seconds: ${ROUND_TIMEOUT:float:150.0}
service_id: mech
Expand Down Expand Up @@ -197,7 +197,7 @@ type: skill
keeper_allowed_retries: 3
keeper_timeout: 30.0
max_attempts: 10
task_deadline: ${TASK_DEADLINE:int:240.0}
task_deadline: ${TASK_DEADLINE:float:240.0}
max_healthcheck: 120
multisend_address: ${MULTISEND_ADDRESS:str:0xA238CBeb142c10Ef7Ad8442C6D1f9E89e07e7761}
on_chain_service_id: ${ON_CHAIN_SERVICE_ID:int:null}
Expand Down
4 changes: 2 additions & 2 deletions packages/valory/skills/mech_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ contracts: []
protocols: []
skills:
- valory/abstract_round_abci:0.1.0:bafybeibqpzbklnljvtc67yon4ciijxj75d7vazm7rurcvbbfnk6jtudukm
- valory/multiplexer_abci:0.1.0:bafybeibjvmrz5r3qgynkpuawwzwmesc5ozmnvs4xykbybo6zno7qpaqg4m
- valory/multiplexer_abci:0.1.0:bafybeieyjeaej6isimm2mh4bmn3q3hjnwpcksus7dhxxif73de5ltspxmi
- valory/registration_abci:0.1.0:bafybeia25gpusnkakb2dp4heqkwtuftbc2apppq3i4bds6sphltsovgzvi
- valory/reset_pause_abci:0.1.0:bafybeigzvwbzktclahjbsyiwqnj6poree4iveon5pric6s5ixb6wrhkdhq
- valory/task_execution_abci:0.1.0:bafybeigjibaukyjtza5plzs7voa3oppfz2bj7u7uwxy7rlzzvsseqarihu
- valory/task_execution_abci:0.1.0:bafybeicgkksijhoffbsot5wbvyskzr3djqocvaewkpqqvch6tgsfhn2b74
- valory/termination_abci:0.1.0:bafybeibcdgnarxyyqexncpfewcemraryywtwueuv7qthsjuean5l77lp2e
- valory/transaction_settlement_abci:0.1.0:bafybeiepus7qsa47gt7dyk32gaqsoae6whjoxfnplttulxrvmcauyerrdm
behaviours:
Expand Down
15 changes: 15 additions & 0 deletions packages/valory/skills/multiplexer_abci/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@
class MultiplexerBaseBehaviour(BaseBehaviour, ABC):
"""Base behaviour for the multiplexer_abci skill."""

def _AsyncBehaviour__handle_waiting_for_message(self) -> None:
"""Handle an 'act' tick, when waiting for a message."""
# if there is no message coming, skip.
if self._AsyncBehaviour__notified: # type: ignore
try:
self._AsyncBehaviour__get_generator_act().send(
self._AsyncBehaviour__message # type: ignore
)
except StopIteration:
self._AsyncBehaviour__handle_stop_iteration()
finally:
# wait for the next message
self._AsyncBehaviour__notified = False
self._AsyncBehaviour__message = None

@property
def synchronized_data(self) -> SynchronizedData:
"""Return the synchronized data."""
Expand Down
2 changes: 1 addition & 1 deletion packages/valory/skills/multiplexer_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeifx5c6xdzvj5v6old2ek56fek6zapsfuxgdiokpacjp57td3wbalm
behaviours.py: bafybeibevecu5s3ewnq2btswcsnwqfslw5hz5spkkjtnkggclqa23h222a
behaviours.py: bafybeifc2uzztsh7hy53r7gguxwmz3hlnxxddvrwdcw3i6icohaep2t6hy
dialogues.py: bafybeie777tjh4xvxo5rrig4kq66vxg5vvmyie576ptot43olwrzfrc64a
fsm_specification.yaml: bafybeibmbpdgq7h6sgaxtdb2aawha5xdwd6oszbn3nwr2tolaijoswkfly
handlers.py: bafybeic77bbhvm7yqhimzosdjnmocy4gm677t3f4gp73c5dll5qhn7xeqy
Expand Down
60 changes: 31 additions & 29 deletions packages/valory/skills/task_execution_abci/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@
class TaskExecutionBaseBehaviour(BaseBehaviour, ABC):
"""Base behaviour for the task_execution_abci skill."""

def _AsyncBehaviour__handle_waiting_for_message(self) -> None:
"""Handle an 'act' tick, when waiting for a message."""
# if there is no message coming, skip.
if self._AsyncBehaviour__notified: # type: ignore
try:
self._AsyncBehaviour__get_generator_act().send(
self._AsyncBehaviour__message # type: ignore
)
except StopIteration:
self._AsyncBehaviour__handle_stop_iteration()
finally:
# wait for the next message
self._AsyncBehaviour__notified = False
self._AsyncBehaviour__message = None

@property
def synchronized_data(self) -> SynchronizedData:
"""Return the synchronized data."""
Expand All @@ -101,23 +116,6 @@ def __init__(self, **kwargs: Any) -> None:
self._task_deadline: Optional[float] = None
self._processing_task: Optional[Dict[str, Any]] = None

def _AsyncBehaviour__handle_waiting_for_message(self) -> None:
"""Handle an 'act' tick, when waiting for a message."""
# if there is no message coming, skip.
if self._AsyncBehaviour__notified: # type: ignore
try:
self._AsyncBehaviour__get_generator_act().send(
self._AsyncBehaviour__message # type: ignore
)
except StopIteration:
self._AsyncBehaviour__handle_stop_iteration()
finally:
# wait for the next message
self._AsyncBehaviour__notified = False
self._AsyncBehaviour__message = None
else:
self._AsyncBehaviour__get_generator_act().send(None)

def async_act(self) -> Generator: # pylint: disable=R0914,R0915
"""Do the act, supporting asynchronous execution."""

Expand All @@ -144,7 +142,7 @@ def async_act(self) -> Generator: # pylint: disable=R0914,R0915
return
except TimeoutError:
# the task was not ready in time
self._handle_timeout()
yield from self._handle_timeout()
return
payload_content = yield from self.get_payload_content(task_result)
sender = self.context.agent_address
Expand Down Expand Up @@ -265,8 +263,11 @@ def get_task_result( # pylint: disable=R0914,R1710

# Handle unfinished task
if not self._invalid_request and not self._async_result.ready():
self.context.logger.debug("The task is not finished yet.")
if time.time() > self._task_deadline:
time_to_timeout = cast(float, self._task_deadline) - time.time()
self.context.logger.info(
f"Waiting for the task to finish. " f"Timeout in: {time_to_timeout}s."
)
if time_to_timeout <= 0:
raise TimeoutError("The task is not finished in the deadline.")
yield from self.sleep(self.params.sleep_time)
return None
Expand Down Expand Up @@ -479,19 +480,20 @@ def _get_deliver_tx(
"data": data,
}

def _handle_timeout(self) -> None:
def _handle_timeout(self) -> Generator[None, None, None]:
"""Handle a timeout."""
# append to the end of the queue
self.context.logger.info("Task timed out. Re-adding to the queue.")
self.context.shared_state.get("pending_tasks").append(self._processing_task)
# reset the state
self._async_result: Optional[AsyncResult] = None
self.request_id = None
self._is_task_prepared = False
self._invalid_request = False
self._task_deadline: Optional[float] = None
self._processing_task: Optional[Dict[str, Any]] = None
self._task_deadline: Optional[float] = None
self._processing_task: Optional[Dict[str, Any]] = None
self._async_result: Optional[AsyncResult] = None # type: ignore
self.request_id = None # type: ignore
self._is_task_prepared = False # type: ignore
self._invalid_request = False # type: ignore
self._task_deadline: Optional[float] = None # type: ignore
self._processing_task: Optional[Dict[str, Any]] = None # type: ignore
self._task_deadline: Optional[float] = None # type: ignore
self._processing_task: Optional[Dict[str, Any]] = None # type: ignore
# wait for the round timeout s.t. the next task
# has a full round to be executed
yield from self.wait_until_round_end()
Expand Down
2 changes: 1 addition & 1 deletion packages/valory/skills/task_execution_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeihrkpey6kxur2uoimrskq2wfpelqidxeapdxie6iuv2x7dk77ksvu
behaviours.py: bafybeibtcg4wyfehuumjv4eqapkrfsklxfioczju425us2mibfnv4ls5we
behaviours.py: bafybeigimp7nas2ffbbqicbuqdeoznxusnez27h2cbwpk6w426o5gcqoyy
dialogues.py: bafybeibmac3m5u5h6ucoyjr4dazay72dyga656wvjl6z6saapluvjo54ne
fsm_specification.yaml: bafybeia66ok2ll4kjbbmgbocjfape6u6ctacgexrnpgmru6zudr5em7vty
handlers.py: bafybeibe5n7my2vd2wlwo73sbma65epjqc7kxgtittewlylcmvnmoxtxzq
Expand Down

0 comments on commit 796a7d1

Please sign in to comment.