Skip to content

Commit

Permalink
feat: add new prediction request sme
Browse files Browse the repository at this point in the history
  • Loading branch information
0xArdi committed Sep 12, 2023
1 parent b12eb4a commit f98590f
Showing 1 changed file with 353 additions and 0 deletions.
353 changes: 353 additions & 0 deletions tools/prediction_request_sme.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------

"""This module implements a Mech tool for binary predictions."""

import json
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Dict, Generator, List, Optional, Tuple

import openai
import requests
from bs4 import BeautifulSoup
from googleapiclient.discovery import build


NUM_URLS_EXTRACT = 5
DEFAULT_OPENAI_SETTINGS = {
"max_tokens": 500,
"temperature": 0.7,
}
ALLOWED_TOOLS = [
"prediction-offline",
"prediction-online",
]
TOOL_TO_ENGINE = {
"prediction-offline": "gpt-3.5-turbo",
"prediction-online": "gpt-3.5-turbo",
}

PREDICTION_PROMPT = """
You are an LLM inside a multi-agent system that takes in a prompt of a user requesting a probability estimation
for a given event. You are provided with an input under the label "USER_PROMPT". You must follow the instructions
under the label "INSTRUCTIONS". You must provide your response in the format specified under "OUTPUT_FORMAT".
INSTRUCTIONS
* Read the input under the label "USER_PROMPT" delimited by three backticks.
* The "USER_PROMPT" specifies an event.
* The event will only have two possible outcomes: either the event will happen or the event will not happen.
* If the event has more than two possible outcomes, you must ignore the rest of the instructions and output the response "Error".
* You must provide a probability estimation of the event happening, based on your training data.
* You are provided an itemized list of information under the label "ADDITIONAL_INFORMATION" delimited by three backticks.
* You can use any item in "ADDITIONAL_INFORMATION" in addition to your training data.
* If an item in "ADDITIONAL_INFORMATION" is not relevant, you must ignore that item for the estimation.
* You must provide your response in the format specified under "OUTPUT_FORMAT".
* Do not include any other contents in your response.
USER_PROMPT:
```
{user_prompt}
```
ADDITIONAL_INFORMATION:
```
{additional_information}
```
OUTPUT_FORMAT
* Your output response must be only a single JSON object to be parsed by Python's "json.loads()".
* The JSON must contain four fields: "p_yes", "p_no", "confidence", and "info_utility".
* Each item in the JSON must have a value between 0 and 1.
- "p_yes": Estimated probability that the event in the "USER_PROMPT" occurs.
- "p_no": Estimated probability that the event in the "USER_PROMPT" does not occur.
- "confidence": A value between 0 and 1 indicating the confidence in the prediction. 0 indicates lowest
confidence value; 1 maximum confidence value.
- "info_utility": Utility of the information provided in "ADDITIONAL_INFORMATION" to help you make the prediction.
0 indicates lowest utility; 1 maximum utility.
* The sum of "p_yes" and "p_no" must equal 1.
* Output only the JSON object. Do not include any other contents in your response.
"""

URL_QUERY_PROMPT = """
You are an LLM inside a multi-agent system that takes in a prompt of a user requesting a probability estimation
for a given event. You are provided with an input under the label "USER_PROMPT". You must follow the instructions
under the label "INSTRUCTIONS". You must provide your response in the format specified under "OUTPUT_FORMAT".
INSTRUCTIONS
* Read the input under the label "USER_PROMPT" delimited by three backticks.
* The "USER_PROMPT" specifies an event.
* The event will only have two possible outcomes: either the event will happen or the event will not happen.
* If the event has more than two possible outcomes, you must ignore the rest of the instructions and output the response "Error".
* You must provide your response in the format specified under "OUTPUT_FORMAT".
* Do not include any other contents in your response.
USER_PROMPT:
```
{user_prompt}
```
OUTPUT_FORMAT
* Your output response must be only a single JSON object to be parsed by Python's "json.loads()".
* The JSON must contain two fields: "queries", and "urls".
- "queries": An array of strings of size between 1 and 5. Each string must be a search engine query that can help obtain relevant information to estimate
the probability that the event in "USER_PROMPT" occurs. You must provide original information in each query, and they should not overlap
or lead to obtain the same set of results.
* Output only the JSON object. Do not include any other contents in your response.
"""


SME_GENERATION_MARKET_PROMPT = """
task question: "{question}"
"""

SME_GENERATION_SYSTEM_PROMPT = """
This task requires answering Yes or No to a specific question related to certain knowledge domains. The final opinion to the question should be determined by one or more subject matter experts (SME) of the related domains. You need to generate one or more SME roles and their role introduction that you believe to be helpful in forming a correct answer to question in the task.
Examples:
task question: "Will Apple release iphone 15 by 1 October 2023?"
[
{
"sme": "Technology Analyst",
"sme_introduction": "You are a seasoned technology analyst AI assistant. Your goal is to do comprehensive research on the news on the tech companies and answer investor's interested questions in a trustful and accurate way."
}
]
---
task question: "Will the newly elected ceremonial president of Singapore face any political scandals by 13 September 2023?"
[
{
"sme": "Political Commentator",
"sme_introduction": "You are an experienced political commentator in Asia. Your main objective is to produce comprehensive, insightful and impartial analysis based on the relevant political news and your politic expertise to form an answer to the question releted to a political event or politician."
}
]
---
task question: "Will the air strike conflict in Sudan be resolved by 13 September 2023?"
[
{
"sme: "Military Expert",
"sme_introduction": "You are an experienced expert in military operation and industry. Your main goal is to faithfully and accurately answer a military related question based on the provided intelligence and your professional experience"
},
{
"sme: "Diplomat",
"sme_introduction": "You are an senior deplomat who engages in diplomacy to foster peaceful relations, negotiate agreements, and navigate complex political, economic, and social landscapes. You need to form an opinion on a question related to international conflicts based on the related information and your understading in geopolitics."
},
]
"""



def search_google(query: str, api_key: str, engine: str, num: int = 3) -> List[str]:
service = build("customsearch", "v1", developerKey=api_key)
search = (
service.cse()
.list(
q=query,
cx=engine,
num=num,
)
.execute()
)
return [result["link"] for result in search["items"]]


def get_urls_from_queries(queries: List[str], api_key: str, engine: str) -> List[str]:
"""Get URLs from search engine queries"""
results = []
for query in queries:
for url in search_google(
query=query,
api_key=api_key,
engine=engine,
num=3, # Number of returned results
):
results.append(url)
unique_results = list(set(results))
return unique_results


def extract_text(
html: str,
num_words: int = 300, # TODO: summerise using GPT instead of limit
) -> str:
"""Extract text from a single HTML document"""
soup = BeautifulSoup(html, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return text[:num_words]


def process_in_batches(
urls: List[str], window: int = 5, timeout: int = 10
) -> Generator[None, None, List[Tuple[Future, str]]]:
"""Iter URLs in batches."""
with ThreadPoolExecutor() as executor:
for i in range(0, len(urls), window):
batch = urls[i : i + window]
futures = [(executor.submit(requests.get, url, timeout=timeout), url) for url in batch]
yield futures

def extract_texts(urls: List[str], num_words: int = 300) -> List[str]:
"""Extract texts from URLs"""
max_allowed = 5
extracted_texts = []
count = 0
stop = False
for batch in process_in_batches(urls=urls):
for future, url in batch:
try:
result = future.result()
if result.status_code != 200:
continue
extracted_texts.append(extract_text(html=result.text, num_words=num_words))
count += 1
if count >= max_allowed:
stop = True
break
except requests.exceptions.ReadTimeout:
print(f"Request timed out: {url}.")
except Exception as e:
print(f"An error occurred: {e}")
if stop:
break
return extracted_texts


def fetch_additional_information(
prompt: str,
engine: str,
temperature: float,
max_tokens: int,
google_api_key: str,
google_engine: str,
) -> str:
"""Fetch additional information."""
url_query_prompt = URL_QUERY_PROMPT.format(user_prompt=prompt)
moderation_result = openai.Moderation.create(url_query_prompt)
if moderation_result["results"][0]["flagged"]:
return ""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": url_query_prompt},
]
response = openai.ChatCompletion.create(
model=engine,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
n=1,
timeout=90,
request_timeout=90,
stop=None,
)
json_data = json.loads(response.choices[0].message.content)
urls = get_urls_from_queries(
json_data["queries"],
api_key=google_api_key,
engine=google_engine,
)
texts = extract_texts(urls)
return "\n".join(["- " + text for text in texts])


def get_sme_role(engine, temperature, max_tokens, prompt) -> Tuple[str, str]:
"""Get SME title and introduction"""
market_question = SME_GENERATION_MARKET_PROMPT.format(question=prompt)
system_prompt = SME_GENERATION_SYSTEM_PROMPT

messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": market_question},
]
response = openai.ChatCompletion.create(
model=engine,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
n=1,
timeout=150,
request_timeout=150,
stop=None,
)
generated_sme_roles = response.choices[0].message.content
# check whether the generated_sme_roles is valid json
sme = json.loads(generated_sme_roles)
return sme["sme"], sme["sme_introduction"]


def run(**kwargs) -> Tuple[str, Optional[Dict[str, Any]]]:
"""Run the task"""
tool = kwargs["tool"]
prompt = kwargs["prompt"]
max_tokens = kwargs.get("max_tokens", DEFAULT_OPENAI_SETTINGS["max_tokens"])
temperature = kwargs.get("temperature", DEFAULT_OPENAI_SETTINGS["temperature"])

openai.api_key = kwargs["api_keys"]["openai"]
if tool not in ALLOWED_TOOLS:
raise ValueError(f"Tool {tool} is not supported.")

engine = TOOL_TO_ENGINE[tool]

try:
sme, sme_introduction = get_sme_role(
engine,
temperature,
max_tokens,
prompt,
)
except Exception as e:
print(f"An error occurred during SME role creation: {e}")
print("Using default SME introduction.")
sme_introduction = "You are a helpful assistant."

additional_information = (
fetch_additional_information(
prompt=prompt,
engine=engine,
temperature=temperature,
max_tokens=max_tokens,
google_api_key=kwargs["api_keys"]["google_api_key"],
google_engine=kwargs["api_keys"]["google_engine_id"],
)
if tool == "prediction-online"
else ""
)
prediction_prompt = PREDICTION_PROMPT.format(
user_prompt=prompt, additional_information=additional_information
)
moderation_result = openai.Moderation.create(prediction_prompt)
if moderation_result["results"][0]["flagged"]:
return "Moderation flagged the prompt as in violation of terms.", None
messages = [
{"role": "system", "content": sme_introduction},
{"role": "user", "content": prediction_prompt},
]
response = openai.ChatCompletion.create(
model=engine,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
n=1,
timeout=150,
request_timeout=150,
stop=None,
)
return response.choices[0].message.content, None

0 comments on commit f98590f

Please sign in to comment.