Event-Driven Architecture com C#, .NET e Kafka: Guia Prático
Aprenda a construir sistemas escaláveis usando Event-Driven Architecture com C#, .NET e Apache Kafka. Guia completo com exemplos práticos, integração multi-linguagem e melhores práticas de produção.
Introdução
Sistemas modernos exigem escalabilidade, resiliência e desacoplamento. A arquitetura tradicional baseada em requisição-resposta (request-response) mostra suas limitações quando precisamos processar milhões de eventos, integrar dezenas de microservices ou garantir consistência eventual em sistemas distribuídos.
Event-Driven Architecture (EDA) surge como solução para esses desafios, transformando a forma como aplicações se comunicam. Ao invés de chamadas síncronas diretas, componentes publicam e consomem eventos assíncronos através de um broker de mensagens.
Neste guia completo, você aprenderá a construir sistemas EDA de produção usando C#, .NET e Apache Kafka, o backbone de mensageria mais usado no mundo. Vamos cobrir desde os fundamentos até padrões avançados, com código real e exemplos práticos.
⏱️ Tempo de leitura: ~20 minutos
O Que é Event-Driven Architecture?
Event-Driven Architecture (EDA) é um padrão arquitetural onde componentes de software reagem a eventos ao invés de executar chamadas diretas entre si. Um evento representa uma mudança de estado significativa no sistema.
Componentes Principais
1. Produtor (Producer) Componente que detecta mudanças de estado e publica eventos.
2. Broker de Mensagens Infraestrutura que recebe, armazena e distribui eventos (ex: Kafka, RabbitMQ).
3. Consumidor (Consumer) Componente que se inscreve em eventos e executa ações baseadas neles.
4. Evento (Event) Mensagem imutável que descreve algo que aconteceu no passado.
Exemplo de Evento
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "OrderCreated",
"timestamp": "2026-01-28T10:30:00Z",
"aggregateId": "order-12345",
"version": 1,
"payload": {
"orderId": "order-12345",
"customerId": "customer-789",
"totalAmount": 299.90,
"items": [
{
"productId": "product-456",
"quantity": 2,
"unitPrice": 149.95
}
]
}
}
Por Que Usar Event-Driven Architecture?
Vantagens
1. Desacoplamento Produtores não conhecem consumidores. Adicione novos consumidores sem modificar produtores.
2. Escalabilidade Processe eventos em paralelo, escale consumidores independentemente.
3. Resiliência Falhas em consumidores não afetam produtores. Eventos são persistidos para reprocessamento.
4. Auditoria Event log serve como fonte única da verdade (Event Sourcing).
5. Integração Multi-Sistema Conecte sistemas heterogêneos (C#, Java, Python, Node.js) através de eventos.
6. Processamento Assíncrono Operações lentas (envio de email, processamento de imagem) não bloqueiam o fluxo principal.
Quando Usar EDA?
✅ Use quando:
- Sistema distribuído com múltiplos microservices
- Necessidade de escalabilidade horizontal
- Processamento assíncrono de tarefas
- Integração entre sistemas heterogêneos
- Auditoria completa de operações (Event Sourcing)
- Real-time analytics e streaming de dados
❌ Evite quando:
- Sistema monolítico simples
- Requisitos de consistência imediata
- Operações síncronas críticas (ex: pagamentos em tempo real)
- Equipe sem experiência em sistemas distribuídos
Apache Kafka: O Backbone do EDA
Apache Kafka é uma plataforma distribuída de streaming de eventos, desenvolvida originalmente pelo LinkedIn e open-source desde 2011.
Por Que Kafka?
| Feature | Kafka | RabbitMQ | AWS SQS |
|---|---|---|---|
| Throughput | Milhões/seg | Milhares/seg | Milhares/seg |
| Persistência | Durável (log) | Opcional | Limitada |
| Retenção | Configurável (dias/TB) | Até consumo | 14 dias max |
| Ordem | Garantida por partição | Por queue | FIFO limitado |
| Replay | Sim | Não | Não |
| Escalabilidade | Horizontal | Vertical/Horizontal | Gerenciado |
Conceitos do Kafka
Topic: Canal lógico onde eventos são publicados (ex: orders, payments, notifications).
Partition: Divisão física de um tópico para paralelização. Cada partição é ordenada.
Offset: ID sequencial único de cada mensagem dentro de uma partição.
Consumer Group: Grupo de consumidores que divide o trabalho de processar mensagens.
Replication Factor: Número de réplicas de cada partição para resiliência.
Configurando o Ambiente
Passo 1: Kafka com Docker
Crie um arquivo docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Inicie o ambiente:
docker-compose up -d
Acesse o Kafka UI em http://localhost:8080.
Passo 2: Criar Projeto .NET
# Criar solution
dotnet new sln -n EventDrivenDemo
# Criar projeto Producer
dotnet new webapi -n Producer.Api
dotnet sln add Producer.Api/Producer.Api.csproj
# Criar projeto Consumer
dotnet new worker -n Consumer.Service
dotnet sln add Consumer.Service/Consumer.Service.csproj
# Criar biblioteca compartilhada
dotnet new classlib -n Shared.Events
dotnet sln add Shared.Events/Shared.Events.csproj
Passo 3: Instalar Dependências
# No Producer.Api
cd Producer.Api
dotnet add package Confluent.Kafka
dotnet add package Newtonsoft.Json
# No Consumer.Service
cd ../Consumer.Service
dotnet add package Confluent.Kafka
dotnet add package Newtonsoft.Json
dotnet add package Microsoft.Extensions.Hosting
# No Shared.Events (referenciado por ambos)
cd ../Shared.Events
Implementação: Eventos Compartilhados
Defina os eventos em Shared.Events para serem reutilizados por produtores e consumidores.
Shared.Events/BaseEvent.cs
using System;
namespace Shared.Events
{
public abstract class BaseEvent
{
public Guid EventId { get; set; } = Guid.NewGuid();
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public string EventType { get; set; }
public int Version { get; set; } = 1;
protected BaseEvent()
{
EventType = GetType().Name;
}
}
}
Shared.Events/OrderCreatedEvent.cs
using System.Collections.Generic;
namespace Shared.Events
{
public class OrderCreatedEvent : BaseEvent
{
public string OrderId { get; set; }
public string CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public List<OrderItem> Items { get; set; }
public string Status { get; set; } = "Pending";
}
public class OrderItem
{
public string ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
}
}
Shared.Events/PaymentProcessedEvent.cs
namespace Shared.Events
{
public class PaymentProcessedEvent : BaseEvent
{
public string PaymentId { get; set; }
public string OrderId { get; set; }
public decimal Amount { get; set; }
public string PaymentMethod { get; set; }
public bool Success { get; set; }
public string TransactionId { get; set; }
}
}
Implementação: Producer (C#/.NET)
Producer.Api/Services/KafkaProducerService.cs
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Shared.Events;
namespace Producer.Api.Services
{
public interface IKafkaProducerService
{
Task<bool> PublishEventAsync<T>(string topic, T eventData, string key = null) where T : BaseEvent;
}
public class KafkaProducerService : IKafkaProducerService, IDisposable
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaProducerService> _logger;
public KafkaProducerService(
IConfiguration configuration,
ILogger<KafkaProducerService> logger)
{
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
ClientId = "producer-api",
Acks = Acks.All, // Garante que todas as réplicas receberam
EnableIdempotence = true, // Evita duplicação
MaxInFlight = 5,
MessageSendMaxRetries = 10,
RetryBackoffMs = 100,
CompressionType = CompressionType.Snappy // Compressão
};
_producer = new ProducerBuilder<string, string>(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError($"Erro no producer: {error.Reason}");
})
.Build();
_logger.LogInformation("Kafka Producer inicializado");
}
public async Task<bool> PublishEventAsync<T>(string topic, T eventData, string key = null)
where T : BaseEvent
{
try
{
var eventKey = key ?? eventData.EventId.ToString();
var eventValue = JsonConvert.SerializeObject(eventData, new JsonSerializerSettings
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore,
NullValueHandling = NullValueHandling.Ignore
});
var message = new Message<string, string>
{
Key = eventKey,
Value = eventValue,
Headers = new Headers
{
{ "event-type", System.Text.Encoding.UTF8.GetBytes(eventData.EventType) },
{ "timestamp", System.Text.Encoding.UTF8.GetBytes(eventData.Timestamp.ToString("o")) }
}
};
var result = await _producer.ProduceAsync(topic, message);
_logger.LogInformation(
$"Evento publicado: {eventData.EventType} | Topic: {topic} | Partition: {result.Partition} | Offset: {result.Offset}"
);
return result.Status == PersistenceStatus.Persisted;
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, $"Erro ao publicar evento: {ex.Error.Reason}");
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "Erro inesperado ao publicar evento");
return false;
}
}
public void Dispose()
{
_producer?.Flush(TimeSpan.FromSeconds(10));
_producer?.Dispose();
}
}
}
Producer.Api/Controllers/OrdersController.cs
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Producer.Api.Services;
using Shared.Events;
namespace Producer.Api.Controllers
{
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IKafkaProducerService _producerService;
public OrdersController(IKafkaProducerService producerService)
{
_producerService = producerService;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
var orderId = $"order-{Guid.NewGuid().ToString().Substring(0, 8)}";
var orderEvent = new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = request.CustomerId,
TotalAmount = request.TotalAmount,
Items = request.Items,
Status = "Pending"
};
var published = await _producerService.PublishEventAsync("orders", orderEvent, orderId);
if (published)
{
return Accepted(new { OrderId = orderId, Message = "Order created successfully" });
}
return StatusCode(500, new { Message = "Failed to publish order event" });
}
}
public class CreateOrderRequest
{
public string CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public List<OrderItem> Items { get; set; }
}
}
Producer.Api/Program.cs
using Producer.Api.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// Registrar Kafka Producer como Singleton
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Producer.Api/appsettings.json
{
"Kafka": {
"BootstrapServers": "localhost:9092"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
Implementação: Consumer (C#/.NET)
Consumer.Service/Workers/OrderConsumerWorker.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Shared.Events;
namespace Consumer.Service.Workers
{
public class OrderConsumerWorker : BackgroundService
{
private readonly ILogger<OrderConsumerWorker> _logger;
private readonly IConsumer<string, string> _consumer;
private readonly string _topic = "orders";
public OrderConsumerWorker(
IConfiguration configuration,
ILogger<OrderConsumerWorker> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = "order-processor-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // Commit manual para controle
EnableAutoOffsetStore = false,
MaxPollIntervalMs = 300000, // 5 minutos
SessionTimeoutMs = 45000,
HeartbeatIntervalMs = 3000
};
_consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError($"Erro no consumer: {error.Reason}");
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
_logger.LogInformation($"Partições atribuídas: {string.Join(", ", partitions)}");
})
.Build();
_consumer.Subscribe(_topic);
_logger.LogInformation($"Consumer inscrito no tópico: {_topic}");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Order Consumer Worker iniciado");
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));
if (consumeResult != null)
{
await ProcessMessageAsync(consumeResult, stoppingToken);
// Commit manual após processamento bem-sucedido
_consumer.Commit(consumeResult);
_consumer.StoreOffset(consumeResult);
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"Erro ao consumir mensagem: {ex.Error.Reason}");
}
catch (Exception ex)
{
_logger.LogError(ex, "Erro inesperado no processamento");
// Não commitamos em caso de erro - mensagem será reprocessada
}
}
}
finally
{
_consumer.Close();
}
}
private async Task ProcessMessageAsync(ConsumeResult<string, string> result, CancellationToken cancellationToken)
{
_logger.LogInformation(
$"Mensagem recebida | Partition: {result.Partition} | Offset: {result.Offset} | Key: {result.Message.Key}"
);
try
{
var orderEvent = JsonConvert.DeserializeObject<OrderCreatedEvent>(result.Message.Value);
// Simulação de processamento
_logger.LogInformation($"Processando pedido: {orderEvent.OrderId}");
_logger.LogInformation($"Cliente: {orderEvent.CustomerId} | Total: R$ {orderEvent.TotalAmount:F2}");
_logger.LogInformation($"Itens: {orderEvent.Items.Count}");
// Aqui você implementaria a lógica de negócio:
// - Validar estoque
// - Reservar produtos
// - Calcular frete
// - Criar registro no banco de dados
// - Publicar novos eventos (OrderValidated, InventoryReserved, etc.)
await Task.Delay(100, cancellationToken); // Simula processamento
_logger.LogInformation($"Pedido {orderEvent.OrderId} processado com sucesso");
}
catch (JsonException ex)
{
_logger.LogError(ex, "Erro ao deserializar evento");
throw; // Rejeitamos a mensagem - será reprocessada
}
}
public override void Dispose()
{
_consumer?.Close();
_consumer?.Dispose();
base.Dispose();
}
}
}
Consumer.Service/Program.cs
using Consumer.Service.Workers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = Host.CreateApplicationBuilder(args);
// Registrar Workers
builder.Services.AddHostedService<OrderConsumerWorker>();
var host = builder.Build();
host.Run();
Consumer.Service/appsettings.json
{
"Kafka": {
"BootstrapServers": "localhost:9092"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
Testando o Sistema
Passo 1: Iniciar os Serviços
# Terminal 1: Producer API
cd Producer.Api
dotnet run
# Terminal 2: Consumer Service
cd Consumer.Service
dotnet run
Passo 2: Criar um Pedido
curl -X POST http://localhost:5000/api/orders \
-H "Content-Type: application/json" \
-d '{
"customerId": "customer-123",
"totalAmount": 599.90,
"items": [
{
"productId": "product-001",
"productName": "Notebook",
"quantity": 1,
"unitPrice": 599.90
}
]
}'
Passo 3: Verificar Logs
Producer.Api:
info: Producer.Api.Services.KafkaProducerService[0]
Evento publicado: OrderCreatedEvent | Topic: orders | Partition: 0 | Offset: 15
Consumer.Service:
info: Consumer.Service.Workers.OrderConsumerWorker[0]
Mensagem recebida | Partition: 0 | Offset: 15 | Key: order-a3f2b8c1
info: Consumer.Service.Workers.OrderConsumerWorker[0]
Processando pedido: order-a3f2b8c1
info: Consumer.Service.Workers.OrderConsumerWorker[0]
Cliente: customer-123 | Total: R$ 599.90
info: Consumer.Service.Workers.OrderConsumerWorker[0]
Pedido order-a3f2b8c1 processado com sucesso
Integração Multi-Linguagem
Uma das grandes vantagens do Kafka é a capacidade de integrar sistemas escritos em diferentes linguagens.
Consumer em Python (FastAPI)
# requirements.txt
confluent-kafka==2.3.0
fastapi==0.109.0
uvicorn==0.27.0
# consumer.py
import json
import logging
from confluent_kafka import Consumer, KafkaError
from typing import Dict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OrderConsumerPython:
def __init__(self):
self.config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-analytics-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
self.consumer = Consumer(self.config)
self.consumer.subscribe(['orders'])
logger.info("Python consumer iniciado")
def process_order(self, order_data: Dict):
"""Processamento de analytics do pedido"""
logger.info(f"Analytics: Pedido {order_data['orderId']}")
logger.info(f"Valor total: R$ {order_data['totalAmount']}")
# Aqui você implementaria:
# - Salvar em data warehouse (BigQuery, Snowflake)
# - Atualizar métricas em tempo real (Redis)
# - Enviar para sistema de BI
# - Treinar modelos de ML
def start_consuming(self):
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.info(f'Fim da partição {msg.partition()}')
else:
logger.error(f'Erro: {msg.error()}')
continue
# Processar mensagem
order_data = json.loads(msg.value().decode('utf-8'))
self.process_order(order_data)
# Commit manual
self.consumer.commit(msg)
except KeyboardInterrupt:
logger.info("Consumer interrompido")
finally:
self.consumer.close()
if __name__ == "__main__":
consumer = OrderConsumerPython()
consumer.start_consuming()
Consumer em Node.js (TypeScript)
// package.json
{
"dependencies": {
"kafkajs": "^2.2.4",
"typescript": "^5.3.3",
"@types/node": "^20.11.5"
}
}
// consumer.ts
import { Kafka, EachMessagePayload } from 'kafkajs';
interface OrderEvent {
orderId: string;
customerId: string;
totalAmount: number;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
}
class NotificationConsumer {
private kafka: Kafka;
private consumer;
constructor() {
this.kafka = new Kafka({
clientId: 'notification-service',
brokers: ['localhost:9092']
});
this.consumer = this.kafka.consumer({
groupId: 'notification-group',
sessionTimeout: 30000,
heartbeatInterval: 3000
});
}
async start() {
await this.consumer.connect();
await this.consumer.subscribe({
topic: 'orders',
fromBeginning: false
});
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
}
});
console.log('Node.js consumer iniciado');
}
private async handleMessage({ topic, partition, message }: EachMessagePayload) {
const orderEvent: OrderEvent = JSON.parse(message.value!.toString());
console.log(`Notificação: Novo pedido ${orderEvent.orderId}`);
// Implementação de notificações:
// - Enviar email de confirmação
// - Push notification mobile
// - SMS de confirmação
// - Webhook para sistemas externos
await this.sendEmailNotification(orderEvent);
await this.sendPushNotification(orderEvent);
}
private async sendEmailNotification(order: OrderEvent) {
console.log(`Email enviado para pedido ${order.orderId}`);
// Integração com SendGrid, AWS SES, etc.
}
private async sendPushNotification(order: OrderEvent) {
console.log(`Push notification enviado para pedido ${order.orderId}`);
// Integração com Firebase Cloud Messaging, OneSignal, etc.
}
}
const consumer = new NotificationConsumer();
consumer.start().catch(console.error);
Consumer em Java (Spring Boot)
// pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
// application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: inventory-service-group
auto-offset-reset: earliest
enable-auto-commit: false
// OrderEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String orderId;
private String customerId;
private BigDecimal totalAmount;
private List<OrderItem> items;
@Data
public static class OrderItem {
private String productId;
private Integer quantity;
private BigDecimal unitPrice;
}
}
// InventoryConsumer.java
@Service
@Slf4j
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
@KafkaListener(
topics = "orders",
groupId = "inventory-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void consume(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment
) {
try {
log.info("Mensagem recebida | Partition: {} | Offset: {}", partition, offset);
ObjectMapper mapper = new ObjectMapper();
OrderEvent orderEvent = mapper.readValue(message, OrderEvent.class);
// Processar reserva de estoque
boolean reserved = inventoryService.reserveInventory(orderEvent);
if (reserved) {
log.info("Estoque reservado para pedido: {}", orderEvent.getOrderId());
acknowledgment.acknowledge(); // Commit manual
} else {
log.warn("Estoque insuficiente para pedido: {}", orderEvent.getOrderId());
// Publicar evento de falha
// NÃO commitamos - mensagem será reprocessada
}
} catch (Exception e) {
log.error("Erro ao processar mensagem", e);
// Não commitar - dead letter queue
}
}
}
Padrões Avançados
1. Event Sourcing
Persistir todos os eventos como fonte da verdade ao invés de apenas o estado atual.
public class EventStore
{
private readonly IKafkaProducerService _producer;
private readonly string _eventStoreTopic = "event-store";
public async Task<bool> AppendEventAsync<T>(T @event) where T : BaseEvent
{
// Cada agregado tem sua própria partição (garantia de ordem)
var partitionKey = GetAggregateId(@event);
return await _producer.PublishEventAsync(_eventStoreTopic, @event, partitionKey);
}
public async Task<List<BaseEvent>> GetEventsAsync(string aggregateId)
{
// Ler todos os eventos de uma partição específica
// Reconstruir o estado a partir dos eventos
var events = new List<BaseEvent>();
// Implementação de leitura do Kafka
// usando Consumer com seek para offset específico
return events;
}
private string GetAggregateId(BaseEvent @event)
{
return @event switch
{
OrderCreatedEvent order => order.OrderId,
PaymentProcessedEvent payment => payment.OrderId,
_ => @event.EventId.ToString()
};
}
}
2. CQRS (Command Query Responsibility Segregation)
Separar comandos (write) de queries (read).
// Write Model (Command)
public class CreateOrderCommand
{
public string CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
}
public class OrderCommandHandler
{
private readonly IKafkaProducerService _producer;
public async Task<string> HandleAsync(CreateOrderCommand command)
{
var orderId = Guid.NewGuid().ToString();
var @event = new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = command.CustomerId,
Items = command.Items,
TotalAmount = command.Items.Sum(i => i.Quantity * i.UnitPrice)
};
await _producer.PublishEventAsync("orders", @event);
return orderId;
}
}
// Read Model (Query)
public class OrderQueryService
{
private readonly IMongoDatabase _readDatabase;
public async Task<OrderReadModel> GetOrderAsync(string orderId)
{
var collection = _readDatabase.GetCollection<OrderReadModel>("orders");
return await collection.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();
}
public async Task<List<OrderReadModel>> GetCustomerOrdersAsync(string customerId)
{
var collection = _readDatabase.GetCollection<OrderReadModel>("orders");
return await collection.Find(o => o.CustomerId == customerId)
.SortByDescending(o => o.CreatedAt)
.Limit(50)
.ToListAsync();
}
}
// Consumer atualiza o Read Model
public class OrderReadModelUpdater
{
private readonly IMongoDatabase _readDatabase;
public async Task UpdateAsync(OrderCreatedEvent @event)
{
var collection = _readDatabase.GetCollection<OrderReadModel>("orders");
var readModel = new OrderReadModel
{
OrderId = @event.OrderId,
CustomerId = @event.CustomerId,
TotalAmount = @event.TotalAmount,
Status = @event.Status,
CreatedAt = @event.Timestamp
};
await collection.InsertOneAsync(readModel);
}
}
3. Saga Pattern (Transações Distribuídas)
Coordenar transações entre múltiplos microservices.
// Orquestração de Saga
public class OrderSagaOrchestrator
{
private readonly IKafkaProducerService _producer;
private readonly ISagaStateStore _stateStore;
public async Task StartSagaAsync(OrderCreatedEvent orderEvent)
{
var sagaId = Guid.NewGuid().ToString();
// 1. Salvar estado inicial
await _stateStore.SaveStateAsync(sagaId, new SagaState
{
SagaId = sagaId,
OrderId = orderEvent.OrderId,
CurrentStep = SagaStep.ReserveInventory,
Status = SagaStatus.InProgress
});
// 2. Iniciar primeiro passo: reservar estoque
await _producer.PublishEventAsync("inventory-commands", new ReserveInventoryCommand
{
OrderId = orderEvent.OrderId,
Items = orderEvent.Items,
SagaId = sagaId
});
}
// Consumer de eventos de resposta
public async Task HandleInventoryReservedAsync(InventoryReservedEvent @event)
{
var state = await _stateStore.GetStateAsync(@event.SagaId);
if (@event.Success)
{
// 3. Próximo passo: processar pagamento
state.CurrentStep = SagaStep.ProcessPayment;
await _stateStore.UpdateStateAsync(state);
await _producer.PublishEventAsync("payment-commands", new ProcessPaymentCommand
{
OrderId = @event.OrderId,
Amount = state.TotalAmount,
SagaId = @event.SagaId
});
}
else
{
// Compensação: cancelar pedido
await CompensateAsync(state);
}
}
public async Task HandlePaymentProcessedAsync(PaymentProcessedEvent @event)
{
var state = await _stateStore.GetStateAsync(@event.SagaId);
if (@event.Success)
{
// 4. Finalizar: confirmar pedido
state.Status = SagaStatus.Completed;
await _stateStore.UpdateStateAsync(state);
await _producer.PublishEventAsync("orders", new OrderConfirmedEvent
{
OrderId = @event.OrderId,
SagaId = @event.SagaId
});
}
else
{
// Compensação: liberar estoque e cancelar pedido
await CompensateAsync(state);
}
}
private async Task CompensateAsync(SagaState state)
{
// Ações compensatórias na ordem inversa
if (state.CurrentStep >= SagaStep.ReserveInventory)
{
await _producer.PublishEventAsync("inventory-commands", new ReleaseInventoryCommand
{
OrderId = state.OrderId,
SagaId = state.SagaId
});
}
state.Status = SagaStatus.Failed;
await _stateStore.UpdateStateAsync(state);
}
}
public enum SagaStep
{
ReserveInventory = 1,
ProcessPayment = 2,
ShipOrder = 3,
Completed = 4
}
public enum SagaStatus
{
InProgress,
Completed,
Failed
}
4. Dead Letter Queue (DLQ)
Tratar mensagens que falharam no processamento.
public class ResilientConsumerWorker : BackgroundService
{
private const int MaxRetries = 3;
private readonly string _dlqTopic = "orders-dlq";
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var result = _consumer.Consume(TimeSpan.FromSeconds(1));
if (result != null)
{
var retryCount = GetRetryCount(result.Message.Headers);
try
{
await ProcessMessageAsync(result.Message.Value);
_consumer.Commit(result);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Erro no processamento (tentativa {retryCount + 1})");
if (retryCount >= MaxRetries)
{
// Enviar para DLQ após máximo de tentativas
await SendToDLQAsync(result.Message, ex.Message);
_consumer.Commit(result); // Commit para não reprocessar
}
else
{
// Incrementar contador e reprocessar
await RetryWithBackoffAsync(result.Message, retryCount);
}
}
}
}
}
private int GetRetryCount(Headers headers)
{
var retryHeader = headers.FirstOrDefault(h => h.Key == "retry-count");
if (retryHeader != null)
{
return int.Parse(Encoding.UTF8.GetString(retryHeader.GetValueBytes()));
}
return 0;
}
private async Task RetryWithBackoffAsync(Message<string, string> message, int retryCount)
{
// Exponential backoff: 1s, 2s, 4s, 8s...
var delayMs = (int)Math.Pow(2, retryCount) * 1000;
await Task.Delay(delayMs);
// Republicar com contador incrementado
var newHeaders = new Headers(message.Headers);
newHeaders.Add("retry-count", Encoding.UTF8.GetBytes((retryCount + 1).ToString()));
await _producer.ProduceAsync("orders-retry", new Message<string, string>
{
Key = message.Key,
Value = message.Value,
Headers = newHeaders
});
}
private async Task SendToDLQAsync(Message<string, string> message, string errorMessage)
{
_logger.LogWarning($"Enviando mensagem para DLQ: {message.Key}");
var dlqHeaders = new Headers(message.Headers);
dlqHeaders.Add("error-message", Encoding.UTF8.GetBytes(errorMessage));
dlqHeaders.Add("dlq-timestamp", Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("o")));
await _producer.ProduceAsync(_dlqTopic, new Message<string, string>
{
Key = message.Key,
Value = message.Value,
Headers = dlqHeaders
});
}
}
Monitoramento e Observabilidade
Métricas com Prometheus
// Install: dotnet add package prometheus-net.AspNetCore
// Program.cs
using Prometheus;
var builder = WebApplication.CreateBuilder(args);
// ... outros serviços
var app = builder.Build();
// Endpoint de métricas
app.UseMetricServer(); // /metrics
app.UseHttpMetrics();
// Métricas customizadas
public class KafkaMetrics
{
private static readonly Counter MessagesProduced = Metrics
.CreateCounter("kafka_messages_produced_total", "Total de mensagens produzidas",
new CounterConfiguration
{
LabelNames = new[] { "topic", "status" }
});
private static readonly Histogram MessageProcessingDuration = Metrics
.CreateHistogram("kafka_message_processing_duration_seconds", "Duração do processamento",
new HistogramConfiguration
{
LabelNames = new[] { "topic", "consumer_group" }
});
public static void RecordMessageProduced(string topic, bool success)
{
MessagesProduced.WithLabels(topic, success ? "success" : "failure").Inc();
}
public static IDisposable MeasureProcessingTime(string topic, string consumerGroup)
{
return MessageProcessingDuration.WithLabels(topic, consumerGroup).NewTimer();
}
}
// Uso no Producer
public async Task<bool> PublishEventAsync<T>(string topic, T eventData) where T : BaseEvent
{
var success = false;
try
{
var result = await _producer.ProduceAsync(topic, message);
success = result.Status == PersistenceStatus.Persisted;
return success;
}
finally
{
KafkaMetrics.RecordMessageProduced(topic, success);
}
}
// Uso no Consumer
private async Task ProcessMessageAsync(ConsumeResult<string, string> result)
{
using (KafkaMetrics.MeasureProcessingTime(result.Topic, "order-processor-group"))
{
// Processar mensagem
await DoWorkAsync(result);
}
}
Distributed Tracing com OpenTelemetry
// Install: dotnet add package OpenTelemetry.Extensions.Hosting
// Install: dotnet add package OpenTelemetry.Instrumentation.AspNetCore
// Program.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
builder.Services.AddOpenTelemetry()
.WithTracing(tracerProviderBuilder =>
{
tracerProviderBuilder
.AddSource("Producer.Api")
.SetResourceBuilder(ResourceBuilder.CreateDefault()
.AddService("producer-api"))
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddJaegerExporter(options =>
{
options.AgentHost = "localhost";
options.AgentPort = 6831;
});
});
// Producer com tracing
public class KafkaProducerService
{
private static readonly ActivitySource ActivitySource = new("Producer.Api");
public async Task<bool> PublishEventAsync<T>(string topic, T eventData) where T : BaseEvent
{
using var activity = ActivitySource.StartActivity("PublishEvent", ActivityKind.Producer);
activity?.SetTag("messaging.system", "kafka");
activity?.SetTag("messaging.destination", topic);
activity?.SetTag("event.type", eventData.EventType);
// Injetar trace context nos headers
var headers = new Headers();
Propagators.DefaultTextMapPropagator.Inject(
new PropagationContext(activity?.Context ?? default, Baggage.Current),
headers,
(h, key, value) => h.Add(key, Encoding.UTF8.GetBytes(value))
);
var message = new Message<string, string>
{
Key = eventData.EventId.ToString(),
Value = JsonConvert.SerializeObject(eventData),
Headers = headers
};
var result = await _producer.ProduceAsync(topic, message);
activity?.SetTag("messaging.kafka.partition", result.Partition.Value);
activity?.SetTag("messaging.kafka.offset", result.Offset.Value);
return result.Status == PersistenceStatus.Persisted;
}
}
Deploy em Produção
Kubernetes com Helm
values.yaml (Producer)
replicaCount: 3
image:
repository: myregistry.azurecr.io/producer-api
tag: "1.0.0"
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 80
ingress:
enabled: true
className: nginx
hosts:
- host: api.example.com
paths:
- path: /
pathType: Prefix
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 250m
memory: 256Mi
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
env:
- name: ASPNETCORE_ENVIRONMENT
value: "Production"
- name: Kafka__BootstrapServers
value: "kafka-cluster.kafka.svc.cluster.local:9092"
livenessProbe:
httpGet:
path: /health
port: 80
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 80
initialDelaySeconds: 10
periodSeconds: 5
deployment.yaml (Consumer)
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-consumer
spec:
replicas: 5
selector:
matchLabels:
app: order-consumer
template:
metadata:
labels:
app: order-consumer
spec:
containers:
- name: consumer
image: myregistry.azurecr.io/consumer-service:1.0.0
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 500m
memory: 512Mi
env:
- name: Kafka__BootstrapServers
value: "kafka-cluster.kafka.svc.cluster.local:9092"
- name: Kafka__GroupId
value: "order-processor-group"
livenessProbe:
exec:
command:
- /bin/sh
- -c
- ps aux | grep dotnet
initialDelaySeconds: 30
periodSeconds: 10
Kafka em Produção (Confluent Cloud)
// appsettings.Production.json
{
"Kafka": {
"BootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"SecurityProtocol": "SASL_SSL",
"SaslMechanism": "PLAIN",
"SaslUsername": "{{ KAFKA_API_KEY }}",
"SaslPassword": "{{ KAFKA_API_SECRET }}",
"SslCaLocation": "/etc/ssl/certs/ca-certificates.crt"
}
}
// Producer com autenticação
var config = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = configuration["Kafka:SaslUsername"],
SaslPassword = configuration["Kafka:SaslPassword"],
SslCaLocation = configuration["Kafka:SslCaLocation"],
// Configurações de produção
Acks = Acks.All,
EnableIdempotence = true,
MaxInFlight = 5,
MessageSendMaxRetries = int.MaxValue,
RequestTimeoutMs = 30000,
CompressionType = CompressionType.Snappy
};
Melhores Práticas
1. Design de Eventos
✅ Boas práticas:
- Eventos imutáveis (passado: "OrderCreated", não "CreateOrder")
- Payload completo (evitar joins posteriores)
- Schema versionado (compatibilidade retroativa)
- Metadata rico (eventId, timestamp, version, correlationId)
❌ Evitar:
- Eventos muito grandes (> 1MB)
- Informações sensíveis não criptografadas
- Acoplamento temporal
2. Particionamento
// Estratégia: Particionar por aggregate ID
public async Task<bool> PublishEventAsync<T>(string topic, T eventData) where T : BaseEvent
{
var partitionKey = GetPartitionKey(eventData);
// Todos os eventos do mesmo agregado vão para a mesma partição
// Garantia de ordem
return await _producer.PublishEventAsync(topic, eventData, partitionKey);
}
private string GetPartitionKey<T>(T eventData) where T : BaseEvent
{
return eventData switch
{
OrderCreatedEvent order => order.OrderId,
PaymentProcessedEvent payment => payment.OrderId,
_ => eventData.EventId.ToString()
};
}
3. Idempotência
public class IdempotentConsumer
{
private readonly IDistributedCache _cache;
private async Task<bool> IsProcessedAsync(string eventId)
{
var key = $"processed:{eventId}";
var value = await _cache.GetStringAsync(key);
return value != null;
}
private async Task MarkAsProcessedAsync(string eventId)
{
var key = $"processed:{eventId}";
var options = new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)
};
await _cache.SetStringAsync(key, "1", options);
}
protected async Task ProcessMessageAsync(ConsumeResult<string, string> result)
{
var @event = JsonConvert.DeserializeObject<BaseEvent>(result.Message.Value);
if (await IsProcessedAsync(@event.EventId.ToString()))
{
_logger.LogInformation($"Evento {event.EventId} já foi processado (idempotência)");
return; // Já processado, ignorar
}
try
{
await DoWorkAsync(@event);
await MarkAsProcessedAsync(@event.EventId.ToString());
}
catch (Exception ex)
{
_logger.LogError(ex, "Erro no processamento");
throw;
}
}
}
4. Schema Evolution
// V1 do evento
public class OrderCreatedEventV1 : BaseEvent
{
public string OrderId { get; set; }
public decimal TotalAmount { get; set; }
}
// V2 do evento (adiciona campo opcional - compatível com V1)
public class OrderCreatedEventV2 : BaseEvent
{
public string OrderId { get; set; }
public decimal TotalAmount { get; set; }
public string Currency { get; set; } = "BRL"; // Novo campo com default
}
// Consumer que aceita ambas as versões
public async Task ProcessMessageAsync(ConsumeResult<string, string> result)
{
var headers = result.Message.Headers;
var version = GetEventVersion(headers);
BaseEvent @event = version switch
{
1 => JsonConvert.DeserializeObject<OrderCreatedEventV1>(result.Message.Value),
2 => JsonConvert.DeserializeObject<OrderCreatedEventV2>(result.Message.Value),
_ => throw new NotSupportedException($"Versão {version} não suportada")
};
await ProcessOrderAsync(@event);
}
5. Configuração de Tópicos
# Criar tópico com configurações otimizadas
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 10 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 dias
--config segment.ms=86400000 \ # 1 dia
--config compression.type=snappy \
--config max.message.bytes=1048576 # 1MB
Troubleshooting
Problema 1: Lag de Consumo
Sintoma: Consumidores não conseguem acompanhar produção de eventos.
Diagnóstico:
# Verificar lag por consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor-group
# Output:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 1000 5000 4000 <- PROBLEMA!
Soluções:
- Escalar consumidores (adicionar réplicas até número de partições)
- Otimizar processamento (batch, cache, async)
- Aumentar partições (rebalanceamento automático)
// Otimização: Batch processing
public async Task ProcessBatchAsync(List<ConsumeResult<string, string>> batch)
{
var tasks = batch.Select(async result =>
{
await ProcessMessageAsync(result.Message.Value);
});
await Task.WhenAll(tasks);
// Commit em lote
_consumer.Commit(batch.Last());
}
Problema 2: Rebalancing Frequente
Sintoma: Logs mostram "Revoking partitions" constantemente.
Causa: Session timeout muito curto ou processamento muito lento.
Solução:
var config = new ConsumerConfig
{
// Aumentar timeouts
SessionTimeoutMs = 60000, // 60 segundos (padrão: 10s)
MaxPollIntervalMs = 300000, // 5 minutos (padrão: 5min)
HeartbeatIntervalMs = 3000, // 3 segundos
// Processar menos mensagens por poll
MaxPartitionFetchBytes = 1048576, // 1MB
FetchMinBytes = 1
};
Problema 3: Mensagens Duplicadas
Sintoma: Mesmo evento processado múltiplas vezes.
Causa: Commit falhou mas processamento foi bem-sucedido.
Solução: Implementar idempotência (ver seção de melhores práticas).
Problema 4: Dead Lock de Consumidor
Sintoma: Consumer para de processar sem erros.
Diagnóstico:
// Adicionar logging detalhado
.SetLogHandler((_, message) =>
{
_logger.LogInformation($"Kafka Log: {message.Level} | {message.Message}");
})
Solução: Configurar health checks e reiniciar consumer automaticamente.
Conclusão
Event-Driven Architecture com C#, .NET e Kafka oferece uma base sólida para construir sistemas modernos, escaláveis e resilientes. Ao longo deste guia, cobrimos:
✅ Fundamentos de EDA e seus benefícios ✅ Apache Kafka como backbone de mensageria ✅ Implementação completa de Producer e Consumer em C#/.NET ✅ Integração multi-linguagem (Python, Node.js, Java) ✅ Padrões avançados (Event Sourcing, CQRS, Saga) ✅ Monitoramento com Prometheus e OpenTelemetry ✅ Deploy em Kubernetes e Kafka Cloud ✅ Melhores práticas e troubleshooting
Próximos Passos
- Experimentar: Implemente o código deste guia localmente
- Aprofundar: Estude Event Sourcing e CQRS em produção
- Escalar: Configure Kafka cluster em produção (Confluent, MSK)
- Monitorar: Integre Prometheus, Grafana e distributed tracing
- Automatizar: CI/CD com testes de integração Kafka
Recursos Adicionais
- Confluent Kafka .NET Client
- Apache Kafka Documentation
- Event-Driven Architecture Patterns
- CQRS Pattern
- Saga Pattern
Autor: Kaique Yamamoto Data: 28 de janeiro de 2026
Sobre o Autor: Full Stack Developer e AI Engineer com 10+ anos de experiência em sistemas distribuídos, mensageria (Kafka, RabbitMQ), DevOps e arquitetura de microservices. Especialista em C#/.NET, Java, Python e Node.js.
Tags: #event-driven-architecture #csharp #dotnet #kafka #microservices #mensageria #arquitetura #sistemas-distribuidos
Artigos Relacionados
Arquitetura de Monorepo Distribuída: Baseada no Livro Migrando Monolitos para Microserviços
Monorepo distribuído une um único repositório à execução em serviços independentes. Este artigo conecta as ideias do livro Migrando Monolitos para Microserviços com práticas de monorepo, bounded contexts e evolução gradual.
Guia Completo: Estruturando Monorepos com Turbopack
Aprenda a criar e gerenciar monorepos modernos usando Turbopack, o successor do Webpack criado pela Vercel. Guia prático com exemplos reais de configuração, otimizações e melhores práticas para escalar suas aplicações.
Observabilidade Escalável: Zabbix, Prometheus e Grafana em Infraestruturas de Todos os Portes
Guia prático sobre como implementar monitoramento com Zabbix, Prometheus e Grafana, adaptado para infraestruturas de pequeno, médio e grande porte.