using System.Text.Json;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using SaasMediaWorker.Configuration;
using SaasMediaWorker.Models;
namespace SaasMediaWorker.Services;
///
/// Redis List'ten BRPOP ile video üretim job'larını consume eden Background Service.
/// NestJS'in VideoGenerationProducer'ı tarafından LPUSH ile eklenen job'ları işler.
///
/// Neden BRPOP?
/// - Atomic: Job'ı listeden alır ve hemen siler — duplicate processing olmaz
/// - Blocking: Boş kuyrukta CPU cycle harcamaz (polling'den üstün)
/// - FIFO: LPUSH + BRPOP = en eski job önce işlenir
///
public class QueueConsumerService : BackgroundService
{
private readonly ILogger _logger;
private readonly RedisSettings _redisSettings;
private readonly WorkerSettings _workerSettings;
private readonly VideoRenderPipeline _pipeline;
private readonly DatabaseService _dbService;
private IConnectionMultiplexer? _redis;
private readonly SemaphoreSlim _concurrencySemaphore;
public QueueConsumerService(
ILogger logger,
IOptions redisSettings,
IOptions workerSettings,
VideoRenderPipeline pipeline,
DatabaseService dbService)
{
_logger = logger;
_redisSettings = redisSettings.Value;
_workerSettings = workerSettings.Value;
_pipeline = pipeline;
_dbService = dbService;
_concurrencySemaphore = new SemaphoreSlim(_workerSettings.MaxConcurrency);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"🎬 Queue Consumer başlatıldı — Kuyruk: {Queue}, Eşzamanlılık: {Concurrency}",
_redisSettings.QueueKey, _workerSettings.MaxConcurrency);
await ConnectToRedis(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await _concurrencySemaphore.WaitAsync(stoppingToken);
var db = _redis!.GetDatabase();
// BRPOP — blocking pop, 5 saniye timeout
var result = await db.ListRightPopAsync(new RedisKey(_redisSettings.QueueKey));
if (result.IsNull)
{
_concurrencySemaphore.Release();
await Task.Delay(
TimeSpan.FromSeconds(_workerSettings.PollIntervalSeconds),
stoppingToken);
continue;
}
// Job'ı deserialize et
var jobJson = result.ToString();
var job = JsonSerializer.Deserialize(jobJson);
if (job == null || string.IsNullOrEmpty(job.ProjectId))
{
_logger.LogWarning("Geçersiz job payload atlandı: {Payload}",
jobJson[..Math.Min(200, jobJson.Length)]);
_concurrencySemaphore.Release();
continue;
}
_logger.LogInformation(
"📥 Job alındı — Project: {ProjectId}, RenderJob: {RenderJobId}",
job.ProjectId, job.RenderJobId);
// Job'ı arka planda işle (semaphore serbest bırakma işlem sonunda yapılır)
_ = ProcessJobAsync(job, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Queue Consumer durduruluyor...");
break;
}
catch (RedisConnectionException ex)
{
_logger.LogError(ex, "Redis bağlantı hatası — 5s sonra tekrar denenecek");
_concurrencySemaphore.Release();
await Task.Delay(5000, stoppingToken);
await ConnectToRedis(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Queue Consumer beklenmeyen hata");
_concurrencySemaphore.Release();
await Task.Delay(2000, stoppingToken);
}
}
}
private async Task ProcessJobAsync(VideoGenerationJob job, CancellationToken ct)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
// DB'de render job durumunu PROCESSING yap
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "PROCESSING", 0, "VIDEO_GENERATION",
workerVersion: _workerSettings.WorkerVersion,
workerHostname: Environment.MachineName);
// DB'de proje durumunu PROCESSING yap
await _dbService.UpdateProjectStatus(job.ProjectId, "GENERATING_MEDIA", 5);
// İlerleme callback — Redis pub/sub ile frontend'e bildirim
var progressCallback = CreateProgressCallback(job);
// Render pipeline'ını çalıştır
var finalVideoUrl = await _pipeline.ExecuteAsync(job, progressCallback, ct);
sw.Stop();
// Başarılı — DB güncelle
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "COMPLETED", 100, "FINALIZATION",
processingTimeMs: sw.ElapsedMilliseconds);
await _dbService.UpdateProjectStatus(
job.ProjectId, "COMPLETED", 100,
finalVideoUrl: finalVideoUrl);
// Redis pub/sub ile tamamlanma bildirimi
await PublishCompletion(job.ProjectId, finalVideoUrl);
_logger.LogInformation(
"✅ Video üretimi tamamlandı — Project: {ProjectId}, Süre: {Duration}s, URL: {Url}",
job.ProjectId, sw.Elapsed.TotalSeconds, finalVideoUrl);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex,
"❌ Video üretimi başarısız — Project: {ProjectId}, Hata: {Error}",
job.ProjectId, ex.Message);
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "FAILED", 0, null,
errorMessage: ex.Message,
errorStack: ex.StackTrace,
processingTimeMs: sw.ElapsedMilliseconds);
await _dbService.UpdateProjectStatus(
job.ProjectId, "FAILED", 0, errorMessage: ex.Message);
}
finally
{
_concurrencySemaphore.Release();
}
}
private Func CreateProgressCallback(VideoGenerationJob job)
{
return async (progress, stage) =>
{
try
{
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "PROCESSING", progress, stage);
await _dbService.UpdateProjectStatus(
job.ProjectId, "GENERATING_MEDIA", progress);
// Redis pub/sub ile ilerleme bildirimi
if (_redis != null)
{
var subscriber = _redis.GetSubscriber();
var progressPayload = JsonSerializer.Serialize(new
{
projectId = job.ProjectId,
renderJobId = job.RenderJobId,
progress,
stage,
timestamp = DateTime.UtcNow.ToString("O")
});
await subscriber.PublishAsync(
new RedisChannel(_redisSettings.ProgressChannel, RedisChannel.PatternMode.Literal),
progressPayload);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "İlerleme bildirimi gönderilemedi");
}
};
}
private async Task PublishCompletion(string projectId, string finalVideoUrl)
{
if (_redis == null) return;
var subscriber = _redis.GetSubscriber();
var payload = JsonSerializer.Serialize(new
{
projectId,
finalVideoUrl,
status = "COMPLETED",
timestamp = DateTime.UtcNow.ToString("O")
});
await subscriber.PublishAsync(
new RedisChannel(_redisSettings.CompletionChannel, RedisChannel.PatternMode.Literal),
payload);
}
private async Task ConnectToRedis(CancellationToken ct)
{
var attempts = 0;
while (!ct.IsCancellationRequested)
{
try
{
_redis = await ConnectionMultiplexer.ConnectAsync(_redisSettings.ConnectionString);
_logger.LogInformation("✅ Redis bağlantısı kuruldu");
return;
}
catch (Exception ex)
{
attempts++;
_logger.LogWarning(ex,
"Redis bağlantı denemesi #{Attempt} başarısız — 3s sonra tekrar",
attempts);
await Task.Delay(3000, ct);
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queue Consumer durduruluyor...");
_redis?.Dispose();
await base.StopAsync(cancellationToken);
}
}