Initial WallMuse project
This commit is contained in:
8
apps/worker-generation/package.json
Normal file
8
apps/worker-generation/package.json
Normal file
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"name": "@wallmuse/worker-generation",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": { "dev": "tsx src/index.ts", "typecheck": "tsc --noEmit -p tsconfig.json", "build": "tsc -p tsconfig.json" },
|
||||
"dependencies": { "@wallmuse/db": "workspace:*", "@wallmuse/image-pipeline": "workspace:*", "@wallmuse/provider-adapters": "workspace:*", "@wallmuse/shared": "workspace:*", "bullmq": "^5.53.2", "ioredis": "^5.6.1" }
|
||||
}
|
||||
20
apps/worker-generation/src/connection.ts
Normal file
20
apps/worker-generation/src/connection.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { Queue } from "bullmq";
|
||||
import IORedis from "ioredis";
|
||||
import type { GenerationWorkerJobData } from "../../../packages/db/src/json-store.js";
|
||||
const defaultGenerationQueueName = "generation.default";
|
||||
|
||||
export function createRedisConnection(): IORedis {
|
||||
return new IORedis(process.env.REDIS_URL ?? "redis://127.0.0.1:6379", { maxRetriesPerRequest: null });
|
||||
}
|
||||
|
||||
export function createGenerationQueue(connection = createRedisConnection()): Queue<GenerationWorkerJobData> {
|
||||
return new Queue<GenerationWorkerJobData>(process.env.GENERATION_QUEUE_NAME ?? defaultGenerationQueueName, {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: Number(process.env.GENERATION_JOB_ATTEMPTS ?? 3),
|
||||
backoff: { type: "exponential", delay: Number(process.env.GENERATION_JOB_BACKOFF_MS ?? 1000) },
|
||||
removeOnComplete: 100,
|
||||
removeOnFail: 100
|
||||
}
|
||||
});
|
||||
}
|
||||
8
apps/worker-generation/src/index.ts
Normal file
8
apps/worker-generation/src/index.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
import { createGenerationWorker } from "./worker.js";
|
||||
const worker = createGenerationWorker();
|
||||
worker.on("ready", () => console.log("[worker-generation] ready"));
|
||||
worker.on("completed", (job) => console.log(`[worker-generation] completed job=${job.id}`));
|
||||
worker.on("failed", (job, error) => console.error(`[worker-generation] failed job=${job?.id ?? "unknown"} ${error.message}`));
|
||||
const shutdown = async () => { await worker.close(); process.exit(0); };
|
||||
process.on("SIGINT", shutdown);
|
||||
process.on("SIGTERM", shutdown);
|
||||
58
apps/worker-generation/src/processor.ts
Normal file
58
apps/worker-generation/src/processor.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { JsonWallMuseDb } from "../../../packages/db/src/json-store.js";
|
||||
import type { GenerationWorkerJobData } from "../../../packages/db/src/json-store.js";
|
||||
import { LocalProviderAssetStore } from "@wallmuse/image-pipeline";
|
||||
import { getImageProviderAdapter } from "@wallmuse/provider-adapters";
|
||||
import type { ImageGenerationResult } from "@wallmuse/provider-adapters";
|
||||
import type { AssetKind } from "@wallmuse/shared";
|
||||
import { getGenerationSteps, getTargetSize } from "./state-machine.js";
|
||||
|
||||
export class GenerationProcessor {
|
||||
constructor(private readonly db = JsonWallMuseDb.fromEnv(), private readonly storage = new LocalProviderAssetStore()) {}
|
||||
async process(job: { data: GenerationWorkerJobData; updateProgress(progress: number): Promise<void> }): Promise<void> {
|
||||
const data = job.data;
|
||||
await this.db.init();
|
||||
const attempt = await this.db.incrementGenerationAttempt(data.groupId, data.taskId);
|
||||
const adapter = getImageProviderAdapter(data.providerSlug);
|
||||
let masterStorageUrl = data.referenceAssetUrl;
|
||||
try {
|
||||
for (const step of getGenerationSteps(data)) {
|
||||
await this.db.markGenerationStatus(data.groupId, data.taskId, "running", step.taskStatus, step.progress);
|
||||
await job.updateProgress(step.progress);
|
||||
if (!step.assetKind) continue;
|
||||
const stored = await this.runProviderStep(data, step.assetKind, attempt, masterStorageUrl, step.promptSuffix);
|
||||
if (step.assetKind === "master") masterStorageUrl = stored.storageUrl;
|
||||
}
|
||||
await this.db.markGenerationStatus(data.groupId, data.taskId, "succeeded", "succeeded", 100);
|
||||
await job.updateProgress(100);
|
||||
} catch (error) {
|
||||
const normalized = adapter.normalizeError(error);
|
||||
await this.db.markGenerationFailure(data.groupId, data.taskId, data.retryAssetKind ? "partial_succeeded" : "failed", normalized.code, normalized.message);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
private async runProviderStep(data: GenerationWorkerJobData, assetKind: Extract<AssetKind, "master" | "landscape" | "portrait">, attempt: number, masterStorageUrl?: string, promptSuffix = "") {
|
||||
const adapter = getImageProviderAdapter(data.providerSlug);
|
||||
const callId = `pcl_${crypto.randomUUID()}`;
|
||||
const callLog = await this.db.createProviderCallLog({ id: callId, ...(data.taskId === undefined ? {} : { taskId: data.taskId }), groupId: data.groupId, providerId: data.providerId, modelId: data.modelId, assetKind, attempt });
|
||||
const startedAt = Date.now();
|
||||
try {
|
||||
const size = getTargetSize(data, assetKind);
|
||||
const baseInput = { prompt: `${data.prompt}${promptSuffix}`, model: data.modelSlug, size, width: size.width, height: size.height, responseFormat: "base64" as const, ...(data.negativePrompt === undefined ? {} : { negativePrompt: data.negativePrompt }), ...(data.seed === undefined ? {} : { seed: data.seed }) };
|
||||
const result = assetKind === "master" || !adapter.generateImageToImage ? await adapter.generateTextToImage(baseInput, { requestId: callId, metadata: { assetKind, groupId: data.groupId } }) : await adapter.generateImageToImage({ ...baseInput, images: [{ kind: "url", value: masterStorageUrl ?? data.referenceAssetUrl ?? "" }] }, { requestId: callId, metadata: { assetKind, groupId: data.groupId } });
|
||||
const stored = await this.storeFirstAsset(data, assetKind, result);
|
||||
await this.db.completeProviderCallLog(callLog.id, { status: "succeeded", latencyMs: Date.now() - startedAt, usage: result.usage, rawMetadata: result.rawMetadata });
|
||||
return stored;
|
||||
} catch (error) {
|
||||
const normalized = adapter.normalizeError(error);
|
||||
await this.db.completeProviderCallLog(callLog.id, { status: "failed", latencyMs: Date.now() - startedAt, errorCode: normalized.code, errorMessage: normalized.message });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
private async storeFirstAsset(data: GenerationWorkerJobData, assetKind: Extract<AssetKind, "master" | "landscape" | "portrait">, result: ImageGenerationResult) {
|
||||
const asset = result.assets[0];
|
||||
if (!asset) throw new Error(`Provider returned no assets for ${assetKind}`);
|
||||
const stored = await this.storage.storeProviderAsset({ userId: data.userId, groupId: data.groupId, assetKind, source: asset });
|
||||
await this.db.upsertGeneratedAsset({ groupId: data.groupId, ...(data.taskId === undefined ? {} : { taskId: data.taskId }), userId: data.userId, assetKind, status: "active", bucket: stored.bucket, objectKey: stored.objectKey, storageUrl: stored.storageUrl, publicUrl: stored.publicUrl, mimeType: stored.mimeType, byteSize: stored.byteSize, sha256: stored.sha256, providerId: data.providerId, modelId: data.modelId, ...(stored.width === undefined ? {} : { width: stored.width }), ...(stored.height === undefined ? {} : { height: stored.height }), ...(asset.seed === undefined ? {} : { seed: asset.seed }) });
|
||||
return stored;
|
||||
}
|
||||
}
|
||||
18
apps/worker-generation/src/state-machine.ts
Normal file
18
apps/worker-generation/src/state-machine.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import type { GenerationWorkerJobData } from "../../../packages/db/src/json-store.js";
|
||||
import type { AssetKind, AspectRatio, GenerationTaskStatus } from "@wallmuse/shared";
|
||||
|
||||
export interface GenerationStep { taskStatus: GenerationTaskStatus; phase: string; assetKind?: Extract<AssetKind, "master" | "landscape" | "portrait">; progress: number; promptSuffix?: string; }
|
||||
const fullPipeline: GenerationStep[] = [
|
||||
{ phase: "validating", taskStatus: "dispatching", progress: 8 },
|
||||
{ phase: "generating_master", taskStatus: "running", assetKind: "master", progress: 24 },
|
||||
{ phase: "deriving_landscape", taskStatus: "running", assetKind: "landscape", progress: 48, promptSuffix: " Adapt this scene into a desktop 16:9 wallpaper while preserving subject, style, color palette, light and material." },
|
||||
{ phase: "deriving_portrait", taskStatus: "running", assetKind: "portrait", progress: 70, promptSuffix: " Adapt this scene into a mobile 9:16 wallpaper while preserving subject, style, color palette, light and material." },
|
||||
{ phase: "downloading", taskStatus: "uploading", progress: 86 },
|
||||
{ phase: "processing", taskStatus: "post_processing", progress: 94 }
|
||||
];
|
||||
export function getGenerationSteps(job: GenerationWorkerJobData): GenerationStep[] { return job.retryAssetKind ? fullPipeline.filter((step) => !step.assetKind || step.assetKind === job.retryAssetKind || step.phase === "downloading" || step.phase === "processing") : fullPipeline; }
|
||||
export function getTargetSize(job: GenerationWorkerJobData, assetKind: Extract<AssetKind, "master" | "landscape" | "portrait">): { aspectRatio: AspectRatio; resolution: "1k" | "2k" | "4k"; width: number; height: number } {
|
||||
const presets = { "1k": { master: [1024, 1024], landscape: [1280, 720], portrait: [720, 1280] }, "2k": { master: [1536, 1536], landscape: [2560, 1440], portrait: [1440, 2560] }, "4k": { master: [2048, 2048], landscape: [3840, 2160], portrait: [2160, 3840] } } as const;
|
||||
const [width, height] = presets[job.resolution][assetKind];
|
||||
return { aspectRatio: assetKind === "portrait" ? "9:16" : assetKind === "landscape" ? "16:9" : "1:1", resolution: job.resolution, width, height };
|
||||
}
|
||||
10
apps/worker-generation/src/worker.ts
Normal file
10
apps/worker-generation/src/worker.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { Worker } from "bullmq";
|
||||
import type { GenerationWorkerJobData } from "../../../packages/db/src/json-store.js";
|
||||
import { GenerationProcessor } from "./processor.js";
|
||||
import { createRedisConnection } from "./connection.js";
|
||||
const defaultGenerationQueueName = "generation.default";
|
||||
export function createGenerationWorker() {
|
||||
const processor = new GenerationProcessor();
|
||||
const connection = createRedisConnection();
|
||||
return new Worker<GenerationWorkerJobData>(process.env.GENERATION_QUEUE_NAME ?? defaultGenerationQueueName, (job) => processor.process(job), { connection, concurrency: Number(process.env.GENERATION_WORKER_CONCURRENCY ?? 2) });
|
||||
}
|
||||
5
apps/worker-generation/tsconfig.json
Normal file
5
apps/worker-generation/tsconfig.json
Normal file
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": { "outDir": "dist", "types": ["node"] },
|
||||
"include": ["src/**/*.ts"]
|
||||
}
|
||||
Reference in New Issue
Block a user