Skip to content

Commit

Permalink
Merge pull request #348 from PrefectHQ/run_until
Browse files Browse the repository at this point in the history
Add utilities for controling loop termination
  • Loading branch information
jlowin authored Oct 9, 2024
2 parents 70ef1dc + b6fc900 commit 33c0701
Show file tree
Hide file tree
Showing 27 changed files with 971 additions and 215 deletions.
2 changes: 1 addition & 1 deletion docs/concepts/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Note that this setting reflects the configuration of the `completion_tools` para

import { VersionBadge } from '/snippets/version-badge.mdx'

<VersionBadge version="0.10.0" />
<VersionBadge version="0.10" />

In addition to specifying which agents are automatically given completion tools, you can control which completion tools are generated for a task using the `completion_tools` parameter. This allows you to specify whether you want to provide success and/or failure tools, or even provide custom completion tools.

Expand Down
102 changes: 102 additions & 0 deletions docs/examples/features/early-termination.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
---
title: Early Termination
description: Control workflow execution with flexible termination logic.
icon: circle-stop
---

import { VersionBadge } from "/snippets/version-badge.mdx"

<VersionBadge version="0.11" />

This example demonstrates how to use termination conditions with the `run_until` parameter to control the execution of a ControlFlow workflow. We'll create a simple research workflow that stops under various conditions, showcasing the flexibility of this feature. In this case, we'll allow research to continue until either two topics are researched or 15 LLM calls are made.

## Code

```python
import controlflow as cf
from controlflow.orchestration.conditions import AnyComplete, MaxLLMCalls
from pydantic import BaseModel


class ResearchPoint(BaseModel):
topic: str
key_findings: list[str]


@cf.flow
def research_workflow(topics: list[str]):
if len(topics) < 2:
raise ValueError("At least two topics are required")

research_tasks = [
cf.Task(f"Research {topic}", result_type=ResearchPoint)
for topic in topics
]

# Run tasks with termination conditions
results = cf.run_tasks(
research_tasks,
instructions="Research only one topic at a time.",
run_until=(
AnyComplete(min_complete=2) # stop after two tasks (if there are more than two topics)
| MaxLLMCalls(15) # or stop after 15 LLM calls, whichever comes first
)
)

completed_research = [r for r in results if isinstance(r, ResearchPoint)]
return completed_research
```

<CodeGroup>

Now, if we run this workflow on 4 topics, it will stop after two topics are researched:

```python Example Usage
# Example usage
topics = [
"Artificial Intelligence",
"Quantum Computing",
"Biotechnology",
"Renewable Energy",
]
results = research_workflow(topics)

print(f"Completed research on {len(results)} topics:")
for research in results:
print(f"\nTopic: {research.topic}")
print("Key Findings:")
for finding in research.key_findings:
print(f"- {finding}")
```

```text Result
Completed research on 2 topics:
Topic: Artificial Intelligence
Key Findings:
- Machine Learning and Deep Learning: These are subsets of AI that involve training models on large datasets to make predictions or decisions without being explicitly programmed. They are widely used in various applications, including image and speech recognition, natural language processing, and autonomous vehicles.
- AI Ethics and Bias: As AI systems become more prevalent, ethical concerns such as bias in AI algorithms, data privacy, and the impact on employment are increasingly significant. Ensuring fairness, transparency, and accountability in AI systems is a growing area of focus.
- AI in Healthcare: AI technologies are revolutionizing healthcare through applications in diagnostics, personalized medicine, and patient monitoring. AI can analyze medical data to assist in early disease detection and treatment planning.
- Natural Language Processing (NLP): NLP is a field of AI focused on the interaction between computers and humans through natural language. Recent advancements include transformers and large language models, which have improved the ability of machines to understand and generate human language.
- AI in Autonomous Systems: AI is a crucial component in developing autonomous systems, such as self-driving cars and drones, which require perception, decision-making, and control capabilities to navigate and operate in real-world environments.
Topic: Quantum Computing
Key Findings:
- Quantum Bits (Qubits): Unlike classical bits, qubits can exist in multiple states simultaneously due to superposition. This allows quantum computers to process a vast amount of information at once, offering a potential exponential speed-up over classical computers for certain tasks.
- Quantum Entanglement: This phenomenon allows qubits that are entangled to be correlated with each other, even when separated by large distances. Entanglement is a key resource in quantum computing and quantum communication.
- Quantum Algorithms: Quantum algorithms, such as Shor's algorithm for factoring large numbers and Grover's algorithm for searching unsorted databases, demonstrate the potential power of quantum computing over classical approaches.
- Quantum Error Correction: Quantum systems are prone to errors due to decoherence and noise from the environment. Quantum error correction methods are essential for maintaining the integrity of quantum computations.
- Applications and Challenges: Quantum computing holds promise for solving complex problems in cryptography, material science, and optimization. However, significant technological challenges remain, including maintaining qubit coherence, scaling up the number of qubits, and developing practical quantum software.
```
</CodeGroup>
## Key Concepts

1. **Custom Termination Conditions**: We use a combination of `AnyComplete` and `MaxLLMCalls` conditions to control when the workflow should stop.

2. **Flexible Workflow Control**: By using termination conditions with the `run_until` parameter, we can create more dynamic workflows that adapt to different scenarios. In this case, we're balancing between getting enough research done and limiting resource usage.

3. **Partial Results**: The workflow can end before all tasks are complete, so we handle partial results by filtering for completed `ResearchPoint` objects.

4. **Combining Conditions**: We use the `|` operator to combine multiple termination conditions. ControlFlow also supports `&` for more complex logic.

This example demonstrates how termination conditions provide fine-grained control over workflow execution, allowing you to balance between task completion and resource usage. This can be particularly useful for managing costs, handling time-sensitive operations, or creating more responsive AI workflows.
2 changes: 1 addition & 1 deletion docs/examples/features/memory.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ icon: brain
---
import { VersionBadge } from '/snippets/version-badge.mdx'

<VersionBadge version="0.10.0" />
<VersionBadge version="0.10" />


Memory in ControlFlow allows agents to store and retrieve information across different conversations or workflow executions. This is particularly useful for maintaining context over time or sharing information between separate interactions.
Expand Down
2 changes: 1 addition & 1 deletion docs/guides/default-memory.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ icon: brain
---
import { VersionBadge } from '/snippets/version-badge.mdx'

<VersionBadge version="0.10.0" />
<VersionBadge version="0.10" />
ControlFlow's [memory](/patterns/memory) feature allows agents to store and retrieve information across multiple workflows. Memory modules are backed by a vector database, configured using a `MemoryProvider`.

Setting up a default provider simplifies the process of creating memory objects throughout your application. Once configured, you can create memory objects without specifying a provider each time.
Expand Down
3 changes: 2 additions & 1 deletion docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"examples/features/tools",
"examples/features/multi-llm",
"examples/features/private-flows",
"examples/features/memory"
"examples/features/memory",
"examples/features/early-termination"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion docs/patterns/memory.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ icon: bookmark
---
import { VersionBadge } from '/snippets/version-badge.mdx'

<VersionBadge version="0.10.0" />
<VersionBadge version="0.10" />


Within an agentic workflow, information is naturally added to the [thread history](/patterns/history) over time, making available to all agents who participate in the workflow. However, that information is not accessible from other threads, even if they relate to the same objective or resources.
Expand Down
32 changes: 32 additions & 0 deletions docs/patterns/running-tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ description: Control task execution and manage how agents collaborate.
icon: play
---

import { VersionBadge } from "/snippets/version-badge.mdx"

Tasks represent a unit of work that needs to be completed by your agents. To execute that work and retrieve its result, you need to instruct your agents to run the task.


Expand Down Expand Up @@ -356,6 +358,36 @@ Note that the setting `max_llm_calls` on the task results in the task failing if
</Tip>


#### Early termination conditions

<VersionBadge version="0.11" />

ControlFlow supports more flexible control over when an orchestration run should end through the use of `run_until` conditions. These conditions allow you to specify complex termination logic based on various factors such as task completion, failure, or custom criteria.

To use a run until condition, you can pass it to the `run_until` parameter when calling `run`, `run_async`, `run_tasks`, or `run_tasks_async`. For example, the following tasks will run until either one of them is complete or 10 LLM calls have been made:

```python
import controlflow as cf
from controlflow.orchestration.conditions import AnyComplete, MaxLLMCalls

result = cf.run_tasks(
tasks=[cf.Task("write a poem about AI"), cf.Task("write a poem about ML")],
run_until=AnyComplete() | MaxLLMCalls(10)
)
```

(Note that because tasks can be run in parallel, it's possible for both subtasks to be completed.)

Termination conditions can be combined using boolean logic: `|` indicates "or" and `&` indicates "and". A variety of built-in conditions are available:

- `AllComplete()`: stop when all tasks are complete (this is the default behavior)
- `MaxLLMCalls(n: int)`: stop when `n` LLM calls have been made (equivalent to providing `max_llm_calls`)
- `MaxAgentTurns(n: int)`: stop when `n` agent turns have been made (equivalent to providing `max_agent_turns`)
- `AnyComplete(tasks: list[Task], min_complete: int=1)`: stop when at least `min_complete` tasks are complete. If no tasks are provided, all of the orchestrator's tasks will be used.
- `AnyFailed(tasks: list[Task], min_failed: int=1)`: stop when at least `min_failed` tasks have failed. If no tasks are provided, all of the orchestrator's tasks will be used.



### Accessing an orchestrator directly

If you want to "step" through the agentic loop yourself, you can create and invoke an `Orchestrator` directly.
Expand Down
52 changes: 52 additions & 0 deletions examples/early_termination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pydantic import BaseModel

import controlflow as cf
from controlflow.orchestration.conditions import AnyComplete, MaxLLMCalls


class ResearchPoint(BaseModel):
topic: str
key_findings: list[str]


@cf.flow
def research_workflow(topics: list[str]):
if len(topics) < 2:
raise ValueError("At least two topics are required")

research_tasks = [
cf.Task(f"Research {topic}", result_type=ResearchPoint) for topic in topics
]

# Run tasks until either two topics are researched or 15 LLM calls are made
results = cf.run_tasks(
research_tasks,
instructions="Research only one topic at a time.",
run_until=(
AnyComplete(
min_complete=2
) # stop after two tasks (if there are more than two topics)
| MaxLLMCalls(15) # or stop after 15 LLM calls, whichever comes first
),
)

completed_research = [r for r in results if isinstance(r, ResearchPoint)]
return completed_research


if __name__ == "__main__":
# Example usage
topics = [
"Artificial Intelligence",
"Quantum Computing",
"Biotechnology",
"Renewable Energy",
]
results = research_workflow(topics)

print(f"Completed research on {len(results)} topics:")
for research in results:
print(f"\nTopic: {research.topic}")
print("Key Findings:")
for finding in research.key_findings:
print(f"- {finding}")
118 changes: 118 additions & 0 deletions examples/reasoning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
This example implements a reasoning loop that lets a relatively simple model
solve difficult problems.
Here, gpt-4o-mini is used to solve a problem that typically requires o1's
reasoning ability.
"""

import argparse

from pydantic import BaseModel, Field

import controlflow as cf
from controlflow.utilities.general import unwrap


class ReasoningStep(BaseModel):
explanation: str = Field(
description="""
A brief (<5 words) description of what you intend to
achieve in this step, to display to the user.
"""
)
reasoning: str = Field(
description="A single step of reasoning, not more than 1 or 2 sentences."
)
found_validated_solution: bool


REASONING_INSTRUCTIONS = """
You are working on solving a difficult problem (the `goal`). Based
on your previous thoughts and the overall goal, please perform **one
reasoning step** that advances you closer to a solution. Document
your thought process and any intermediate steps you take.
After marking this task complete for a single step, you will be
given a new reasoning task to continue working on the problem. The
loop will continue until you have a valid solution.
Complete the task as soon as you have a valid solution.
**Guidelines**
- You will not be able to brute force a solution exhaustively. You
must use your reasoning ability to make a plan that lets you make
progress.
- Each step should be focused on a specific aspect of the problem,
either advancing your understanding of the problem or validating a
solution.
- You should build on previous steps without repeating them.
- Since you will iterate your reasoning, you can explore multiple
approaches in different steps.
- Use logical and analytical thinking to reason through the problem.
- Ensure that your solution is valid and meets all requirements.
- If you find yourself spinning your wheels, take a step back and
re-evaluate your approach.
"""


@cf.flow
def solve_with_reasoning(goal: str, agent: cf.Agent) -> str:
while True:
response: ReasoningStep = cf.run(
objective="""
Carefully read the `goal` and analyze the problem.
Produce a single step of reasoning that advances you closer to a solution.
""",
instructions=REASONING_INSTRUCTIONS,
result_type=ReasoningStep,
agents=[agent],
context=dict(goal=goal),
model_kwargs=dict(tool_choice="required"),
)

if response.found_validated_solution:
if cf.run(
"""
Check your solution to be absolutely sure that it is correct and meets all requirements of the goal. Return True if it does.
""",
result_type=bool,
context=dict(goal=goal),
):
break

return cf.run(objective=goal, agents=[agent])


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Solve a reasoning problem.")
parser.add_argument("--goal", type=str, help="Custom goal to solve", default=None)
args = parser.parse_args()

agent = cf.Agent(name="Definitely not GPT-4o mini", model="openai/gpt-4o-mini")

# Default goal via https://www.reddit.com/r/singularity/comments/1fggo1e/comment/ln3ymsu/
default_goal = """
Using only four instances of the digit 9 and any combination of the following
mathematical operations: the decimal point, parentheses, addition (+),
subtraction (-), multiplication (*), division (/), factorial (!), and square
root (sqrt), create an equation that equals 24.
In order to validate your result, you should test that you have followed the rules:
1. You have used the correct number of variables
2. You have only used 9s and potentially a leading 0 for a decimal
3. You have used valid mathematical symbols
4. Your equation truly equates to 24.
"""

# Use the provided goal if available, otherwise use the default
goal = args.goal if args.goal is not None else default_goal
goal = unwrap(goal)
print(f"The goal is:\n\n{goal}")

result = solve_with_reasoning(goal=goal, agent=agent)

print(f"The solution is:\n\n{result}")
1 change: 1 addition & 0 deletions src/controlflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .tools import tool
from .run import run, run_async, run_tasks, run_tasks_async
from .plan import plan
import controlflow.orchestration


# --- Version ---
Expand Down
8 changes: 7 additions & 1 deletion src/controlflow/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
handle_tool_call_async,
)
from controlflow.utilities.context import ctx
from controlflow.utilities.general import ControlFlowModel, hash_objects
from controlflow.utilities.general import ControlFlowModel, hash_objects, unwrap
from controlflow.utilities.prefect import create_markdown_artifact, prefect_task

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,6 +128,12 @@ def _generate_id(self):
)
)

@field_validator("instructions")
def _validate_instructions(cls, v):
if v:
v = unwrap(v)
return v

@field_validator("tools", mode="before")
def _validate_tools(cls, tools: list[Tool]):
return as_tools(tools or [])
Expand Down
Loading

0 comments on commit 33c0701

Please sign in to comment.