feat(agent): More agent tweaks

This commit is contained in:
Willie Zutz 2025-06-22 13:35:01 -06:00
parent c3e845e0e2
commit a8eaadc6ed
11 changed files with 169 additions and 26 deletions

View file

@ -66,11 +66,41 @@ const handleEmitterEvents = async (
chatId: string,
startTime: number,
userMessageId: string,
abortController: AbortController,
) => {
let recievedMessage = '';
let sources: any[] = [];
let searchQuery: string | undefined;
let searchUrl: string | undefined;
let isStreamActive = true;
// Keep-alive ping mechanism to prevent reverse proxy timeouts
const pingInterval = setInterval(() => {
if (isStreamActive) {
try {
writer.write(
encoder.encode(
JSON.stringify({
type: 'ping',
timestamp: Date.now(),
}) + '\n',
),
);
} catch (error) {
// If writing fails, the connection is likely closed
clearInterval(pingInterval);
isStreamActive = false;
}
} else {
clearInterval(pingInterval);
}
}, 30000); // Send ping every 30 seconds
// Clean up ping interval if request is cancelled
abortController.signal.addEventListener('abort', () => {
isStreamActive = false;
clearInterval(pingInterval);
});
stream.on('data', (data) => {
const parsedData = JSON.parse(data);
@ -149,6 +179,9 @@ const handleEmitterEvents = async (
});
stream.on('end', () => {
isStreamActive = false;
clearInterval(pingInterval);
const endTime = Date.now();
const duration = endTime - startTime;
@ -190,6 +223,9 @@ const handleEmitterEvents = async (
.execute();
});
stream.on('error', (data) => {
isStreamActive = false;
clearInterval(pingInterval);
const parsedData = JSON.parse(data);
writer.write(
encoder.encode(
@ -413,6 +449,7 @@ export const POST = async (req: Request) => {
message.chatId,
startTime,
message.messageId,
abortController,
);
handleHistorySave(message, humanMessageId, body.focusMode, body.files);

View file

@ -192,6 +192,7 @@ export const POST = async (req: Request) => {
const stream = new ReadableStream({
start(controller) {
let sources: any[] = [];
let isStreamActive = true;
controller.enqueue(
encoder.encode(
@ -202,7 +203,31 @@ export const POST = async (req: Request) => {
),
);
// Keep-alive ping mechanism to prevent reverse proxy timeouts
const pingInterval = setInterval(() => {
if (isStreamActive && !signal.aborted) {
try {
controller.enqueue(
encoder.encode(
JSON.stringify({
type: 'ping',
timestamp: Date.now(),
}) + '\n',
),
);
} catch (error) {
// If enqueueing fails, the connection is likely closed
clearInterval(pingInterval);
isStreamActive = false;
}
} else {
clearInterval(pingInterval);
}
}, 30000); // Send ping every 30 seconds
signal.addEventListener('abort', () => {
isStreamActive = false;
clearInterval(pingInterval);
emitter.removeAllListeners();
try {
@ -244,6 +269,9 @@ export const POST = async (req: Request) => {
emitter.on('end', () => {
if (signal.aborted) return;
isStreamActive = false;
clearInterval(pingInterval);
controller.enqueue(
encoder.encode(
JSON.stringify({
@ -257,6 +285,9 @@ export const POST = async (req: Request) => {
emitter.on('error', (error: any) => {
if (signal.aborted) return;
isStreamActive = false;
clearInterval(pingInterval);
controller.error(error);
});
},