-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement a custom filtering method #737
Conversation
Implement a custom filtering method to handle potential RPC timeouts or hanging.
95db6f9
to
55bbd35
Compare
def rpc_call_with_timeout(func: Callable, timeout: int) -> Any: | ||
"""Execute an RPC call with a timeout.""" | ||
|
||
with concurrent.futures.ThreadPoolExecutor(RPC_CALL_MAX_WORKERS) as executor: | ||
# submit the function with the RPC call to the executor | ||
future = executor.submit(func) | ||
|
||
try: | ||
# wait for the result with a timeout | ||
data = future.result(timeout=timeout) | ||
except TimeoutError: | ||
# handle the case where the execution times out | ||
err = f"The RPC didn't respond within {timeout} seconds." | ||
return None, err | ||
|
||
# check if an error occurred | ||
if isinstance(data, str): | ||
# handle the case where the execution fails | ||
return None, data | ||
|
||
return data, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the purpose of this method, you're creating a thread pool executor and you're only submitting one function at a time which means you're basically executing one function at a time instead of executing multiple function calls parallely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's basically for the timeout to avoid hanging:
:param timeout: a timeout in seconds to interrupt the operation in case the RPC request hangs. |
filtering_operation = self.batch_filter_wrapper( | ||
event, match_single, match_any, to_block_adj, from_block | ||
) | ||
batch_result, err = rpc_call_with_timeout(filtering_operation, timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proposed changes
Implement a custom filtering method to handle RPC timeouts.
Fixes
Fixes valory-xyz/open-autonomy#2121.
Types of changes
What types of changes does your code introduce to agents-aea?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply.develop
branch (left side). Also you should start your branch off ourdevelop
.Further comments
n/a