diff --git a/packages/packages.json b/packages/packages.json index 635fe2e7..b2f9da1f 100644 --- a/packages/packages.json +++ b/packages/packages.json @@ -2,12 +2,12 @@ "dev": { "connection/valory/websocket_client/0.1.0": "bafybeicz53kzs5uvyiod2azntl76zwgmpgr22ven4wl5fnwt2m546j3wsu", "skill/valory/contract_subscription/0.1.0": "bafybeif3hkpgbzuoxsbqxnd752qkvk3onytltrufnyrphnqbi62si4mdhy", - "agent/valory/mech/0.1.0": "bafybeicybguqpwjrpwye3trclcyxt4ca43k4nyrzm2c2zcvbyf5gnokl4u", - "skill/valory/multiplexer_abci/0.1.0": "bafybeibjvmrz5r3qgynkpuawwzwmesc5ozmnvs4xykbybo6zno7qpaqg4m", - "skill/valory/task_execution_abci/0.1.0": "bafybeifd6ubbhqp6xa3vg6mpimw555j7rn2d7n4jybfmwhoqqi4ql2fu2m", - "skill/valory/mech_abci/0.1.0": "bafybeiglgiiyy5kqf3rdaelw47f3hsw2tpihowayjjzmagoiixd6t6dzvy", + "agent/valory/mech/0.1.0": "bafybeifno5k2xf3iksh73nd3cpu3msova7wcsunig24fphew3l424wvzyi", + "skill/valory/multiplexer_abci/0.1.0": "bafybeiagcipcsnjtbxl4hfs6ulydrtxsa4nm4iarek3i5l6x55o6vcsiue", + "skill/valory/task_execution_abci/0.1.0": "bafybeihs7dkagcqzudpjz65wt3bonrisutttteozknhalyyopxop6uegsu", + "skill/valory/mech_abci/0.1.0": "bafybeiej45vz5xp5zl6pii3eg6xv46ddshirqkald62f6cmwwumutgcnpy", "contract/valory/agent_mech/0.1.0": "bafybeiasgzwum4kypi5d4ubldvj7ihg6tyvpnvt5gyqbx3jdsku7b3gxgy", - "service/valory/mech/0.1.0": "bafybeibtkfvqb3tcnakqbzlfb5xnfsmnvnicms7bgvl5x5ramscfqzwsue", + "service/valory/mech/0.1.0": "bafybeiddeiqrio7ylathikmqcblerg6xts35vp6cyylaowmitiljwuvfya", "protocol/valory/acn_data_share/0.1.0": "bafybeieyixetwvz767zekhvg7r6etumyanzys6xbalx2brrfswybinnlhi", "protocol/valory/default/1.0.0": "bafybeiecmut3235aen7wxukllv424f3dysvvlgfmn562kzdunc5hdj3hxu" }, diff --git a/packages/valory/agents/mech/aea-config.yaml b/packages/valory/agents/mech/aea-config.yaml index 2074b7ac..31f7e740 100644 --- a/packages/valory/agents/mech/aea-config.yaml +++ b/packages/valory/agents/mech/aea-config.yaml @@ -34,11 +34,11 @@ skills: - valory/abstract_abci:0.1.0:bafybeih3bwx5apteinnoxts7sqmjlskntdbo7vvnmdbs5noo2pv76by7fu - valory/abstract_round_abci:0.1.0:bafybeibqpzbklnljvtc67yon4ciijxj75d7vazm7rurcvbbfnk6jtudukm - valory/contract_subscription:0.1.0:bafybeif3hkpgbzuoxsbqxnd752qkvk3onytltrufnyrphnqbi62si4mdhy -- valory/mech_abci:0.1.0:bafybeiglgiiyy5kqf3rdaelw47f3hsw2tpihowayjjzmagoiixd6t6dzvy -- valory/multiplexer_abci:0.1.0:bafybeibjvmrz5r3qgynkpuawwzwmesc5ozmnvs4xykbybo6zno7qpaqg4m +- valory/mech_abci:0.1.0:bafybeiej45vz5xp5zl6pii3eg6xv46ddshirqkald62f6cmwwumutgcnpy +- valory/multiplexer_abci:0.1.0:bafybeiagcipcsnjtbxl4hfs6ulydrtxsa4nm4iarek3i5l6x55o6vcsiue - valory/registration_abci:0.1.0:bafybeia25gpusnkakb2dp4heqkwtuftbc2apppq3i4bds6sphltsovgzvi - valory/reset_pause_abci:0.1.0:bafybeigzvwbzktclahjbsyiwqnj6poree4iveon5pric6s5ixb6wrhkdhq -- valory/task_execution_abci:0.1.0:bafybeifd6ubbhqp6xa3vg6mpimw555j7rn2d7n4jybfmwhoqqi4ql2fu2m +- valory/task_execution_abci:0.1.0:bafybeihs7dkagcqzudpjz65wt3bonrisutttteozknhalyyopxop6uegsu - valory/termination_abci:0.1.0:bafybeibcdgnarxyyqexncpfewcemraryywtwueuv7qthsjuean5l77lp2e - valory/transaction_settlement_abci:0.1.0:bafybeiepus7qsa47gt7dyk32gaqsoae6whjoxfnplttulxrvmcauyerrdm default_ledger: ethereum @@ -85,6 +85,7 @@ type: connection config: endpoint: ${str:wss://rpc.gnosischain.com/wss} target_skill_id: valory/contract_subscription:0.1.0 +is_abstract: true --- public_id: valory/contract_subscription:0.1.0:bafybeiby5ajjc7a3m2uq73d2pprx6enqt4ghfcq2gkmrtsr75e4d4napi4 type: skill @@ -101,6 +102,7 @@ models: params: args: use_polling: ${bool:false} +is_abstract: true --- public_id: valory/abci:0.1.0 type: connection @@ -145,6 +147,7 @@ models: on_chain_service_id: ${int:1} multisend_address: ${str:0xA238CBeb142c10Ef7Ad8442C6D1f9E89e07e7761} service_registry_address: ${str:0x9338b5153AE39BB89f50468E608eD9d764B755fD} + task_deadline: ${float:240.0} setup: all_participants: ${list:["0x10E867Ac2Fb0Aa156ca81eF440a5cdf373bE1AaC"]} safe_contract_address: ${str:0x5e1D1eb61E1164D5a50b28C575dA73A29595dFf7} diff --git a/packages/valory/services/mech/service.yaml b/packages/valory/services/mech/service.yaml index 379f98d8..d9c4bcfd 100644 --- a/packages/valory/services/mech/service.yaml +++ b/packages/valory/services/mech/service.yaml @@ -7,7 +7,7 @@ license: Apache-2.0 fingerprint: README.md: bafybeif7ia4jdlazy6745ke2k2x5yoqlwsgwr6sbztbgqtwvs3ndm2p7ba fingerprint_ignore_patterns: [] -agent: valory/mech:0.1.0:bafybeicybguqpwjrpwye3trclcyxt4ca43k4nyrzm2c2zcvbyf5gnokl4u +agent: valory/mech:0.1.0:bafybeifno5k2xf3iksh73nd3cpu3msova7wcsunig24fphew3l424wvzyi number_of_agents: 4 deployment: agent: @@ -87,6 +87,7 @@ type: skill validate_timeout: 1205 agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESS:str:0xFf82123dFB52ab75C417195c5fDB87630145ae81} reset_period_count: ${RESET_PERIOD_COUNT:int:1000} + task_deadline: ${TASK_DEADLINE:float:240.0} file_hash_to_tools_json: ${FILE_HASH_TO_TOOLS:list:[]} api_keys_json: ${API_KEYS:list:[]} 1: @@ -124,6 +125,7 @@ type: skill polling_interval: ${POLLING_INTERVAL:int:25} tendermint_com_url: ${TENDERMINT_COM_URL:str:http://localhost:8080} tendermint_max_retries: 5 + task_deadline: ${TASK_DEADLINE:float:240.0} tendermint_url: ${TENDERMINT_URL:str:http://localhost:26657} tendermint_p2p_url: ${TM_P2P_ENDPOINT_NODE_1:str:node0:26656} termination_sleep: ${TERMINATION_SLEEP:int:900} @@ -157,6 +159,7 @@ type: skill retry_timeout: 3 reset_pause_duration: ${RESET_PAUSE_DURATION:int:10} request_retry_delay: 1.0 + task_deadline: ${TASK_DEADLINE:float:240.0} request_timeout: 10.0 round_timeout_seconds: ${ROUND_TIMEOUT:float:150.0} service_id: mech @@ -194,6 +197,7 @@ type: skill keeper_allowed_retries: 3 keeper_timeout: 30.0 max_attempts: 10 + task_deadline: ${TASK_DEADLINE:float:240.0} max_healthcheck: 120 multisend_address: ${MULTISEND_ADDRESS:str:0xA238CBeb142c10Ef7Ad8442C6D1f9E89e07e7761} on_chain_service_id: ${ON_CHAIN_SERVICE_ID:int:null} diff --git a/packages/valory/skills/mech_abci/skill.yaml b/packages/valory/skills/mech_abci/skill.yaml index 88aaeb57..33454897 100644 --- a/packages/valory/skills/mech_abci/skill.yaml +++ b/packages/valory/skills/mech_abci/skill.yaml @@ -19,10 +19,10 @@ contracts: [] protocols: [] skills: - valory/abstract_round_abci:0.1.0:bafybeibqpzbklnljvtc67yon4ciijxj75d7vazm7rurcvbbfnk6jtudukm -- valory/multiplexer_abci:0.1.0:bafybeibjvmrz5r3qgynkpuawwzwmesc5ozmnvs4xykbybo6zno7qpaqg4m +- valory/multiplexer_abci:0.1.0:bafybeiagcipcsnjtbxl4hfs6ulydrtxsa4nm4iarek3i5l6x55o6vcsiue - valory/registration_abci:0.1.0:bafybeia25gpusnkakb2dp4heqkwtuftbc2apppq3i4bds6sphltsovgzvi - valory/reset_pause_abci:0.1.0:bafybeigzvwbzktclahjbsyiwqnj6poree4iveon5pric6s5ixb6wrhkdhq -- valory/task_execution_abci:0.1.0:bafybeifd6ubbhqp6xa3vg6mpimw555j7rn2d7n4jybfmwhoqqi4ql2fu2m +- valory/task_execution_abci:0.1.0:bafybeihs7dkagcqzudpjz65wt3bonrisutttteozknhalyyopxop6uegsu - valory/termination_abci:0.1.0:bafybeibcdgnarxyyqexncpfewcemraryywtwueuv7qthsjuean5l77lp2e - valory/transaction_settlement_abci:0.1.0:bafybeiepus7qsa47gt7dyk32gaqsoae6whjoxfnplttulxrvmcauyerrdm behaviours: @@ -147,6 +147,7 @@ models: tendermint_url: http://localhost:26657 termination_sleep: 900 tx_timeout: 10.0 + task_deadline: 240.0 use_polling: false use_termination: false validate_timeout: 1205 diff --git a/packages/valory/skills/multiplexer_abci/behaviours.py b/packages/valory/skills/multiplexer_abci/behaviours.py index dbfe71e9..b942fd03 100644 --- a/packages/valory/skills/multiplexer_abci/behaviours.py +++ b/packages/valory/skills/multiplexer_abci/behaviours.py @@ -41,6 +41,21 @@ class MultiplexerBaseBehaviour(BaseBehaviour, ABC): """Base behaviour for the multiplexer_abci skill.""" + def _AsyncBehaviour__handle_waiting_for_message(self) -> None: + """Handle an 'act' tick, when waiting for a message.""" + # if there is no message coming, skip. + if self._AsyncBehaviour__notified: # type: ignore + try: + self._AsyncBehaviour__get_generator_act().send( + self._AsyncBehaviour__message # type: ignore + ) + except StopIteration: + self._AsyncBehaviour__handle_stop_iteration() + finally: + # wait for the next message + self._AsyncBehaviour__notified = False + self._AsyncBehaviour__message = None + @property def synchronized_data(self) -> SynchronizedData: """Return the synchronized data.""" @@ -118,7 +133,7 @@ def extend_pending_tasks(self) -> Generator[None, None, List[Dict]]: ]: # store each requests in the pending_tasks list, make sure each req is stored once pending_tasks.append(request) - pending_tasks.sort(key=lambda x: x["block_number"]) + pending_tasks.sort(key=lambda x: x["block_number"], reverse=True) return pending_tasks @@ -149,10 +164,10 @@ def async_act(self) -> Generator: pending_tasks = yield from self.extend_pending_tasks() self.context.shared_state["pending_tasks"] = pending_tasks - if self.context.shared_state.get("pending_tasks", []): - payload_content = MultiplexerRound.EXECUTE_PAYLOAD - elif do_reset: + if do_reset: payload_content = MultiplexerRound.RESET_PAYLOAD + elif self.context.shared_state.get("pending_tasks", []): + payload_content = MultiplexerRound.EXECUTE_PAYLOAD sender = self.context.agent_address payload = MultiplexerPayload(sender=sender, content=payload_content) diff --git a/packages/valory/skills/multiplexer_abci/rounds.py b/packages/valory/skills/multiplexer_abci/rounds.py index 24393ba0..3eedf254 100644 --- a/packages/valory/skills/multiplexer_abci/rounds.py +++ b/packages/valory/skills/multiplexer_abci/rounds.py @@ -79,7 +79,6 @@ def end_block(self) -> Optional[Tuple[BaseSynchronizedData, Event]]: event = Event.WAIT if self.most_voted_payload == self.RESET_PAYLOAD: - period_counter = -1 event = Event.RESET if self.most_voted_payload == self.EXECUTE_PAYLOAD: @@ -152,7 +151,7 @@ class MultiplexerAbciApp(AbciApp[Event]): event_to_timeout: EventToTimeout = { Event.ROUND_TIMEOUT: 30.0, } - cross_period_persisted_keys: FrozenSet[str] = frozenset() + cross_period_persisted_keys: FrozenSet[str] = frozenset([get_name(SynchronizedData.period_counter)]) db_pre_conditions: Dict[AppState, Set[str]] = { MultiplexerRound: set(), } diff --git a/packages/valory/skills/multiplexer_abci/skill.yaml b/packages/valory/skills/multiplexer_abci/skill.yaml index 87b9353d..b19de0a1 100644 --- a/packages/valory/skills/multiplexer_abci/skill.yaml +++ b/packages/valory/skills/multiplexer_abci/skill.yaml @@ -7,13 +7,13 @@ license: Apache-2.0 aea_version: '>=1.0.0, <2.0.0' fingerprint: __init__.py: bafybeifx5c6xdzvj5v6old2ek56fek6zapsfuxgdiokpacjp57td3wbalm - behaviours.py: bafybeibevecu5s3ewnq2btswcsnwqfslw5hz5spkkjtnkggclqa23h222a + behaviours.py: bafybeihyd77y43glxiu75jhgzmzd6xrgr5tkxnz77nbhpstacym6z7zbju dialogues.py: bafybeie777tjh4xvxo5rrig4kq66vxg5vvmyie576ptot43olwrzfrc64a fsm_specification.yaml: bafybeibmbpdgq7h6sgaxtdb2aawha5xdwd6oszbn3nwr2tolaijoswkfly handlers.py: bafybeic77bbhvm7yqhimzosdjnmocy4gm677t3f4gp73c5dll5qhn7xeqy models.py: bafybeiets4yg4p7g7mclpdmpekfmbgd6z3dy4x2kvd6rhqhjqr3njdxlcy payloads.py: bafybeibhg7q5ejfhjkjvcfeqjyzp32msn4alu5btnywimh2zd5arr2f2mm - rounds.py: bafybeibsi2tm4sovnaevbc6dw3ljxlc6jkta33k4aqest4q423kb5okrui + rounds.py: bafybeiggh5vy4uc64uautdqqq25zmty4rc4m23bx2gopnornbku7rwl6ze fingerprint_ignore_patterns: [] connections: [] contracts: diff --git a/packages/valory/skills/task_execution_abci/behaviours.py b/packages/valory/skills/task_execution_abci/behaviours.py index 15208c04..d1e3d88b 100644 --- a/packages/valory/skills/task_execution_abci/behaviours.py +++ b/packages/valory/skills/task_execution_abci/behaviours.py @@ -19,6 +19,7 @@ """This package contains round behaviours of TaskExecutionAbciApp.""" import os +import time from abc import ABC from multiprocessing.pool import AsyncResult from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Type, cast @@ -74,6 +75,21 @@ class TaskExecutionBaseBehaviour(BaseBehaviour, ABC): """Base behaviour for the task_execution_abci skill.""" + def _AsyncBehaviour__handle_waiting_for_message(self) -> None: + """Handle an 'act' tick, when waiting for a message.""" + # if there is no message coming, skip. + if self._AsyncBehaviour__notified: # type: ignore + try: + self._AsyncBehaviour__get_generator_act().send( + self._AsyncBehaviour__message # type: ignore + ) + except StopIteration: + self._AsyncBehaviour__handle_stop_iteration() + finally: + # wait for the next message + self._AsyncBehaviour__notified = False + self._AsyncBehaviour__message = None + @property def synchronized_data(self) -> SynchronizedData: """Return the synchronized data.""" @@ -97,23 +113,8 @@ def __init__(self, **kwargs: Any) -> None: self.request_id = None self._is_task_prepared = False self._invalid_request = False - - def _AsyncBehaviour__handle_waiting_for_message(self) -> None: - """Handle an 'act' tick, when waiting for a message.""" - # if there is no message coming, skip. - if self._AsyncBehaviour__notified: # type: ignore - try: - self._AsyncBehaviour__get_generator_act().send( - self._AsyncBehaviour__message # type: ignore - ) - except StopIteration: - self._AsyncBehaviour__handle_stop_iteration() - finally: - # wait for the next message - self._AsyncBehaviour__notified = False - self._AsyncBehaviour__message = None - else: - self._AsyncBehaviour__get_generator_act().send(None) + self._task_deadline: Optional[float] = None + self._processing_task: Optional[Dict[str, Any]] = None def async_act(self) -> Generator: # pylint: disable=R0914,R0915 """Do the act, supporting asynchronous execution.""" @@ -134,9 +135,14 @@ def async_act(self) -> Generator: # pylint: disable=R0914,R0915 self.context.params.__dict__["_frozen"] = True with self.context.benchmark_tool.measure(self.behaviour_id).local(): - task_result = yield from self.get_task_result() - if task_result is None: - # the task is not ready yet, check in the next iteration + try: + task_result = yield from self.get_task_result() + if task_result is None: + # the task is not ready yet, check in the next iteration + return + except TimeoutError: + # the task was not ready in time + yield from self._handle_timeout() return payload_content = yield from self.get_payload_content(task_result) sender = self.context.agent_address @@ -193,6 +199,12 @@ def get_task_result( # pylint: disable=R0914,R1710 self.context.logger.info(f"Preparing task with data: {task_data}") self.request_id = task_data["requestId"] self.sender_address = task_data["sender"] + # store the task data so that we can + # add it to the end of the queue if needed + self._processing_task = task_data + # Set the deadline for the task + # it's okay to use time.time() here + self._task_deadline = time.time() + self.params.task_deadline task_data_ = task_data["data"] # Verify the data hash and handle encoding @@ -251,7 +263,12 @@ def get_task_result( # pylint: disable=R0914,R1710 # Handle unfinished task if not self._invalid_request and not self._async_result.ready(): - self.context.logger.debug("The task is not finished yet.") + time_to_timeout = cast(float, self._task_deadline) - time.time() + self.context.logger.info( + f"Waiting for the task to finish. " f"Timeout in: {time_to_timeout}s." + ) + if time_to_timeout <= 0: + raise TimeoutError("The task is not finished in the deadline.") yield from self.sleep(self.params.sleep_time) return None @@ -351,6 +368,7 @@ def _prepare_task(self, task_data: Dict[str, Any]) -> None: exec(tool_py, local_namespace) # pylint: disable=W0122 # nosec task_data["method"] = local_namespace["run"] task_data["api_keys"] = self.params.api_keys + task_data["logger"] = self.context.logger task_id = self.context.task_manager.enqueue_task(tool_task, kwargs=task_data) self._async_result = self.context.task_manager.get_task_result(task_id) self._is_task_prepared = True @@ -463,6 +481,25 @@ def _get_deliver_tx( "data": data, } + def _handle_timeout(self) -> Generator[None, None, None]: + """Handle a timeout.""" + # append to the end of the queue + self.context.logger.info("Task timed out. Re-adding to the queue.") + self.context.shared_state.get("pending_tasks").append(self._processing_task) + # reset the state + self._async_result: Optional[AsyncResult] = None # type: ignore + self.request_id = None # type: ignore + self._is_task_prepared = False # type: ignore + self._invalid_request = False # type: ignore + self._task_deadline: Optional[float] = None # type: ignore + self._processing_task: Optional[Dict[str, Any]] = None # type: ignore + self._task_deadline: Optional[float] = None # type: ignore + self._processing_task: Optional[Dict[str, Any]] = None # type: ignore + # wait for the round timeout s.t. the next task + # has a full round to be executed + yield from self.wait_until_round_end() + self.set_done() + class TaskExecutionRoundBehaviour(AbstractRoundBehaviour): """TaskExecutionRoundBehaviour""" diff --git a/packages/valory/skills/task_execution_abci/models.py b/packages/valory/skills/task_execution_abci/models.py index 6130bb04..839eb994 100644 --- a/packages/valory/skills/task_execution_abci/models.py +++ b/packages/valory/skills/task_execution_abci/models.py @@ -75,6 +75,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.ipfs_fetch_timeout = self._ensure( "ipfs_fetch_timeout", kwargs=kwargs, type_=float ) + self.task_deadline = self._ensure("task_deadline", kwargs=kwargs, type_=float) super().__init__(*args, **kwargs) def _nested_list_todict_workaround( diff --git a/packages/valory/skills/task_execution_abci/skill.yaml b/packages/valory/skills/task_execution_abci/skill.yaml index 64eb4bf7..d0e2251b 100644 --- a/packages/valory/skills/task_execution_abci/skill.yaml +++ b/packages/valory/skills/task_execution_abci/skill.yaml @@ -8,13 +8,13 @@ license: Apache-2.0 aea_version: '>=1.0.0, <2.0.0' fingerprint: __init__.py: bafybeihrkpey6kxur2uoimrskq2wfpelqidxeapdxie6iuv2x7dk77ksvu - behaviours.py: bafybeihhddcm6qdavd5w4fshyknn3ukjmjpswwcdem6l4no62nqhbgqkda + behaviours.py: bafybeifvcvpopvceuxltecfrz3765zjhl5vpvry5d6kpqzcm4eqc6rkcgy dialogues.py: bafybeibmac3m5u5h6ucoyjr4dazay72dyga656wvjl6z6saapluvjo54ne fsm_specification.yaml: bafybeia66ok2ll4kjbbmgbocjfape6u6ctacgexrnpgmru6zudr5em7vty handlers.py: bafybeibe5n7my2vd2wlwo73sbma65epjqc7kxgtittewlylcmvnmoxtxzq io_/__init__.py: bafybeifxgmmwjqzezzn3e6keh2bfo4cyo7y5dq2ept3stfmgglbrzfl5rq io_/naive_loader.py: bafybeihqrt34jso7dwfcedh7itmmovfv55tdjhw2tkqifsbiohetbonynu - models.py: bafybeihavofxq3nxt46x74idm2mjl5xxghoqzjtuxnx5i255k6mdwsyyaq + models.py: bafybeifc6emcgxezrhudosk3htpznssel6uml57gkqy6gyrh64efl2af24 payloads.py: bafybeigptsnusjowmqjcxnzc4ct7n2iczuiorlwqsg7dl6ipnwkjb6iqoe rounds.py: bafybeifaza7nzpn7fv6xuk6pcamxne3b5tzqogricjkcvbek5cso2emcnm tasks.py: bafybeicu5t5cvfhbndgpxbbtmp4vbmtyb6fba6vsnlewftvuderxp5lwcy @@ -143,6 +143,7 @@ models: safe_contract_address: '0x0000000000000000000000000000000000000000' share_tm_config_on_startup: false sleep_time: 1 + task_deadline: 240.0 tendermint_check_sleep_delay: 3 tendermint_com_url: http://localhost:8080 tendermint_max_retries: 5 diff --git a/tools/prediction_request.py b/tools/prediction_request.py index 9197f8a4..97d56d41 100644 --- a/tools/prediction_request.py +++ b/tools/prediction_request.py @@ -20,6 +20,7 @@ """This module implements a Mech tool for binary predictions.""" import json +import logging from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, Generator, List, Optional, Tuple @@ -223,6 +224,7 @@ def run(**kwargs) -> Tuple[str, Optional[Dict[str, Any]]]: """Run the task""" tool = kwargs["tool"] prompt = kwargs["prompt"] + logger: logging.Logger = kwargs["logger"] max_tokens = kwargs.get("max_tokens", DEFAULT_OPENAI_SETTINGS["max_tokens"]) temperature = kwargs.get("temperature", DEFAULT_OPENAI_SETTINGS["temperature"]) @@ -231,6 +233,7 @@ def run(**kwargs) -> Tuple[str, Optional[Dict[str, Any]]]: raise ValueError(f"Tool {tool} is not supported.") engine = TOOL_TO_ENGINE[tool] + logger.info(f"Fetching additional information") additional_information = ( fetch_additional_information( prompt=prompt, @@ -243,9 +246,11 @@ def run(**kwargs) -> Tuple[str, Optional[Dict[str, Any]]]: if tool == "prediction-online" else "" ) + logger.info(f"Fetched additional information") prediction_prompt = PREDICTION_PROMPT.format( user_prompt=prompt, additional_information=additional_information ) + logger.info(f"Prediction prompt: {prediction_prompt}") moderation_result = openai.Moderation.create(prediction_prompt) if moderation_result["results"][0]["flagged"]: return "Moderation flagged the prompt as in violation of terms.", None @@ -253,6 +258,7 @@ def run(**kwargs) -> Tuple[str, Optional[Dict[str, Any]]]: {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prediction_prompt}, ] + logger.info(f"Messages: {messages}") response = openai.ChatCompletion.create( model=engine, messages=messages, @@ -263,4 +269,5 @@ def run(**kwargs) -> Tuple[str, Optional[Dict[str, Any]]]: request_timeout=150, stop=None, ) + logger.info(f"OpenAI response: {response.choices[0].message.content}") return response.choices[0].message.content, None