Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring deployment a bit #15

Merged
merged 16 commits into from
Feb 16, 2024
36 changes: 7 additions & 29 deletions examples/cloud_deployment/gcp/deploy.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
import getpass
import os

from prediction_market_agent_tooling.deploy.gcp.deploy import (
deploy_to_gcp,
remove_deployed_gcp_function,
run_deployed_gcp_function,
schedule_deployed_gcp_function,
)
from prediction_market_agent_tooling.deploy.gcp.utils import gcp_function_is_active
from prediction_market_agent_tooling.deploy.agent_example import DeployableCoinFlipAgent
from prediction_market_agent_tooling.markets.markets import MarketType

if __name__ == "__main__":
current_dir = os.path.dirname(os.path.realpath(__file__))
fname = deploy_to_gcp(
requirements_file=None,
extra_deps=[
"git+https://github.com/gnosis/prediction-market-agent-tooling.git@main"
],
function_file=f"{current_dir}/agent.py",
agent = DeployableCoinFlipAgent()
agent.deploy_gcp(
kongzii marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Switch to main.
repository="git+https://github.com/gnosis/prediction-market-agent-tooling.git@peter/refactor-deployment",
market_type=MarketType.MANIFOLD,
labels={
"owner": getpass.getuser()
Expand All @@ -27,18 +17,6 @@
secrets={
"MANIFOLD_API_KEY": f"JUNG_PERSONAL_GMAIL_MANIFOLD_API_KEY:latest"
}, # Must be in the format "env_var_in_container => secret_name:version", you can create secrets using `gcloud secrets create --labels owner=<your-name> <secret-name>` command.
memory=512,
memory=256,
cron_schedule="0 */2 * * *",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much cleaner :). Removing remove_deployed_gcp_function though - living a high-risk lifestyle!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be auto-removed after it's scheduled to run every 2 hours? I thought that was there only for the example purposes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the example was there originally as something that demos all the deployment functionality but destroys everything at the end, kind of like a test that the user could run. Just guarding against the user running it without realising that they're spawning a forever-alive agent! No big deal though

)

# Check that the function is deployed
assert gcp_function_is_active(fname)

# Run the function
response = run_deployed_gcp_function(fname)
assert response.ok

# Schedule the function to run once every 2 hours
schedule_deployed_gcp_function(fname, cron_schedule="0 */2 * * *")

# Delete the function
remove_deployed_gcp_function(fname)
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
python_version = 3.9
python_version = 3.10
files = prediction_market_agent_tooling/, tests/, examples/, scripts/
plugins = pydantic.mypy
warn_redundant_casts = True
Expand Down
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 82 additions & 23 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import inspect
import os
import tempfile
import time
import typing as t
from decimal import Decimal
from enum import Enum

from pydantic import BaseModel

from prediction_market_agent_tooling.deploy.gcp.deploy import (
deploy_to_gcp,
run_deployed_gcp_function,
schedule_deployed_gcp_function,
)
from prediction_market_agent_tooling.deploy.gcp.utils import gcp_function_is_active
from prediction_market_agent_tooling.markets.data_models import (
AgentMarket,
BetAmount,
Expand All @@ -16,12 +23,19 @@
)


class DeploymentType(str, Enum):
GOOGLE_CLOUD = "google_cloud"
LOCAL = "local"
class DeployableAgent:
def __init__(self) -> None:
self.load()

def __init_subclass__(cls, **kwargs: t.Any) -> None:
if cls.__init__ is not DeployableAgent.__init__:
raise TypeError(
"Cannot override __init__ method of DeployableAgent class, please override the `load` method to set up the agent."
)

def load(self) -> None:
pass

class DeployableAgent(BaseModel):
def pick_markets(self, markets: list[AgentMarket]) -> list[AgentMarket]:
"""
This method should be implemented by the subclass to pick the markets to bet on. By default, it picks only the first market.
Expand All @@ -34,26 +48,72 @@ def answer_binary_market(self, market: AgentMarket) -> bool:
"""
raise NotImplementedError("This method must be implemented by the subclass")

def deploy(
def deploy_local(
self,
market_type: MarketType,
deployment_type: DeploymentType,
sleep_time: float,
timeout: float,
place_bet: bool,
) -> None:
if deployment_type == DeploymentType.GOOGLE_CLOUD:
# Deploy to Google Cloud Functions, and use Google Cloud Scheduler to run the function
raise NotImplementedError(
"TODO not currently possible via DeployableAgent class. See examples/cloud_deployment/ instead."
start_time = time.time()
while True:
self.run(market_type=market_type, _place_bet=place_bet)
time.sleep(sleep_time)
if time.time() - start_time > timeout:
break

def deploy_gcp(
self,
repository: str,
market_type: MarketType,
memory: int,
labels: dict[str, str] | None = None,
env_vars: dict[str, str] | None = None,
secrets: dict[str, str] | None = None,
cron_schedule: str | None = None,
) -> None:
path_to_agent_file = os.path.relpath(inspect.getfile(self.__class__))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return path to the agent's implementation, not to this abstract class.

Screenshot by Dropbox Capture


entrypoint_template = f"""
from {path_to_agent_file.replace("/", ".").replace(".py", "")} import *
import functions_framework
from prediction_market_agent_tooling.markets.markets import MarketType

@functions_framework.http
def main(request) -> str:
{self.__class__.__name__}().run(market_type={market_type.__class__.__name__}.{market_type.name})
return "Success"
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy with this, but it works for now. We can iterate again later.

During the implementation, I somehow didn't see how static functions would help in the end. I wanted to use them but then ended up with the current solution.

Please comment if I missed something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wooo nice. The problem I saw with not making run a static function is that the user can now define their agent like:

from pydantic import BaseModel
from prediction_market_agent_tooling.deploy.agent import DeployableAgent

class Foo(BaseModel):
    x: int

class DeployableCoinFlipAgent(DeployableAgent):
    foo: Foo

    ...

so now can have arbitrary dependencies that we need to know about to initialize the agent. e.g. in this example we need to import and create an instance of Foo, so deployment will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But even if run is a static function, isn't the problem still the same? Even if run is static, they are still able to do

class DeployableCoinFlipAgent(DeployableAgent):
    foo: Foo

so, nothing changes, or does it?

Doing{self.__class__.__name__}() assumes that the class doesn't take any init arguments, but if run is static, the class needs to be initialised somewhere, so it's the same issue.

What we could do is

class DeployableAgent:
   @staticmethod
   def create_agent() -> "DeployableAgent":
      """Override this method if your agents needs custom initialization"""
      return DeployableAgent()

and use {self.__class__.__name__}.create_agent()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that because the static method can't access self, it doesn't depend on Foo to run it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried with this branch, and e.g.

from pydantic import BaseModel

from prediction_market_agent_tooling.deploy.agent import DeployableAgent
from prediction_market_agent_tooling.markets.data_models import AgentMarket
from prediction_market_agent_tooling.markets.markets import MarketType

class Foo(BaseModel):
    answer: bool

class FooAgent(DeployableAgent):
    def __init__(self, foo: Foo):
        self.foo = foo

    def answer_binary_market(self, market: AgentMarket) -> bool:
        return self.foo.answer

if __name__ == "__main__":
    agent = FooAgent(foo=Foo(answer=True))
    agent.deploy_gcp(
        repository="git+https://github.com/gnosis/prediction-market-agent-tooling.git@peter/refactor-deployment",
        market_type=MarketType.MANIFOLD,
        memory=1024,
    )

which produced

import functions_framework
from <some path> import *

from prediction_market_agent_tooling.markets.markets import MarketType


@functions_framework.http
def main(request) -> str:
    FooAgent().run(market_type=MarketType.MANIFOLD)
    return "Success"

(the <some path> string was a bit funky, but ignoring for now). This failed with:

    FooAgent().run(market_type=MarketType.MANIFOLD)
TypeError: FooAgent.__init__() missing 1 required positional argument: 'foo'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd expect it to fail, as foo is the required argument in your FooAgent.

So you could do

class FooAgent(DeployableAgent):
    def __init__(self, foo: Foo):
        self.foo = foo

    def answer_binary_market(self, market: AgentMarket) -> bool:
        return self.foo.answer

    @staticmethod
    def create_agent() -> "FooAgent":
        return FooAgent(foo=...)

And instead of calling FooAgent() in the generated function, we would call FooAgent.create_agent().

I think our situation is very similar to the MLFlow, https://mlflow.org/docs/latest/_modules/mlflow/pyfunc/model.html#PythonModel, they also have some "parent class" that needs to be subclassed, and then this subclass can be deployed by MLFlow-tools without any worries. They don't allow to change init as well, instead there are special methods that are auto-called in the deployment.


But back to the staticmethod, let's say we have this:

class DeployableAgent:
   ...
   
   @staticmethod
   def run(market_type: MarketType, _place_bet: bool = True) -> None:
        available_markets = [
            x.to_agent_market() for x in get_binary_markets(market_type)
        ]
        markets = self.pick_markets(available_markets)
        for market in markets:
            result = self.answer_binary_market(market)
            if _place_bet:
                print(f"Placing bet on {market} with result {result}")
                place_bet(
                    market=market.original_market,
                    amount=get_tiny_bet(market_type),
                    outcome=result,
                    omen_auto_deposit=True,
                )

Now we don't have self anymore, so we need to change it, but how? We can't do

result = DeployableAgent.answer_binary_market(market)

because that way, anything subclassed won't be used, the parent abstract class will always be used.

And we also can't do

   @staticmethod
   def run(agent: DeployableAgent, market_type: MarketType, _place_bet: bool = True) -> None:
       ...

because we would be again at the square 0, we would need to initialise the agent inside of the generated main function, and that initialisation could fail, so we would need something as create_agent I proposed above anyway.

Or did you have something else in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right! Happy to go with this option then. We can add a guard to the base class to throw an exception if the derived class tries to override the init method:

class Base:
    def __init__(self):
        pass

    def __init_subclass__(cls, **kwargs):
        if cls.__init__ is not Base.__init__:
            raise TypeError("Cannot override __init__ method of Base class")


class Derived(Base):
    # Raises a TypeError
    def __init__(self):
        pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wau I didn't know about __init_subclass__ :o Added!


gcp_fname = self.get_gcloud_fname(market_type)

with tempfile.NamedTemporaryFile(mode="w", suffix=".py") as f:
f.write(entrypoint_template)
f.flush()

fname = deploy_to_gcp(
gcp_fname=gcp_fname,
requirements_file=None,
extra_deps=[repository],
function_file=f.name,
labels=labels,
env_vars=env_vars,
secrets=secrets,
memory=memory,
)
elif deployment_type == DeploymentType.LOCAL:
start_time = time.time()
while True:
self.run(market_type=market_type, _place_bet=place_bet)
time.sleep(sleep_time)
if time.time() - start_time > timeout:
break

# Check that the function is deployed
if not gcp_function_is_active(fname):
raise RuntimeError("Failed to deploy the function")

# Run the function
response = run_deployed_gcp_function(fname)
if not response.ok:
raise RuntimeError("Failed to run the deployed function")

# Schedule the function
if cron_schedule:
schedule_deployed_gcp_function(fname, cron_schedule=cron_schedule)

def run(self, market_type: MarketType, _place_bet: bool = True) -> None:
available_markets = [
Expand All @@ -71,9 +131,8 @@ def run(self, market_type: MarketType, _place_bet: bool = True) -> None:
omen_auto_deposit=True,
)

@classmethod
def get_gcloud_fname(cls, market_type: MarketType) -> str:
return f"{cls.__class__.__name__.lower()}-{market_type}-{int(time.time())}"
def get_gcloud_fname(self, market_type: MarketType) -> str:
return f"{self.__class__.__name__.lower()}-{market_type}-{int(time.time())}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a normal method with self, fixed the name in gcp:

Screenshot by Dropbox Capture



def get_tiny_bet(market_type: MarketType) -> BetAmount:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import random

import functions_framework
from flask.wrappers import Request

from prediction_market_agent_tooling.deploy.agent import DeployableAgent
from prediction_market_agent_tooling.markets.data_models import AgentMarket
from prediction_market_agent_tooling.markets.markets import MarketType


class DeployableCoinFlipAgent(DeployableAgent):
Expand All @@ -16,9 +12,3 @@ def pick_markets(self, markets: list[AgentMarket]) -> list[AgentMarket]:

def answer_binary_market(self, market: AgentMarket) -> bool:
return random.choice([True, False])


@functions_framework.http
def main(request: Request) -> str:
DeployableCoinFlipAgent().run(market_type=MarketType.MANIFOLD)
return "Success"
12 changes: 4 additions & 8 deletions prediction_market_agent_tooling/deploy/gcp/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import requests
from cron_validator import CronValidator

from prediction_market_agent_tooling.deploy.agent import DeployableAgent
from prediction_market_agent_tooling.deploy.gcp.utils import (
gcloud_create_topic_cmd,
gcloud_delete_function_cmd,
Expand All @@ -17,18 +16,17 @@
get_gcloud_function_uri,
get_gcloud_id_token,
)
from prediction_market_agent_tooling.markets.markets import MarketType
from prediction_market_agent_tooling.tools.utils import export_requirements_from_toml


def deploy_to_gcp(
gcp_fname: str,
function_file: str,
requirements_file: t.Optional[str],
extra_deps: list[str],
labels: dict[str, str],
env_vars: dict[str, str],
secrets: dict[str, str],
market_type: MarketType,
labels: dict[str, str] | None,
env_vars: dict[str, str] | None,
secrets: dict[str, str] | None,
memory: int, # in MB
) -> str:
if requirements_file and not os.path.exists(requirements_file):
Expand All @@ -37,8 +35,6 @@ def deploy_to_gcp(
if not os.path.exists(function_file):
raise ValueError(f"File {function_file} does not exist")

gcp_fname = DeployableAgent().get_gcloud_fname(market_type=market_type)

# Make a tempdir to store the requirements file and the function
with tempfile.TemporaryDirectory() as tempdir:
# Copy function_file to tempdir/main.py
Expand Down
21 changes: 12 additions & 9 deletions prediction_market_agent_tooling/deploy/gcp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ def gcloud_deploy_cmd(
gcp_function_name: str,
source: str,
entry_point: str,
labels: dict[str, str],
env_vars: dict[str, str],
secrets: dict[str, str],
labels: dict[str, str] | None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just make them default empty dicts to save a few chars below on if statements

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't defaulting to dicts/lists/... forbidden pattern in Python, because of mutability issues?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱 I wasn't aware of that before! TIL...
Explained well here: https://chat.openai.com/share/045afe93-640a-43e7-91b7-24c7e32a197e

Not an issue in this case I guess, as these dicts are only read, not written to. But thanks for the lesson and I retract my suggestion!

env_vars: dict[str, str] | None,
secrets: dict[str, str] | None,
memory: int, # in MB
) -> str:
cmd = (
Expand All @@ -27,12 +27,15 @@ def gcloud_deploy_cmd(
f"--memory {memory}MB "
f"--no-allow-unauthenticated "
)
for k, v in labels.items():
cmd += f"--update-labels {k}={v} "
for k, v in env_vars.items():
cmd += f"--set-env-vars {k}={v} "
for k, v in secrets.items():
cmd += f"--set-secrets {k}={v} "
if labels:
for k, v in labels.items():
cmd += f"--update-labels {k}={v} "
if env_vars:
for k, v in env_vars.items():
cmd += f"--set-env-vars {k}={v} "
if secrets:
for k, v in secrets.items():
cmd += f"--set-secrets {k}={v} "

return cmd

Expand Down
5 changes: 2 additions & 3 deletions tests/deploy/test_deploy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random

from prediction_market_agent_tooling.deploy.agent import DeployableAgent, DeploymentType
from prediction_market_agent_tooling.deploy.agent import DeployableAgent
from prediction_market_agent_tooling.markets.data_models import AgentMarket
from prediction_market_agent_tooling.markets.markets import MarketType

Expand All @@ -16,10 +16,9 @@ def answer_binary_market(self, market: AgentMarket) -> bool:
return random.choice([True, False])

agent = DeployableCoinFlipAgent()
agent.deploy(
agent.deploy_local(
sleep_time=0.001,
market_type=MarketType.MANIFOLD,
deployment_type=DeploymentType.LOCAL,
timeout=0.01,
place_bet=False,
)
Loading