+108
-21
@@ -23,9 +23,8 @@ import {
|
||||
import { TaskLockService } from "./task-lock.service";
|
||||
import { FeederService } from "../modules/feeder/feeder.service";
|
||||
|
||||
// ────────────────────────────────────────────────────────────────
|
||||
|
||||
// Types
|
||||
// ────────────────────────────────────────────────────────────────
|
||||
|
||||
interface LiveScoreTeamPayload {
|
||||
id: string;
|
||||
@@ -93,9 +92,9 @@ interface PendingPredictionRunForSettlement {
|
||||
|
||||
type SportType = "football" | "basketball";
|
||||
|
||||
// ────────────────────────────────────────────────────────────────
|
||||
|
||||
// Service
|
||||
// ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@Injectable()
|
||||
export class DataFetcherTask {
|
||||
@@ -108,7 +107,7 @@ export class DataFetcherTask {
|
||||
private readonly scraper: FeederScraperService,
|
||||
private readonly taskLock: TaskLockService,
|
||||
private readonly feeder: FeederService,
|
||||
) {}
|
||||
) { }
|
||||
|
||||
// ────────────────────────────────────────────────────────────
|
||||
// CRON 1: Main sync — every 15 minutes
|
||||
@@ -200,10 +199,20 @@ export class DataFetcherTask {
|
||||
|
||||
this.logger.log("syncLiveMatches START");
|
||||
|
||||
// 4-day forward window: pull today .. +3 days so upcoming matches enter
|
||||
// live_matches early and their odds are refreshed every cycle. That rolling
|
||||
// refresh is what lets the odds-movement / steam monitor (and forward CLV)
|
||||
// see a real opening→closing range. Finished/live matches are already
|
||||
// excluded from odds re-fetch in fetchOddsForMatches(), so closed-match
|
||||
// odds and their ranges are never re-pulled.
|
||||
const SYNC_DAYS_AHEAD = 4;
|
||||
const today = getDateStringInTimeZone(new Date(), this.timeZone);
|
||||
const tomorrow = getShiftedDateStringInTimeZone(1, this.timeZone);
|
||||
await this.syncMatchList(today);
|
||||
await this.syncMatchList(tomorrow);
|
||||
for (let dayOffset = 1; dayOffset < SYNC_DAYS_AHEAD; dayOffset++) {
|
||||
await this.syncMatchList(
|
||||
getShiftedDateStringInTimeZone(dayOffset, this.timeZone),
|
||||
);
|
||||
}
|
||||
await this.updateLiveScores();
|
||||
await this.archiveNewlyFinishedMatches(today);
|
||||
await this.settlePredictionRuns();
|
||||
@@ -324,13 +333,13 @@ export class DataFetcherTask {
|
||||
const scoreAway = matchData.awayScore ?? null;
|
||||
const htScoreHome = this.asInt(
|
||||
matchData.score?.ht?.home ??
|
||||
matchData.htHomeScore ??
|
||||
matchData.homeHtScore,
|
||||
matchData.htHomeScore ??
|
||||
matchData.homeHtScore,
|
||||
);
|
||||
const htScoreAway = this.asInt(
|
||||
matchData.score?.ht?.away ??
|
||||
matchData.htAwayScore ??
|
||||
matchData.awayHtScore,
|
||||
matchData.htAwayScore ??
|
||||
matchData.awayHtScore,
|
||||
);
|
||||
const storedStatus = deriveStoredMatchStatus({
|
||||
state: matchData.state,
|
||||
@@ -893,9 +902,9 @@ export class DataFetcherTask {
|
||||
]);
|
||||
const sidelined = match.matchSlug
|
||||
? await this.scraper.fetchSidelinedPlayers(
|
||||
match.id,
|
||||
match.matchSlug,
|
||||
)
|
||||
match.id,
|
||||
match.matchSlug,
|
||||
)
|
||||
: null;
|
||||
|
||||
// Normalize to same home.xi/away.xi format used by processMatchOdds
|
||||
@@ -969,9 +978,9 @@ export class DataFetcherTask {
|
||||
const targetMatches =
|
||||
topLeagueIds.size > 0
|
||||
? allMatches.filter(
|
||||
(m) =>
|
||||
!!m.competitionId && topLeagueIds.has(String(m.competitionId)),
|
||||
)
|
||||
(m) =>
|
||||
!!m.competitionId && topLeagueIds.has(String(m.competitionId)),
|
||||
)
|
||||
: allMatches;
|
||||
|
||||
if (targetMatches.length === 0) {
|
||||
@@ -1167,7 +1176,7 @@ export class DataFetcherTask {
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
})
|
||||
.catch(() => {});
|
||||
.catch(() => { });
|
||||
this.logger.debug(
|
||||
`[${sport}] Marked as POSTPONED: ${match.matchName}`,
|
||||
);
|
||||
@@ -1240,6 +1249,77 @@ export class DataFetcherTask {
|
||||
// (Preserved from original — no logic changes)
|
||||
// ────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Persist live odds movement into the structured odd_selections / odds_history
|
||||
* tables (the live odds elsewhere are only a JSON blob on live_matches). This
|
||||
* is what makes opening→closing ranges and steam queryable in SQL. Mirrors the
|
||||
* historical feeder's change-detection (feeder-persistence.service.ts) but for
|
||||
* the live HTML odds format. Change-only writes; ALL markets in the payload
|
||||
* are tracked (so anomalies/steam on any bet type are captured).
|
||||
*/
|
||||
private async persistOddsHistory(
|
||||
matchId: string,
|
||||
odds: Record<string, Record<string, number>>,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const cats = await this.prisma.oddCategory.findMany({
|
||||
where: { matchId },
|
||||
include: { selections: true },
|
||||
});
|
||||
for (const [catName, sels] of Object.entries(odds)) {
|
||||
let category = cats.find((c) => c.name === catName);
|
||||
if (!category) {
|
||||
category = await this.prisma.oddCategory.create({
|
||||
data: { matchId, name: catName },
|
||||
include: { selections: true },
|
||||
});
|
||||
cats.push(category);
|
||||
}
|
||||
for (const [selName, value] of Object.entries(sels)) {
|
||||
const sVal = String(value);
|
||||
if (!selName || !sVal || !(value > 0)) continue;
|
||||
const existing = category.selections.find((s) => s.name === selName);
|
||||
if (existing) {
|
||||
if (existing.oddValue !== sVal) {
|
||||
const oldVal = parseFloat(existing.oddValue || "0");
|
||||
const newVal = parseFloat(sVal);
|
||||
if (!isNaN(oldVal) && !isNaN(newVal) && oldVal > 0) {
|
||||
await this.prisma.oddsHistory.create({
|
||||
data: {
|
||||
selectionId: existing.dbId,
|
||||
matchId,
|
||||
previousValue: oldVal,
|
||||
newValue: newVal,
|
||||
},
|
||||
});
|
||||
}
|
||||
await this.prisma.oddSelection.update({
|
||||
where: { dbId: existing.dbId },
|
||||
data: { oddValue: sVal },
|
||||
});
|
||||
existing.oddValue = sVal;
|
||||
}
|
||||
} else {
|
||||
const created = await this.prisma.oddSelection.create({
|
||||
data: {
|
||||
categoryId: category.dbId,
|
||||
name: selName,
|
||||
oddValue: sVal,
|
||||
openingValue: sVal,
|
||||
},
|
||||
});
|
||||
category.selections.push(created);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this.logger.debug(
|
||||
`odds_history persist skipped for ${matchId}: ${message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async processMatchOdds(match: LiveMatchOddsTarget): Promise<void> {
|
||||
const matchSlug = match.matchSlug || "match";
|
||||
const sport = String(match.sport || "football").toLowerCase();
|
||||
@@ -1408,6 +1488,13 @@ export class DataFetcherTask {
|
||||
},
|
||||
});
|
||||
|
||||
// Fill the structured odds_history table from this pre-match odds refresh,
|
||||
// so opening→closing ranges & steam are captured in the DB (not just the
|
||||
// live_matches JSON blob). Runs only for pre-match matches reached here.
|
||||
if (Object.keys(odds).length > 0) {
|
||||
await this.persistOddsHistory(match.id, odds);
|
||||
}
|
||||
|
||||
if (
|
||||
Object.keys(odds).length > 0 ||
|
||||
refereeName ||
|
||||
@@ -1651,9 +1738,9 @@ export class DataFetcherTask {
|
||||
|
||||
const score = this.isRecord(value.score)
|
||||
? {
|
||||
home: this.asInt(value.score.home),
|
||||
away: this.asInt(value.score.away),
|
||||
}
|
||||
home: this.asInt(value.score.home),
|
||||
away: this.asInt(value.score.away),
|
||||
}
|
||||
: null;
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user