Skip to content

Commit

Permalink
Merge pull request #110 from valory-xyz/chore/analysis_invalid_markets
Browse files Browse the repository at this point in the history
Analysis invalid markets
  • Loading branch information
jmoreira-valory authored Sep 16, 2024
2 parents 2d7af26 + 5bdd855 commit a72113c
Show file tree
Hide file tree
Showing 3 changed files with 531 additions and 60 deletions.
75 changes: 67 additions & 8 deletions scripts/list_finalizing_markets.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": null,
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -29,7 +29,7 @@
"\"\"\"Script for analyzing market creator markets.\"\"\"\n",
"\n",
"from collections import defaultdict\n",
"from datetime import datetime, timedelta\n",
"from datetime import datetime, timedelta, timezone\n",
"from enum import Enum\n",
"from mech_request_utils import get_mech_requests, IPFS_ADDRESS\n",
"from scipy import stats\n",
Expand Down Expand Up @@ -258,7 +258,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -374,7 +374,7 @@
" url = \"\"\n",
" if market.get(\"question\"):\n",
" question_title = market[\"question\"].get(\"title\", \"\")\n",
" url = f'https://aiomen.eth.limo/#/{market.get(\"id\",\"\")}'\n",
" url = f'https://presagio.pages.dev/markets?id={market.get(\"id\",\"\")}'\n",
" return _make_clickable(question_title, url)\n",
"\n",
"\n",
Expand Down Expand Up @@ -498,13 +498,13 @@
" return MechDeliverState.TIMEOUT\n",
"\n",
" result = result.replace(\" \", \"\")\n",
" if result == \"{\\\"is_determinable\\\":false}\":\n",
" if \"is_determinable\\\":false\" in result:\n",
" return MechDeliverState.RESP_NOT_DETERMINABLE\n",
" if result == \"{\\\"has_occurred\\\":false}\":\n",
" if \"\\\"has_occurred\\\":false\" in result:\n",
" return MechDeliverState.RESP_NO\n",
" if result == \"{\\\"has_occurred\\\":true}\":\n",
" if \"\\\"has_occurred\\\":true\" in result:\n",
" return MechDeliverState.RESP_YES\n",
" if result.startswith(\"{\\\"is_valid\\\":false\"):\n",
" if \"\\\"is_valid\\\":false\" in result:\n",
" return MechDeliverState.RESP_INVALID_MARKET\n",
"\n",
" return MechDeliverState.UNKNOWN\n",
Expand Down Expand Up @@ -659,6 +659,9 @@
"\n",
"print(f\"Filtering {market_states} markets\")\n",
"df_filtered = df[df[\"State\"].isin(market_states)]\n",
"#df_filtered = df[df[\"Current answer\"]==\"Invalid\"]\n",
"\n",
"\n",
"pd.options.display.max_colwidth = 150\n",
"html = df_filtered.to_html(\n",
" escape=False,\n",
Expand All @@ -668,6 +671,62 @@
"\n",
"display(HTML(html))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def plot_closed_markets(data: Dict[str, Any], start_timestamp: int, end_timestamp: int) -> None:\n",
" markets = data[\"data\"][\"fixedProductMarketMakers\"]\n",
" \n",
" filtered_markets = [\n",
" item for item in markets\n",
" if item['creationTimestamp'] is not None and\n",
" start_timestamp <= int(item['creationTimestamp']) < end_timestamp\n",
" ]\n",
" \n",
" print(len(filtered_markets))\n",
" answer_counts = {}\n",
"\n",
" for item in filtered_markets:\n",
" current_answer = item.get('currentAnswer')\n",
" \n",
" if current_answer is not None:\n",
" answer_counts[current_answer] = answer_counts.get(current_answer, 0) + 1\n",
" \n",
" # Collect the questions for filtered markets\n",
" question = item.get('question', {}).get('title', 'No Title')\n",
"\n",
" # Prepare data for pie chart\n",
" labels = [f\"{answer_mapping.get(answer.lower(), '--')} ({count})\" for answer, count in answer_counts.items()]\n",
" sizes = list(answer_counts.values())\n",
"\n",
" # Format the dates for the title\n",
" start_datetime = datetime.fromtimestamp(start_timestamp, tz=timezone.utc)\n",
" end_datetime = datetime.fromtimestamp(end_timestamp, tz=timezone.utc)\n",
" start_date_str = start_datetime.isoformat()\n",
" end_date_str = end_datetime.isoformat()\n",
"\n",
" # Plot pie chart\n",
" plt.figure(figsize=(10, 6))\n",
" plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140, colors=plt.cm.Paired.colors)\n",
" plt.title(f'Distribution of Current Answers in Closed Markets\\nCreation timestamp: >= {start_date_str} UTC < {end_date_str} UTC')\n",
" plt.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle.\n",
" plt.show()\n",
"\n",
" for item in filtered_markets:\n",
" if item.get('currentAnswer') == \"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff\":\n",
" print(f'- {item[\"question\"].get(\"title\", \"\")}')\n",
"\n",
"\n",
"\n",
"month= 7\n",
"plot_closed_markets(data, int(datetime(2024, 6, 1, tzinfo=timezone.utc).timestamp()), int(datetime(2024, month+1, 31, tzinfo=timezone.utc).timestamp()))\n",
"\n",
"\n"
]
}
],
"metadata": {
Expand Down
185 changes: 133 additions & 52 deletions scripts/mech_request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import json
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from typing import Any, Dict, List

import requests
Expand All @@ -33,9 +35,10 @@
TEXT_ALIGNMENT = 30
MINIMUM_WRITE_FILE_DELAY_SECONDS = 20
MECH_FROM_BLOCK_RANGE = 50000
MECH_REQUESTS_JSON_PATH = "mech_requests.json"
DEFAULT_MECH_REQUESTS_JSON_PATH = "mech_requests.json"
IPFS_ADDRESS = "https://gateway.autonolas.tech/ipfs/"
THEGRAPH_ENDPOINT = "https://api.studio.thegraph.com/query/57238/mech/0.0.2"
THREAD_POOL_EXECUTOR_MAX_WORKERS = 10

REQUESTS_QUERY = """
query requests_query($sender: Bytes, $id_gt: Bytes) {
Expand All @@ -52,8 +55,8 @@
"""

DELIVERS_QUERY = """
query delivers_query($requestId: BigInt, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) {
delivers(where: {requestId: $requestId, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: blockNumber, first: 1000) {
query delivers_query($requestId_in: [BigInt!], $id_gt: Bytes) {
delivers(where: {requestId_in: $requestId_in, id_gt: $id_gt}, orderBy: blockNumber, first: 1000) {
blockNumber
blockTimestamp
id
Expand Down Expand Up @@ -92,70 +95,116 @@ def _populate_missing_requests(sender: str, mech_requests: Dict[str, Any]) -> No
_write_mech_events_to_file(mech_requests)

_write_mech_events_to_file(mech_requests, True)
print(f"{f'{len(mech_requests)} requests found':>{TEXT_ALIGNMENT}}")


def _populate_missing_responses(mech_requests: Dict[str, Any]) -> None:
def _populate_missing_delivers(mech_requests: Dict[str, Any]) -> None:
transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT)
client = Client(transport=transport, fetch_schema_from_transport=True)

for _, mech_request in tqdm(
mech_requests.items(),
desc=f"{'Fetching responses':>{TEXT_ALIGNMENT}}",
NUM_REQUESTS_PER_QUERY = 100
pending_mech_requests = {
k: req for k, req in mech_requests.items() if "deliver" not in req
}

progress_bar = tqdm(
total=len(pending_mech_requests),
desc=f"{'Fetching delivers':>{TEXT_ALIGNMENT}}",
miniters=1,
):
if "deliver" in mech_request:
continue
)

variables = {
"requestId": mech_request["requestId"],
"blockNumber_gte": mech_request["blockNumber"],
"blockNumber_lte": str(
int(mech_request["blockNumber"]) + MECH_FROM_BLOCK_RANGE
),
}
response = client.execute(gql(DELIVERS_QUERY), variable_values=variables)
items = response.get("delivers")
while pending_mech_requests:
picked_requests = list(pending_mech_requests.values())[:NUM_REQUESTS_PER_QUERY]

for mech_request in picked_requests:
del pending_mech_requests[mech_request["id"]]

requestsId_in = [mech_request["requestId"] for mech_request in picked_requests]

mech_delivers = []
id_gt = "0x00"
while True:
variables = {
"requestId_in": requestsId_in,
"id_gt": id_gt,
}
response = client.execute(gql(DELIVERS_QUERY), variable_values=variables)
items = response.get("delivers")

if not items:
break

mech_delivers.extend(items)
id_gt = items[-1]["id"]

# If the user sends requests with the same values (tool, prompt, nonce) it
# will generate the same requestId. Therefore, multiple items can be retrieved
# at this point. We assume the most likely deliver to this request is the
# one with the closest blockNumber among all delivers with the same requestId.
if items:
mech_request["deliver"] = items[0]

#
# In conclusion, for each request in picked_requests, find the deliver with the
# smallest blockNumber such that >= request's blockNumber
for mech_request in picked_requests:
for deliver in mech_delivers:
if deliver["requestId"] == mech_request["requestId"] and int(
deliver["blockNumber"]
) >= int(mech_request["blockNumber"]):
mech_request["deliver"] = deliver
break

progress_bar.update(len(picked_requests))
_write_mech_events_to_file(mech_requests)

_write_mech_events_to_file(mech_requests, True)


def _populate_missing_ipfs_contents(mech_requests: Dict[str, Any]) -> None:
for _, mech_request in tqdm(
mech_requests.items(),
desc=f"{'Fetching IPFS contents':>{TEXT_ALIGNMENT}}",
miniters=1,
):
def _populate_event_ipfs_contents(event: Dict[str, Any], url: str) -> None:
response = requests.get(url)
response.raise_for_status() # Raise an exception for HTTP error responses
event["ipfsContents"] = response.json()


def _populate_missing_ipfs_contents(mech_requests: Dict[str, Any]) -> int:
error_count = 0

# Collect all pending events
pending_events = []

for _, mech_request in mech_requests.items():
if "ipfsContents" not in mech_request:
ipfs_hash = mech_request["ipfsHash"]
url = f"{IPFS_ADDRESS}{ipfs_hash}/metadata.json"
response = requests.get(url)
response.raise_for_status()
mech_request["ipfsContents"] = response.json()

if "deliver" not in mech_request:
continue

deliver = mech_request["deliver"]
if "ipfsContents" not in deliver:
ipfs_hash = deliver["ipfsHash"]
request_id = deliver["requestId"]
url = f"{IPFS_ADDRESS}{ipfs_hash}/{request_id}"
response = requests.get(url)
response.raise_for_status()
deliver["ipfsContents"] = response.json()
pending_events.append((mech_request, url))

_write_mech_events_to_file(mech_requests)
if "deliver" in mech_request:
deliver = mech_request["deliver"]
if "ipfsContents" not in deliver:
ipfs_hash = deliver["ipfsHash"]
request_id = deliver["requestId"]
url = f"{IPFS_ADDRESS}{ipfs_hash}/{request_id}"
pending_events.append((deliver, url))

with ThreadPoolExecutor(max_workers=THREAD_POOL_EXECUTOR_MAX_WORKERS) as executor:
futures = [
executor.submit(_populate_event_ipfs_contents, event, url)
for event, url in pending_events
]

for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"{'Fetching IPFS contents':>{TEXT_ALIGNMENT}}",
miniters=1,
):
try:
future.result()
_write_mech_events_to_file(mech_requests)
except Exception as e: # pylint: disable=broad-except
error_count += 1
print(f"Error occurred: {e}")

_write_mech_events_to_file(mech_requests, True)
return error_count


def _find_duplicate_delivers(
Expand Down Expand Up @@ -200,6 +249,7 @@ def _process_duplicate_delivers(mech_requests: Dict[str, Any]) -> None:


last_write_time = 0.0
mech_events_json_path = DEFAULT_MECH_REQUESTS_JSON_PATH


def _write_mech_events_to_file(
Expand All @@ -209,24 +259,55 @@ def _write_mech_events_to_file(
now = time.time()

if force_write or (now - last_write_time) >= MINIMUM_WRITE_FILE_DELAY_SECONDS:
with open(MECH_REQUESTS_JSON_PATH, "w", encoding="utf-8") as file:
json.dump({"mechRequests": mech_requests}, file, indent=2, sort_keys=True)
try:
with open(mech_events_json_path, "r", encoding="utf-8") as file:
data = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
data = {}

data["mechRequests"] = mech_requests

with open(mech_events_json_path, "w", encoding="utf-8") as file:
json.dump(data, file, indent=2, sort_keys=True)

last_write_time = now


def get_mech_requests(sender: str) -> Dict[str, Any]:
def get_mech_requests(
sender: str, json_path: str = DEFAULT_MECH_REQUESTS_JSON_PATH
) -> Dict[str, Any]:
"""Get Mech requests populated with the associated response and IPFS contents."""
mech_requests = {}
start_time = time.time()

global mech_events_json_path # pylint: disable=global-statement
mech_events_json_path = json_path

try:
with open(MECH_REQUESTS_JSON_PATH, "r", encoding="UTF-8") as json_file:
with open(mech_events_json_path, "r", encoding="UTF-8") as json_file:
existing_data = json.load(json_file)
mech_requests = existing_data.get("mechRequests", {})
except FileNotFoundError:
pass # File doesn't exist yet, so there are no existing requests
mech_requests = {}

_populate_missing_requests(sender.lower(), mech_requests)
_populate_missing_responses(mech_requests)
_populate_missing_delivers(mech_requests)
_process_duplicate_delivers(mech_requests)
_find_duplicate_delivers(mech_requests)
_populate_missing_ipfs_contents(mech_requests)
error_count = _populate_missing_ipfs_contents(mech_requests)

if error_count > 0:
print(f"{error_count} errors populating IPFS contents. Retrying again...")
_populate_missing_ipfs_contents(mech_requests)

end_time = time.time()
elapsed_time = end_time - start_time
elapsed_timedelta = timedelta(seconds=elapsed_time)
formatted_time = str(elapsed_timedelta)
print(f"Time of execution: {formatted_time}")

return mech_requests


if __name__ == "__main__":
service_safe_address = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec"
get_mech_requests(service_safe_address, f"dataset_{service_safe_address}.json")
Loading

0 comments on commit a72113c

Please sign in to comment.