feat(agent): Stream agent messages, sources, tool calls, etc.

This commit is contained in:
Willie Zutz 2025-08-03 15:48:34 -06:00
parent d63196b2e8
commit 3e238303b0
14 changed files with 550 additions and 506 deletions

View file

@ -4,6 +4,7 @@ import {
BaseMessage,
HumanMessage,
SystemMessage,
AIMessage,
} from '@langchain/core/messages';
import { Embeddings } from '@langchain/core/embeddings';
import { EventEmitter } from 'events';
@ -17,6 +18,7 @@ import {
} from '@/lib/tools/agents';
import { formatDateForLLM } from '../utils';
import { getModelName } from '../utils/modelUtils';
import { removeThinkingBlocks } from '../utils/contentUtils';
/**
* Simplified Agent using createReactAgent
@ -451,19 +453,6 @@ Use all available tools strategically to provide comprehensive, well-researched,
// Initialize agent with the provided focus mode and file context
const agent = this.initializeAgent(focusMode, fileIds);
// Emit initial agent action
this.emitter.emit(
'data',
JSON.stringify({
type: 'agent_action',
data: {
action: 'simplified_agent_start',
message: `Starting simplified agent search in ${focusMode} mode`,
details: `Processing query with ${fileIds.length} files available`,
},
}),
);
// Prepare initial state
const initialState = {
messages: [...history, new HumanMessage(query)],
@ -489,25 +478,165 @@ Use all available tools strategically to provide comprehensive, well-researched,
signal: this.signal,
};
// Execute the agent
const result = await agent.invoke(initialState, config);
// Use streamEvents to capture both tool calls and token-level streaming
const eventStream = agent.streamEvents(initialState, {
...config,
version: 'v2',
});
// Collect relevant documents from tool execution history
let finalResult: any = null;
let collectedDocuments: any[] = [];
let currentResponseBuffer = '';
// Get the relevant docs from the current agent state
if (result && result.relevantDocuments) {
collectedDocuments.push(...result.relevantDocuments);
// Process the event stream
for await (const event of eventStream) {
// Handle different event types
if (
event.event === 'on_chain_end' &&
event.name === 'RunnableSequence'
) {
finalResult = event.data.output;
// Collect relevant documents from the final result
if (finalResult && finalResult.relevantDocuments) {
collectedDocuments.push(...finalResult.relevantDocuments);
}
}
// Collect sources from tool results
if (
event.event === 'on_chain_end' &&
(event.name.includes('search') ||
event.name.includes('Search') ||
event.name.includes('tool') ||
event.name.includes('Tool'))
) {
// Handle LangGraph state updates with relevantDocuments
if (event.data?.output && Array.isArray(event.data.output)) {
for (const item of event.data.output) {
if (
item.update &&
item.update.relevantDocuments &&
Array.isArray(item.update.relevantDocuments)
) {
collectedDocuments.push(...item.update.relevantDocuments);
}
}
}
}
// Emit sources as we collect them
if (collectedDocuments.length > 0) {
this.emitter.emit(
'data',
JSON.stringify({
type: 'sources',
data: collectedDocuments,
searchQuery: '',
searchUrl: '',
}),
);
}
// Handle streaming tool calls (for thought messages)
if (event.event === 'on_chat_model_end' && event.data.output) {
const output = event.data.output;
if (
output._getType() === 'ai' &&
output.tool_calls &&
output.tool_calls.length > 0
) {
const aiMessage = output as AIMessage;
// Process each tool call and emit thought messages
for (const toolCall of aiMessage.tool_calls || []) {
if (toolCall && toolCall.name) {
const toolName = toolCall.name;
const toolArgs = toolCall.args || {};
// Create user-friendly messages for different tools using markdown components
let toolMarkdown = '';
switch (toolName) {
case 'web_search':
toolMarkdown = `<ToolCall type="search" query="${(toolArgs.query || 'relevant information').replace(/"/g, '&quot;')}"></ToolCall>`;
break;
case 'file_search':
toolMarkdown = `<ToolCall type="file" query="${(toolArgs.query || 'relevant information').replace(/"/g, '&quot;')}"></ToolCall>`;
break;
case 'url_summarization':
if (Array.isArray(toolArgs.urls)) {
toolMarkdown = `<ToolCall type="url" count="${toolArgs.urls.length}"></ToolCall>`;
} else {
toolMarkdown = `<ToolCall type="url" count="1"></ToolCall>`;
}
break;
default:
toolMarkdown = `<ToolCall type="${toolName}"></ToolCall>`;
}
// Emit the thought message
this.emitter.emit(
'data',
JSON.stringify({
type: 'tool_call',
data: {
// messageId: crypto.randomBytes(7).toString('hex'),
content: toolMarkdown,
},
}),
);
}
}
}
}
// Handle token-level streaming for the final response
if (event.event === 'on_chat_model_stream' && event.data.chunk) {
const chunk = event.data.chunk;
if (chunk.content && typeof chunk.content === 'string') {
// If this is the first token, emit sources if we have them
if (currentResponseBuffer === '' && collectedDocuments.length > 0) {
this.emitter.emit(
'data',
JSON.stringify({
type: 'sources',
data: collectedDocuments,
searchQuery: '',
searchUrl: '',
}),
);
}
// Add the token to our buffer
currentResponseBuffer += chunk.content;
// Emit the individual token
this.emitter.emit(
'data',
JSON.stringify({
type: 'response',
data: chunk.content,
}),
);
}
}
}
// Add collected documents to result for source tracking
const finalResult = {
...result,
relevantDocuments: collectedDocuments,
};
// Emit the final sources used for the response
if (collectedDocuments.length > 0) {
this.emitter.emit(
'data',
JSON.stringify({
type: 'sources',
data: collectedDocuments,
searchQuery: '',
searchUrl: '',
}),
);
}
// Extract final message and emit as response
// If we didn't get any streamed tokens but have a final result, emit it
if (
currentResponseBuffer === '' &&
finalResult &&
finalResult.messages &&
finalResult.messages.length > 0
@ -516,23 +645,7 @@ Use all available tools strategically to provide comprehensive, well-researched,
finalResult.messages[finalResult.messages.length - 1];
if (finalMessage && finalMessage.content) {
console.log('SimplifiedAgent: Emitting final response');
// Emit the sources used for the response
if (
finalResult.relevantDocuments &&
finalResult.relevantDocuments.length > 0
) {
this.emitter.emit(
'data',
JSON.stringify({
type: 'sources',
data: finalResult.relevantDocuments,
searchQuery: '',
searchUrl: '',
}),
);
}
console.log('SimplifiedAgent: Emitting complete response (fallback)');
this.emitter.emit(
'data',
@ -541,23 +654,22 @@ Use all available tools strategically to provide comprehensive, well-researched,
data: finalMessage.content,
}),
);
} else {
console.warn('SimplifiedAgent: No valid final message found');
this.emitter.emit(
'data',
JSON.stringify({
type: 'response',
data: 'I apologize, but I was unable to generate a complete response to your query. Please try rephrasing your question or providing more specific details.',
}),
);
}
} else {
console.warn('SimplifiedAgent: No result messages found');
}
// If we still have no response, emit a fallback message
if (
currentResponseBuffer === '' &&
(!finalResult ||
!finalResult.messages ||
finalResult.messages.length === 0)
) {
console.warn('SimplifiedAgent: No valid response found');
this.emitter.emit(
'data',
JSON.stringify({
type: 'response',
data: 'I encountered an issue while processing your request. Please try again with a different query.',
data: 'I apologize, but I was unable to generate a complete response to your query. Please try rephrasing your question or providing more specific details.',
}),
);
}
@ -577,7 +689,7 @@ Use all available tools strategically to provide comprehensive, well-researched,
console.error('SimplifiedAgent: Error during search and answer:', error);
// Handle specific error types
if (error.name === 'AbortError') {
if (error.name === 'AbortError' || this.signal.aborted) {
console.warn('SimplifiedAgent: Operation was aborted');
this.emitter.emit(
'data',