feat(app): allow stopping requests
This commit is contained in:
parent
380216e062
commit
9c7ccf42fc
7 changed files with 227 additions and 68 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import { ChatOpenAI } from '@langchain/openai';
|
||||
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
|
||||
import type { Embeddings } from '@langchain/core/embeddings';
|
||||
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
|
||||
import { BaseMessage } from '@langchain/core/messages';
|
||||
import { StringOutputParser } from '@langchain/core/output_parsers';
|
||||
import {
|
||||
ChatPromptTemplate,
|
||||
MessagesPlaceholder,
|
||||
|
|
@ -11,19 +12,18 @@ import {
|
|||
RunnableMap,
|
||||
RunnableSequence,
|
||||
} from '@langchain/core/runnables';
|
||||
import { BaseMessage } from '@langchain/core/messages';
|
||||
import { StringOutputParser } from '@langchain/core/output_parsers';
|
||||
import LineListOutputParser from '../outputParsers/listLineOutputParser';
|
||||
import LineOutputParser from '../outputParsers/lineOutputParser';
|
||||
import { getDocumentsFromLinks } from '../utils/documents';
|
||||
import { Document } from 'langchain/document';
|
||||
import { searchSearxng } from '../searxng';
|
||||
import path from 'node:path';
|
||||
import fs from 'node:fs';
|
||||
import computeSimilarity from '../utils/computeSimilarity';
|
||||
import formatChatHistoryAsString from '../utils/formatHistory';
|
||||
import eventEmitter from 'events';
|
||||
import { StreamEvent } from '@langchain/core/tracers/log_stream';
|
||||
import { ChatOpenAI } from '@langchain/openai';
|
||||
import eventEmitter from 'events';
|
||||
import { Document } from 'langchain/document';
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import LineOutputParser from '../outputParsers/lineOutputParser';
|
||||
import LineListOutputParser from '../outputParsers/listLineOutputParser';
|
||||
import { searchSearxng } from '../searxng';
|
||||
import computeSimilarity from '../utils/computeSimilarity';
|
||||
import { getDocumentsFromLinks } from '../utils/documents';
|
||||
import formatChatHistoryAsString from '../utils/formatHistory';
|
||||
|
||||
export interface MetaSearchAgentType {
|
||||
searchAndAnswer: (
|
||||
|
|
@ -34,6 +34,7 @@ export interface MetaSearchAgentType {
|
|||
optimizationMode: 'speed' | 'balanced' | 'quality',
|
||||
fileIds: string[],
|
||||
systemInstructions: string,
|
||||
signal: AbortSignal,
|
||||
) => Promise<eventEmitter>;
|
||||
}
|
||||
|
||||
|
|
@ -247,6 +248,7 @@ class MetaSearchAgent implements MetaSearchAgentType {
|
|||
embeddings: Embeddings,
|
||||
optimizationMode: 'speed' | 'balanced' | 'quality',
|
||||
systemInstructions: string,
|
||||
signal: AbortSignal,
|
||||
) {
|
||||
return RunnableSequence.from([
|
||||
RunnableMap.from({
|
||||
|
|
@ -254,43 +256,58 @@ class MetaSearchAgent implements MetaSearchAgentType {
|
|||
query: (input: BasicChainInput) => input.query,
|
||||
chat_history: (input: BasicChainInput) => input.chat_history,
|
||||
date: () => new Date().toISOString(),
|
||||
context: RunnableLambda.from(async (input: BasicChainInput) => {
|
||||
const processedHistory = formatChatHistoryAsString(
|
||||
input.chat_history,
|
||||
);
|
||||
|
||||
let docs: Document[] | null = null;
|
||||
let query = input.query;
|
||||
|
||||
if (this.config.searchWeb) {
|
||||
const searchRetrieverChain =
|
||||
await this.createSearchRetrieverChain(llm);
|
||||
var date = new Date().toISOString();
|
||||
const searchRetrieverResult = await searchRetrieverChain.invoke({
|
||||
chat_history: processedHistory,
|
||||
query,
|
||||
date,
|
||||
});
|
||||
|
||||
query = searchRetrieverResult.query;
|
||||
docs = searchRetrieverResult.docs;
|
||||
|
||||
// Store the search query in the context for emitting to the client
|
||||
if (searchRetrieverResult.searchQuery) {
|
||||
this.searchQuery = searchRetrieverResult.searchQuery;
|
||||
context: RunnableLambda.from(
|
||||
async (
|
||||
input: BasicChainInput,
|
||||
options?: { signal?: AbortSignal },
|
||||
) => {
|
||||
// Check if the request was aborted
|
||||
if (options?.signal?.aborted || signal?.aborted) {
|
||||
console.log('Request cancelled by user');
|
||||
throw new Error('Request cancelled by user');
|
||||
}
|
||||
}
|
||||
|
||||
const sortedDocs = await this.rerankDocs(
|
||||
query,
|
||||
docs ?? [],
|
||||
fileIds,
|
||||
embeddings,
|
||||
optimizationMode,
|
||||
);
|
||||
const processedHistory = formatChatHistoryAsString(
|
||||
input.chat_history,
|
||||
);
|
||||
|
||||
return sortedDocs;
|
||||
})
|
||||
let docs: Document[] | null = null;
|
||||
let query = input.query;
|
||||
|
||||
if (this.config.searchWeb) {
|
||||
const searchRetrieverChain =
|
||||
await this.createSearchRetrieverChain(llm);
|
||||
var date = new Date().toISOString();
|
||||
|
||||
const searchRetrieverResult = await searchRetrieverChain.invoke(
|
||||
{
|
||||
chat_history: processedHistory,
|
||||
query,
|
||||
date,
|
||||
},
|
||||
{ signal: options?.signal },
|
||||
);
|
||||
|
||||
query = searchRetrieverResult.query;
|
||||
docs = searchRetrieverResult.docs;
|
||||
|
||||
// Store the search query in the context for emitting to the client
|
||||
if (searchRetrieverResult.searchQuery) {
|
||||
this.searchQuery = searchRetrieverResult.searchQuery;
|
||||
}
|
||||
}
|
||||
|
||||
const sortedDocs = await this.rerankDocs(
|
||||
query,
|
||||
docs ?? [],
|
||||
fileIds,
|
||||
embeddings,
|
||||
optimizationMode,
|
||||
);
|
||||
|
||||
return sortedDocs;
|
||||
},
|
||||
)
|
||||
.withConfig({
|
||||
runName: 'FinalSourceRetriever',
|
||||
})
|
||||
|
|
@ -450,8 +467,17 @@ class MetaSearchAgent implements MetaSearchAgentType {
|
|||
stream: AsyncGenerator<StreamEvent, any, any>,
|
||||
emitter: eventEmitter,
|
||||
llm: BaseChatModel,
|
||||
signal: AbortSignal,
|
||||
) {
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
for await (const event of stream) {
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
event.event === 'on_chain_end' &&
|
||||
event.name === 'FinalSourceRetriever'
|
||||
|
|
@ -544,6 +570,7 @@ class MetaSearchAgent implements MetaSearchAgentType {
|
|||
optimizationMode: 'speed' | 'balanced' | 'quality',
|
||||
fileIds: string[],
|
||||
systemInstructions: string,
|
||||
signal: AbortSignal,
|
||||
) {
|
||||
const emitter = new eventEmitter();
|
||||
|
||||
|
|
@ -553,6 +580,7 @@ class MetaSearchAgent implements MetaSearchAgentType {
|
|||
embeddings,
|
||||
optimizationMode,
|
||||
systemInstructions,
|
||||
signal,
|
||||
);
|
||||
|
||||
const stream = answeringChain.streamEvents(
|
||||
|
|
@ -562,10 +590,12 @@ class MetaSearchAgent implements MetaSearchAgentType {
|
|||
},
|
||||
{
|
||||
version: 'v1',
|
||||
// Pass the abort signal to the LLM streaming chain
|
||||
signal,
|
||||
},
|
||||
);
|
||||
|
||||
this.handleStream(stream, emitter, llm);
|
||||
this.handleStream(stream, emitter, llm, signal);
|
||||
|
||||
return emitter;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue