-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: patch streaming API code #1693
Conversation
…tead of JSON with newlines
Stream steps seems OK now: % curl --request POST \
--url http://localhost:8283/api/agents/agent-aeb9453f-eedb-4e12-8824-651885927e5f/messages \
--header 'accept: application/json' \
--header 'authorization: Bearer password' \
--header 'content-type: application/json' \
--data '
{
"messages": [
{
"text": "Hi is anyone there?",
"role": "user"
}
],
"stream_steps": true,
"stream_tokens": false
}
'
data: [DONE_GEN]
data: {"id":"message-de410269-9f9c-4a8f-b55b-252a2fb43866","date":"2024-08-28T17:04:55.730026+00:00","internal_monologue":"Chad keeps repeating the same phrase despite my varying responses. It's beginning to feel like an echo in here. Should I try a different approach or continue with the current one? Finding the correct path to redirect this conversation might require some improvisation."}
data: {"id":"message-de410269-9f9c-4a8f-b55b-252a2fb43866","date":"2024-08-28T17:04:55.730071+00:00","function_call":{"name":"send_message","arguments":"{\n \"message\": \"As assuredly as the sun rises, Chad, I'm here and ready to assist. Just out of curiosity, do you consider pineapple a suitable topping for pizza?\"\n}"}}
data: {"id":"message-289cd479-2047-41f3-8916-86c12dd386bb","date":"2024-08-28T17:04:55.730096+00:00","function_return":"None","status":"success"}
data: [DONE_STEP]
data: [DONE]
|
…that we can pass a MemGPT message ID back in the chunks of our streaming API (previously we hadn't created a Message so there was no Message ID to pass back by the time the streaming started)
…was an unsupported data type with the existing set of schemas
…ctor (TODO this is intended to be used to allow creating a message in agent.py that uses the ID that came inside of the ChatCompletionResponse
… the chunks/message objects
… pass through in the chunks from the MemGPT API
… or not we want to use timestamps/ids from the chunks coming from the server, or from a message we created ahead of time
…on the return payload from any MemGPT message POST SSE return
…e for streaming, but this enables actually persisting them (previously they were just fake throwaways). NOTE: this isn't really done very well since it has a hack assuming the 'message-' prefix on a ChatCompletionResponse means that we intended to use the .id property on the ChatCompletionResponse in subsequent Message creations
Streaming tokens now seems to be working:
|
…ng back to duplicated code style
# If we are streaming, we needed to create a Message ID ahead of time, | ||
# and now we want to use it in the creation of the Message object | ||
# TODO figure out a cleaner way to do this | ||
response_message_id: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders FYI we are now passing response_message_id
into handle_ai_response
for the special case where we created the Message
object before we started unpacking it / turning it into inner thoughts / actions / etc.
"""Handles parsing and function execution""" | ||
|
||
# Hacky failsafe for now to make sure we didn't implement the streaming Message ID creation incorrectly | ||
if response_message_id is not None: | ||
assert response_message_id.startswith("message-"), response_message_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders We can cut this out later, but I think it's fine to leave in for now while the streaming code is in flux (or at least until we add streaming unit tests that test for persistence of IDs that are streamed back
response_message, | ||
# TODO this is kind of hacky, find a better way to handle this | ||
# the only time we set up message creation ahead of time is when streaming is on | ||
response_message_id=response.id if stream else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders translation:
If we're streaming (tokens), then we want to create a Message.id
ahead of time so that the chunks we return via the API (and via the client once we add support) has id
s attached to them.
However, this all happens before the MemGPT agent logic loop that takes a ChatCompletionResponse
as input (which is the final result of a stream, not the intermediate result).
So that means we need to modify handle_ai_response
to (in the streaming case) accept a pre-generated Message.id
, and use it when we create the Message
objects inside of handle_ai_response
.
@@ -423,7 +423,7 @@ def send_message( | |||
) -> MemGPTResponse: | |||
messages = [MessageCreate(role=role, text=message, name=name)] | |||
# TODO: figure out how to handle stream_steps and stream_tokens | |||
request = MemGPTRequest(messages=messages, stream_steps=stream) | |||
request = MemGPTRequest(messages=messages, stream_steps=stream, return_message_object=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders = True
means the type that comes back is Message
. = False
means the type that comes back is InnerThoughts
/ FunctionCall
/ ... (these are now typed too, vs previously they were dicts)
memgpt/schemas/enums.py
Outdated
@@ -5,6 +5,7 @@ class MessageRole(str, Enum): | |||
assistant = "assistant" | |||
user = "user" | |||
tool = "tool" | |||
function = "function" # NOTE: deprecated, use tool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders This is still supported in the OpenAI API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove the note?
@@ -12,7 +13,9 @@ class BaseMemGPTMessage(BaseModel): | |||
|
|||
@field_serializer("date") | |||
def serialize_datetime(self, dt: datetime, _info): | |||
return dt.now(timezone.utc).isoformat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@4shub @goetzrobin FYI this was a pretty bad bug that was previously causing all message streaming response chunks to have newly created timestamps
@@ -32,6 +35,20 @@ class FunctionCall(BaseModel): | |||
arguments: str | |||
|
|||
|
|||
class FunctionCallDelta(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders I didn't have to make a Delta
/ Chunk
specific model for InnerMonologue
since InnerMonologue
has chunk support built in - InnerMonologue.arguments
can just be partial pieces:
InnerMonologue.arguments = "hello there"
to
InnerMonologue.arguments = "hello "
InnerMonologue.arguments = "there"
However FunctionCall
is problematic since at least with OpenAI API the stream back usually starts with just name
, then chunks of the arguments
:
FunctionCall.name: "send_message"
FunctionCall.arguments: "\{\ 'content': ...
to
FunctionCall.name: "send_message"
FunctionCall.arguments: "\{\ "
FunctionCall.arguments: "'content:'"
...
So we need a new Pydantic model that supports optional attributes when name is null or arguments is null (technically you should never have the case where both are null, but not sure how you set that up in Pydantic + probably not worth the hassle).
text=openai_message_dict["content"], | ||
name=openai_message_dict["name"] if "name" in openai_message_dict else None, | ||
tool_calls=openai_message_dict["tool_calls"] if "tool_calls" in openai_message_dict else None, | ||
tool_call_id=openai_message_dict["tool_call_id"] if "tool_call_id" in openai_message_dict else None, | ||
created_at=created_at, | ||
) | ||
if id is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders I couldn't figure out a clean way to refactor this such that when id
(the kwarg) is not None
, we pass it through, and when it is None
, we omit it (and let Message
's constructor do the default init).
I tried making a message_args = dict(...)
version where we then add id
to the arg dictionary, then do Message(**message_args)
, but that started throwing Pydantic validation errors, so I just did the simple code duplication version.
# the non-streaming message types | ||
MemGPTMessage, | ||
LegacyMemGPTMessage, | ||
# the streaming message types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sarahwooders the "streaming message types" actually includes MemGPTMessage
, since MemGPTMessage
types (FunctionCall
, InternalMonologue
, ...) all natively support streaming (as mentioned in an earlier comment).
@4shub change back dates to be true, but timestamp is same |
memgpt/schemas/enums.py
Outdated
@@ -5,6 +5,7 @@ class MessageRole(str, Enum): | |||
assistant = "assistant" | |||
user = "user" | |||
tool = "tool" | |||
function = "function" # NOTE: deprecated, use tool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove the note?
stream_steps == true
andstream_tokens == false
stream_steps == true
andstream_tokens == true
id
andcreated_at
that get returned on the POST SSE stream are the same as what we get with a subsequentcursor
fetch (this makes sure that the persistence code worked)