feat(app): allow stopping requests

This commit is contained in:
Willie Zutz 2025-05-14 11:19:06 -06:00
parent 936f651372
commit ce1a38febc
7 changed files with 227 additions and 68 deletions

View file

@ -1,6 +1,7 @@
import { ChatOpenAI } from '@langchain/openai';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { Embeddings } from '@langchain/core/embeddings';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import { BaseMessage } from '@langchain/core/messages';
import { StringOutputParser } from '@langchain/core/output_parsers';
import {
ChatPromptTemplate,
MessagesPlaceholder,
@ -11,19 +12,18 @@ import {
RunnableMap,
RunnableSequence,
} from '@langchain/core/runnables';
import { BaseMessage } from '@langchain/core/messages';
import { StringOutputParser } from '@langchain/core/output_parsers';
import LineListOutputParser from '../outputParsers/listLineOutputParser';
import LineOutputParser from '../outputParsers/lineOutputParser';
import { getDocumentsFromLinks } from '../utils/documents';
import { Document } from 'langchain/document';
import { searchSearxng } from '../searxng';
import path from 'node:path';
import fs from 'node:fs';
import computeSimilarity from '../utils/computeSimilarity';
import formatChatHistoryAsString from '../utils/formatHistory';
import eventEmitter from 'events';
import { StreamEvent } from '@langchain/core/tracers/log_stream';
import { ChatOpenAI } from '@langchain/openai';
import eventEmitter from 'events';
import { Document } from 'langchain/document';
import fs from 'node:fs';
import path from 'node:path';
import LineOutputParser from '../outputParsers/lineOutputParser';
import LineListOutputParser from '../outputParsers/listLineOutputParser';
import { searchSearxng } from '../searxng';
import computeSimilarity from '../utils/computeSimilarity';
import { getDocumentsFromLinks } from '../utils/documents';
import formatChatHistoryAsString from '../utils/formatHistory';
export interface MetaSearchAgentType {
searchAndAnswer: (
@ -34,6 +34,7 @@ export interface MetaSearchAgentType {
optimizationMode: 'speed' | 'balanced' | 'quality',
fileIds: string[],
systemInstructions: string,
signal: AbortSignal,
) => Promise<eventEmitter>;
}
@ -247,6 +248,7 @@ class MetaSearchAgent implements MetaSearchAgentType {
embeddings: Embeddings,
optimizationMode: 'speed' | 'balanced' | 'quality',
systemInstructions: string,
signal: AbortSignal,
) {
return RunnableSequence.from([
RunnableMap.from({
@ -254,43 +256,58 @@ class MetaSearchAgent implements MetaSearchAgentType {
query: (input: BasicChainInput) => input.query,
chat_history: (input: BasicChainInput) => input.chat_history,
date: () => new Date().toISOString(),
context: RunnableLambda.from(async (input: BasicChainInput) => {
const processedHistory = formatChatHistoryAsString(
input.chat_history,
);
let docs: Document[] | null = null;
let query = input.query;
if (this.config.searchWeb) {
const searchRetrieverChain =
await this.createSearchRetrieverChain(llm);
var date = new Date().toISOString();
const searchRetrieverResult = await searchRetrieverChain.invoke({
chat_history: processedHistory,
query,
date,
});
query = searchRetrieverResult.query;
docs = searchRetrieverResult.docs;
// Store the search query in the context for emitting to the client
if (searchRetrieverResult.searchQuery) {
this.searchQuery = searchRetrieverResult.searchQuery;
context: RunnableLambda.from(
async (
input: BasicChainInput,
options?: { signal?: AbortSignal },
) => {
// Check if the request was aborted
if (options?.signal?.aborted || signal?.aborted) {
console.log('Request cancelled by user');
throw new Error('Request cancelled by user');
}
}
const sortedDocs = await this.rerankDocs(
query,
docs ?? [],
fileIds,
embeddings,
optimizationMode,
);
const processedHistory = formatChatHistoryAsString(
input.chat_history,
);
return sortedDocs;
})
let docs: Document[] | null = null;
let query = input.query;
if (this.config.searchWeb) {
const searchRetrieverChain =
await this.createSearchRetrieverChain(llm);
var date = new Date().toISOString();
const searchRetrieverResult = await searchRetrieverChain.invoke(
{
chat_history: processedHistory,
query,
date,
},
{ signal: options?.signal },
);
query = searchRetrieverResult.query;
docs = searchRetrieverResult.docs;
// Store the search query in the context for emitting to the client
if (searchRetrieverResult.searchQuery) {
this.searchQuery = searchRetrieverResult.searchQuery;
}
}
const sortedDocs = await this.rerankDocs(
query,
docs ?? [],
fileIds,
embeddings,
optimizationMode,
);
return sortedDocs;
},
)
.withConfig({
runName: 'FinalSourceRetriever',
})
@ -450,8 +467,17 @@ class MetaSearchAgent implements MetaSearchAgentType {
stream: AsyncGenerator<StreamEvent, any, any>,
emitter: eventEmitter,
llm: BaseChatModel,
signal: AbortSignal,
) {
if (signal.aborted) {
return;
}
for await (const event of stream) {
if (signal.aborted) {
return;
}
if (
event.event === 'on_chain_end' &&
event.name === 'FinalSourceRetriever'
@ -544,6 +570,7 @@ class MetaSearchAgent implements MetaSearchAgentType {
optimizationMode: 'speed' | 'balanced' | 'quality',
fileIds: string[],
systemInstructions: string,
signal: AbortSignal,
) {
const emitter = new eventEmitter();
@ -553,6 +580,7 @@ class MetaSearchAgent implements MetaSearchAgentType {
embeddings,
optimizationMode,
systemInstructions,
signal,
);
const stream = answeringChain.streamEvents(
@ -562,10 +590,12 @@ class MetaSearchAgent implements MetaSearchAgentType {
},
{
version: 'v1',
// Pass the abort signal to the LLM streaming chain
signal,
},
);
this.handleStream(stream, emitter, llm);
this.handleStream(stream, emitter, llm, signal);
return emitter;
}