fix: streaming
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-04-28 22:30:37 -04:00
parent 4c1523d81b
commit fad8ed865a

View File

@@ -323,10 +323,13 @@ func (a *API) PostChat(w http.ResponseWriter, r *http.Request) {
}
// Send Message
if err := a.sendMessage(r.Context(), w, chat.ID, genReq.Model, genReq.Prompt); err != nil {
responseStarted, err := a.sendMessage(r.Context(), w, chat.ID, genReq.Model, genReq.Prompt)
if err != nil {
log.WithError(err).WithField("chat_id", chat.ID).Error("failed to send message")
if !responseStarted {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
}
}
}
func (a *API) DeleteChat(w http.ResponseWriter, r *http.Request) {
@@ -421,10 +424,14 @@ func (a *API) PostChatMessage(w http.ResponseWriter, r *http.Request) {
return
}
if err := a.sendMessage(r.Context(), w, chatID, genReq.Model, genReq.Prompt); err != nil {
// Send Message
responseStarted, err := a.sendMessage(r.Context(), w, chatID, genReq.Model, genReq.Prompt)
if err != nil {
log.WithError(err).WithField("chat_id", chatID).Error("failed to send message")
if !responseStarted {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
}
}
}
func (a *API) getClient() (*client.Client, error) {
@@ -449,10 +456,10 @@ func (a *API) getClient() (*client.Client, error) {
return a.client, nil
}
func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uuid.UUID, chatModel, userMessage string) error {
func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uuid.UUID, chatModel, userMessage string) (bool, error) {
apiClient, err := a.getClient()
if err != nil {
return fmt.Errorf("failed to get client: %w", err)
return false, fmt.Errorf("failed to get client: %w", err)
}
// Detach Request Context
@@ -462,19 +469,20 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui
// Create User Message
userMsg := &store.Message{ChatID: chatID, Role: "user", Content: userMessage}
if err := a.store.SaveChatMessage(userMsg); err != nil {
return fmt.Errorf("failed to add user message to chat: %w", err)
return false, fmt.Errorf("failed to add user message to chat: %w", err)
}
// Get Chat History - Fetch before creating the in-progress assistant message so the
// LLM request does not include an empty assistant response prefill.
chat, err := a.store.GetChat(chatID)
if err != nil {
return false, fmt.Errorf("failed to get chat: %w", err)
}
// Add Assistant Response - TODO: Ensure InProgress Flag?
assistantMsg := &store.Message{ChatID: chatID, Role: "assistant"}
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return fmt.Errorf("failed to add assistant message to chat: %w", err)
}
// Get Chat
chat, err := a.store.GetChat(chatID)
if err != nil {
return fmt.Errorf("failed to get chat: %w", err)
return false, fmt.Errorf("failed to add assistant message to chat: %w", err)
}
// Set Headers
@@ -491,35 +499,52 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui
Chat: toChatNoMessages(chat),
UserMessage: userMsg,
}); err != nil {
return fmt.Errorf("failed to send initial chunk: %w", err)
return false, fmt.Errorf("failed to send initial chunk: %w", err)
}
responseStarted := true
streamToClient := true
// Send Message
if _, err := apiClient.SendMessage(ctx, chat.Messages, chatModel, func(m *client.MessageChunk) error {
var apiMsgChunk MessageChunk
messageChanged := false
if m.Stats != nil {
messageChanged = true
assistantMsg.Stats = m.Stats
}
if m.Message != nil {
messageChanged = true
assistantMsg.Content += *m.Message
apiMsgChunk.AssistantMessage = assistantMsg
}
if m.Thinking != nil {
messageChanged = true
assistantMsg.Thinking += *m.Thinking
}
// Save Assistant Progress - Persist each streamed update so partial content
// survives client disconnects or upstream stream failures.
if messageChanged {
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return fmt.Errorf("failed to save assistant progress: %w", err)
}
apiMsgChunk.AssistantMessage = assistantMsg
}
// Send Progress Chunk
// Send Progress Chunk - If the browser disconnects, keep the detached
// generation running and continue saving streamed content to the store.
if streamToClient {
if err := json.NewEncoder(flushWriter).Encode(apiMsgChunk); err != nil {
return fmt.Errorf("failed to send progress chunk: %w", err)
streamToClient = false
a.logger.WithError(err).WithField("chat_id", chat.ID).Warn("client stream disconnected")
}
}
return nil
}); err != nil {
return fmt.Errorf("failed to generate text stream: %w", err)
return responseStarted, fmt.Errorf("failed to generate text stream: %w", err)
}
// Summarize & Update Chat Title
@@ -534,16 +559,18 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui
// Update Assistant Message
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return fmt.Errorf("failed to save assistant message to chat: %w", err)
return responseStarted, fmt.Errorf("failed to save assistant message to chat: %w", err)
}
// Send Final Chunk
if streamToClient {
if err := json.NewEncoder(flushWriter).Encode(&MessageChunk{
Chat: toChatNoMessages(chat),
AssistantMessage: assistantMsg,
}); err != nil {
return fmt.Errorf("failed to send final chunk: %w", err)
return responseStarted, fmt.Errorf("failed to send final chunk: %w", err)
}
}
return nil
return responseStarted, nil
}