Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ #172 Add Anthropic integration for chat streaming #182

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions pkg/providers/anthropic/chat_stream.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,85 @@
package anthropic

import (
"context"
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"glide/pkg/api/schemas"
"glide/pkg/providers/clients"
"glide/pkg/api/schemas"
"glide/pkg/providers/clients"
"glide/pkg/telemetry"
"go.uber.org/zap"
)

func (c *Client) SupportChatStream() bool {
return false
return true
}

func (c *Client) ChatStream(_ context.Context, _ *schemas.ChatRequest) (clients.ChatStream, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has to be ChatStream() method that would create an instance of AnthropicChatStream in this case.
For example, in OpenAI case, it looked this way:
https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L162-L177

Without that there is nothing that would use the AnthropicChatStream struct

return nil, clients.ErrChatStreamNotImplemented
type AnthropicChatStream struct {
tel *telemetry.Telemetry
client *http.client
request *http.Request
response *http.Response
errMapper *ErrorMapper
}

func NewAnthropicChatStream(tel *telemetry.Telemetry, *http.client, request *http.Request, errMapper *ErrorMapper) *AnthropicChatStream {
return &AnthropicChatStream{
tel: tel,
client: client,
request: request,
errMapper: errMapper,
}
}

// Open makes the HTTP request using the provided http.Client to initiate the chat stream.
func (s *AnthropicChatStream) Open(ctx context.Context) error {
resp, err := s.client.Do(s.request)
if err != nil {
s.tel.L().Error("Failed to open chat stream", zap.Error(err))
// Map and return the error using errMapper, if errMapper is defined.
return s.errMapper.Map(err)
}

if resp.StatusCode != http.StatusOK {
resp.Body.Close()
s.tel.L().Warn("Unexpected status code", zap.Int("status", resp.StatusCode))
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

s.response = resp
s.tel.L().Info("Chat stream opened successfully")
return nil
}

// Recv listens for and decodes incoming messages from the chat stream into ChatStreamChunk objects.
func (s *AnthropicChatStream) Recv() (*schemas.ChatStreamChunk, error) {
if s.response == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Anthropic uses server-side events (SSE) for chat streaming? If so, you need to use a special parser to read that stream just like OpenAI:

https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L75-L95

SSE has a special format that has to be parsed before you can even unmarshal the real chunk from JSON into an Anthropic chat stream struct.

s.tel.L().Error("Attempted to receive from an unopened stream")
return nil, fmt.Errorf("stream not opened")
}

decoder := json.NewDecoder(s.response.Body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using just json.Unmarshal() to unmarshal chunks with the default decoder config:
https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L115
so I feel like the same goes in Anthropic case, too:

var chunk schemas.ChatStreamChunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schemas.ChatStreamChunk is Glide's unified schema for stream chunk, but it's not going to be useful directly to parse Anthropic chunks most likely. You need to define a Anthropic-specific chunk schema, use it to parse incoming chunks, and then finally remap useful fields to an instance of schemas.ChatStreamChunk.

This is how it's done in OpenAI case:
https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L115-L147

if err := decoder.Decode(&chunk); err != nil {
if err == io.EOF {
s.tel.L().Info("Chat stream ended")
return nil, nil // Stream ended normally.
}
s.tel.L().Error("Error during stream processing", zap.Error(err))
return nil, err // An error occurred during stream processing.
}

return &chunk, nil
}

// Close ensures the chat stream is properly terminated by closing the response body.
func (s *AnthropicChatStream) Close() error {
if s.response != nil {
s.tel.L().Info("Closing chat stream")
return s.response.Body.Close()
}
return nil
}