← Back to all posts

RabbitMQ ile Event-Driven Architecture

RabbitMQ ile Event-Driven Architecture

RabbitMQ ile Event-Driven Architecture

Neden Event-Driven?

Modüler monolith yazısında modüller arası iletişimi InProcessEventBus ile çözdük. Aynı process içinde MediatR üzerinden event yayımlıyorduk. Bu yaklaşım tek sunucu, tek uygulama için yeterlidir.

Ama şu sorular ortaya çıktığında tablo değişir:

  • Orders servisi ödeme işlemini beklerken ne yapacak?
  • Notifications servisi çöktüğünde sipariş oluşturma da mı durmalı?
  • Aynı event'e 5 farklı servis tepki vermeli — hepsini senkron mu çağıracağız?
  • Trafik aniden 10 katına çıktı — sistemi nasıl esnetebiliriz?

Bu soruların cevabı mesaj kuyruğu ve event-driven architecture'dır.


Temel Kavramlar

Senkron vs Asenkron İletişim

Senkron (REST/gRPC):
OrderService → [HTTP] → PaymentService → [HTTP] → NotificationService
     ↑                       ↑                           ↑
   Bekle                   Bekle                       Bekle
   200ms                   150ms                        80ms
                                               Toplam: ~430ms
   PaymentService çöktüyse → OrderService de hata alır

Asenkron (Event-Driven):
OrderService → [Event] → RabbitMQ ──┬→ PaymentService   (kendi hızında)
                                     ├→ InventoryService (kendi hızında)
                                     └→ Notifications    (kendi hızında)
     ↑
   5ms (mesajı kuyruğa bıraktı, işi bitti)
   PaymentService çöktüyse → Mesaj kuyrukta bekler, servis kalktığında işlenir

RabbitMQ Bileşenleri

Producer (Yayımlayan)
    ↓ mesaj gönderir
Exchange (Dağıtıcı)
    ↓ routing rule'a göre iletir
Queue (Kuyruk)
    ↓ mesajı saklar
Consumer (Tüketen)
    ↓ işler, ack gönderir

Exchange Tipleri:

Direct Exchange:
  Routing key tam eşleşmesi
  "order.created" → sadece "order.created" kuyruğuna

Fanout Exchange:
  Routing key yok, herkese gönder
  Bir event → tüm bağlı kuyruklara

Topic Exchange:
  Pattern eşleşmesi (* tek kelime, # çok kelime)
  "order.*" → "order.created", "order.cancelled"
  "order.#" → "order.payment.failed" de dahil

Headers Exchange:
  Header bilgisine göre routing
  Nadiren kullanılır

Kurulum

Docker ile RabbitMQ

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.13-management
    container_name: rabbitmq
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: myapp
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:
docker-compose up -d
# Management UI: http://localhost:15672
# Kullanıcı: admin / admin123

NuGet Paketleri

# MassTransit — RabbitMQ üzerinde soyutlama katmanı
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ

# Ya da ham RabbitMQ client
dotnet add package RabbitMQ.Client

MassTransit mi, ham client mi?

MassTransit; retry, dead-letter, saga, outbox ve serialization gibi production'da ihtiyaç duyulan her şeyi hazır sunar. Ham client daha fazla kontrol verir ama daha fazla iş gerektirir. Bu yazıda MassTransit kullanacağız — production'da tercih edilen yol budur.


Proje Yapısı

src/
├── Shared/
│   └── MyApp.Shared/
│       └── Contracts/              ← Paylaşılan mesaj sözleşmeleri
│           ├── Orders/
│           │   ├── OrderCreated.cs
│           │   └── OrderCancelled.cs
│           └── Payments/
│               ├── PaymentCompleted.cs
│               └── PaymentFailed.cs
│
├── Modules/
│   ├── MyApp.Orders/
│   ├── MyApp.Payments/
│   └── MyApp.Notifications/
│
└── MyApp.API/
    └── Program.cs

Mesaj Sözleşmeleri

Mesaj sözleşmeleri tüm servisler tarafından görülebilen ayrı bir projede yaşar. Bu sayede publisher ve consumer aynı modeli kullanır.

// Shared/Contracts/Orders/OrderCreated.cs
namespace MyApp.Shared.Contracts.Orders;

public record OrderCreated
{
    public Guid OrderId { get; init; }
    public Guid CustomerId { get; init; }
    public string CustomerEmail { get; init; } = string.Empty;
    public decimal TotalAmount { get; init; }
    public string Currency { get; init; } = "TRY";
    public List<OrderCreatedItem> Items { get; init; } = [];
    public string DeliveryAddress { get; init; } = string.Empty;
    public DateTime CreatedAt { get; init; }
}

public record OrderCreatedItem
{
    public Guid ProductId { get; init; }
    public string ProductName { get; init; } = string.Empty;
    public int Quantity { get; init; }
    public decimal UnitPrice { get; init; }
}

// Shared/Contracts/Orders/OrderCancelled.cs
public record OrderCancelled
{
    public Guid OrderId { get; init; }
    public Guid CustomerId { get; init; }
    public string CancelReason { get; init; } = string.Empty;
    public DateTime CancelledAt { get; init; }
}

// Shared/Contracts/Payments/PaymentCompleted.cs
public record PaymentCompleted
{
    public Guid PaymentId { get; init; }
    public Guid OrderId { get; init; }
    public decimal Amount { get; init; }
    public string PaymentMethod { get; init; } = string.Empty;
    public DateTime CompletedAt { get; init; }
}

// Shared/Contracts/Payments/PaymentFailed.cs
public record PaymentFailed
{
    public Guid OrderId { get; init; }
    public string FailureReason { get; init; } = string.Empty;
    public int AttemptCount { get; init; }
    public DateTime FailedAt { get; init; }
}

MassTransit Konfigürasyonu

// Program.cs
builder.Services.AddMassTransit(x =>
{
    // Consumer'ları otomatik keşfet
    x.AddConsumers(typeof(Program).Assembly);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(builder.Configuration["RabbitMQ:Host"], "/myapp", h =>
        {
            h.Username(builder.Configuration["RabbitMQ:Username"]!);
            h.Password(builder.Configuration["RabbitMQ:Password"]!);
        });

        // Retry politikası
        cfg.UseMessageRetry(r =>
        {
            r.Exponential(
                retryLimit: 5,
                minInterval: TimeSpan.FromSeconds(1),
                maxInterval: TimeSpan.FromSeconds(30),
                intervalDelta: TimeSpan.FromSeconds(2));

            // Bu hatalarda retry yapma — iş hatası, tekrar denemek anlamsız
            r.Ignore<ValidationException>();
            r.Ignore<ArgumentException>();
        });

        // Dead-letter queue için
        cfg.UseDeadLetterQueue();

        // Consumer endpoint'lerini yapılandır
        cfg.ConfigureEndpoints(context);
    });
});
// appsettings.json
{
  "RabbitMQ": {
    "Host": "localhost",
    "Username": "admin",
    "Password": "admin123"
  }
}

Publisher — Event Yayımlamak

Orders Modülü — Sipariş Oluşturuldu

// MyApp.Orders/Application/Commands/CreateOrder/CreateOrderCommandHandler.cs
public class CreateOrderCommandHandler
    : IRequestHandler<CreateOrderCommand, Guid>
{
    private readonly OrdersDbContext _context;
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly ILogger<CreateOrderCommandHandler> _logger;

    public CreateOrderCommandHandler(
        OrdersDbContext context,
        IPublishEndpoint publishEndpoint,
        ILogger<CreateOrderCommandHandler> logger)
    {
        _context = context;
        _publishEndpoint = publishEndpoint;
        _logger = logger;
    }

    public async Task<Guid> Handle(
        CreateOrderCommand command,
        CancellationToken ct)
    {
        // Sipariş oluştur
        var order = Order.Create(command.CustomerId, command.DeliveryAddress);

        foreach (var line in command.Lines)
        {
            var product = await _context.Products
                .FindAsync(line.ProductId, ct)
                ?? throw new NotFoundException(nameof(Product), line.ProductId);

            order.AddItem(product, line.Quantity);
        }

        _context.Orders.Add(order);
        await _context.SaveChangesAsync(ct);

        // Event yayımla — RabbitMQ'ya gönder
        await _publishEndpoint.Publish(new OrderCreated
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            CustomerEmail = command.CustomerEmail,
            TotalAmount = order.TotalAmount,
            Currency = "TRY",
            Items = order.Items.Select(i => new OrderCreatedItem
            {
                ProductId = i.ProductId,
                ProductName = i.ProductName,
                Quantity = i.Quantity,
                UnitPrice = i.UnitPrice
            }).ToList(),
            DeliveryAddress = order.DeliveryAddress,
            CreatedAt = order.CreatedAt
        }, ct);

        _logger.LogInformation(
            "Sipariş oluşturuldu ve event yayımlandı. OrderId: {OrderId}",
            order.Id);

        return order.Id;
    }
}

Consumer — Event Tüketmek

Payments Modülü — Ödeme Başlat

// MyApp.Payments/Consumers/OrderCreatedConsumer.cs
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly PaymentsDbContext _context;
    private readonly IPaymentGateway _paymentGateway;
    private readonly ILogger<OrderCreatedConsumer> _logger;

    public OrderCreatedConsumer(
        PaymentsDbContext context,
        IPaymentGateway paymentGateway,
        ILogger<OrderCreatedConsumer> logger)
    {
        _context = context;
        _paymentGateway = paymentGateway;
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var message = context.Message;

        _logger.LogInformation(
            "OrderCreated event alındı. OrderId: {OrderId}", message.OrderId);

        // Idempotency kontrolü — aynı mesaj iki kez gelebilir
        var existingPayment = await _context.Payments
            .FirstOrDefaultAsync(p => p.OrderId == message.OrderId);

        if (existingPayment is not null)
        {
            _logger.LogWarning(
                "Bu sipariş için ödeme zaten başlatılmış. OrderId: {OrderId}",
                message.OrderId);
            return; // Tekrar işleme — güvenle atla
        }

        // Ödeme kaydı oluştur
        var payment = Payment.Create(
            message.OrderId,
            message.TotalAmount,
            message.Currency);

        _context.Payments.Add(payment);
        await _context.SaveChangesAsync();

        // Ödeme gateway'e gönder
        var result = await _paymentGateway.ProcessAsync(new PaymentRequest
        {
            OrderId = message.OrderId,
            Amount = message.TotalAmount,
            Currency = message.Currency
        });

        if (result.IsSuccess)
        {
            payment.Complete(result.TransactionId);
            await _context.SaveChangesAsync();

            // Ödeme tamamlandı event'i yayımla
            await context.Publish(new PaymentCompleted
            {
                PaymentId = payment.Id,
                OrderId = message.OrderId,
                Amount = message.TotalAmount,
                PaymentMethod = result.PaymentMethod,
                CompletedAt = DateTime.UtcNow
            });
        }
        else
        {
            payment.Fail(result.ErrorMessage);
            await _context.SaveChangesAsync();

            // Ödeme başarısız event'i yayımla
            await context.Publish(new PaymentFailed
            {
                OrderId = message.OrderId,
                FailureReason = result.ErrorMessage,
                AttemptCount = 1,
                FailedAt = DateTime.UtcNow
            });
        }
    }
}

Inventory Modülü — Stok Rezervasyonu

// MyApp.Inventory/Consumers/OrderCreatedConsumer.cs
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly InventoryDbContext _context;
    private readonly ILogger<OrderCreatedConsumer> _logger;

    public OrderCreatedConsumer(
        InventoryDbContext context,
        ILogger<OrderCreatedConsumer> logger)
    {
        _context = context;
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var message = context.Message;

        foreach (var item in message.Items)
        {
            var stock = await _context.Stocks
                .FirstOrDefaultAsync(s => s.ProductId == item.ProductId);

            if (stock is null || stock.Available < item.Quantity)
            {
                _logger.LogWarning(
                    "Yetersiz stok. ProductId: {ProductId}, İstenen: {Quantity}",
                    item.ProductId, item.Quantity);

                // Stok yetersizse sipariş iptal event'i fırlatılabilir
                await context.Publish(new OrderCancelled
                {
                    OrderId = message.OrderId,
                    CustomerId = message.CustomerId,
                    CancelReason = $"Ürün stokta yok: {item.ProductName}",
                    CancelledAt = DateTime.UtcNow
                });

                return;
            }

            stock.Reserve(item.Quantity);
        }

        await _context.SaveChangesAsync();

        _logger.LogInformation(
            "Stok rezervasyonu tamamlandı. OrderId: {OrderId}",
            message.OrderId);
    }
}

Notifications Modülü — Bildirim Gönder

// MyApp.Notifications/Consumers/OrderCreatedConsumer.cs
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly IEmailService _emailService;
    private readonly ISmsService _smsService;
    private readonly ILogger<OrderCreatedConsumer> _logger;

    public OrderCreatedConsumer(
        IEmailService emailService,
        ISmsService smsService,
        ILogger<OrderCreatedConsumer> logger)
    {
        _emailService = emailService;
        _smsService = smsService;
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var message = context.Message;

        // Email bildirimi
        await _emailService.SendAsync(new EmailMessage
        {
            To = message.CustomerEmail,
            Subject = $"Siparişiniz Alındı — #{message.OrderId:N}",
            Template = "order-confirmation",
            Data = new
            {
                OrderId = message.OrderId,
                TotalAmount = message.TotalAmount,
                Currency = message.Currency,
                Items = message.Items
            }
        });

        _logger.LogInformation(
            "Sipariş onay emaili gönderildi. OrderId: {OrderId}",
            message.OrderId);
    }
}

// PaymentFailed event'ini dinle
public class PaymentFailedConsumer : IConsumer<PaymentFailed>
{
    private readonly IEmailService _emailService;
    private readonly ICustomerRepository _customerRepo;

    public PaymentFailedConsumer(
        IEmailService emailService,
        ICustomerRepository customerRepo)
    {
        _emailService = emailService;
        _customerRepo = customerRepo;
    }

    public async Task Consume(ConsumeContext<PaymentFailed> context)
    {
        var message = context.Message;

        var customer = await _customerRepo.GetByOrderIdAsync(message.OrderId);
        if (customer is null) return;

        await _emailService.SendAsync(new EmailMessage
        {
            To = customer.Email,
            Subject = "Ödeme İşleminiz Başarısız Oldu",
            Template = "payment-failed",
            Data = new
            {
                OrderId = message.OrderId,
                Reason = message.FailureReason
            }
        });
    }
}

Hata Yönetimi

Dead Letter Queue

İşlenemeyen mesajlar Dead Letter Queue'ya (DLQ) düşer. Bunlar kaybolmaz, incelenebilir ve yeniden işlenebilir.

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(...);

        // Dead letter queue yapılandırması
        cfg.ReceiveEndpoint("orders-created", e =>
        {
            e.ConfigureConsumer<OrderCreatedConsumer>(context);

            // 5 retry sonrası DLQ'ya gönder
            e.UseMessageRetry(r => r.Immediate(5));

            e.BindDeadLetterQueue(
                "orders-created-dlq",
                "orders-dead-letter",
                dlq =>
                {
                    dlq.Durable = true;
                    dlq.AutoDelete = false;
                });
        });
    });
});

DLQ Mesajlarını İzleme ve Yeniden İşleme

// MyApp.API/Controllers/DeadLetterController.cs
[ApiController]
[Route("api/dead-letters")]
[Authorize(Roles = "Admin")]
public class DeadLetterController : ControllerBase
{
    private readonly IConnection _connection;
    private readonly ILogger<DeadLetterController> _logger;

    public DeadLetterController(
        IConnection connection,
        ILogger<DeadLetterController> logger)
    {
        _connection = connection;
        _logger = logger;
    }

    [HttpGet("{queueName}")]
    public IActionResult GetDeadLetters(string queueName, int count = 10)
    {
        using var channel = _connection.CreateModel();

        var messages = new List<object>();

        for (int i = 0; i < count; i++)
        {
            var result = channel.BasicGet($"{queueName}-dlq", autoAck: false);
            if (result is null) break;

            var body = Encoding.UTF8.GetString(result.Body.ToArray());
            messages.Add(new
            {
                DeliveryTag = result.DeliveryTag,
                RoutingKey = result.RoutingKey,
                Body = body,
                Headers = result.BasicProperties.Headers
            });

            // Ack gönderme — mesaj kuyrukta kalsın
            channel.BasicNack(result.DeliveryTag, false, true);
        }

        return Ok(messages);
    }

    [HttpPost("{queueName}/requeue")]
    public IActionResult RequeueDeadLetters(string queueName)
    {
        using var channel = _connection.CreateModel();

        int count = 0;
        BasicGetResult? result;

        while ((result = channel.BasicGet($"{queueName}-dlq", autoAck: false)) != null)
        {
            // Ana kuyruğa yeniden gönder
            channel.BasicPublish(
                exchange: "",
                routingKey: queueName,
                basicProperties: result.BasicProperties,
                body: result.Body);

            channel.BasicAck(result.DeliveryTag, false);
            count++;
        }

        _logger.LogInformation(
            "{Count} mesaj yeniden kuyruğa alındı. Queue: {Queue}",
            count, queueName);

        return Ok(new { RequeuedCount = count });
    }
}

Outbox Pattern — Veri Tutarlılığı

En kritik sorun: veritabanına kaydettik ama mesaj gönderemedik. Ya da mesaj gitti ama veritabanı işlemi başarısız oldu.

Outbox Pattern bu problemi çözer: mesaj önce veritabanına yazılır, sonra arka planda RabbitMQ'ya iletilir. İkisi aynı transaction içinde olduğu için tutarlılık garantilenir.

// MassTransit Outbox — tek satır kurulum
builder.Services.AddMassTransit(x =>
{
    // Entity Framework Outbox
    x.AddEntityFrameworkOutbox<AppDbContext>(o =>
    {
        o.UseSqlServer();
        o.UseBusOutbox(); // Background servis mesajları iletir

        // Her 1 saniyede bir pending mesajları kontrol et
        o.QueryDelay = TimeSpan.FromSeconds(1);
    });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(...);
        cfg.ConfigureEndpoints(context);
    });
});
// AppDbContext'e Outbox tabloları ekle
public class AppDbContext : DbContext
{
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // MassTransit Outbox tablo yapılandırması
        modelBuilder.AddInboxStateEntity();
        modelBuilder.AddOutboxMessageEntity();
        modelBuilder.AddOutboxStateEntity();

        base.OnModelCreating(modelBuilder);
    }
}
# Migration oluştur
dotnet ef migrations add AddMassTransitOutbox
dotnet ef database update

Artık handler'da publish işlemi otomatik olarak outbox'a yazılır:

public async Task<Guid> Handle(CreateOrderCommand command, CancellationToken ct)
{
    var order = Order.Create(command.CustomerId, command.DeliveryAddress);
    _context.Orders.Add(order);

    // Bu publish aynı transaction içinde outbox tablosuna yazılır
    // RabbitMQ'ya iletim background'da garantili şekilde yapılır
    await _publishEndpoint.Publish(new OrderCreated
    {
        OrderId = order.Id,
        // ...
    }, ct);

    // İkisi aynı transaction — ya ikisi de başarılı ya ikisi de başarısız
    await _context.SaveChangesAsync(ct);

    return order.Id;
}

Idempotency — Aynı Mesajı İki Kez İşlemek

Network sorunları, retry mekanizmaları veya yeniden kuyruk işlemleri aynı mesajın birden fazla kez teslim edilmesine yol açabilir. Consumer'larınız bunu güvenle yönetmeli.

// İdempotency için InboxState kullanımı
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly PaymentsDbContext _context;

    public OrderCreatedConsumer(PaymentsDbContext context)
    {
        _context = context;
    }

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        // MassTransit MessageId ile idempotency kontrolü
        var messageId = context.MessageId ?? Guid.NewGuid();

        var alreadyProcessed = await _context.ProcessedMessages
            .AnyAsync(m => m.MessageId == messageId);

        if (alreadyProcessed)
        {
            // Aynı mesaj daha önce işlendi — güvenle atla
            return;
        }

        // İşle
        var payment = Payment.Create(
            context.Message.OrderId,
            context.Message.TotalAmount,
            context.Message.Currency);

        _context.Payments.Add(payment);

        // İşlenmiş mesaj kaydı — tekrar işlemeyi önler
        _context.ProcessedMessages.Add(new ProcessedMessage
        {
            MessageId = messageId,
            ProcessedAt = DateTime.UtcNow
        });

        await _context.SaveChangesAsync();
    }
}

Saga — Uzun Süren İş Akışları

Birden fazla servisin koordineli çalışması gereken iş akışlarında Saga kullanılır. Sipariş → Ödeme → Stok → Kargo akışı buna iyi bir örnektir.

// MyApp.Orders/Sagas/OrderSaga.cs
public class OrderSagaState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; } = string.Empty;
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime? PaymentCompletedAt { get; set; }
    public DateTime? StockReservedAt { get; set; }
}

public class OrderSaga : MassTransitStateMachine<OrderSagaState>
{
    public State Submitted { get; private set; } = null!;
    public State PaymentPending { get; private set; } = null!;
    public State PaymentFailed { get; private set; } = null!;
    public State StockReserving { get; private set; } = null!;
    public State Completed { get; private set; } = null!;
    public State Cancelled { get; private set; } = null!;

    public Event<OrderCreated> OrderCreatedEvent { get; private set; } = null!;
    public Event<PaymentCompleted> PaymentCompletedEvent { get; private set; } = null!;
    public Event<PaymentFailed> PaymentFailedEvent { get; private set; } = null!;

    public OrderSaga()
    {
        InstanceState(x => x.CurrentState);

        Event(() => OrderCreatedEvent,
            x => x.CorrelateById(m => m.Message.OrderId));

        Event(() => PaymentCompletedEvent,
            x => x.CorrelateById(m => m.Message.OrderId));

        Event(() => PaymentFailedEvent,
            x => x.CorrelateById(m => m.Message.OrderId));

        Initially(
            When(OrderCreatedEvent)
                .Then(ctx =>
                {
                    ctx.Saga.OrderId = ctx.Message.OrderId;
                    ctx.Saga.CustomerId = ctx.Message.CustomerId;
                    ctx.Saga.TotalAmount = ctx.Message.TotalAmount;
                    ctx.Saga.CreatedAt = DateTime.UtcNow;
                })
                .TransitionTo(PaymentPending));

        During(PaymentPending,
            When(PaymentCompletedEvent)
                .Then(ctx =>
                {
                    ctx.Saga.PaymentCompletedAt = DateTime.UtcNow;
                })
                .TransitionTo(StockReserving),

            When(PaymentFailedEvent)
                .TransitionTo(Cancelled));

        During(StockReserving,
            When(PaymentCompletedEvent)
                .TransitionTo(Completed));
    }
}
// Saga kaydı
builder.Services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderSaga, OrderSagaState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic;
            r.AddDbContext<DbContext, AppDbContext>();
        });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(...);
        cfg.ConfigureEndpoints(context);
    });
});

İzleme ve Gözlemlenebilirlik

// Prometheus metrikleri
builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        // OpenTelemetry desteği
        cfg.UseOpenTelemetry();

        cfg.Host(...);
        cfg.ConfigureEndpoints(context);
    });
});

// Serilog ile mesaj logları
public class LoggingConsumerObserver : IConsumeObserver
{
    private readonly ILogger<LoggingConsumerObserver> _logger;

    public LoggingConsumerObserver(ILogger<LoggingConsumerObserver> logger)
    {
        _logger = logger;
    }

    public Task PreConsume<T>(ConsumeContext<T> context) where T : class
    {
        _logger.LogInformation(
            "Mesaj işleniyor: {MessageType} | MessageId: {MessageId}",
            typeof(T).Name,
            context.MessageId);
        return Task.CompletedTask;
    }

    public Task PostConsume<T>(ConsumeContext<T> context) where T : class
    {
        _logger.LogInformation(
            "Mesaj tamamlandı: {MessageType} | MessageId: {MessageId}",
            typeof(T).Name,
            context.MessageId);
        return Task.CompletedTask;
    }

    public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
    {
        _logger.LogError(exception,
            "Mesaj işleme hatası: {MessageType} | MessageId: {MessageId}",
            typeof(T).Name,
            context.MessageId);
        return Task.CompletedTask;
    }
}

// Kaydı
builder.Services.AddSingleton<IConsumeObserver, LoggingConsumerObserver>();

Denetim Listesi

✅ RabbitMQ Docker ile ayağa kaldırıldı, Management UI erişilebilir
✅ Mesaj sözleşmeleri ayrı Shared projesinde tanımlandı
✅ MassTransit ile RabbitMQ entegrasyonu yapılandırıldı
✅ Retry politikası — exponential backoff ile tanımlandı
✅ Dead Letter Queue — işlenemeyen mesajlar için yapılandırıldı
✅ Outbox Pattern — veri tutarlılığı için implemente edildi
✅ Consumer'lar idempotent — aynı mesajı iki kez işlemiyor
✅ Saga — uzun süren iş akışları için tanımlandı
✅ Logging observer — tüm mesaj trafiği loglanıyor
✅ DLQ izleme ve yeniden işleme endpoint'i hazır
✅ Management UI üzerinden queue ve exchange'ler izleniyor

Temel Çıkarımlar

  • Event-driven architecture servisleri birbirinden gevşek bağlar — biri çöktüğünde diğerleri etkilenmez
  • RabbitMQ mesajları saklar — consumer hazır olmasa bile veri kaybolmaz
  • MassTransit retry, dead-letter ve outbox gibi production ihtiyaçlarını hazır sunar
  • Outbox Pattern olmadan "veritabanına kaydettim ama mesaj gitimedi" senaryosu kaçınılmazdır
  • Her consumer idempotent olmalıdır — aynı mesaj birden fazla gelebilir
  • Saga uzun süren iş akışlarında dağıtık state yönetimini çözer
  • Dead Letter Queue işlenemeyen mesajları çöpe atmaz, analiz ve recovery için saklar

Mesaj kuyruğu sisteminizi daha karmaşık yapmaz — daha dayanıklı yapar. Bir servisin çökmesi artık veri kaybı değil, geçici bir gecikme anlamına gelir.