feat: stream persistent

This commit is contained in:
2026-04-28 22:41:03 -04:00
parent fad8ed865a
commit eb66801f58
7 changed files with 485 additions and 133 deletions

View File

@@ -24,17 +24,19 @@ import (
)
type API struct {
logger *logrus.Entry
store store.Store
client *client.Client
dataDir string
logger *logrus.Entry
store store.Store
client *client.Client
dataDir string
generationManager *generationManager
}
func New(s store.Store, dataDir string, logger *logrus.Logger) *API {
return &API{
store: s,
dataDir: dataDir,
logger: logger.WithField("service", "api"),
store: s,
dataDir: dataDir,
logger: logger.WithField("service", "api"),
generationManager: newGenerationManager(),
}
}
@@ -322,13 +324,18 @@ func (a *API) PostChat(w http.ResponseWriter, r *http.Request) {
return
}
// Send Message
responseStarted, err := a.sendMessage(r.Context(), w, chat.ID, genReq.Model, genReq.Prompt)
// Start Message
chunk, err := a.startMessageGeneration(chat.ID, genReq.Model, genReq.Prompt)
if err != nil {
log.WithError(err).WithField("chat_id", chat.ID).Error("failed to send message")
if !responseStarted {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
}
log.WithError(err).WithField("chat_id", chat.ID).Error("failed to start message generation")
http.Error(w, "Failed to start message generation", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(chunk); err != nil {
log.WithError(err).Error("failed to encode message generation response")
http.Error(w, "Failed to encode message generation response", http.StatusInternalServerError)
}
}
@@ -395,6 +402,68 @@ func (a *API) GetChat(w http.ResponseWriter, r *http.Request) {
}
}
func (a *API) GetChatStream(w http.ResponseWriter, r *http.Request) {
log := a.logger.WithField("handler", "GetChatStreamHandler")
// Parse Chat ID
rawChatID := r.PathValue("chatId")
if rawChatID == "" {
log.Error("missing chat ID parameter")
http.Error(w, "Chat ID is required", http.StatusBadRequest)
return
}
chatID, err := uuid.Parse(rawChatID)
if err != nil {
log.WithError(err).Error("invalid chat ID format")
http.Error(w, "Invalid chat ID format", http.StatusBadRequest)
return
}
// Subscribe Before Snapshot
updates, unsubscribe, active := a.generationManager.subscribe(chatID)
defer unsubscribe()
// Get Chat Snapshot
chat, err := a.store.GetChat(chatID)
if err != nil {
log.WithError(err).WithField("chat_id", chatID).Error("failed to get chat")
http.Error(w, "Failed to get chat", http.StatusInternalServerError)
return
}
// Set Headers
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
flushWriter := newFlushWriter(w)
// Send Snapshot
if err := json.NewEncoder(flushWriter).Encode(&MessageChunk{Chat: toChat(chat)}); err != nil {
log.WithError(err).WithField("chat_id", chatID).Warn("failed to send stream snapshot")
return
}
if !active {
return
}
// Forward Updates
for {
select {
case <-r.Context().Done():
return
case chunk, ok := <-updates:
if !ok {
return
}
if err := json.NewEncoder(flushWriter).Encode(chunk); err != nil {
log.WithError(err).WithField("chat_id", chatID).Warn("client stream disconnected")
return
}
}
}
}
func (a *API) PostChatMessage(w http.ResponseWriter, r *http.Request) {
log := a.logger.WithField("handler", "PostChatMessageHandler")
@@ -424,13 +493,22 @@ func (a *API) PostChatMessage(w http.ResponseWriter, r *http.Request) {
return
}
// Send Message
responseStarted, err := a.sendMessage(r.Context(), w, chatID, genReq.Model, genReq.Prompt)
// Start Message
chunk, err := a.startMessageGeneration(chatID, genReq.Model, genReq.Prompt)
if err != nil {
log.WithError(err).WithField("chat_id", chatID).Error("failed to send message")
if !responseStarted {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
log.WithError(err).WithField("chat_id", chatID).Error("failed to start message generation")
if errors.Is(err, errGenerationActive) {
http.Error(w, "Chat generation already active", http.StatusConflict)
} else {
http.Error(w, "Failed to start message generation", http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(chunk); err != nil {
log.WithError(err).Error("failed to encode message generation response")
http.Error(w, "Failed to encode message generation response", http.StatusInternalServerError)
}
}
@@ -456,99 +534,99 @@ func (a *API) getClient() (*client.Client, error) {
return a.client, nil
}
func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uuid.UUID, chatModel, userMessage string) (bool, error) {
func (a *API) startMessageGeneration(chatID uuid.UUID, chatModel, userMessage string) (*MessageChunk, error) {
apiClient, err := a.getClient()
if err != nil {
return false, fmt.Errorf("failed to get client: %w", err)
return nil, fmt.Errorf("failed to get client: %w", err)
}
// Detach Request Context
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Minute*5)
defer cancel()
var chat *store.Chat
var userMsg *store.Message
var assistantMsg *store.Message
var initialChunk *MessageChunk
// Create User Message
userMsg := &store.Message{ChatID: chatID, Role: "user", Content: userMessage}
if err := a.store.SaveChatMessage(userMsg); err != nil {
return false, fmt.Errorf("failed to add user message to chat: %w", err)
}
// Start Generation - The manager reserves the chat before messages are
// persisted, preventing concurrent completions from creating duplicate rows.
if err := a.generationManager.start(chatID, func(_ *generation) error {
// Create User Message
userMsg = &store.Message{ChatID: chatID, Role: "user", Content: userMessage}
if err := a.store.SaveChatMessage(userMsg); err != nil {
return fmt.Errorf("failed to add user message to chat: %w", err)
}
// Get Chat History - Fetch before creating the in-progress assistant message so the
// LLM request does not include an empty assistant response prefill.
chat, err := a.store.GetChat(chatID)
if err != nil {
return false, fmt.Errorf("failed to get chat: %w", err)
}
// Get Chat History - Fetch before creating the in-progress assistant message so the
// LLM request does not include an empty assistant response prefill.
chat, err = a.store.GetChat(chatID)
if err != nil {
return fmt.Errorf("failed to get chat: %w", err)
}
// Add Assistant Response - TODO: Ensure InProgress Flag?
assistantMsg := &store.Message{ChatID: chatID, Role: "assistant"}
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return false, fmt.Errorf("failed to add assistant message to chat: %w", err)
}
// Add Assistant Response
assistantMsg = &store.Message{ChatID: chatID, Role: "assistant", Status: store.MessageStatusStreaming}
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return fmt.Errorf("failed to add assistant message to chat: %w", err)
}
// Set Headers
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
// Create Flush Writer
flushWriter := newFlushWriter(w)
// Send Initial Chunk - User Message & Chat
if err := json.NewEncoder(flushWriter).Encode(&MessageChunk{
Chat: toChatNoMessages(chat),
UserMessage: userMsg,
// Create Initial Chunk
initialChunk = &MessageChunk{
Chat: toChatNoMessages(chat),
UserMessage: userMsg,
AssistantMessage: assistantMsg,
}
return nil
}, func(gen *generation) {
a.runMessageGeneration(apiClient, chat, assistantMsg, chatModel, gen)
}); err != nil {
return false, fmt.Errorf("failed to send initial chunk: %w", err)
return nil, err
}
responseStarted := true
streamToClient := true
return initialChunk, nil
}
func (a *API) runMessageGeneration(apiClient *client.Client, chat *store.Chat, assistantMsg *store.Message, chatModel string, gen *generation) {
// Create Generation Context
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
// Send Message
if _, err := apiClient.SendMessage(ctx, chat.Messages, chatModel, func(m *client.MessageChunk) error {
var apiMsgChunk MessageChunk
messageChanged := false
if m.Stats != nil {
messageChanged = true
assistantMsg.Stats = m.Stats
}
if m.Message != nil {
messageChanged = true
assistantMsg.Content += *m.Message
}
if m.Thinking != nil {
messageChanged = true
assistantMsg.Thinking += *m.Thinking
}
// Save Assistant Progress - Persist each streamed update so partial content
// survives client disconnects or upstream stream failures.
// Save And Broadcast Progress
if messageChanged {
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return fmt.Errorf("failed to save assistant progress: %w", err)
}
apiMsgChunk.AssistantMessage = assistantMsg
}
// Send Progress Chunk - If the browser disconnects, keep the detached
// generation running and continue saving streamed content to the store.
if streamToClient {
if err := json.NewEncoder(flushWriter).Encode(apiMsgChunk); err != nil {
streamToClient = false
a.logger.WithError(err).WithField("chat_id", chat.ID).Warn("client stream disconnected")
}
gen.broadcast(&MessageChunk{AssistantMessage: assistantMsg})
}
return nil
}); err != nil {
return responseStarted, fmt.Errorf("failed to generate text stream: %w", err)
assistantMsg.Status = store.MessageStatusFailed
if saveErr := a.store.SaveChatMessage(assistantMsg); saveErr != nil {
a.logger.WithError(saveErr).WithField("chat_id", chat.ID).Error("failed to save failed assistant message")
}
gen.broadcast(&MessageChunk{AssistantMessage: assistantMsg})
a.logger.WithError(err).WithField("chat_id", chat.ID).Error("failed to generate text stream")
return
}
// Summarize & Update Chat Title
if chat.Title == "" {
var err error
chat.Title, err = apiClient.CreateTitle(ctx, chat.Messages[0].Content, chatModel)
if err != nil {
a.logger.WithError(err).WithField("chat_id", chat.ID).Error("failed to create chat title")
@@ -557,20 +635,11 @@ func (a *API) sendMessage(ctx context.Context, w http.ResponseWriter, chatID uui
}
}
// Update Assistant Message
// Complete Assistant Message
assistantMsg.Status = store.MessageStatusComplete
if err := a.store.SaveChatMessage(assistantMsg); err != nil {
return responseStarted, fmt.Errorf("failed to save assistant message to chat: %w", err)
a.logger.WithError(err).WithField("chat_id", chat.ID).Error("failed to save assistant message")
return
}
// Send Final Chunk
if streamToClient {
if err := json.NewEncoder(flushWriter).Encode(&MessageChunk{
Chat: toChatNoMessages(chat),
AssistantMessage: assistantMsg,
}); err != nil {
return responseStarted, fmt.Errorf("failed to send final chunk: %w", err)
}
}
return responseStarted, nil
gen.broadcast(&MessageChunk{Chat: toChatNoMessages(chat), AssistantMessage: assistantMsg})
}