This commit is contained in:
Executable
+994
@@ -0,0 +1,994 @@
|
||||
/**
|
||||
* Feeder Service - Senior Level Implementation
|
||||
* Main orchestration service for historical data scanning
|
||||
*/
|
||||
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { FeederScraperService } from './feeder-scraper.service';
|
||||
import { FeederTransformerService } from './feeder-transformer.service';
|
||||
import { FeederPersistenceService } from './feeder-persistence.service';
|
||||
import {
|
||||
Sport,
|
||||
MatchSummary,
|
||||
Competition,
|
||||
LivescoresApiResponse,
|
||||
TransformedPlayer,
|
||||
MatchParticipation,
|
||||
ProcessResult,
|
||||
BasketballPlayerStats,
|
||||
BasketballTeamStats,
|
||||
TransformedMatchStats,
|
||||
MatchOfficial,
|
||||
ParsedMatchHeader,
|
||||
ParsedMarket,
|
||||
DbEventPayload,
|
||||
DbMarketPayload,
|
||||
} from './feeder.types';
|
||||
|
||||
interface ProcessDateOptions {
|
||||
onlyCompletedMatches?: boolean;
|
||||
refreshExistingMatches?: boolean;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class FeederService {
|
||||
private readonly logger = new Logger(FeederService.name);
|
||||
|
||||
// Configuration - Adjust these based on rate limiting behavior
|
||||
private readonly CONCURRENCY_LIMIT = 20; // Increased for maximum speed on EC2
|
||||
private readonly REQUEST_DELAY_MS = 50; // Minimal delay to respect basics
|
||||
private readonly HISTORICAL_START_DATE = '2023-06-01'; // 2 years of data
|
||||
private readonly SPORTS: Sport[] = ['football', 'basketball'];
|
||||
private readonly MAX_RETRIES = 50;
|
||||
private readonly DAILY_SYNC_TIME_ZONE = 'Europe/Istanbul';
|
||||
|
||||
constructor(
|
||||
private readonly scraperService: FeederScraperService,
|
||||
private readonly transformerService: FeederTransformerService,
|
||||
private readonly persistenceService: FeederPersistenceService,
|
||||
) {}
|
||||
|
||||
// ============================================
|
||||
// DELAY HELPER
|
||||
// ============================================
|
||||
private delay(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
private getYesterdayDateString(timeZone: string): string {
|
||||
const formatter = new Intl.DateTimeFormat('en-CA', {
|
||||
timeZone,
|
||||
year: 'numeric',
|
||||
month: '2-digit',
|
||||
day: '2-digit',
|
||||
});
|
||||
const parts = formatter.formatToParts(new Date());
|
||||
const year = Number(parts.find((part) => part.type === 'year')?.value);
|
||||
const month = Number(parts.find((part) => part.type === 'month')?.value);
|
||||
const day = Number(parts.find((part) => part.type === 'day')?.value);
|
||||
|
||||
const tzMidnightUtc = new Date(Date.UTC(year, month - 1, day));
|
||||
tzMidnightUtc.setUTCDate(tzMidnightUtc.getUTCDate() - 1);
|
||||
|
||||
return tzMidnightUtc.toISOString().split('T')[0];
|
||||
}
|
||||
|
||||
private getTimeZoneOffsetMs(date: Date, timeZone: string): number {
|
||||
const formatter = new Intl.DateTimeFormat('en-US', {
|
||||
timeZone,
|
||||
timeZoneName: 'shortOffset',
|
||||
});
|
||||
const offsetLabel =
|
||||
formatter.formatToParts(date).find((part) => part.type === 'timeZoneName')
|
||||
?.value || 'GMT+0';
|
||||
|
||||
const match = offsetLabel.match(/GMT([+-])(\d{1,2})(?::?(\d{2}))?/);
|
||||
if (!match) return 0;
|
||||
|
||||
const sign = match[1] === '-' ? -1 : 1;
|
||||
const hours = Number(match[2] || '0');
|
||||
const minutes = Number(match[3] || '0');
|
||||
|
||||
return sign * (hours * 60 + minutes) * 60 * 1000;
|
||||
}
|
||||
|
||||
private getDayBoundsForTimeZone(
|
||||
dateString: string,
|
||||
timeZone: string,
|
||||
): { startTs: number; endTs: number } {
|
||||
const [year, month, day] = dateString.split('-').map(Number);
|
||||
const startGuess = new Date(Date.UTC(year, month - 1, day, 0, 0, 0));
|
||||
const nextDayGuess = new Date(
|
||||
Date.UTC(year, month - 1, day + 1, 0, 0, 0),
|
||||
);
|
||||
|
||||
const startOffsetMs = this.getTimeZoneOffsetMs(startGuess, timeZone);
|
||||
const nextDayOffsetMs = this.getTimeZoneOffsetMs(nextDayGuess, timeZone);
|
||||
|
||||
const startMs =
|
||||
Date.UTC(year, month - 1, day, 0, 0, 0) - startOffsetMs;
|
||||
const nextDayStartMs =
|
||||
Date.UTC(year, month - 1, day + 1, 0, 0, 0) - nextDayOffsetMs;
|
||||
|
||||
return {
|
||||
startTs: Math.floor(startMs / 1000),
|
||||
endTs: Math.floor((nextDayStartMs - 1) / 1000),
|
||||
};
|
||||
}
|
||||
|
||||
private parseScoreValue(value: unknown): number | null {
|
||||
if (value === null || value === undefined || value === '') return null;
|
||||
const parsed = Number(value);
|
||||
return Number.isFinite(parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
private isCompletedMatchSummary(match: MatchSummary): boolean {
|
||||
if (match.statusBoxContent === 'ERT') return false;
|
||||
|
||||
const normalizedState = String(match.state || '')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
const normalizedStatus = String(match.status || '')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
const normalizedSubstate = String(match.substate || '')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
|
||||
if (['postgame', 'post'].includes(normalizedState)) return true;
|
||||
|
||||
if (
|
||||
['played', 'finished', 'ft', 'afterpenalties', 'penalties'].includes(
|
||||
normalizedStatus,
|
||||
)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (['postgame', 'post', 'played', 'finished', 'ft'].includes(normalizedSubstate)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const homeScore = this.parseScoreValue(
|
||||
match.score?.home ?? match.homeScore,
|
||||
);
|
||||
const awayScore = this.parseScoreValue(
|
||||
match.score?.away ?? match.awayScore,
|
||||
);
|
||||
|
||||
return homeScore !== null && awayScore !== null;
|
||||
}
|
||||
|
||||
async runPreviousDayCompletedMatchesScan(
|
||||
sports: Sport[] = this.SPORTS,
|
||||
targetDateStr: string = this.getYesterdayDateString(
|
||||
this.DAILY_SYNC_TIME_ZONE,
|
||||
),
|
||||
targetLeagueIds: string[] = [],
|
||||
): Promise<void> {
|
||||
this.logger.log(
|
||||
`🗓️ STARTING DAILY COMPLETED MATCH SYNC [Date: ${targetDateStr}] [Sports: ${sports.join(', ')}] ${targetLeagueIds.length > 0 ? `[Filter: ${targetLeagueIds.length} leagues]` : ''}`,
|
||||
);
|
||||
|
||||
for (const sport of sports) {
|
||||
await this.processDate(targetDateStr, sport, targetLeagueIds, {
|
||||
onlyCompletedMatches: true,
|
||||
refreshExistingMatches: true,
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`✅ DAILY COMPLETED MATCH SYNC FINISHED [Date: ${targetDateStr}]`,
|
||||
);
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// MAIN HISTORICAL SCAN
|
||||
// ============================================
|
||||
async runHistoricalScan(
|
||||
sports: Sport[] = this.SPORTS,
|
||||
startDateStr: string = this.HISTORICAL_START_DATE,
|
||||
targetLeagueIds: string[] = [], // NEW: Optional league filter
|
||||
): Promise<void> {
|
||||
this.logger.log(
|
||||
`🚀 STARTING HISTORICAL SCAN [Target: ${sports.join(', ')}] ${targetLeagueIds.length > 0 ? `[Filter: ${targetLeagueIds.length} leagues]` : ''}`,
|
||||
);
|
||||
|
||||
const startDate = new Date(startDateStr);
|
||||
const endDate = new Date();
|
||||
// Start from 2 days ago to avoid overlap with live_matches table.
|
||||
// Cron jobs (data-fetcher.task.ts) handle today and yesterday,
|
||||
// writing to live_matches. Historical scan should only fill matches table.
|
||||
endDate.setDate(endDate.getDate() - 2);
|
||||
|
||||
const stateKey = `historical_scan_state_${sports.join('_')}${targetLeagueIds.length > 0 ? '_filtered' : ''}_desc`;
|
||||
let currentDate: Date | null = null;
|
||||
|
||||
// Resume from saved state
|
||||
try {
|
||||
const savedState = await this.persistenceService.getState(stateKey);
|
||||
if (savedState) {
|
||||
const resumeDate = new Date(savedState);
|
||||
// Ensure resumeDate is valid for reverse scan (<= endDate and >= startDate)
|
||||
if (resumeDate <= endDate && resumeDate >= startDate) {
|
||||
currentDate = new Date(resumeDate);
|
||||
// For reverse scan, we resume from the *next* day backwards, i.e., resumeDate - 1 day
|
||||
currentDate.setDate(currentDate.getDate() - 1);
|
||||
this.logger.log(
|
||||
`📍 Resuming from: ${currentDate.toISOString().split('T')[0]}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
this.logger.warn('Could not read state, starting from beginning');
|
||||
}
|
||||
|
||||
// Initialize currentDate to endDate if not resuming (or if resume failed)
|
||||
// Note: If resuming, currentDate is already set above.
|
||||
// If not resuming, we start from endDate (Today) and go backwards.
|
||||
if (!currentDate) {
|
||||
currentDate = new Date(endDate);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`📊 Scanning (Reverse): ${currentDate.toISOString().split('T')[0]} ← ${startDate.toISOString().split('T')[0]}`,
|
||||
);
|
||||
|
||||
let processedDays = 0;
|
||||
const scanStartTime = Date.now();
|
||||
|
||||
// REVERSE LOOP: Iterate while currentDate is greater than or equal to startDate
|
||||
while (currentDate >= startDate) {
|
||||
const dateString = currentDate.toISOString().split('T')[0];
|
||||
|
||||
for (const sport of sports) {
|
||||
await this.processDate(dateString, sport, targetLeagueIds);
|
||||
}
|
||||
|
||||
// Save state
|
||||
await this.persistenceService.setState(stateKey, dateString);
|
||||
|
||||
// --- ETA CALCULATION ---
|
||||
processedDays++;
|
||||
const now = Date.now();
|
||||
const totalElapsed = now - scanStartTime;
|
||||
const avgTimePerDay = totalElapsed / processedDays;
|
||||
|
||||
// Calculate remaining days based on current position for REVERSE scan
|
||||
// Days left = (currentDate - startDate)
|
||||
const daysLeft = Math.ceil(
|
||||
(currentDate.getTime() - startDate.getTime()) / (1000 * 60 * 60 * 24),
|
||||
);
|
||||
|
||||
const estimatedRemainingMs = avgTimePerDay * daysLeft;
|
||||
|
||||
// Format time helper
|
||||
const formatDuration = (ms: number) => {
|
||||
const seconds = Math.floor((ms / 1000) % 60);
|
||||
const minutes = Math.floor((ms / (1000 * 60)) % 60);
|
||||
const hours = Math.floor(ms / (1000 * 60 * 60));
|
||||
return `${hours}h ${minutes}m ${seconds}s`;
|
||||
};
|
||||
|
||||
this.logger.log(
|
||||
`⏱️ PROGRESS: [${processedDays} days done] | Avg/Day: ${(avgTimePerDay / 1000).toFixed(1)}s | Remaining: ${daysLeft} days | 🏁 ETA: ${formatDuration(estimatedRemainingMs)}`,
|
||||
);
|
||||
|
||||
// Decrement date for reverse scan
|
||||
currentDate.setDate(currentDate.getDate() - 1);
|
||||
}
|
||||
|
||||
this.logger.log('🎉 HISTORICAL SCAN COMPLETED');
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// PROCESS SINGLE DATE
|
||||
// ============================================
|
||||
private async processDate(
|
||||
dateString: string,
|
||||
sport: Sport,
|
||||
targetLeagueIds: string[] = [],
|
||||
options: ProcessDateOptions = {},
|
||||
): Promise<void> {
|
||||
const { onlyCompletedMatches = false, refreshExistingMatches = false } =
|
||||
options;
|
||||
this.logger.log(`[${sport}] 📅 Processing: ${dateString}`);
|
||||
|
||||
try {
|
||||
// Fetch historical source snapshot for the date with retry.
|
||||
// The upstream endpoint is named "livescores", but this path is used
|
||||
// strictly as a historical source and filtered by mstUtc below.
|
||||
let response: LivescoresApiResponse | null = null;
|
||||
for (let i = 0; i < 3; i++) {
|
||||
try {
|
||||
response = await this.scraperService.fetchLivescores(
|
||||
dateString,
|
||||
sport,
|
||||
);
|
||||
break; // Success, exit loop
|
||||
} catch (e: any) {
|
||||
const is502 =
|
||||
e.message?.includes('502') ||
|
||||
e.response?.status === 502 ||
|
||||
e.message?.includes('Bad Gateway');
|
||||
|
||||
if (is502 && i < 2) {
|
||||
this.logger.warn(
|
||||
`[${sport}] [${dateString}] Historical source fetch returned 502. Retrying in 5s...`,
|
||||
);
|
||||
await this.delay(5000);
|
||||
continue;
|
||||
}
|
||||
throw e; // Rethrow if not 502 or retries exhausted
|
||||
}
|
||||
}
|
||||
const data = response?.data;
|
||||
|
||||
if (!data?.matches || !data?.competitions) {
|
||||
this.logger.warn(`[${sport}] [${dateString}] No data from API`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Filter matches with iddaa code and deduplicate
|
||||
const rawMatches = Object.values(
|
||||
data.matches,
|
||||
) as unknown as MatchSummary[];
|
||||
|
||||
const allMatches = rawMatches.filter((m) => m.iddaaCode);
|
||||
|
||||
// CRITICAL FIX: Filter matches by actual match date (mstUtc).
|
||||
// Mackolik's historical source endpoint can still return current live/upcoming matches
|
||||
// regardless of the matchDate query parameter. We must filter by mstUtc
|
||||
// to ensure we only process matches that actually belong to the target date.
|
||||
const { startTs: targetDateStartTs, endTs: targetDateEndTs } =
|
||||
this.getDayBoundsForTimeZone(
|
||||
dateString,
|
||||
this.DAILY_SYNC_TIME_ZONE,
|
||||
);
|
||||
|
||||
const dateFilteredMatches = allMatches.filter((m) => {
|
||||
const matchTs = m.mstUtc;
|
||||
return matchTs >= targetDateStartTs && matchTs <= targetDateEndTs;
|
||||
});
|
||||
|
||||
const apiReturnedCount = allMatches.length;
|
||||
const afterDateFilterCount = dateFilteredMatches.length;
|
||||
|
||||
if (apiReturnedCount > 0 && afterDateFilterCount === 0) {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] Historical source returned ${apiReturnedCount} matches, but none belong to the target date after mstUtc filtering. Skipping.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (afterDateFilterCount < apiReturnedCount) {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] Filtered out ${apiReturnedCount - afterDateFilterCount} off-date rows from historical source payload before processing.`,
|
||||
);
|
||||
}
|
||||
|
||||
let matchesToProcess = Array.from(
|
||||
new Map(dateFilteredMatches.map((m) => [m.id, m])).values(),
|
||||
);
|
||||
|
||||
if (targetLeagueIds.length > 0) {
|
||||
matchesToProcess = matchesToProcess.filter((m) =>
|
||||
targetLeagueIds.includes(m.competitionId),
|
||||
);
|
||||
}
|
||||
|
||||
if (onlyCompletedMatches) {
|
||||
const beforeCompletedFilter = matchesToProcess.length;
|
||||
matchesToProcess = matchesToProcess.filter((m) =>
|
||||
this.isCompletedMatchSummary(m),
|
||||
);
|
||||
|
||||
if (
|
||||
beforeCompletedFilter > 0 &&
|
||||
matchesToProcess.length < beforeCompletedFilter
|
||||
) {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] Filtered out ${beforeCompletedFilter - matchesToProcess.length} non-completed matches from daily sync payload.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Check if any matches came from source
|
||||
if (matchesToProcess.length === 0) {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] No iddaa matches found in source`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. Filter out already existing matches to skip processing
|
||||
const allIds = matchesToProcess.map((m) => m.id);
|
||||
const existingIds =
|
||||
await this.persistenceService.getExistingMatchIds(allIds);
|
||||
const totalCount = matchesToProcess.length;
|
||||
|
||||
if (!refreshExistingMatches && existingIds.length > 0) {
|
||||
matchesToProcess = matchesToProcess.filter(
|
||||
(m) => !existingIds.includes(m.id),
|
||||
);
|
||||
}
|
||||
|
||||
if (matchesToProcess.length === 0) {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] All ${totalCount} matches already exist. Skipping...`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (refreshExistingMatches) {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] Refreshing ${matchesToProcess.length} completed matches (${existingIds.length} already existed in matches)`,
|
||||
);
|
||||
} else {
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] Processing ${matchesToProcess.length}/${totalCount} matches (Skipped ${existingIds.length} existing)`,
|
||||
);
|
||||
}
|
||||
|
||||
let successCount = 0;
|
||||
const failedMatches: MatchSummary[] = [];
|
||||
|
||||
// 1. SEQUENTIAL PROCESSING (Robust Mode)
|
||||
// Processes matches one by one to avoid 502 errors
|
||||
let sequentialCount = 0;
|
||||
for (const match of matchesToProcess) {
|
||||
sequentialCount++;
|
||||
|
||||
// Batch pause: Wait for ~5 matches worth of time every 10 matches
|
||||
if (sequentialCount > 1 && sequentialCount % 10 === 0) {
|
||||
this.logger.log(
|
||||
`[${sport}] ⏸️ Processed 10 matches, pausing for cooldown...`,
|
||||
);
|
||||
await this.delay(4000); // Wait 2s (approx 5 * 400ms)
|
||||
}
|
||||
|
||||
await this.delay(300); // 300ms delay between individual matches
|
||||
try {
|
||||
const result = await this.processSingleMatch(
|
||||
match,
|
||||
data.competitions,
|
||||
sport,
|
||||
refreshExistingMatches,
|
||||
);
|
||||
|
||||
if (result.success) {
|
||||
this.logger.log(
|
||||
`[${sport}] ✅ successful for ${match.id} ${match.homeTeam.name} vs ${match.awayTeam.name}`,
|
||||
);
|
||||
successCount++;
|
||||
} else if (result.retryable) {
|
||||
this.logger.log(
|
||||
`[${sport}] ⚠️ retryable for ${match.id} ${match.homeTeam.name} vs ${match.awayTeam.name}`,
|
||||
);
|
||||
failedMatches.push(match);
|
||||
}
|
||||
} catch (e: any) {
|
||||
this.logger.warn(
|
||||
`[${sport}] Sequential error for ${match.id}: ${e.message}`,
|
||||
);
|
||||
failedMatches.push(match);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. SEQUENTIAL RETRY FOR FAILED (502) MATCHES
|
||||
if (failedMatches.length > 0) {
|
||||
this.logger.log(
|
||||
`[${sport}] ⚠️ Retrying ${failedMatches.length} failed matches sequentially...`,
|
||||
);
|
||||
|
||||
for (const match of failedMatches) {
|
||||
await this.delay(2000); // Longer delay for retries
|
||||
try {
|
||||
const result = await this.processSingleMatch(
|
||||
match,
|
||||
data.competitions,
|
||||
sport,
|
||||
refreshExistingMatches,
|
||||
);
|
||||
if (result.success) {
|
||||
successCount++;
|
||||
this.logger.log(`[${sport}] ✅ Retry successful for ${match.id}`);
|
||||
} else {
|
||||
this.logger.warn(`[${sport}] ❌ Retry failed for ${match.id}`);
|
||||
}
|
||||
} catch (e: any) {
|
||||
this.logger.warn(
|
||||
`[${sport}] ❌ Retry exception for ${match.id}: ${e.message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.logger.log(
|
||||
`[${sport}] [${dateString}] ✓ Saved ${successCount} matches`,
|
||||
);
|
||||
} catch (error: any) {
|
||||
this.logger.error(
|
||||
`[${sport}] [${dateString}] ❌ Failed: ${error.message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// REFRESH SINGLE MATCH (On-demand)
|
||||
// ============================================
|
||||
async refreshMatch(
|
||||
matchId: string,
|
||||
scope: 'all' | 'lineups' | 'odds' = 'all',
|
||||
): Promise<ProcessResult> {
|
||||
this.logger.log(`🔄 Refreshing match (${scope}) for ${matchId}`);
|
||||
|
||||
const matchRecord = await this.persistenceService.getMatch(matchId);
|
||||
if (!matchRecord) {
|
||||
this.logger.warn(`[${matchId}] Refresh failed: Match not in DB`);
|
||||
return { success: false, retryable: false, error: 'Match not found' };
|
||||
}
|
||||
|
||||
// Construct MatchSummary from DB record
|
||||
const summary: MatchSummary = {
|
||||
id: matchId,
|
||||
matchName: matchRecord.matchName,
|
||||
matchSlug: matchRecord.matchSlug,
|
||||
competitionId: matchRecord.leagueId,
|
||||
mstUtc: Number(matchRecord.mstUtc),
|
||||
iddaaCode: matchRecord.iddaaCode,
|
||||
homeTeam: {
|
||||
id: matchRecord.homeTeamId,
|
||||
name: matchRecord.homeTeam?.name || '',
|
||||
slug: matchRecord.homeTeam?.slug || '',
|
||||
},
|
||||
awayTeam: {
|
||||
id: matchRecord.awayTeamId,
|
||||
name: matchRecord.awayTeam?.name || '',
|
||||
slug: matchRecord.awayTeam?.slug || '',
|
||||
},
|
||||
score: {
|
||||
home: matchRecord.scoreHome,
|
||||
away: matchRecord.scoreAway,
|
||||
},
|
||||
};
|
||||
|
||||
const dummyCompetitions: Record<string, Competition> = {
|
||||
[summary.competitionId]: {
|
||||
id: summary.competitionId,
|
||||
name: 'Unknown',
|
||||
competitionSlug: '',
|
||||
country: { id: '', name: '' },
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
return await this.processSingleMatch(
|
||||
summary,
|
||||
dummyCompetitions,
|
||||
matchRecord.sport as Sport,
|
||||
true, // FORCE UPDATE
|
||||
scope,
|
||||
);
|
||||
} catch (error: any) {
|
||||
this.logger.error(`[${matchId}] Refresh exception: ${error.message}`);
|
||||
return { success: false, retryable: true, error: error.message };
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// PROCESS SINGLE MATCH
|
||||
// ============================================
|
||||
private async processSingleMatch(
|
||||
matchSummary: MatchSummary,
|
||||
competitions: Record<string, Competition>,
|
||||
sport: Sport,
|
||||
force: boolean = false,
|
||||
scope: 'all' | 'lineups' | 'odds' = 'all', // Add scope flag
|
||||
): Promise<ProcessResult> {
|
||||
const matchId = matchSummary.id;
|
||||
const homeTeamId = matchSummary.homeTeam?.id;
|
||||
const awayTeamId = matchSummary.awayTeam?.id;
|
||||
|
||||
if (!matchId || !homeTeamId || !awayTeamId) {
|
||||
this.logger.warn(`[${matchId}] Skipped: Missing IDs`);
|
||||
return { success: false, retryable: false };
|
||||
}
|
||||
|
||||
// Skip postponed matches (ERT = Erteledendi)
|
||||
if (matchSummary.statusBoxContent === 'ERT') {
|
||||
this.logger.debug(`[${matchId}] Skipped: Postponed match (ERT)`);
|
||||
return { success: false, retryable: false };
|
||||
}
|
||||
|
||||
// Track critical errors (502) to trigger retry even if save succeeds
|
||||
let hasCriticalError = false;
|
||||
|
||||
// Helper for resilient fetching with internal retry
|
||||
const fetchResilient = async <T>(
|
||||
label: string,
|
||||
fn: () => Promise<T>,
|
||||
retries = 3,
|
||||
baseDelayMs = 1000,
|
||||
): Promise<T | null> => {
|
||||
for (let i = 0; i < retries; i++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (e: any) {
|
||||
const is502 =
|
||||
e.message?.includes('502') ||
|
||||
e.response?.status === 502 ||
|
||||
e.message?.includes('Bad Gateway');
|
||||
|
||||
if (i === retries - 1) throw e; // Last attempt failed
|
||||
|
||||
if (is502) {
|
||||
// Exponential backoff: 1s, 2s, 3s
|
||||
const waitTime = baseDelayMs * (i + 1);
|
||||
// this.logger.debug(
|
||||
// `[${matchId}] ${label} failed (502). Retrying in ${waitTime}ms...`,
|
||||
// );
|
||||
await this.delay(waitTime);
|
||||
continue;
|
||||
}
|
||||
throw e; // Non-502 error, fail immediately
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
try {
|
||||
// Check if exists
|
||||
if (!force) {
|
||||
// Skip exist check if force is true
|
||||
const exists = await this.persistenceService.matchExists(matchId);
|
||||
if (exists) {
|
||||
return { success: true, retryable: false };
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`[${matchId}] Processing (${scope}): ${matchSummary.matchName}`,
|
||||
);
|
||||
|
||||
const league = competitions[matchSummary.competitionId];
|
||||
const playersMap = new Map<string, TransformedPlayer>();
|
||||
const participationData: MatchParticipation[] = [];
|
||||
let eventData: DbEventPayload[] = [];
|
||||
let stats: TransformedMatchStats | null = null;
|
||||
let basketballTeamStats: BasketballTeamStats | null = null;
|
||||
const basketballPlayerStats: Partial<BasketballPlayerStats>[] = [];
|
||||
let officialsData: MatchOfficial[] = [];
|
||||
|
||||
// 1. Fetch Match Header (score, status)
|
||||
let headerData: ParsedMatchHeader | null = null;
|
||||
if (scope === 'all') {
|
||||
try {
|
||||
headerData = await fetchResilient('Header', () =>
|
||||
this.scraperService.fetchMatchHeader(matchId),
|
||||
);
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Header fetch failed: ${e.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Sport-specific data fetching
|
||||
if (sport === 'basketball') {
|
||||
// Basketball: Box Score (Always if all or lineups)
|
||||
if (scope === 'all' || scope === 'lineups') {
|
||||
try {
|
||||
const boxData = await fetchResilient('BoxScore', () =>
|
||||
this.scraperService.fetchBasketballBoxScore(matchId),
|
||||
);
|
||||
if (boxData) {
|
||||
const homeParsed = this.scraperService.parseBasketballBoxScore(
|
||||
boxData.views?.home?.html || '',
|
||||
);
|
||||
const awayParsed = this.scraperService.parseBasketballBoxScore(
|
||||
boxData.views?.away?.html || '',
|
||||
);
|
||||
|
||||
basketballTeamStats =
|
||||
scope === 'all'
|
||||
? {
|
||||
home: homeParsed.teamTotals,
|
||||
away: awayParsed.teamTotals,
|
||||
}
|
||||
: null;
|
||||
|
||||
if (scope === 'all') {
|
||||
try {
|
||||
const details = await fetchResilient('QuarterScores', () =>
|
||||
this.scraperService.fetchBasketballDetailsHeader(matchId),
|
||||
);
|
||||
if (details && basketballTeamStats) {
|
||||
basketballTeamStats.home = {
|
||||
...basketballTeamStats.home,
|
||||
...details.home,
|
||||
};
|
||||
basketballTeamStats.away = {
|
||||
...basketballTeamStats.away,
|
||||
...details.away,
|
||||
};
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(
|
||||
`[${matchId}] Quarter scores fetch failed: ${e.message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Process players (always do if lineups or all)
|
||||
const processPlayers = (
|
||||
parsed: typeof homeParsed,
|
||||
teamId: string,
|
||||
) => {
|
||||
parsed.players.forEach((p) => {
|
||||
if (p.name) {
|
||||
// Use extracted ID if available, otherwise generate one
|
||||
const id =
|
||||
p.id ||
|
||||
this.transformerService.generateBasketballPlayerId(
|
||||
teamId,
|
||||
p.name,
|
||||
);
|
||||
basketballPlayerStats.push({ ...p, id, teamId });
|
||||
playersMap.set(id, {
|
||||
id,
|
||||
name: p.name,
|
||||
slug: id,
|
||||
teamId,
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
processPlayers(homeParsed, homeTeamId);
|
||||
processPlayers(awayParsed, awayTeamId);
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Box score failed: ${e.message}`);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Football: Events, Lineups, Stats, Officials
|
||||
|
||||
// Key Events
|
||||
if (scope === 'all') {
|
||||
try {
|
||||
const eventsData = await fetchResilient('Events', () =>
|
||||
this.scraperService.fetchKeyEvents(matchId),
|
||||
);
|
||||
if (eventsData?.keyEvents) {
|
||||
const transformedEvents =
|
||||
this.transformerService.transformKeyEvents(
|
||||
eventsData.keyEvents,
|
||||
homeTeamId,
|
||||
awayTeamId,
|
||||
matchId,
|
||||
);
|
||||
|
||||
this.transformerService.extractPlayersFromEvents(
|
||||
transformedEvents,
|
||||
playersMap,
|
||||
);
|
||||
|
||||
eventData =
|
||||
this.transformerService.prepareEventDataForDb(
|
||||
transformedEvents,
|
||||
);
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Events failed: ${e.message}`);
|
||||
}
|
||||
|
||||
await this.delay(300);
|
||||
}
|
||||
|
||||
// Starting Formation & Substitutes (Always for lineups or all)
|
||||
// V20 OPTIMIZATION: Disabled to speed up feeder and reduce 502 errors.
|
||||
// We only use Team Stats for V20 model.
|
||||
/*
|
||||
if (scope === 'all' || scope === 'lineups') {
|
||||
// Starting Formation
|
||||
try {
|
||||
const formationData =
|
||||
await this.scraperService.fetchStartingFormation(matchId);
|
||||
if (formationData?.stats) {
|
||||
this.transformerService.processLineup(
|
||||
formationData.stats.home || [],
|
||||
homeTeamId,
|
||||
true,
|
||||
matchId,
|
||||
playersMap,
|
||||
participationData,
|
||||
);
|
||||
this.transformerService.processLineup(
|
||||
formationData.stats.away || [],
|
||||
awayTeamId,
|
||||
true,
|
||||
matchId,
|
||||
playersMap,
|
||||
participationData,
|
||||
);
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Formation failed: ${e.message}`);
|
||||
}
|
||||
|
||||
// Substitutes
|
||||
try {
|
||||
const subsData =
|
||||
await this.scraperService.fetchSubstitutions(matchId);
|
||||
if (subsData?.stats) {
|
||||
this.transformerService.processLineup(
|
||||
subsData.stats.home || [],
|
||||
homeTeamId,
|
||||
false,
|
||||
matchId,
|
||||
playersMap,
|
||||
participationData,
|
||||
);
|
||||
this.transformerService.processLineup(
|
||||
subsData.stats.away || [],
|
||||
awayTeamId,
|
||||
false,
|
||||
matchId,
|
||||
playersMap,
|
||||
participationData,
|
||||
);
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Subs failed: ${e.message}`);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Game Stats & Officials
|
||||
if (scope === 'all') {
|
||||
try {
|
||||
const gameStats = await fetchResilient('Stats', () =>
|
||||
this.scraperService.fetchGameStats(matchId),
|
||||
);
|
||||
stats = this.transformerService.transformGameStats(gameStats);
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Stats failed: ${e.message}`);
|
||||
}
|
||||
|
||||
// Officials (from match page)
|
||||
try {
|
||||
const matchPageHtml = await fetchResilient('Officials', () =>
|
||||
this.scraperService.fetchMatchPage(
|
||||
matchId,
|
||||
matchSummary.matchSlug,
|
||||
sport,
|
||||
),
|
||||
);
|
||||
if (matchPageHtml) {
|
||||
officialsData =
|
||||
this.transformerService.parseOfficials(matchPageHtml);
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Officials failed: ${e.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Fetch Iddaa Odds (Always if all or odds)
|
||||
let oddsArray: DbMarketPayload[] = [];
|
||||
if (scope === 'all' || scope === 'odds') {
|
||||
try {
|
||||
let markets: ParsedMarket[] = [];
|
||||
if (sport === 'basketball') {
|
||||
markets =
|
||||
((await fetchResilient('BucketOdds', () =>
|
||||
this.scraperService.fetchBasketballMarkets(matchId),
|
||||
)) as ParsedMarket[]) || [];
|
||||
} else {
|
||||
markets =
|
||||
((await fetchResilient('IddaaOdds', () =>
|
||||
this.scraperService.fetchIddaaMarkets(matchId),
|
||||
)) as ParsedMarket[]) || [];
|
||||
}
|
||||
// Logic is same since structure is ParsedMarket[]
|
||||
oddsArray = this.transformerService.transformIddaaMarkets(markets);
|
||||
} catch (e: any) {
|
||||
if (e.message?.includes('502')) hasCriticalError = true;
|
||||
this.logger.warn(`[${matchId}] Odds failed: ${e.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Persist to Database
|
||||
let saved = false;
|
||||
if (scope === 'lineups') {
|
||||
saved = await this.persistenceService.saveLineups(
|
||||
matchId,
|
||||
playersMap,
|
||||
participationData,
|
||||
homeTeamId,
|
||||
awayTeamId,
|
||||
);
|
||||
} else if (scope === 'odds') {
|
||||
saved = await this.persistenceService.saveOdds(matchId, oddsArray);
|
||||
} else {
|
||||
// Full Update
|
||||
saved = await this.persistenceService.saveMatch(
|
||||
sport,
|
||||
matchId,
|
||||
matchSummary,
|
||||
league,
|
||||
homeTeamId,
|
||||
awayTeamId,
|
||||
headerData,
|
||||
playersMap,
|
||||
participationData,
|
||||
eventData,
|
||||
stats,
|
||||
basketballTeamStats,
|
||||
basketballPlayerStats,
|
||||
oddsArray,
|
||||
officialsData,
|
||||
);
|
||||
}
|
||||
|
||||
// === AI FEATURE CALCULATION (V17 - DEPRECATED) ===
|
||||
// Bu servis V17 modeli içindi. V20 Modeli tamamen Python (ai-engine) tarafında çalışmaktadır.
|
||||
// Gereksiz kaynak tüketmemesi için devre dışı bırakıldı.
|
||||
/*
|
||||
if (saved) {
|
||||
try {
|
||||
// Fire and forget - don't block the feeder
|
||||
this.aiFeatureStoreService
|
||||
.calculateAndSaveFeatures(matchId)
|
||||
.catch((err) => {
|
||||
this.logger.warn(
|
||||
`[${matchId}] AI Feature calculation failed: ${err.message}`,
|
||||
);
|
||||
});
|
||||
} catch (e) {
|
||||
// Safety catch
|
||||
}
|
||||
}
|
||||
*/
|
||||
// ==========================================
|
||||
|
||||
if (saved && hasCriticalError) {
|
||||
// Collect missing components
|
||||
const missingParts: string[] = [];
|
||||
if (!stats) missingParts.push('Stats');
|
||||
if (oddsArray.length === 0) missingParts.push('Odds');
|
||||
if (officialsData.length === 0) missingParts.push('Officials');
|
||||
|
||||
this.logger.warn(
|
||||
`[${matchId}] Saved with MISSING DATA (502). Missing: [${missingParts.join(', ')}]. Scheduled for retry.`,
|
||||
);
|
||||
return { success: false, retryable: true };
|
||||
}
|
||||
|
||||
return { success: saved, retryable: !saved };
|
||||
} catch (error: any) {
|
||||
const isRetryable =
|
||||
error.message.includes('502') ||
|
||||
error.message.includes('504') ||
|
||||
error.message.includes('ECONNABORTED') ||
|
||||
error.message.includes('timeout') ||
|
||||
error.message.includes('ETIMEDOUT') ||
|
||||
error.message.includes('Unique constraint'); // Concurrency retry
|
||||
|
||||
if (isRetryable) {
|
||||
this.logger.warn(`[${matchId}] ${error.message} - Will retry`);
|
||||
} else {
|
||||
this.logger.error(`[${matchId}] ${error.message} - Not retryable`);
|
||||
}
|
||||
|
||||
return { success: false, retryable: isRetryable };
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user