This commit is contained in:
2026-04-19 13:23:00 +03:00
parent e4c74025e5
commit 1346924387
25 changed files with 1639 additions and 1076 deletions
+177 -98
View File
@@ -1,4 +1,4 @@
import { Injectable, Logger } from "@nestjs/common";
import { Injectable, Logger } from "@nestjs/common";
import { Cron } from "@nestjs/schedule";
import { HttpService } from "@nestjs/axios";
import { PrismaService } from "../database/prisma.service";
@@ -8,10 +8,22 @@ import * as fs from "fs";
import * as path from "path";
import { Prisma } from "@prisma/client";
import { SidelinedResponse } from "../modules/feeder/feeder.types";
import {
FINISHED_STATE_VALUES_FOR_DB,
FINISHED_STATUS_VALUES_FOR_DB,
LIVE_STATE_VALUES_FOR_DB,
LIVE_STATUS_VALUES_FOR_DB,
} from "../common/utils/match-status.util";
import {
getDateStringInTimeZone,
getDayBoundsForTimeZone,
getShiftedDateStringInTimeZone,
} from "../common/utils/timezone.util";
import { TaskLockService } from "./task-lock.service";
// ────────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────────
// Types
// ────────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────────
interface LiveScoreTeamPayload {
id: string;
@@ -64,75 +76,119 @@ interface LiveLineupsJson {
type SportType = "football" | "basketball";
// ────────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────────
// Service
// ────────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────────
@Injectable()
export class DataFetcherTask {
private readonly logger = new Logger(DataFetcherTask.name);
private readonly timeZone = "Europe/Istanbul";
constructor(
private readonly httpService: HttpService,
private readonly prisma: PrismaService,
private readonly scraper: FeederScraperService,
private readonly taskLock: TaskLockService,
) {}
// ────────────────────────────────────────────────────────────
// CRON 1: Main sync every 15 minutes
// Phases: match list live scores odds lineups
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// CRON 1: Main sync — every 15 minutes
// Phases: match list → live scores → odds → lineups
// ────────────────────────────────────────────────────────────
@Cron("*/15 * * * *")
async syncLiveMatches(): Promise<void> {
if (this.shouldSkipInHistoricalMode("syncLiveMatches")) return;
this.logger.log("━━━ syncLiveMatches START ━━━");
const today = new Date().toISOString().split("T")[0];
// Phase 1: Match list (football + basketball)
await this.syncMatchList(today);
// Phase 2: Live score updates
await this.updateLiveScores();
// Phase 3: Odds + referee + lineups + sidelined (via processMatchOdds)
await this.fetchOddsForMatches();
// Phase 4: Fill missing lineups (backup for edge cases)
await this.fillMissingLineups();
this.logger.log("━━━ syncLiveMatches END ━━━");
await this.taskLock.runWithLease(
"syncLiveMatches",
30 * 60 * 1000,
async () => {
await this.runLiveSync();
},
this.logger,
);
}
// ────────────────────────────────────────────────────────────
// CRON 2: Daily cleanup + full sync 07:00 Istanbul
// Truncates live_matches, then runs full sync
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// CRON 2: Daily cleanup + full sync — 07:00 Istanbul
// Preserve yesterday as a fallback until the 08:00 archive job completes.
// ────────────────────────────────────────────────────────────
@Cron("0 7 * * *", { timeZone: "Europe/Istanbul" })
async cleanAndFullSync(): Promise<void> {
if (this.shouldSkipInHistoricalMode("cleanAndFullSync")) return;
this.logger.log("🧹 cleanAndFullSync: Truncating live_matches...");
await this.taskLock.runWithLease(
"cleanAndFullSync",
2 * 60 * 60 * 1000,
async () => {
this.logger.log(
"cleanAndFullSync: Pruning stale live_matches while preserving yesterday for archive fallback...",
);
try {
const deleted = await this.prisma.liveMatch.deleteMany({});
this.logger.log(
`🧹 Deleted ${deleted.count} live matches. Starting full sync...`,
);
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Truncate failed: ${message}`);
return;
}
try {
const yesterdayDate = getShiftedDateStringInTimeZone(
-1,
this.timeZone,
);
const { startMs: yesterdayStartMs } = getDayBoundsForTimeZone(
yesterdayDate,
this.timeZone,
);
const cutoffDate = new Date(yesterdayStartMs);
// Run full sync immediately after cleanup
await this.syncLiveMatches();
const deleted = await this.prisma.liveMatch.deleteMany({
where: {
OR: [
{ mstUtc: { lt: BigInt(yesterdayStartMs) } },
{
AND: [
{ mstUtc: null },
{ updatedAt: { lt: cutoffDate } },
{
OR: [
{ status: { in: FINISHED_STATUS_VALUES_FOR_DB } },
{ state: { in: FINISHED_STATE_VALUES_FOR_DB } },
],
},
],
},
],
},
});
this.logger.log(
`Pruned ${deleted.count} stale live matches. Starting full sync...`,
);
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Stale live_match cleanup failed: ${message}`);
return;
}
await this.runLiveSync();
},
this.logger,
);
}
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// Phase 1: Fetch match list for all sports
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
private async runLiveSync(): Promise<void> {
if (this.shouldSkipInHistoricalMode("syncLiveMatches")) return;
this.logger.log("syncLiveMatches START");
const today = getDateStringInTimeZone(new Date(), this.timeZone);
await this.syncMatchList(today);
await this.updateLiveScores();
await this.fetchOddsForMatches();
await this.fillMissingLineups();
this.logger.log("syncLiveMatches END");
}
private async syncMatchList(date: string): Promise<void> {
// Football
@@ -141,7 +197,7 @@ export class DataFetcherTask {
await this.fetchMatchesForSport("football", date, footballLeagues);
} else {
this.logger.warn(
"top_leagues.json is missing/empty writing ALL football matches",
"top_leagues.json is missing/empty — writing ALL football matches",
);
await this.fetchMatchesForSport("football", date, new Set());
}
@@ -170,17 +226,18 @@ export class DataFetcherTask {
}
}
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// Phase 2: Live score updates (merged from live-updater.task)
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
private async updateLiveScores(): Promise<void> {
try {
const liveMatches = await this.prisma.liveMatch.findMany({
where: {
state: {
in: ["live", "firsthalf", "secondhalf", "1H", "2H", "HT", "LIVE"],
},
OR: [
{ state: { in: LIVE_STATE_VALUES_FOR_DB } },
{ status: { in: LIVE_STATUS_VALUES_FOR_DB } },
],
},
select: { id: true, matchSlug: true },
});
@@ -191,7 +248,7 @@ export class DataFetcherTask {
}
this.logger.log(
`📡 Updating scores for ${liveMatches.length} live matches`,
`📡 Updating scores for ${liveMatches.length} live matches`,
);
for (const match of liveMatches) {
@@ -219,19 +276,19 @@ export class DataFetcherTask {
}
}
this.logger.log("📡 Live score update complete");
this.logger.log("📡 Live score update complete");
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Live score update failed: ${message}`);
}
}
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// Phase 3: Odds + referee + lineups + sidelined
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
private async fetchOddsForMatches(): Promise<void> {
this.logger.log("💰 Fetching odds for live matches...");
this.logger.log("💰 Fetching odds for live matches...");
try {
// Load both league filters
@@ -266,11 +323,11 @@ export class DataFetcherTask {
});
if (matchesToFetch.length === 0) {
this.logger.log("💰 No matches to fetch odds for");
this.logger.log("💰 No matches to fetch odds for");
return;
}
this.logger.log(`💰 Fetching odds for ${matchesToFetch.length} matches`);
this.logger.log(`💰 Fetching odds for ${matchesToFetch.length} matches`);
let successCount = 0;
let errorCount = 0;
@@ -299,7 +356,7 @@ export class DataFetcherTask {
// Retry failed matches (502/Timeout)
if (failedMatches.length > 0) {
this.logger.warn(
`⚠️ Retrying ${failedMatches.length} failed matches (502/Timeout)...`,
`⚠️ Retrying ${failedMatches.length} failed matches (502/Timeout)...`,
);
for (const match of failedMatches) {
@@ -307,19 +364,19 @@ export class DataFetcherTask {
try {
await this.processMatchOdds(match);
successCount++;
this.logger.log(` Retry successful for match ${match.id}`);
this.logger.log(`✅ Retry successful for match ${match.id}`);
} catch (retryErr: unknown) {
const message =
retryErr instanceof Error ? retryErr.message : String(retryErr);
this.logger.error(
` Retry failed for match ${match.id}: ${message}`,
`❌ Retry failed for match ${match.id}: ${message}`,
);
}
}
}
this.logger.log(
`💰 Odds complete: ${successCount} success, ${errorCount} errors (initially)`,
`💰 Odds complete: ${successCount} success, ${errorCount} errors (initially)`,
);
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
@@ -327,14 +384,36 @@ export class DataFetcherTask {
}
}
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// Phase 4: Fill missing lineups (backup)
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
private async fillMissingLineups(): Promise<void> {
try {
const matchesToUpdate = await this.prisma.liveMatch.findMany({
where: { status: { notIn: ["FT", "post", "postGame"] } },
where: {
sport: "football",
NOT: {
OR: [
{ status: { in: FINISHED_STATUS_VALUES_FOR_DB } },
{ state: { in: FINISHED_STATE_VALUES_FOR_DB } },
{
AND: [
{ scoreHome: { not: null } },
{ scoreAway: { not: null } },
{
NOT: {
OR: [
{ status: { in: LIVE_STATUS_VALUES_FOR_DB } },
{ state: { in: LIVE_STATE_VALUES_FOR_DB } },
],
},
},
],
},
],
},
},
select: { id: true, matchSlug: true, lineups: true, sport: true },
take: 30,
});
@@ -345,11 +424,11 @@ export class DataFetcherTask {
);
if (toUpdate.length === 0) {
this.logger.debug("👕 All lineups already filled");
this.logger.debug("👕 All lineups already filled");
return;
}
this.logger.log(`👕 Filling lineups for ${toUpdate.length} matches...`);
this.logger.log(`👕 Filling lineups for ${toUpdate.length} matches...`);
for (const match of toUpdate) {
try {
@@ -374,7 +453,7 @@ export class DataFetcherTask {
},
});
this.logger.log(`👕 Lineups filled for match ${match.id}`);
this.logger.log(`👕 Lineups filled for match ${match.id}`);
await this.delay(500);
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
@@ -387,9 +466,9 @@ export class DataFetcherTask {
}
}
// ────────────────────────────────────────────────────────────
// Unified match fetcher DRY for football + basketball
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// Unified match fetcher — DRY for football + basketball
// ────────────────────────────────────────────────────────────
private async fetchMatchesForSport(
sport: SportType,
@@ -650,7 +729,7 @@ export class DataFetcherTask {
upsertCount + skippedCount === targetMatches.length
) {
this.logger.log(
`[${sport}] Progress: ${upsertCount + skippedCount}/${targetMatches.length} (Saved: ${upsertCount}, Skipped: ${skippedCount})`,
`[${sport}] ⏳ Progress: ${upsertCount + skippedCount}/${targetMatches.length} (Saved: ${upsertCount}, Skipped: ${skippedCount})`,
);
}
} catch (err: unknown) {
@@ -668,10 +747,10 @@ export class DataFetcherTask {
}
}
// ────────────────────────────────────────────────────────────
// processMatchOdds odds + referee + lineups + sidelined
// (Preserved from original no logic changes)
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// processMatchOdds — odds + referee + lineups + sidelined
// (Preserved from original — no logic changes)
// ────────────────────────────────────────────────────────────
private async processMatchOdds(match: LiveMatchOddsTarget): Promise<void> {
const matchSlug = match.matchSlug || "match";
@@ -687,7 +766,7 @@ export class DataFetcherTask {
let lineups: LiveLineupsJson | null = null;
let sidelined: SidelinedResponse | null = null;
// 1. Fetch Odds from İddaa page
// 1. Fetch Odds from İddaa page
const oddsUrl = `https://www.mackolik.com/${sportPath}/${matchSlug}/iddaa/${match.id}`;
try {
const response = await firstValueFrom(
@@ -722,7 +801,7 @@ export class DataFetcherTask {
typeof mainResp.data === "string" ? mainResp.data : "",
);
} catch {
// Non-critical referee is optional
// Non-critical — referee is optional
}
}
@@ -751,7 +830,7 @@ export class DataFetcherTask {
subs: substitutions?.stats?.away || [],
},
};
this.logger.log(`👥 Lineups found for ${match.matchName}`);
this.logger.log(`👥 Lineups found for ${match.matchName}`);
} else {
this.logger.debug(`No lineups (yet) for ${match.matchName}`);
}
@@ -779,7 +858,7 @@ export class DataFetcherTask {
sidelined.awayTeam?.totalSidelined > 0
) {
this.logger.log(
`🚑 Sidelined: ${sidelined.homeTeam.totalSidelined}(H) - ${sidelined.awayTeam.totalSidelined}(A) for ${match.matchName}`,
`🚑 Sidelined: ${sidelined.homeTeam.totalSidelined}(H) - ${sidelined.awayTeam.totalSidelined}(A) for ${match.matchName}`,
);
}
}
@@ -813,22 +892,22 @@ export class DataFetcherTask {
sidelined.awayTeam.totalSidelined > 0))
) {
this.logger.log(
` Loop update: ${match.matchName} | Odds: ${Object.keys(odds).length} | Ref: ${refereeName || "N/A"} | Lineups: ${lineups ? "Yes" : "No"} | Sidelined: ${sidelined ? "Yes" : "No"}`,
`✅ Loop update: ${match.matchName} | Odds: ${Object.keys(odds).length} | Ref: ${refereeName || "N/A"} | Lineups: ${lineups ? "Yes" : "No"} | Sidelined: ${sidelined ? "Yes" : "No"}`,
);
} else {
this.logger.debug(
` No detailed data for ${match.matchName}, marked check.`,
`❕ No detailed data for ${match.matchName}, marked check.`,
);
}
}
// ────────────────────────────────────────────────────────────
// HTML Extraction Helpers (preserved no logic changes)
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// HTML Extraction Helpers (preserved — no logic changes)
// ────────────────────────────────────────────────────────────
/**
* Extract odds from Mackolik HTML page
* Returns structured odds object: { "MS": {"1": 2.10, "X": 3.40}, "AU25": {"Alt": 2.05, "Üst": 1.75} }
* Returns structured odds object: { "MS": {"1": 2.10, "X": 3.40}, "AU25": {"Alt": 2.05, "Üst": 1.75} }
*/
private extractOddsFromHtml(
html: string,
@@ -914,17 +993,17 @@ export class DataFetcherTask {
const lower = name.toLowerCase();
// Specific & Compound names FIRST
if (lower.includes("ilk yarı/maç sonucu")) return "HTFT";
if (lower.includes("1. yarı sonucu")) return "HT";
if (lower.includes("çifte şans")) return "CS";
if (lower.includes("ilk yarı/maç sonucu")) return "HTFT";
if (lower.includes("1. yarı sonucu")) return "HT";
if (lower.includes("çifte şans")) return "CS";
// General names LATER
if (lower.includes("maç sonucu") && !lower.includes("handikap"))
if (lower.includes("maç sonucu") && !lower.includes("handikap"))
return "MS";
if (lower.includes("karşılıklı gol")) return "KG";
if (lower.includes("2,5 alt/üst") || lower.includes("2.5")) return "AU25";
if (lower.includes("1,5 alt/üst") || lower.includes("1.5")) return "AU15";
if (lower.includes("3,5 alt/üst") || lower.includes("3.5")) return "AU35";
if (lower.includes("karşılıklı gol")) return "KG";
if (lower.includes("2,5 alt/üst") || lower.includes("2.5")) return "AU25";
if (lower.includes("1,5 alt/üst") || lower.includes("1.5")) return "AU15";
if (lower.includes("3,5 alt/üst") || lower.includes("3.5")) return "AU35";
return null;
}
@@ -934,7 +1013,7 @@ export class DataFetcherTask {
*/
private extractRefereeFromHtml(html: string): string | null {
try {
// Strategy 1: Mackolik officials section head referee in '--main' list item
// Strategy 1: Mackolik officials section — head referee in '--main' list item
const mainOfficialPattern =
/official-list-item--main[^>]*>\s*(?:<[^>]*>\s*)*?<span[^>]*official-name[^>]*>\s*([^<]+)/i;
const mainMatch = mainOfficialPattern.exec(html);
@@ -970,9 +1049,9 @@ export class DataFetcherTask {
return null;
}
// ────────────────────────────────────────────────────────────
// Low-level Helpers (preserved no logic changes)
// ────────────────────────────────────────────────────────────
// ────────────────────────────────────────────────────────────
// Low-level Helpers (preserved — no logic changes)
// ────────────────────────────────────────────────────────────
private shouldSkipInHistoricalMode(jobName: string): boolean {
if (process.env.FEEDER_MODE === "historical") {
+23 -12
View File
@@ -1,12 +1,16 @@
import { Injectable, Logger } from "@nestjs/common";
import { Cron } from "@nestjs/schedule";
import { FeederService } from "../modules/feeder/feeder.service";
import { TaskLockService } from "./task-lock.service";
@Injectable()
export class HistoricalResultsSyncTask {
private readonly logger = new Logger(HistoricalResultsSyncTask.name);
constructor(private readonly feederService: FeederService) {}
constructor(
private readonly feederService: FeederService,
private readonly taskLock: TaskLockService,
) {}
private shouldSkipInHistoricalMode(jobName: string): boolean {
if (process.env.FEEDER_MODE === "historical") {
@@ -25,17 +29,24 @@ export class HistoricalResultsSyncTask {
return;
}
this.logger.log(
"Starting previous-day completed match sync for football and basketball...",
);
await this.taskLock.runWithLease(
"syncPreviousDayCompletedMatches",
6 * 60 * 60 * 1000,
async () => {
this.logger.log(
"Starting previous-day completed match sync for football and basketball...",
);
try {
await this.feederService.runPreviousDayCompletedMatchesScan();
this.logger.log("Previous-day completed match sync finished");
} catch (error: any) {
this.logger.error(
`Previous-day completed match sync failed: ${error.message}`,
);
}
try {
await this.feederService.runPreviousDayCompletedMatchesScan();
this.logger.log("Previous-day completed match sync finished");
} catch (error: any) {
this.logger.error(
`Previous-day completed match sync failed: ${error.message}`,
);
}
},
this.logger,
);
}
}
+126 -70
View File
@@ -1,12 +1,28 @@
import { Injectable, Logger } from "@nestjs/common";
import { Cron } from "@nestjs/schedule";
import { PrismaService } from "../database/prisma.service";
import {
FINISHED_STATE_VALUES_FOR_DB,
FINISHED_STATUS_VALUES_FOR_DB,
LIVE_STATE_VALUES_FOR_DB,
LIVE_STATUS_VALUES_FOR_DB,
} from "../common/utils/match-status.util";
import {
getDateOnlyValueForTimeZone,
getShiftedDateStringInTimeZone,
getDayBoundsForTimeZone,
} from "../common/utils/timezone.util";
import { TaskLockService } from "./task-lock.service";
@Injectable()
export class LimitResetterTask {
private readonly logger = new Logger(LimitResetterTask.name);
private readonly timeZone = "Europe/Istanbul";
constructor(private readonly prisma: PrismaService) {}
constructor(
private readonly prisma: PrismaService,
private readonly taskLock: TaskLockService,
) {}
private shouldSkipInHistoricalMode(jobName: string): boolean {
if (process.env.FEEDER_MODE === "historical") {
@@ -22,34 +38,39 @@ export class LimitResetterTask {
@Cron("0 3 * * *", { timeZone: "Europe/Istanbul" })
async resetUsageLimits() {
if (this.shouldSkipInHistoricalMode("resetUsageLimits")) return;
this.logger.log("Starting daily usage limit reset job...");
await this.taskLock.runWithLease(
"resetUsageLimits",
30 * 60 * 1000,
async () => {
this.logger.log("Starting daily usage limit reset job...");
try {
const today = new Date();
today.setHours(0, 0, 0, 0);
try {
const today = getDateOnlyValueForTimeZone(this.timeZone);
// Reset all limits that were last reset before today
const result = await this.prisma.usageLimit.updateMany({
where: {
lastResetDate: { lt: today },
},
data: {
analysisCount: 0,
couponCount: 0,
lastResetDate: today,
},
});
const result = await this.prisma.usageLimit.updateMany({
where: {
lastResetDate: { lt: today },
},
data: {
analysisCount: 0,
couponCount: 0,
lastResetDate: today,
},
});
if (result.count > 0) {
this.logger.log(
`Usage limits for ${result.count} users have been reset`,
);
} else {
this.logger.log("No user limits needed resetting");
}
} catch (error: any) {
this.logger.error(`Limit reset job failed: ${error.message}`);
}
if (result.count > 0) {
this.logger.log(
`Usage limits for ${result.count} users have been reset`,
);
} else {
this.logger.log("No user limits needed resetting");
}
} catch (error: any) {
this.logger.error(`Limit reset job failed: ${error.message}`);
}
},
this.logger,
);
}
/**
@@ -58,37 +79,65 @@ export class LimitResetterTask {
@Cron("0 4 * * *", { timeZone: "Europe/Istanbul" })
async cleanupOldData() {
if (this.shouldSkipInHistoricalMode("cleanupOldData")) return;
this.logger.log("Starting data cleanup job...");
await this.taskLock.runWithLease(
"cleanupOldData",
60 * 60 * 1000,
async () => {
this.logger.log("Starting data cleanup job...");
try {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
try {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
// Delete old AI prediction logs
const deletedLogs = await this.prisma.aiPredictionsLog.deleteMany({
where: {
createdAt: { lt: thirtyDaysAgo },
},
});
const deletedLogs = await this.prisma.aiPredictionsLog.deleteMany({
where: {
createdAt: { lt: thirtyDaysAgo },
},
});
// Delete old live matches (finished more than 1 day ago)
// Historical data is already persisted in the 'matches' table
const oneDayAgo = new Date();
oneDayAgo.setDate(oneDayAgo.getDate() - 1);
const yesterdayDate = getShiftedDateStringInTimeZone(
-1,
this.timeZone,
);
const { startMs: yesterdayStartMs } = getDayBoundsForTimeZone(
yesterdayDate,
this.timeZone,
);
const liveMatchCutoff = new Date(yesterdayStartMs);
const deletedLiveMatches = await this.prisma.liveMatch.deleteMany({
where: {
state: "Finished",
updatedAt: { lt: oneDayAgo },
},
});
const deletedLiveMatches = await this.prisma.liveMatch.deleteMany({
where: {
updatedAt: { lt: liveMatchCutoff },
OR: [
{ status: { in: FINISHED_STATUS_VALUES_FOR_DB } },
{ state: { in: FINISHED_STATE_VALUES_FOR_DB } },
{
AND: [
{ scoreHome: { not: null } },
{ scoreAway: { not: null } },
{
NOT: {
OR: [
{ status: { in: LIVE_STATUS_VALUES_FOR_DB } },
{ state: { in: LIVE_STATE_VALUES_FOR_DB } },
],
},
},
],
},
],
},
});
this.logger.log(
`Cleanup complete: ${deletedLogs.count} old logs, ${deletedLiveMatches.count} old live matches`,
);
} catch (error: any) {
this.logger.error(`Cleanup job failed: ${error.message}`);
}
this.logger.log(
`Cleanup complete: ${deletedLogs.count} old logs, ${deletedLiveMatches.count} old live matches`,
);
} catch (error: any) {
this.logger.error(`Cleanup job failed: ${error.message}`);
}
},
this.logger,
);
}
/**
@@ -97,26 +146,33 @@ export class LimitResetterTask {
@Cron("0 0 * * *", { timeZone: "Europe/Istanbul" })
async checkSubscriptions() {
if (this.shouldSkipInHistoricalMode("checkSubscriptions")) return;
this.logger.log("Checking expired subscriptions...");
await this.taskLock.runWithLease(
"checkSubscriptions",
30 * 60 * 1000,
async () => {
this.logger.log("Checking expired subscriptions...");
try {
const now = new Date();
try {
const now = new Date();
const result = await this.prisma.user.updateMany({
where: {
subscriptionStatus: "active",
subscriptionExpiresAt: { lt: now },
},
data: {
subscriptionStatus: "expired",
},
});
const result = await this.prisma.user.updateMany({
where: {
subscriptionStatus: "active",
subscriptionExpiresAt: { lt: now },
},
data: {
subscriptionStatus: "expired",
},
});
if (result.count > 0) {
this.logger.log(`${result.count} subscriptions marked as expired`);
}
} catch (error: any) {
this.logger.error(`Subscription check failed: ${error.message}`);
}
if (result.count > 0) {
this.logger.log(`${result.count} subscriptions marked as expired`);
}
} catch (error: any) {
this.logger.error(`Subscription check failed: ${error.message}`);
}
},
this.logger,
);
}
}
+80
View File
@@ -0,0 +1,80 @@
import { Injectable, Logger } from "@nestjs/common";
import { Prisma } from "@prisma/client";
import { PrismaService } from "../database/prisma.service";
@Injectable()
export class TaskLockService {
private readonly logger = new Logger(TaskLockService.name);
private readonly activeTasks = new Set<string>();
constructor(private readonly prisma: PrismaService) {}
async runWithLease<T>(
key: string,
ttlMs: number,
task: () => Promise<T>,
logger: Logger,
): Promise<T | null> {
if (this.activeTasks.has(key)) {
logger.warn(`Skipping ${key}: task is already running in this process`);
return null;
}
const owner = `${process.pid}-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`;
const acquired = await this.acquireLease(key, owner, ttlMs);
if (!acquired) {
logger.warn(`Skipping ${key}: lease is already held by another instance`);
return null;
}
this.activeTasks.add(key);
try {
return await task();
} finally {
this.activeTasks.delete(key);
await this.releaseLease(key, owner);
}
}
private async acquireLease(
key: string,
owner: string,
ttlMs: number,
): Promise<boolean> {
const rows = await this.prisma.$queryRaw<{ key: string }[]>(
Prisma.sql`
INSERT INTO app_settings (key, value, updated_at)
VALUES (${this.getDbKey(key)}, ${owner}, NOW() + (${ttlMs} * INTERVAL '1 millisecond'))
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at
WHERE app_settings.updated_at < NOW()
OR app_settings.value = ${owner}
RETURNING key
`,
);
return rows.length > 0;
}
private async releaseLease(key: string, owner: string): Promise<void> {
try {
await this.prisma.$executeRaw(
Prisma.sql`
DELETE FROM app_settings
WHERE key = ${this.getDbKey(key)}
AND value = ${owner}
`,
);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Failed to release task lease ${key}: ${message}`);
}
}
private getDbKey(key: string): string {
return `task_lock:${key}`;
}
}
+7 -3
View File
@@ -1,15 +1,14 @@
import { Module } from "@nestjs/common";
import { ScheduleModule } from "@nestjs/schedule";
import { HttpModule } from "@nestjs/axios";
import { DataFetcherTask } from "./data-fetcher.task";
import { HistoricalResultsSyncTask } from "./historical-results-sync.task";
import { LimitResetterTask } from "./limit-resetter.task";
import { TaskLockService } from "./task-lock.service";
import { DatabaseModule } from "../database/database.module";
import { FeederModule } from "../modules/feeder/feeder.module";
@Module({
imports: [
ScheduleModule.forRoot(),
HttpModule.register({
timeout: 30000,
headers: {
@@ -20,7 +19,12 @@ import { FeederModule } from "../modules/feeder/feeder.module";
DatabaseModule,
FeederModule,
],
providers: [DataFetcherTask, HistoricalResultsSyncTask, LimitResetterTask],
providers: [
TaskLockService,
DataFetcherTask,
HistoricalResultsSyncTask,
LimitResetterTask,
],
exports: [DataFetcherTask, HistoricalResultsSyncTask, LimitResetterTask],
})
export class TasksModule {}