From a8eaadc6edda8e793ae58cb6ca5595abbb0f17e9 Mon Sep 17 00:00:00 2001 From: Willie Zutz Date: Sun, 22 Jun 2025 13:35:01 -0600 Subject: [PATCH] feat(agent): More agent tweaks --- src/app/api/chat/route.ts | 37 +++++++++++++++++++++ src/app/api/search/route.ts | 31 +++++++++++++++++ src/components/ChatWindow.tsx | 8 +++++ src/lib/agents/analyzerAgent.ts | 23 +++++++++++-- src/lib/agents/synthesizerAgent.ts | 26 ++++++++++----- src/lib/agents/taskManagerAgent.ts | 2 ++ src/lib/agents/webSearchAgent.ts | 46 ++++++++++++++++++++------ src/lib/prompts/analyzer.ts | 1 + src/lib/prompts/taskBreakdown.ts | 7 ++++ src/lib/search/agentSearch.ts | 1 + src/lib/utils/analyzePreviewContent.ts | 13 +++++--- 11 files changed, 169 insertions(+), 26 deletions(-) diff --git a/src/app/api/chat/route.ts b/src/app/api/chat/route.ts index 607ab2d..68f346b 100644 --- a/src/app/api/chat/route.ts +++ b/src/app/api/chat/route.ts @@ -66,11 +66,41 @@ const handleEmitterEvents = async ( chatId: string, startTime: number, userMessageId: string, + abortController: AbortController, ) => { let recievedMessage = ''; let sources: any[] = []; let searchQuery: string | undefined; let searchUrl: string | undefined; + let isStreamActive = true; + + // Keep-alive ping mechanism to prevent reverse proxy timeouts + const pingInterval = setInterval(() => { + if (isStreamActive) { + try { + writer.write( + encoder.encode( + JSON.stringify({ + type: 'ping', + timestamp: Date.now(), + }) + '\n', + ), + ); + } catch (error) { + // If writing fails, the connection is likely closed + clearInterval(pingInterval); + isStreamActive = false; + } + } else { + clearInterval(pingInterval); + } + }, 30000); // Send ping every 30 seconds + + // Clean up ping interval if request is cancelled + abortController.signal.addEventListener('abort', () => { + isStreamActive = false; + clearInterval(pingInterval); + }); stream.on('data', (data) => { const parsedData = JSON.parse(data); @@ -149,6 +179,9 @@ const handleEmitterEvents = async ( }); stream.on('end', () => { + isStreamActive = false; + clearInterval(pingInterval); + const endTime = Date.now(); const duration = endTime - startTime; @@ -190,6 +223,9 @@ const handleEmitterEvents = async ( .execute(); }); stream.on('error', (data) => { + isStreamActive = false; + clearInterval(pingInterval); + const parsedData = JSON.parse(data); writer.write( encoder.encode( @@ -413,6 +449,7 @@ export const POST = async (req: Request) => { message.chatId, startTime, message.messageId, + abortController, ); handleHistorySave(message, humanMessageId, body.focusMode, body.files); diff --git a/src/app/api/search/route.ts b/src/app/api/search/route.ts index b31babe..c76018f 100644 --- a/src/app/api/search/route.ts +++ b/src/app/api/search/route.ts @@ -192,6 +192,7 @@ export const POST = async (req: Request) => { const stream = new ReadableStream({ start(controller) { let sources: any[] = []; + let isStreamActive = true; controller.enqueue( encoder.encode( @@ -202,7 +203,31 @@ export const POST = async (req: Request) => { ), ); + // Keep-alive ping mechanism to prevent reverse proxy timeouts + const pingInterval = setInterval(() => { + if (isStreamActive && !signal.aborted) { + try { + controller.enqueue( + encoder.encode( + JSON.stringify({ + type: 'ping', + timestamp: Date.now(), + }) + '\n', + ), + ); + } catch (error) { + // If enqueueing fails, the connection is likely closed + clearInterval(pingInterval); + isStreamActive = false; + } + } else { + clearInterval(pingInterval); + } + }, 30000); // Send ping every 30 seconds + signal.addEventListener('abort', () => { + isStreamActive = false; + clearInterval(pingInterval); emitter.removeAllListeners(); try { @@ -244,6 +269,9 @@ export const POST = async (req: Request) => { emitter.on('end', () => { if (signal.aborted) return; + isStreamActive = false; + clearInterval(pingInterval); + controller.enqueue( encoder.encode( JSON.stringify({ @@ -257,6 +285,9 @@ export const POST = async (req: Request) => { emitter.on('error', (error: any) => { if (signal.aborted) return; + isStreamActive = false; + clearInterval(pingInterval); + controller.error(error); }); }, diff --git a/src/components/ChatWindow.tsx b/src/components/ChatWindow.tsx index f97e827..7581706 100644 --- a/src/components/ChatWindow.tsx +++ b/src/components/ChatWindow.tsx @@ -431,6 +431,14 @@ const ChatWindow = ({ id }: { id?: string }) => { return; } + // Handle ping messages to keep connection alive (no action needed) + if (data.type === 'ping') { + console.debug('Ping received'); + // Ping messages are used to keep the connection alive during long requests + // No action is required on the frontend + return; + } + if (data.type === 'agent_action') { const agentActionEvent: AgentActionEvent = { action: data.data.action, diff --git a/src/lib/agents/analyzerAgent.ts b/src/lib/agents/analyzerAgent.ts index 3fad35b..b35ea37 100644 --- a/src/lib/agents/analyzerAgent.ts +++ b/src/lib/agents/analyzerAgent.ts @@ -101,10 +101,27 @@ export class AnalyzerAgent { ); console.log('Next action response:', nextActionContent); - //} - if (!nextActionContent.startsWith('good_content')) { - if (nextActionContent.startsWith('need_user_info')) { + if ( + !nextActionContent.startsWith('good_content') && + !nextActionContent.startsWith('`good_content`') + ) { + // If we don't have enough information, but we still have available tasks, proceed with the next task + + if (state.tasks && state.tasks.length > 0) { + const hasMoreTasks = state.currentTaskIndex < state.tasks.length - 1; + + if (hasMoreTasks) { + return new Command({ + goto: 'task_manager', + }); + } + } + + if ( + nextActionContent.startsWith('need_user_info') || + nextActionContent.startsWith('`need_user_info`') + ) { const moreUserInfoPrompt = await ChatPromptTemplate.fromTemplate( additionalUserInputPrompt, ).format({ diff --git a/src/lib/agents/synthesizerAgent.ts b/src/lib/agents/synthesizerAgent.ts index 431e921..63297f7 100644 --- a/src/lib/agents/synthesizerAgent.ts +++ b/src/lib/agents/synthesizerAgent.ts @@ -52,6 +52,8 @@ Your task is to provide answers that are: ### Citation Requirements - Cite every single fact, statement, or sentence using [number] notation corresponding to the source from the provided \`context\` +- If a statement is based on AI model inference or training data, it must be marked as \`[AI]\` and not cited from the context +- If a statement is based on previous messages in the conversation history, it must be marked as \`[Hist]\` and not cited from the context - Integrate citations naturally at the end of sentences or clauses as appropriate. For example, "The Eiffel Tower is one of the most visited landmarks in the world[1]." - Ensure that **every sentence in your response includes at least one citation**, even when information is inferred or connected to general knowledge available in the provided context - Use multiple sources for a single detail if applicable, such as, "Paris is a cultural hub, attracting millions of visitors annually[1][2]." @@ -68,20 +70,29 @@ Your task is to provide answers that are: ${this.personaInstructions} -User Query: ${state.originalQuery || state.query} +# Conversation History Context: +${ + removeThinkingBlocksFromMessages(state.messages) + .map((msg) => `<${msg.getType()}>${msg.content}`) + .join('\n') || 'No previous conversation context' +} -Available Information: +# Available Information: ${state.relevantDocuments .map( (doc, index) => `<${index + 1}>\n -${doc.metadata.title}\n -${doc.metadata?.url.toLowerCase().includes('file') ? '' : '\n' + doc.metadata.url + '\n'} -\n${doc.pageContent}\n\n -`, + ${doc.metadata.title}\n + ${doc.metadata?.url.toLowerCase().includes('file') ? '' : '\n' + doc.metadata.url + '\n'} + \n${doc.pageContent}\n\n + `, ) .join('\n')} -`; + +# User Query: ${state.originalQuery || state.query} + +Answer the user query: + `; // Stream the response in real-time using LLM streaming capabilities let fullResponse = ''; @@ -99,7 +110,6 @@ ${doc.metadata?.url.toLowerCase().includes('file') ? '' : '\n' + doc.metada const stream = await this.llm.stream( [ - ...removeThinkingBlocksFromMessages(state.messages), new SystemMessage(synthesisPrompt), new HumanMessage(state.originalQuery || state.query), ], diff --git a/src/lib/agents/taskManagerAgent.ts b/src/lib/agents/taskManagerAgent.ts index 098a104..3d1cb87 100644 --- a/src/lib/agents/taskManagerAgent.ts +++ b/src/lib/agents/taskManagerAgent.ts @@ -121,6 +121,8 @@ export class TaskManagerAgent { // Parse the response to extract tasks const responseContent = taskBreakdownResult.content as string; + + console.log('Task breakdown response:', responseContent); const taskLines = responseContent .split('\n') .filter((line) => line.trim().startsWith('TASK:')) diff --git a/src/lib/agents/webSearchAgent.ts b/src/lib/agents/webSearchAgent.ts index f562da5..a2c2e41 100644 --- a/src/lib/agents/webSearchAgent.ts +++ b/src/lib/agents/webSearchAgent.ts @@ -17,23 +17,27 @@ import { AgentState } from './agentState'; import { setTemperature } from '../utils/modelUtils'; import { Embeddings } from '@langchain/core/embeddings'; import { removeThinkingBlocksFromMessages } from '../utils/contentUtils'; +import computeSimilarity from '../utils/computeSimilarity'; export class WebSearchAgent { private llm: BaseChatModel; private emitter: EventEmitter; private systemInstructions: string; private signal: AbortSignal; + private embeddings: Embeddings; constructor( llm: BaseChatModel, emitter: EventEmitter, systemInstructions: string, signal: AbortSignal, + embeddings: Embeddings, ) { this.llm = llm; this.emitter = emitter; this.systemInstructions = systemInstructions; this.signal = signal; + this.embeddings = embeddings; } /** @@ -138,16 +142,33 @@ export class WebSearchAgent { let bannedSummaryUrls = state.bannedSummaryUrls || []; let bannedPreviewUrls = state.bannedPreviewUrls || []; + const queryVector = await this.embeddings.embedQuery( + state.originalQuery + ' ' + currentTask, + ); - // Extract preview content from top 8 search results for analysis - const previewContents: PreviewContent[] = searchResults.results - .filter( - (result) => - !bannedSummaryUrls.includes(result.url) && - !bannedPreviewUrls.includes(result.url), - ) // Filter out banned URLs first - .slice(0, 8) // Then take top 8 results - .map((result) => ({ + // Filter out banned URLs first + const filteredResults = searchResults.results.filter( + (result) => + !bannedSummaryUrls.includes(result.url) && + !bannedPreviewUrls.includes(result.url), + ); + + // Calculate similarities for all filtered results + const resultsWithSimilarity = await Promise.all( + filteredResults.map(async (result) => { + const vector = await this.embeddings.embedQuery( + result.title + ' ' + result.content || '', + ); + const similarity = computeSimilarity(vector, queryVector); + return { result, similarity }; + }), + ); + + // Sort by relevance score and take top 8 results + const previewContents: PreviewContent[] = resultsWithSimilarity + .sort((a, b) => b.similarity - a.similarity) + .slice(0, 8) + .map(({ result }) => ({ title: result.title || 'Untitled', snippet: result.content || '', url: result.url, @@ -181,6 +202,7 @@ export class WebSearchAgent { previewAnalysisResult = await analyzePreviewContent( previewContents, + state.query, currentTask, removeThinkingBlocksFromMessages(state.messages), this.llm, @@ -267,7 +289,9 @@ export class WebSearchAgent { }); // Summarize the top 2 search results - for (const result of searchResults.results.slice(0, 8)) { + for (const result of resultsWithSimilarity + .slice(0, 8) + .map((r) => r.result)) { if (this.signal.aborted) { console.warn('Search operation aborted by signal'); break; // Exit if the operation is aborted @@ -381,7 +405,7 @@ export class WebSearchAgent { console.log(responseMessage); return new Command({ - goto: 'task_manager', // Route back to task manager to check if more tasks remain + goto: 'analyzer', // Route back to analyzer to process the results update: { messages: [new AIMessage(responseMessage)], relevantDocuments: documents, diff --git a/src/lib/prompts/analyzer.ts b/src/lib/prompts/analyzer.ts index f6c2fce..aec9fc0 100644 --- a/src/lib/prompts/analyzer.ts +++ b/src/lib/prompts/analyzer.ts @@ -9,6 +9,7 @@ Your task is to analyze the provided context and determine if we have enough inf # Response Options Decision Tree ## Step 1: Check if content is sufficient +- If your training data and the provided context contain enough information to answer the user's query → respond with \`good_content\` - If the context fully answers the user's query with complete information → respond with \`good_content\` - If the user is requesting to use the existing context to answer their query → respond with \`good_content\` - If the user is requesting to avoid web searches → respond with \`good_content\` diff --git a/src/lib/prompts/taskBreakdown.ts b/src/lib/prompts/taskBreakdown.ts index 045821c..2d811b4 100644 --- a/src/lib/prompts/taskBreakdown.ts +++ b/src/lib/prompts/taskBreakdown.ts @@ -22,6 +22,8 @@ export const taskBreakdownPrompt = `You are a task breakdown specialist. Your jo 3. Maintain **specific details** like quantities, measurements, and qualifiers 4. Use **clear, unambiguous language** in each sub-question 5. Keep the **same question type** (factual, analytical, etc.) +6. Avoid introducing **new concepts** or information not present in the original question +7. **Do not** repeat the same question multiple times; each sub-question should be unique and focused on a specific aspect of the original query ## Examples: @@ -58,6 +60,11 @@ TASK: What are the side effects of aspirin? TASK: What are the side effects of ibuprofen? TASK: What are the side effects of acetaminophen? +**Input**: "What day is New Year's Day this year?" +**Analysis**: Single focused question, no breakdown needed +**Output**: +TASK: What day is New Year's Day this year? + ## Your Task: Analyze this user question: "{query}" diff --git a/src/lib/search/agentSearch.ts b/src/lib/search/agentSearch.ts index 829cdc1..41729c2 100644 --- a/src/lib/search/agentSearch.ts +++ b/src/lib/search/agentSearch.ts @@ -61,6 +61,7 @@ export class AgentSearch { emitter, systemInstructions, signal, + embeddings, ); this.analyzerAgent = new AnalyzerAgent( llm, diff --git a/src/lib/utils/analyzePreviewContent.ts b/src/lib/utils/analyzePreviewContent.ts index 4325276..f8638bf 100644 --- a/src/lib/utils/analyzePreviewContent.ts +++ b/src/lib/utils/analyzePreviewContent.ts @@ -19,6 +19,7 @@ export type PreviewContent = { export const analyzePreviewContent = async ( previewContents: PreviewContent[], query: string, + taskQuery: string, chatHistory: BaseMessage[], llm: BaseChatModel, systemInstructions: string, @@ -60,14 +61,15 @@ Snippet: ${content.snippet} console.log(`Invoking LLM for preview content analysis`); const analysisResponse = await llm.invoke( - `${systemPrompt}You are a preview content analyzer, tasked with determining if search result snippets contain sufficient information to answer a user's query. + `${systemPrompt}You are a preview content analyzer, tasked with determining if search result snippets contain sufficient information to answer the Task Query. # Instructions -- Analyze the provided search result previews (titles + snippets), and chat history context to determine if they collectively contain enough information to provide a complete and accurate answer to the user's query +- Analyze the provided search result previews (titles + snippets), and chat history context to determine if they collectively contain enough information to provide a complete and accurate answer to the Task Query - You must make a binary decision: either the preview content is sufficient OR it is not sufficient -- If the preview content can provide a complete answer to the query, respond with "sufficient" -- If the preview content lacks important details, requires deeper analysis, or cannot fully answer the query, respond with "not_needed: [specific reason why full content analysis is required]" +- If the preview content can provide a complete answer to the Task Query, respond with "sufficient" +- If the preview content lacks important details, requires deeper analysis, or cannot fully answer the Task Query, respond with "not_needed: [specific reason why full content analysis is required]" - Be specific in your reasoning when the content is not sufficient +- The original query is provided for additional context, only use it for clarification of overall expectations and intent. You do **not** need to answer the original query directly or completely - Output your decision inside a \`decision\` XML tag # Information Context: @@ -79,6 +81,9 @@ ${formattedChatHistory ? formattedChatHistory : 'No previous conversation contex # User Query: ${query} +# Task Query (what to answer): +${taskQuery} + # Search Result Previews to Analyze: ${formattedPreviewContent} `,