fix: prevent Agnes video worker fetch timeout

This commit is contained in:
FengLee
2026-06-06 18:09:32 +08:00
parent e1ec52ab86
commit dd1118dfb8
4 changed files with 121 additions and 7 deletions

View File

@@ -128,12 +128,24 @@ await runTest('Agnes video failures are reported by stage instead of raw fetch f
const executor = read('src/lib/user-api-manifest-executor.ts');
const videoRoute = read('src/app/api/generate/video/route.ts');
const worker = read('src/lib/generation-job-worker.ts');
const runner = read('src/lib/generation-job-runner.ts');
assert.match(executor, /const stage = method === 'GET' \? '上游任务轮询' : '上游任务创建'/);
assert.match(executor, /网络连接失败,请稍后重试/);
assert.match(videoRoute, /上游已返回视频地址,但平台下载或保存结果视频失败/);
assert.match(worker, /creation history persistence failed:/);
assert.match(worker, /\(\$\{url\}\)/);
assert.match(runner, /内部生成请求网络连接失败/);
assert.match(runner, /requestInternalGenerationJson/);
});
await runTest('Agnes video polling progress is forwarded into generation job status', () => {
const executor = read('src/lib/user-api-manifest-executor.ts');
assert.match(executor, /function getManifestProgress/);
assert.match(executor, /getPathValue\(raw,\s*'progress'\)/);
assert.match(executor, /remainingSeconds/);
assert.match(executor, /await input\.onProgress\?\.\(getManifestProgress\(raw,\s*status\)\)/);
});
await runTest('Agnes installer source creates free inactive rows with empty API key and per-row Manifest files', () => {

View File

@@ -25,6 +25,15 @@ await runTest('generation job runner can dispatch reverse-prompt payloads to the
assert.match(source, /const endpoint = type === 'image' \? '\/api\/generate\/image' : type === 'video' \? '\/api\/generate\/video' : '\/api\/generate\/reverse-prompt';/);
});
await runTest('generation job runner uses long-lived internal HTTP requests for slow video jobs', () => {
const source = read('src/lib/generation-job-runner.ts');
assert.match(source, /requestInternalGenerationJson/);
assert.match(source, /GENERATION_INTERNAL_REQUEST_TIMEOUT_MS/);
assert.match(source, /25 \* 60_000/);
assert.match(source, /req\.setTimeout\(timeoutMs/);
assert.doesNotMatch(source, /await fetch\(`\$\{baseUrl\}\$\{endpoint\}`/);
});
await runTest('generation jobs route can list active jobs and accept reverse-prompt submissions', () => {
const source = read('src/app/api/generation-jobs/route.ts');
assert.match(source, /export async function GET\(request: NextRequest\)/);

View File

@@ -1,7 +1,74 @@
import { getInternalGenerationHeaders } from '@/lib/server-api-config';
import { request as httpRequest } from 'http';
import { request as httpsRequest } from 'https';
export type GenerationJobType = 'image' | 'video' | 'reverse-prompt';
type InternalGenerationResponse = {
statusCode: number;
data: Record<string, unknown>;
};
function parsePositiveInt(value: string | undefined, fallback: number): number {
const parsed = Number(value);
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : fallback;
}
function getGenerationPayloadTimeoutMs(type: GenerationJobType): number {
const fallback = type === 'video' ? 25 * 60_000 : type === 'reverse-prompt' ? 5 * 60_000 : 15 * 60_000;
return parsePositiveInt(process.env.GENERATION_INTERNAL_REQUEST_TIMEOUT_MS, fallback);
}
function requestInternalGenerationJson(
url: string,
headers: Record<string, string>,
payload: Record<string, unknown>,
timeoutMs: number,
): Promise<InternalGenerationResponse> {
return new Promise((resolve, reject) => {
const target = new URL(url);
const body = JSON.stringify(payload);
const transport = target.protocol === 'https:' ? httpsRequest : httpRequest;
const req = transport(
{
protocol: target.protocol,
hostname: target.hostname,
port: target.port,
method: 'POST',
path: `${target.pathname}${target.search}`,
headers: {
...headers,
'Content-Length': Buffer.byteLength(body),
},
},
res => {
const chunks: Buffer[] = [];
res.on('data', chunk => chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)));
res.once('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
try {
resolve({
statusCode: res.statusCode || 500,
data: raw ? JSON.parse(raw) as Record<string, unknown> : {},
});
} catch {
resolve({
statusCode: res.statusCode || 500,
data: raw ? { error: raw } : {},
});
}
});
},
);
req.setTimeout(timeoutMs, () => {
req.destroy(new Error(`内部生成请求超时(${Math.ceil(timeoutMs / 1000)} 秒)`));
});
req.once('error', error => reject(error));
req.end(body);
});
}
export async function runGenerationPayload(
type: GenerationJobType,
payload: Record<string, unknown>,
@@ -17,18 +84,24 @@ export async function runGenerationPayload(
if (options.userId) headers['x-miaojing-generation-user-id'] = options.userId;
if (options.jobId) headers['x-miaojing-generation-job-id'] = options.jobId;
const res = await fetch(`${baseUrl}${endpoint}`, {
method: 'POST',
const { statusCode, data } = await requestInternalGenerationJson(
`${baseUrl}${endpoint}`,
headers,
body: JSON.stringify(payload),
payload,
getGenerationPayloadTimeoutMs(type),
).catch(error => {
const message = error instanceof Error ? error.message : String(error || '');
if (/fetch failed|network|ECONNRESET|ETIMEDOUT|EAI_AGAIN|ENOTFOUND|socket|aborted/i.test(message)) {
throw new Error(`内部生成请求网络连接失败:${message || 'request failed'}`);
}
throw error;
});
const data = await res.json().catch(() => ({}));
if (!res.ok) {
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
typeof data?.error === 'string'
? data.error
: `Generation request failed (${res.status})`,
: `Generation request failed (${statusCode})`,
);
}

View File

@@ -249,6 +249,26 @@ function replaceTaskIdPlaceholders(value: unknown, taskId?: string): unknown {
return value;
}
function numberFromUnknown(value: unknown): number | undefined {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : undefined;
}
function getManifestProgress(raw: unknown, status: unknown): Record<string, unknown> {
const percent = numberFromUnknown(getPathValue(raw, 'progress'))
?? numberFromUnknown(getPathValue(raw, 'data.progress'))
?? numberFromUnknown(getPathValue(raw, 'result.progress'));
const remainingSeconds = numberFromUnknown(getPathValue(raw, 'remainingSeconds'))
?? numberFromUnknown(getPathValue(raw, 'remaining_seconds'))
?? numberFromUnknown(getPathValue(raw, 'eta'))
?? numberFromUnknown(getPathValue(raw, 'eta_seconds'));
return {
...(percent !== undefined ? { percent } : {}),
...(remainingSeconds !== undefined ? { remainingSeconds } : {}),
message: typeof status === 'string' ? status : '等待上游任务完成',
};
}
function dataUrlToBlob(value: string): { blob: Blob; fileName: string } | null {
const parsed = parseDataUrlForUpload(value);
if (!parsed) return null;
@@ -407,7 +427,7 @@ async function pollManifestResult(
if ((isFinal && isSuccess) || (!poll.finalValues?.length && isSuccess) || (!poll.successValues?.length && (media.images.length > 0 || media.videos.length > 0))) {
return { raw, ...media };
}
await input.onProgress?.({ message: typeof status === 'string' ? status : '等待上游任务完成' });
await input.onProgress?.(getManifestProgress(raw, status));
}
throw new Error('上游任务轮询超时');