From fad8ed865a13a915074690a10806c6ad9919b465 Mon Sep 17 00:00:00 2001 From: Evan Reichard Date: Tue, 28 Apr 2026 22:30:37 -0400 Subject: [PATCH] fix: streaming --- backend/internal/api/handlers.go | 81 +++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/backend/internal/api/handlers.go b/backend/internal/api/handlers.go index 821704b..50da73f 100644 --- a/backend/internal/api/handlers.go +++ b/backend/internal/api/handlers.go @@ -323,9 +323,12 @@ func (a *API) PostChat(w http.ResponseWriter, r *http.Request) { } // Send Message - if err := a.sendMessage(r.Context(), w, chat.ID, genReq.Model, genReq.Prompt); err != nil { + responseStarted, err := a.sendMessage(r.Context(), w, chat.ID, genReq.Model, genReq.Prompt) + if err != nil { log.WithError(err).WithField("chat_id", chat.ID).Error("failed to send message") - http.Error(w, "Failed to send message", http.StatusInternalServerError) + if !responseStarted { + http.Error(w, "Failed to send message", http.StatusInternalServerError) + } } } @@ -421,9 +424,13 @@ func (a *API) PostChatMessage(w http.ResponseWriter, r *http.Request) { return } - if err := a.sendMessage(r.Context(), w, chatID, genReq.Model, genReq.Prompt); err != nil { + // Send Message + responseStarted, err := a.sendMessage(r.Context(), w, chatID, genReq.Model, genReq.Prompt) + if err != nil { log.WithError(err).WithField("chat_id", chatID).Error("failed to send message") - http.Error(w, "Failed to send message", http.StatusInternalServerError) + if !responseStarted { + http.Error(w, "Failed to send message", http.StatusInternalServerError) + } } } @@ -449,10 +456,10 @@ func (a *API) getClient() (*client.Client, error) { return a.client, nil } -func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uuid.UUID, chatModel, userMessage string) error { +func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uuid.UUID, chatModel, userMessage string) (bool, error) { apiClient, err := a.getClient() if err != nil { - return fmt.Errorf("failed to get client: %w", err) + return false, fmt.Errorf("failed to get client: %w", err) } // Detach Request Context @@ -462,19 +469,20 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui // Create User Message userMsg := &store.Message{ChatID: chatID, Role: "user", Content: userMessage} if err := a.store.SaveChatMessage(userMsg); err != nil { - return fmt.Errorf("failed to add user message to chat: %w", err) + return false, fmt.Errorf("failed to add user message to chat: %w", err) + } + + // Get Chat History - Fetch before creating the in-progress assistant message so the + // LLM request does not include an empty assistant response prefill. + chat, err := a.store.GetChat(chatID) + if err != nil { + return false, fmt.Errorf("failed to get chat: %w", err) } // Add Assistant Response - TODO: Ensure InProgress Flag? assistantMsg := &store.Message{ChatID: chatID, Role: "assistant"} if err := a.store.SaveChatMessage(assistantMsg); err != nil { - return fmt.Errorf("failed to add assistant message to chat: %w", err) - } - - // Get Chat - chat, err := a.store.GetChat(chatID) - if err != nil { - return fmt.Errorf("failed to get chat: %w", err) + return false, fmt.Errorf("failed to add assistant message to chat: %w", err) } // Set Headers @@ -491,35 +499,52 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui Chat: toChatNoMessages(chat), UserMessage: userMsg, }); err != nil { - return fmt.Errorf("failed to send initial chunk: %w", err) + return false, fmt.Errorf("failed to send initial chunk: %w", err) } + responseStarted := true + streamToClient := true // Send Message if _, err := apiClient.SendMessage(ctx, chat.Messages, chatModel, func(m *client.MessageChunk) error { var apiMsgChunk MessageChunk + messageChanged := false if m.Stats != nil { + messageChanged = true assistantMsg.Stats = m.Stats } if m.Message != nil { + messageChanged = true assistantMsg.Content += *m.Message - apiMsgChunk.AssistantMessage = assistantMsg } if m.Thinking != nil { + messageChanged = true assistantMsg.Thinking += *m.Thinking + } + + // Save Assistant Progress - Persist each streamed update so partial content + // survives client disconnects or upstream stream failures. + if messageChanged { + if err := a.store.SaveChatMessage(assistantMsg); err != nil { + return fmt.Errorf("failed to save assistant progress: %w", err) + } apiMsgChunk.AssistantMessage = assistantMsg } - // Send Progress Chunk - if err := json.NewEncoder(flushWriter).Encode(apiMsgChunk); err != nil { - return fmt.Errorf("failed to send progress chunk: %w", err) + // Send Progress Chunk - If the browser disconnects, keep the detached + // generation running and continue saving streamed content to the store. + if streamToClient { + if err := json.NewEncoder(flushWriter).Encode(apiMsgChunk); err != nil { + streamToClient = false + a.logger.WithError(err).WithField("chat_id", chat.ID).Warn("client stream disconnected") + } } return nil }); err != nil { - return fmt.Errorf("failed to generate text stream: %w", err) + return responseStarted, fmt.Errorf("failed to generate text stream: %w", err) } // Summarize & Update Chat Title @@ -534,16 +559,18 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui // Update Assistant Message if err := a.store.SaveChatMessage(assistantMsg); err != nil { - return fmt.Errorf("failed to save assistant message to chat: %w", err) + return responseStarted, fmt.Errorf("failed to save assistant message to chat: %w", err) } // Send Final Chunk - if err := json.NewEncoder(flushWriter).Encode(&MessageChunk{ - Chat: toChatNoMessages(chat), - AssistantMessage: assistantMsg, - }); err != nil { - return fmt.Errorf("failed to send final chunk: %w", err) + if streamToClient { + if err := json.NewEncoder(flushWriter).Encode(&MessageChunk{ + Chat: toChatNoMessages(chat), + AssistantMessage: assistantMsg, + }); err != nil { + return responseStarted, fmt.Errorf("failed to send final chunk: %w", err) + } } - return nil + return responseStarted, nil }