81 lines
2.2 KiB
TypeScript
81 lines
2.2 KiB
TypeScript
import { Injectable, Logger } from "@nestjs/common";
|
|
import { Prisma } from "@prisma/client";
|
|
import { PrismaService } from "../database/prisma.service";
|
|
|
|
@Injectable()
|
|
export class TaskLockService {
|
|
private readonly logger = new Logger(TaskLockService.name);
|
|
private readonly activeTasks = new Set<string>();
|
|
|
|
constructor(private readonly prisma: PrismaService) {}
|
|
|
|
async runWithLease<T>(
|
|
key: string,
|
|
ttlMs: number,
|
|
task: () => Promise<T>,
|
|
logger: Logger,
|
|
): Promise<T | null> {
|
|
if (this.activeTasks.has(key)) {
|
|
logger.warn(`Skipping ${key}: task is already running in this process`);
|
|
return null;
|
|
}
|
|
|
|
const owner = `${process.pid}-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`;
|
|
const acquired = await this.acquireLease(key, owner, ttlMs);
|
|
|
|
if (!acquired) {
|
|
logger.warn(`Skipping ${key}: lease is already held by another instance`);
|
|
return null;
|
|
}
|
|
|
|
this.activeTasks.add(key);
|
|
|
|
try {
|
|
return await task();
|
|
} finally {
|
|
this.activeTasks.delete(key);
|
|
await this.releaseLease(key, owner);
|
|
}
|
|
}
|
|
|
|
private async acquireLease(
|
|
key: string,
|
|
owner: string,
|
|
ttlMs: number,
|
|
): Promise<boolean> {
|
|
const rows = await this.prisma.$queryRaw<{ key: string }[]>(
|
|
Prisma.sql`
|
|
INSERT INTO app_settings (key, value, updated_at)
|
|
VALUES (${this.getDbKey(key)}, ${owner}, NOW() + (${ttlMs} * INTERVAL '1 millisecond'))
|
|
ON CONFLICT (key) DO UPDATE
|
|
SET value = EXCLUDED.value,
|
|
updated_at = EXCLUDED.updated_at
|
|
WHERE app_settings.updated_at < NOW()
|
|
OR app_settings.value = ${owner}
|
|
RETURNING key
|
|
`,
|
|
);
|
|
|
|
return rows.length > 0;
|
|
}
|
|
|
|
private async releaseLease(key: string, owner: string): Promise<void> {
|
|
try {
|
|
await this.prisma.$executeRaw(
|
|
Prisma.sql`
|
|
DELETE FROM app_settings
|
|
WHERE key = ${this.getDbKey(key)}
|
|
AND value = ${owner}
|
|
`,
|
|
);
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
this.logger.warn(`Failed to release task lease ${key}: ${message}`);
|
|
}
|
|
}
|
|
|
|
private getDbKey(key: string): string {
|
|
return `task_lock:${key}`;
|
|
}
|
|
}
|