Files
ContentGen_BE/media-worker/Services/QueueConsumerService.cs
Harun CAN 85c35c73e8
Some checks failed
Backend Deploy 🚀 / build-and-deploy (push) Has been cancelled
main
2026-03-29 12:43:49 +03:00

256 lines
9.3 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Text.Json;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using SaasMediaWorker.Configuration;
using SaasMediaWorker.Models;
namespace SaasMediaWorker.Services;
/// <summary>
/// Redis List'ten BRPOP ile video üretim job'larını consume eden Background Service.
/// NestJS'in VideoGenerationProducer'ı tarafından LPUSH ile eklenen job'ları işler.
///
/// Neden BRPOP?
/// - Atomic: Job'ı listeden alır ve hemen siler — duplicate processing olmaz
/// - Blocking: Boş kuyrukta CPU cycle harcamaz (polling'den üstün)
/// - FIFO: LPUSH + BRPOP = en eski job önce işlenir
/// </summary>
public class QueueConsumerService : BackgroundService
{
private readonly ILogger<QueueConsumerService> _logger;
private readonly RedisSettings _redisSettings;
private readonly WorkerSettings _workerSettings;
private readonly VideoRenderPipeline _pipeline;
private readonly DatabaseService _dbService;
private IConnectionMultiplexer? _redis;
private readonly SemaphoreSlim _concurrencySemaphore;
public QueueConsumerService(
ILogger<QueueConsumerService> logger,
IOptions<RedisSettings> redisSettings,
IOptions<WorkerSettings> workerSettings,
VideoRenderPipeline pipeline,
DatabaseService dbService)
{
_logger = logger;
_redisSettings = redisSettings.Value;
_workerSettings = workerSettings.Value;
_pipeline = pipeline;
_dbService = dbService;
_concurrencySemaphore = new SemaphoreSlim(_workerSettings.MaxConcurrency);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"🎬 Queue Consumer başlatıldı — Kuyruk: {Queue}, Eşzamanlılık: {Concurrency}",
_redisSettings.QueueKey, _workerSettings.MaxConcurrency);
await ConnectToRedis(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await _concurrencySemaphore.WaitAsync(stoppingToken);
var db = _redis!.GetDatabase();
// BRPOP — blocking pop, 5 saniye timeout
var result = await db.ListRightPopAsync(new RedisKey(_redisSettings.QueueKey));
if (result.IsNull)
{
_concurrencySemaphore.Release();
await Task.Delay(
TimeSpan.FromSeconds(_workerSettings.PollIntervalSeconds),
stoppingToken);
continue;
}
// Job'ı deserialize et
var jobJson = result.ToString();
var job = JsonSerializer.Deserialize<VideoGenerationJob>(jobJson);
if (job == null || string.IsNullOrEmpty(job.ProjectId))
{
_logger.LogWarning("Geçersiz job payload atlandı: {Payload}",
jobJson[..Math.Min(200, jobJson.Length)]);
_concurrencySemaphore.Release();
continue;
}
_logger.LogInformation(
"📥 Job alındı — Project: {ProjectId}, RenderJob: {RenderJobId}",
job.ProjectId, job.RenderJobId);
// Job'ı arka planda işle (semaphore serbest bırakma işlem sonunda yapılır)
_ = ProcessJobAsync(job, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Queue Consumer durduruluyor...");
break;
}
catch (RedisConnectionException ex)
{
_logger.LogError(ex, "Redis bağlantı hatası — 5s sonra tekrar denenecek");
_concurrencySemaphore.Release();
await Task.Delay(5000, stoppingToken);
await ConnectToRedis(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Queue Consumer beklenmeyen hata");
_concurrencySemaphore.Release();
await Task.Delay(2000, stoppingToken);
}
}
}
private async Task ProcessJobAsync(VideoGenerationJob job, CancellationToken ct)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
// DB'de render job durumunu PROCESSING yap
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "PROCESSING", 0, "VIDEO_GENERATION",
workerVersion: _workerSettings.WorkerVersion,
workerHostname: Environment.MachineName);
// DB'de proje durumunu PROCESSING yap
await _dbService.UpdateProjectStatus(job.ProjectId, "GENERATING_MEDIA", 5);
// İlerleme callback — Redis pub/sub ile frontend'e bildirim
var progressCallback = CreateProgressCallback(job);
// Render pipeline'ını çalıştır
var finalVideoUrl = await _pipeline.ExecuteAsync(job, progressCallback, ct);
sw.Stop();
// Başarılı — DB güncelle
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "COMPLETED", 100, "FINALIZATION",
processingTimeMs: sw.ElapsedMilliseconds);
await _dbService.UpdateProjectStatus(
job.ProjectId, "COMPLETED", 100,
finalVideoUrl: finalVideoUrl);
// Redis pub/sub ile tamamlanma bildirimi
await PublishCompletion(job.ProjectId, finalVideoUrl);
_logger.LogInformation(
"✅ Video üretimi tamamlandı — Project: {ProjectId}, Süre: {Duration}s, URL: {Url}",
job.ProjectId, sw.Elapsed.TotalSeconds, finalVideoUrl);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex,
"❌ Video üretimi başarısız — Project: {ProjectId}, Hata: {Error}",
job.ProjectId, ex.Message);
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "FAILED", 0, null,
errorMessage: ex.Message,
errorStack: ex.StackTrace,
processingTimeMs: sw.ElapsedMilliseconds);
await _dbService.UpdateProjectStatus(
job.ProjectId, "FAILED", 0, errorMessage: ex.Message);
}
finally
{
_concurrencySemaphore.Release();
}
}
private Func<int, string, Task> CreateProgressCallback(VideoGenerationJob job)
{
return async (progress, stage) =>
{
try
{
await _dbService.UpdateRenderJobStatus(
job.RenderJobId, "PROCESSING", progress, stage);
await _dbService.UpdateProjectStatus(
job.ProjectId, "GENERATING_MEDIA", progress);
// Redis pub/sub ile ilerleme bildirimi
if (_redis != null)
{
var subscriber = _redis.GetSubscriber();
var progressPayload = JsonSerializer.Serialize(new
{
projectId = job.ProjectId,
renderJobId = job.RenderJobId,
progress,
stage,
timestamp = DateTime.UtcNow.ToString("O")
});
await subscriber.PublishAsync(
new RedisChannel(_redisSettings.ProgressChannel, RedisChannel.PatternMode.Literal),
progressPayload);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "İlerleme bildirimi gönderilemedi");
}
};
}
private async Task PublishCompletion(string projectId, string finalVideoUrl)
{
if (_redis == null) return;
var subscriber = _redis.GetSubscriber();
var payload = JsonSerializer.Serialize(new
{
projectId,
finalVideoUrl,
status = "COMPLETED",
timestamp = DateTime.UtcNow.ToString("O")
});
await subscriber.PublishAsync(
new RedisChannel(_redisSettings.CompletionChannel, RedisChannel.PatternMode.Literal),
payload);
}
private async Task ConnectToRedis(CancellationToken ct)
{
var attempts = 0;
while (!ct.IsCancellationRequested)
{
try
{
_redis = await ConnectionMultiplexer.ConnectAsync(_redisSettings.ConnectionString);
_logger.LogInformation("✅ Redis bağlantısı kuruldu");
return;
}
catch (Exception ex)
{
attempts++;
_logger.LogWarning(ex,
"Redis bağlantı denemesi #{Attempt} başarısız — 3s sonra tekrar",
attempts);
await Task.Delay(3000, ct);
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queue Consumer durduruluyor...");
_redis?.Dispose();
await base.StopAsync(cancellationToken);
}
}