Skip to content

Commit

Permalink
Merge branch 'main' into loadams/revert-pin-transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
loadams committed Sep 3, 2024
2 parents b84f2bf + 3ed3aa2 commit e5379e0
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 85 deletions.
27 changes: 14 additions & 13 deletions mii/entrypoints/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import json
import grpc
import argparse
from typing import AsyncGenerator

# Third-party imports
import uvicorn
import mii
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response
from fastapi.responses import StreamingResponse, JSONResponse, Response
from mii.grpc_related.proto.modelresponse_pb2_grpc import ModelResponseStub
from mii.grpc_related.proto import modelresponse_pb2
from mii.utils import kwarg_dict_to_proto
Expand Down Expand Up @@ -81,18 +82,18 @@ async def generate(request: CompletionRequest) -> Response:

# Streaming case
if request.stream:
return JSONResponse({"error": "Streaming is not yet supported."},
status_code=400)
# async def StreamResults() -> AsyncGenerator[bytes, None]:
# # Send an empty chunk to start the stream and prevent timeout
# yield ""
# async for response_chunk in stub.GeneratorReplyStream(requestData):
# # Send the response chunk
# responses = [obj.response for obj in response_chunk.response]
# dataOut = {"text": responses}
# yield f"data: {json.dumps(dataOut)}\n\n"
# yield f"data: [DONE]\n\n"
# return StreamingResponse(StreamResults(), media_type="text/event-stream")

async def StreamResults() -> AsyncGenerator[bytes, None]:
# Send an empty chunk to start the stream and prevent timeout
yield ""
async for response_chunk in stub.GeneratorReplyStream(requestData):
# Send the response chunk
responses = [obj.response for obj in response_chunk.response]
dataOut = {"text": responses}
yield f"data: {json.dumps(dataOut)}\n\n"
yield f"data: [DONE]\n\n"

return StreamingResponse(StreamResults(), media_type="text/event-stream")

# Non-streaming case
responseData = await stub.GeneratorReply(requestData)
Expand Down
2 changes: 1 addition & 1 deletion mii/entrypoints/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time

import shortuuid
from pydantic import BaseModel, BaseSettings, Field
from mii.pydantic_v1 import BaseModel, BaseSettings, Field


class ErrorResponse(BaseModel):
Expand Down
140 changes: 69 additions & 71 deletions mii/entrypoints/openai_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import argparse
import json
import os
from typing import Optional, List, Union
from typing import AsyncGenerator, Optional, List, Union
from transformers import AutoTokenizer
import codecs

from fastapi import FastAPI, Depends, HTTPException, Response
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer

import shortuuid
Expand All @@ -31,16 +31,16 @@
from .data_models import (
ChatCompletionRequest,
ChatCompletionResponse,
# ChatCompletionResponseStreamChoice,
# ChatCompletionStreamResponse,
ChatCompletionResponseStreamChoice,
ChatCompletionStreamResponse,
ChatMessage,
ChatCompletionResponseChoice,
CompletionRequest,
CompletionResponse,
CompletionResponseChoice,
# DeltaMessage,
# CompletionResponseStreamChoice,
# CompletionStreamResponse,
DeltaMessage,
CompletionResponseStreamChoice,
CompletionStreamResponse,
ErrorResponse,
ModelCard,
ModelList,
Expand Down Expand Up @@ -202,42 +202,41 @@ async def create_chat_completion(request: ChatCompletionRequest):

# Streaming case
if request.stream:
return create_error_response(
ErrorCode.VALIDATION_TYPE_ERROR,
f"Streaming is not yet supported.",
)
# async def StreamResults() -> AsyncGenerator[bytes, None]:
# # First chunk with role
# firstChoices = []
# for _ in range(request.n):
# firstChoice = ChatCompletionResponseStreamChoice(
# index=len(firstChoices),
# delta=DeltaMessage(role=response_role),
# finish_reason=None,
# )
# firstChoices.append(firstChoice)

# chunk = ChatCompletionStreamResponse(
# id=id, choices=firstChoices, model=app_settings.model_id
# )
# yield f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n"
# async for response_chunk in stub.GeneratorReplyStream(requestData):
# streamChoices = []

# for c in response_chunk.response:
# choice = ChatCompletionResponseStreamChoice(
# index=len(streamChoices),
# delta=DeltaMessage(content=c.response),
# finish_reason=None if c.finish_reason == "none" else c.finish_reason,
# )
# streamChoices.append(choice)

# chunk = ChatCompletionStreamResponse(
# id=id, choices=streamChoices, model=app_settings.model_id
# )
# yield f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n"
# yield "data: [DONE]\n\n"
# return StreamingResponse(StreamResults(), media_type="text/event-stream")

async def StreamResults() -> AsyncGenerator[bytes, None]:
# First chunk with role
firstChoices = []
for _ in range(request.n):
firstChoice = ChatCompletionResponseStreamChoice(
index=len(firstChoices),
delta=DeltaMessage(role=response_role),
finish_reason=None,
)
firstChoices.append(firstChoice)

chunk = ChatCompletionStreamResponse(id=id,
choices=firstChoices,
model=app_settings.model_id)
yield f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n"
async for response_chunk in stub.GeneratorReplyStream(requestData):
streamChoices = []

for c in response_chunk.response:
choice = ChatCompletionResponseStreamChoice(
index=len(streamChoices),
delta=DeltaMessage(content=c.response),
finish_reason=None
if c.finish_reason == "none" else c.finish_reason,
)
streamChoices.append(choice)

chunk = ChatCompletionStreamResponse(id=id,
choices=streamChoices,
model=app_settings.model_id)
yield f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"

return StreamingResponse(StreamResults(), media_type="text/event-stream")

# Non-streaming case
responseData = await stub.GeneratorReply(requestData)
Expand Down Expand Up @@ -330,34 +329,33 @@ async def create_completion(request: CompletionRequest):
id = f"cmpl-{shortuuid.random()}"
# Streaming case
if request.stream:
return create_error_response(
ErrorCode.VALIDATION_TYPE_ERROR,
f"Streaming is not yet supported.",
)
# async def StreamResults() -> AsyncGenerator[bytes, None]:
# # Send an empty chunk to start the stream and prevent timeout
# yield ""
# async for response_chunk in stub.GeneratorReplyStream(requestData):
# streamChoices = []

# for c in response_chunk.response:
# choice = CompletionResponseStreamChoice(
# index=len(streamChoices),
# text=c.response,
# logprobs=None,
# finish_reason=None if c.finish_reason == "none" else c.finish_reason,
# )
# streamChoices.append(choice)

# chunk = CompletionStreamResponse(
# id=id,
# object="text_completion",
# choices=streamChoices,
# model=app_settings.model_id,
# )
# yield f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n"
# yield "data: [DONE]\n\n"
# return StreamingResponse(StreamResults(), media_type="text/event-stream")

async def StreamResults() -> AsyncGenerator[bytes, None]:
# Send an empty chunk to start the stream and prevent timeout
yield ""
async for response_chunk in stub.GeneratorReplyStream(requestData):
streamChoices = []

for c in response_chunk.response:
choice = CompletionResponseStreamChoice(
index=len(streamChoices),
text=c.response,
logprobs=None,
finish_reason=None
if c.finish_reason == "none" else c.finish_reason,
)
streamChoices.append(choice)

chunk = CompletionStreamResponse(
id=id,
object="text_completion",
choices=streamChoices,
model=app_settings.model_id,
)
yield f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"

return StreamingResponse(StreamResults(), media_type="text/event-stream")

# Non-streaming case
responseData = await stub.GeneratorReply(requestData)
Expand Down
3 changes: 3 additions & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ accelerate
asyncio
deepspeed>=0.15.0
deepspeed-kernels
fastapi
fastchat
Flask-RESTful
grpcio
grpcio-tools
Pillow
pydantic>=2.0.0
pyzmq
safetensors
shortuuid
torch
transformers
ujson
Expand Down

0 comments on commit e5379e0

Please sign in to comment.