ArquiteturaDesenvolvimento

Event-Driven Architecture com C#, .NET e Kafka: Guia Prático

há 2 diasPor Kaique Yamamoto

Aprenda a construir sistemas escaláveis usando Event-Driven Architecture com C#, .NET e Apache Kafka. Guia completo com exemplos práticos, integração multi-linguagem e melhores práticas de produção.

Introdução

Sistemas modernos exigem escalabilidade, resiliência e desacoplamento. A arquitetura tradicional baseada em requisição-resposta (request-response) mostra suas limitações quando precisamos processar milhões de eventos, integrar dezenas de microservices ou garantir consistência eventual em sistemas distribuídos.

Event-Driven Architecture (EDA) surge como solução para esses desafios, transformando a forma como aplicações se comunicam. Ao invés de chamadas síncronas diretas, componentes publicam e consomem eventos assíncronos através de um broker de mensagens.

Neste guia completo, você aprenderá a construir sistemas EDA de produção usando C#, .NET e Apache Kafka, o backbone de mensageria mais usado no mundo. Vamos cobrir desde os fundamentos até padrões avançados, com código real e exemplos práticos.

⏱️ Tempo de leitura: ~20 minutos


O Que é Event-Driven Architecture?

Event-Driven Architecture (EDA) é um padrão arquitetural onde componentes de software reagem a eventos ao invés de executar chamadas diretas entre si. Um evento representa uma mudança de estado significativa no sistema.

Componentes Principais

1. Produtor (Producer) Componente que detecta mudanças de estado e publica eventos.

2. Broker de Mensagens Infraestrutura que recebe, armazena e distribui eventos (ex: Kafka, RabbitMQ).

3. Consumidor (Consumer) Componente que se inscreve em eventos e executa ações baseadas neles.

4. Evento (Event) Mensagem imutável que descreve algo que aconteceu no passado.

Exemplo de Evento

{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "eventType": "OrderCreated",
  "timestamp": "2026-01-28T10:30:00Z",
  "aggregateId": "order-12345",
  "version": 1,
  "payload": {
    "orderId": "order-12345",
    "customerId": "customer-789",
    "totalAmount": 299.90,
    "items": [
      {
        "productId": "product-456",
        "quantity": 2,
        "unitPrice": 149.95
      }
    ]
  }
}

Por Que Usar Event-Driven Architecture?

Vantagens

1. Desacoplamento Produtores não conhecem consumidores. Adicione novos consumidores sem modificar produtores.

2. Escalabilidade Processe eventos em paralelo, escale consumidores independentemente.

3. Resiliência Falhas em consumidores não afetam produtores. Eventos são persistidos para reprocessamento.

4. Auditoria Event log serve como fonte única da verdade (Event Sourcing).

5. Integração Multi-Sistema Conecte sistemas heterogêneos (C#, Java, Python, Node.js) através de eventos.

6. Processamento Assíncrono Operações lentas (envio de email, processamento de imagem) não bloqueiam o fluxo principal.

Quando Usar EDA?

Use quando:

  • Sistema distribuído com múltiplos microservices
  • Necessidade de escalabilidade horizontal
  • Processamento assíncrono de tarefas
  • Integração entre sistemas heterogêneos
  • Auditoria completa de operações (Event Sourcing)
  • Real-time analytics e streaming de dados

Evite quando:

  • Sistema monolítico simples
  • Requisitos de consistência imediata
  • Operações síncronas críticas (ex: pagamentos em tempo real)
  • Equipe sem experiência em sistemas distribuídos

Apache Kafka: O Backbone do EDA

Apache Kafka é uma plataforma distribuída de streaming de eventos, desenvolvida originalmente pelo LinkedIn e open-source desde 2011.

Por Que Kafka?

FeatureKafkaRabbitMQAWS SQS
ThroughputMilhões/segMilhares/segMilhares/seg
PersistênciaDurável (log)OpcionalLimitada
RetençãoConfigurável (dias/TB)Até consumo14 dias max
OrdemGarantida por partiçãoPor queueFIFO limitado
ReplaySimNãoNão
EscalabilidadeHorizontalVertical/HorizontalGerenciado

Conceitos do Kafka

Topic: Canal lógico onde eventos são publicados (ex: orders, payments, notifications).

Partition: Divisão física de um tópico para paralelização. Cada partição é ordenada.

Offset: ID sequencial único de cada mensagem dentro de uma partição.

Consumer Group: Grupo de consumidores que divide o trabalho de processar mensagens.

Replication Factor: Número de réplicas de cada partição para resiliência.


Configurando o Ambiente

Passo 1: Kafka com Docker

Crie um arquivo docker-compose.yml:

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

Inicie o ambiente:

docker-compose up -d

Acesse o Kafka UI em http://localhost:8080.

Passo 2: Criar Projeto .NET

# Criar solution
dotnet new sln -n EventDrivenDemo

# Criar projeto Producer
dotnet new webapi -n Producer.Api
dotnet sln add Producer.Api/Producer.Api.csproj

# Criar projeto Consumer
dotnet new worker -n Consumer.Service
dotnet sln add Consumer.Service/Consumer.Service.csproj

# Criar biblioteca compartilhada
dotnet new classlib -n Shared.Events
dotnet sln add Shared.Events/Shared.Events.csproj

Passo 3: Instalar Dependências

# No Producer.Api
cd Producer.Api
dotnet add package Confluent.Kafka
dotnet add package Newtonsoft.Json

# No Consumer.Service
cd ../Consumer.Service
dotnet add package Confluent.Kafka
dotnet add package Newtonsoft.Json
dotnet add package Microsoft.Extensions.Hosting

# No Shared.Events (referenciado por ambos)
cd ../Shared.Events

Implementação: Eventos Compartilhados

Defina os eventos em Shared.Events para serem reutilizados por produtores e consumidores.

Shared.Events/BaseEvent.cs

using System;

namespace Shared.Events
{
    public abstract class BaseEvent
    {
        public Guid EventId { get; set; } = Guid.NewGuid();
        public DateTime Timestamp { get; set; } = DateTime.UtcNow;
        public string EventType { get; set; }
        public int Version { get; set; } = 1;

        protected BaseEvent()
        {
            EventType = GetType().Name;
        }
    }
}

Shared.Events/OrderCreatedEvent.cs

using System.Collections.Generic;

namespace Shared.Events
{
    public class OrderCreatedEvent : BaseEvent
    {
        public string OrderId { get; set; }
        public string CustomerId { get; set; }
        public decimal TotalAmount { get; set; }
        public List<OrderItem> Items { get; set; }
        public string Status { get; set; } = "Pending";
    }

    public class OrderItem
    {
        public string ProductId { get; set; }
        public string ProductName { get; set; }
        public int Quantity { get; set; }
        public decimal UnitPrice { get; set; }
    }
}

Shared.Events/PaymentProcessedEvent.cs

namespace Shared.Events
{
    public class PaymentProcessedEvent : BaseEvent
    {
        public string PaymentId { get; set; }
        public string OrderId { get; set; }
        public decimal Amount { get; set; }
        public string PaymentMethod { get; set; }
        public bool Success { get; set; }
        public string TransactionId { get; set; }
    }
}

Implementação: Producer (C#/.NET)

Producer.Api/Services/KafkaProducerService.cs

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Shared.Events;

namespace Producer.Api.Services
{
    public interface IKafkaProducerService
    {
        Task<bool> PublishEventAsync<T>(string topic, T eventData, string key = null) where T : BaseEvent;
    }

    public class KafkaProducerService : IKafkaProducerService, IDisposable
    {
        private readonly IProducer<string, string> _producer;
        private readonly ILogger<KafkaProducerService> _logger;

        public KafkaProducerService(
            IConfiguration configuration,
            ILogger<KafkaProducerService> logger)
        {
            _logger = logger;

            var config = new ProducerConfig
            {
                BootstrapServers = configuration["Kafka:BootstrapServers"],
                ClientId = "producer-api",
                Acks = Acks.All, // Garante que todas as réplicas receberam
                EnableIdempotence = true, // Evita duplicação
                MaxInFlight = 5,
                MessageSendMaxRetries = 10,
                RetryBackoffMs = 100,
                CompressionType = CompressionType.Snappy // Compressão
            };

            _producer = new ProducerBuilder<string, string>(config)
                .SetErrorHandler((_, error) =>
                {
                    _logger.LogError($"Erro no producer: {error.Reason}");
                })
                .Build();

            _logger.LogInformation("Kafka Producer inicializado");
        }

        public async Task<bool> PublishEventAsync<T>(string topic, T eventData, string key = null)
            where T : BaseEvent
        {
            try
            {
                var eventKey = key ?? eventData.EventId.ToString();
                var eventValue = JsonConvert.SerializeObject(eventData, new JsonSerializerSettings
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore,
                    NullValueHandling = NullValueHandling.Ignore
                });

                var message = new Message<string, string>
                {
                    Key = eventKey,
                    Value = eventValue,
                    Headers = new Headers
                    {
                        { "event-type", System.Text.Encoding.UTF8.GetBytes(eventData.EventType) },
                        { "timestamp", System.Text.Encoding.UTF8.GetBytes(eventData.Timestamp.ToString("o")) }
                    }
                };

                var result = await _producer.ProduceAsync(topic, message);

                _logger.LogInformation(
                    $"Evento publicado: {eventData.EventType} | Topic: {topic} | Partition: {result.Partition} | Offset: {result.Offset}"
                );

                return result.Status == PersistenceStatus.Persisted;
            }
            catch (ProduceException<string, string> ex)
            {
                _logger.LogError(ex, $"Erro ao publicar evento: {ex.Error.Reason}");
                return false;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Erro inesperado ao publicar evento");
                return false;
            }
        }

        public void Dispose()
        {
            _producer?.Flush(TimeSpan.FromSeconds(10));
            _producer?.Dispose();
        }
    }
}

Producer.Api/Controllers/OrdersController.cs

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Producer.Api.Services;
using Shared.Events;

namespace Producer.Api.Controllers
{
    [ApiController]
    [Route("api/[controller]")]
    public class OrdersController : ControllerBase
    {
        private readonly IKafkaProducerService _producerService;

        public OrdersController(IKafkaProducerService producerService)
        {
            _producerService = producerService;
        }

        [HttpPost]
        public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
        {
            var orderId = $"order-{Guid.NewGuid().ToString().Substring(0, 8)}";

            var orderEvent = new OrderCreatedEvent
            {
                OrderId = orderId,
                CustomerId = request.CustomerId,
                TotalAmount = request.TotalAmount,
                Items = request.Items,
                Status = "Pending"
            };

            var published = await _producerService.PublishEventAsync("orders", orderEvent, orderId);

            if (published)
            {
                return Accepted(new { OrderId = orderId, Message = "Order created successfully" });
            }

            return StatusCode(500, new { Message = "Failed to publish order event" });
        }
    }

    public class CreateOrderRequest
    {
        public string CustomerId { get; set; }
        public decimal TotalAmount { get; set; }
        public List<OrderItem> Items { get; set; }
    }
}

Producer.Api/Program.cs

using Producer.Api.Services;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// Registrar Kafka Producer como Singleton
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();

app.Run();

Producer.Api/appsettings.json

{
  "Kafka": {
    "BootstrapServers": "localhost:9092"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  }
}

Implementação: Consumer (C#/.NET)

Consumer.Service/Workers/OrderConsumerWorker.cs

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Shared.Events;

namespace Consumer.Service.Workers
{
    public class OrderConsumerWorker : BackgroundService
    {
        private readonly ILogger<OrderConsumerWorker> _logger;
        private readonly IConsumer<string, string> _consumer;
        private readonly string _topic = "orders";

        public OrderConsumerWorker(
            IConfiguration configuration,
            ILogger<OrderConsumerWorker> logger)
        {
            _logger = logger;

            var config = new ConsumerConfig
            {
                BootstrapServers = configuration["Kafka:BootstrapServers"],
                GroupId = "order-processor-group",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false, // Commit manual para controle
                EnableAutoOffsetStore = false,
                MaxPollIntervalMs = 300000, // 5 minutos
                SessionTimeoutMs = 45000,
                HeartbeatIntervalMs = 3000
            };

            _consumer = new ConsumerBuilder<string, string>(config)
                .SetErrorHandler((_, error) =>
                {
                    _logger.LogError($"Erro no consumer: {error.Reason}");
                })
                .SetPartitionsAssignedHandler((c, partitions) =>
                {
                    _logger.LogInformation($"Partições atribuídas: {string.Join(", ", partitions)}");
                })
                .Build();

            _consumer.Subscribe(_topic);
            _logger.LogInformation($"Consumer inscrito no tópico: {_topic}");
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Order Consumer Worker iniciado");

            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));

                        if (consumeResult != null)
                        {
                            await ProcessMessageAsync(consumeResult, stoppingToken);

                            // Commit manual após processamento bem-sucedido
                            _consumer.Commit(consumeResult);
                            _consumer.StoreOffset(consumeResult);
                        }
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, $"Erro ao consumir mensagem: {ex.Error.Reason}");
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Erro inesperado no processamento");
                        // Não commitamos em caso de erro - mensagem será reprocessada
                    }
                }
            }
            finally
            {
                _consumer.Close();
            }
        }

        private async Task ProcessMessageAsync(ConsumeResult<string, string> result, CancellationToken cancellationToken)
        {
            _logger.LogInformation(
                $"Mensagem recebida | Partition: {result.Partition} | Offset: {result.Offset} | Key: {result.Message.Key}"
            );

            try
            {
                var orderEvent = JsonConvert.DeserializeObject<OrderCreatedEvent>(result.Message.Value);

                // Simulação de processamento
                _logger.LogInformation($"Processando pedido: {orderEvent.OrderId}");
                _logger.LogInformation($"Cliente: {orderEvent.CustomerId} | Total: R$ {orderEvent.TotalAmount:F2}");
                _logger.LogInformation($"Itens: {orderEvent.Items.Count}");

                // Aqui você implementaria a lógica de negócio:
                // - Validar estoque
                // - Reservar produtos
                // - Calcular frete
                // - Criar registro no banco de dados
                // - Publicar novos eventos (OrderValidated, InventoryReserved, etc.)

                await Task.Delay(100, cancellationToken); // Simula processamento

                _logger.LogInformation($"Pedido {orderEvent.OrderId} processado com sucesso");
            }
            catch (JsonException ex)
            {
                _logger.LogError(ex, "Erro ao deserializar evento");
                throw; // Rejeitamos a mensagem - será reprocessada
            }
        }

        public override void Dispose()
        {
            _consumer?.Close();
            _consumer?.Dispose();
            base.Dispose();
        }
    }
}

Consumer.Service/Program.cs

using Consumer.Service.Workers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var builder = Host.CreateApplicationBuilder(args);

// Registrar Workers
builder.Services.AddHostedService<OrderConsumerWorker>();

var host = builder.Build();
host.Run();

Consumer.Service/appsettings.json

{
  "Kafka": {
    "BootstrapServers": "localhost:9092"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  }
}

Testando o Sistema

Passo 1: Iniciar os Serviços

# Terminal 1: Producer API
cd Producer.Api
dotnet run

# Terminal 2: Consumer Service
cd Consumer.Service
dotnet run

Passo 2: Criar um Pedido

curl -X POST http://localhost:5000/api/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "customer-123",
    "totalAmount": 599.90,
    "items": [
      {
        "productId": "product-001",
        "productName": "Notebook",
        "quantity": 1,
        "unitPrice": 599.90
      }
    ]
  }'

Passo 3: Verificar Logs

Producer.Api:

info: Producer.Api.Services.KafkaProducerService[0]
      Evento publicado: OrderCreatedEvent | Topic: orders | Partition: 0 | Offset: 15

Consumer.Service:

info: Consumer.Service.Workers.OrderConsumerWorker[0]
      Mensagem recebida | Partition: 0 | Offset: 15 | Key: order-a3f2b8c1
info: Consumer.Service.Workers.OrderConsumerWorker[0]
      Processando pedido: order-a3f2b8c1
info: Consumer.Service.Workers.OrderConsumerWorker[0]
      Cliente: customer-123 | Total: R$ 599.90
info: Consumer.Service.Workers.OrderConsumerWorker[0]
      Pedido order-a3f2b8c1 processado com sucesso

Integração Multi-Linguagem

Uma das grandes vantagens do Kafka é a capacidade de integrar sistemas escritos em diferentes linguagens.

Consumer em Python (FastAPI)

# requirements.txt
confluent-kafka==2.3.0
fastapi==0.109.0
uvicorn==0.27.0

# consumer.py
import json
import logging
from confluent_kafka import Consumer, KafkaError
from typing import Dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OrderConsumerPython:
    def __init__(self):
        self.config = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'python-analytics-group',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False
        }
        self.consumer = Consumer(self.config)
        self.consumer.subscribe(['orders'])
        logger.info("Python consumer iniciado")

    def process_order(self, order_data: Dict):
        """Processamento de analytics do pedido"""
        logger.info(f"Analytics: Pedido {order_data['orderId']}")
        logger.info(f"Valor total: R$ {order_data['totalAmount']}")

        # Aqui você implementaria:
        # - Salvar em data warehouse (BigQuery, Snowflake)
        # - Atualizar métricas em tempo real (Redis)
        # - Enviar para sistema de BI
        # - Treinar modelos de ML

    def start_consuming(self):
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        logger.info(f'Fim da partição {msg.partition()}')
                    else:
                        logger.error(f'Erro: {msg.error()}')
                    continue

                # Processar mensagem
                order_data = json.loads(msg.value().decode('utf-8'))
                self.process_order(order_data)

                # Commit manual
                self.consumer.commit(msg)

        except KeyboardInterrupt:
            logger.info("Consumer interrompido")
        finally:
            self.consumer.close()

if __name__ == "__main__":
    consumer = OrderConsumerPython()
    consumer.start_consuming()

Consumer em Node.js (TypeScript)

// package.json
{
  "dependencies": {
    "kafkajs": "^2.2.4",
    "typescript": "^5.3.3",
    "@types/node": "^20.11.5"
  }
}

// consumer.ts
import { Kafka, EachMessagePayload } from 'kafkajs';

interface OrderEvent {
  orderId: string;
  customerId: string;
  totalAmount: number;
  items: Array<{
    productId: string;
    quantity: number;
    unitPrice: number;
  }>;
}

class NotificationConsumer {
  private kafka: Kafka;
  private consumer;

  constructor() {
    this.kafka = new Kafka({
      clientId: 'notification-service',
      brokers: ['localhost:9092']
    });

    this.consumer = this.kafka.consumer({
      groupId: 'notification-group',
      sessionTimeout: 30000,
      heartbeatInterval: 3000
    });
  }

  async start() {
    await this.consumer.connect();
    await this.consumer.subscribe({
      topic: 'orders',
      fromBeginning: false
    });

    await this.consumer.run({
      eachMessage: async (payload: EachMessagePayload) => {
        await this.handleMessage(payload);
      }
    });

    console.log('Node.js consumer iniciado');
  }

  private async handleMessage({ topic, partition, message }: EachMessagePayload) {
    const orderEvent: OrderEvent = JSON.parse(message.value!.toString());

    console.log(`Notificação: Novo pedido ${orderEvent.orderId}`);

    // Implementação de notificações:
    // - Enviar email de confirmação
    // - Push notification mobile
    // - SMS de confirmação
    // - Webhook para sistemas externos

    await this.sendEmailNotification(orderEvent);
    await this.sendPushNotification(orderEvent);
  }

  private async sendEmailNotification(order: OrderEvent) {
    console.log(`Email enviado para pedido ${order.orderId}`);
    // Integração com SendGrid, AWS SES, etc.
  }

  private async sendPushNotification(order: OrderEvent) {
    console.log(`Push notification enviado para pedido ${order.orderId}`);
    // Integração com Firebase Cloud Messaging, OneSignal, etc.
  }
}

const consumer = new NotificationConsumer();
consumer.start().catch(console.error);

Consumer em Java (Spring Boot)

// pom.xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

// application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: inventory-service-group
      auto-offset-reset: earliest
      enable-auto-commit: false

// OrderEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String customerId;
    private BigDecimal totalAmount;
    private List<OrderItem> items;

    @Data
    public static class OrderItem {
        private String productId;
        private Integer quantity;
        private BigDecimal unitPrice;
    }
}

// InventoryConsumer.java
@Service
@Slf4j
public class InventoryConsumer {

    @Autowired
    private InventoryService inventoryService;

    @KafkaListener(
        topics = "orders",
        groupId = "inventory-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) long offset,
        Acknowledgment acknowledgment
    ) {
        try {
            log.info("Mensagem recebida | Partition: {} | Offset: {}", partition, offset);

            ObjectMapper mapper = new ObjectMapper();
            OrderEvent orderEvent = mapper.readValue(message, OrderEvent.class);

            // Processar reserva de estoque
            boolean reserved = inventoryService.reserveInventory(orderEvent);

            if (reserved) {
                log.info("Estoque reservado para pedido: {}", orderEvent.getOrderId());
                acknowledgment.acknowledge(); // Commit manual
            } else {
                log.warn("Estoque insuficiente para pedido: {}", orderEvent.getOrderId());
                // Publicar evento de falha
                // NÃO commitamos - mensagem será reprocessada
            }

        } catch (Exception e) {
            log.error("Erro ao processar mensagem", e);
            // Não commitar - dead letter queue
        }
    }
}

Padrões Avançados

1. Event Sourcing

Persistir todos os eventos como fonte da verdade ao invés de apenas o estado atual.

public class EventStore
{
    private readonly IKafkaProducerService _producer;
    private readonly string _eventStoreTopic = "event-store";

    public async Task<bool> AppendEventAsync<T>(T @event) where T : BaseEvent
    {
        // Cada agregado tem sua própria partição (garantia de ordem)
        var partitionKey = GetAggregateId(@event);

        return await _producer.PublishEventAsync(_eventStoreTopic, @event, partitionKey);
    }

    public async Task<List<BaseEvent>> GetEventsAsync(string aggregateId)
    {
        // Ler todos os eventos de uma partição específica
        // Reconstruir o estado a partir dos eventos
        var events = new List<BaseEvent>();

        // Implementação de leitura do Kafka
        // usando Consumer com seek para offset específico

        return events;
    }

    private string GetAggregateId(BaseEvent @event)
    {
        return @event switch
        {
            OrderCreatedEvent order => order.OrderId,
            PaymentProcessedEvent payment => payment.OrderId,
            _ => @event.EventId.ToString()
        };
    }
}

2. CQRS (Command Query Responsibility Segregation)

Separar comandos (write) de queries (read).

// Write Model (Command)
public class CreateOrderCommand
{
    public string CustomerId { get; set; }
    public List<OrderItem> Items { get; set; }
}

public class OrderCommandHandler
{
    private readonly IKafkaProducerService _producer;

    public async Task<string> HandleAsync(CreateOrderCommand command)
    {
        var orderId = Guid.NewGuid().ToString();

        var @event = new OrderCreatedEvent
        {
            OrderId = orderId,
            CustomerId = command.CustomerId,
            Items = command.Items,
            TotalAmount = command.Items.Sum(i => i.Quantity * i.UnitPrice)
        };

        await _producer.PublishEventAsync("orders", @event);

        return orderId;
    }
}

// Read Model (Query)
public class OrderQueryService
{
    private readonly IMongoDatabase _readDatabase;

    public async Task<OrderReadModel> GetOrderAsync(string orderId)
    {
        var collection = _readDatabase.GetCollection<OrderReadModel>("orders");
        return await collection.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();
    }

    public async Task<List<OrderReadModel>> GetCustomerOrdersAsync(string customerId)
    {
        var collection = _readDatabase.GetCollection<OrderReadModel>("orders");
        return await collection.Find(o => o.CustomerId == customerId)
            .SortByDescending(o => o.CreatedAt)
            .Limit(50)
            .ToListAsync();
    }
}

// Consumer atualiza o Read Model
public class OrderReadModelUpdater
{
    private readonly IMongoDatabase _readDatabase;

    public async Task UpdateAsync(OrderCreatedEvent @event)
    {
        var collection = _readDatabase.GetCollection<OrderReadModel>("orders");

        var readModel = new OrderReadModel
        {
            OrderId = @event.OrderId,
            CustomerId = @event.CustomerId,
            TotalAmount = @event.TotalAmount,
            Status = @event.Status,
            CreatedAt = @event.Timestamp
        };

        await collection.InsertOneAsync(readModel);
    }
}

3. Saga Pattern (Transações Distribuídas)

Coordenar transações entre múltiplos microservices.

// Orquestração de Saga
public class OrderSagaOrchestrator
{
    private readonly IKafkaProducerService _producer;
    private readonly ISagaStateStore _stateStore;

    public async Task StartSagaAsync(OrderCreatedEvent orderEvent)
    {
        var sagaId = Guid.NewGuid().ToString();

        // 1. Salvar estado inicial
        await _stateStore.SaveStateAsync(sagaId, new SagaState
        {
            SagaId = sagaId,
            OrderId = orderEvent.OrderId,
            CurrentStep = SagaStep.ReserveInventory,
            Status = SagaStatus.InProgress
        });

        // 2. Iniciar primeiro passo: reservar estoque
        await _producer.PublishEventAsync("inventory-commands", new ReserveInventoryCommand
        {
            OrderId = orderEvent.OrderId,
            Items = orderEvent.Items,
            SagaId = sagaId
        });
    }

    // Consumer de eventos de resposta
    public async Task HandleInventoryReservedAsync(InventoryReservedEvent @event)
    {
        var state = await _stateStore.GetStateAsync(@event.SagaId);

        if (@event.Success)
        {
            // 3. Próximo passo: processar pagamento
            state.CurrentStep = SagaStep.ProcessPayment;
            await _stateStore.UpdateStateAsync(state);

            await _producer.PublishEventAsync("payment-commands", new ProcessPaymentCommand
            {
                OrderId = @event.OrderId,
                Amount = state.TotalAmount,
                SagaId = @event.SagaId
            });
        }
        else
        {
            // Compensação: cancelar pedido
            await CompensateAsync(state);
        }
    }

    public async Task HandlePaymentProcessedAsync(PaymentProcessedEvent @event)
    {
        var state = await _stateStore.GetStateAsync(@event.SagaId);

        if (@event.Success)
        {
            // 4. Finalizar: confirmar pedido
            state.Status = SagaStatus.Completed;
            await _stateStore.UpdateStateAsync(state);

            await _producer.PublishEventAsync("orders", new OrderConfirmedEvent
            {
                OrderId = @event.OrderId,
                SagaId = @event.SagaId
            });
        }
        else
        {
            // Compensação: liberar estoque e cancelar pedido
            await CompensateAsync(state);
        }
    }

    private async Task CompensateAsync(SagaState state)
    {
        // Ações compensatórias na ordem inversa
        if (state.CurrentStep >= SagaStep.ReserveInventory)
        {
            await _producer.PublishEventAsync("inventory-commands", new ReleaseInventoryCommand
            {
                OrderId = state.OrderId,
                SagaId = state.SagaId
            });
        }

        state.Status = SagaStatus.Failed;
        await _stateStore.UpdateStateAsync(state);
    }
}

public enum SagaStep
{
    ReserveInventory = 1,
    ProcessPayment = 2,
    ShipOrder = 3,
    Completed = 4
}

public enum SagaStatus
{
    InProgress,
    Completed,
    Failed
}

4. Dead Letter Queue (DLQ)

Tratar mensagens que falharam no processamento.

public class ResilientConsumerWorker : BackgroundService
{
    private const int MaxRetries = 3;
    private readonly string _dlqTopic = "orders-dlq";

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var result = _consumer.Consume(TimeSpan.FromSeconds(1));

            if (result != null)
            {
                var retryCount = GetRetryCount(result.Message.Headers);

                try
                {
                    await ProcessMessageAsync(result.Message.Value);
                    _consumer.Commit(result);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, $"Erro no processamento (tentativa {retryCount + 1})");

                    if (retryCount >= MaxRetries)
                    {
                        // Enviar para DLQ após máximo de tentativas
                        await SendToDLQAsync(result.Message, ex.Message);
                        _consumer.Commit(result); // Commit para não reprocessar
                    }
                    else
                    {
                        // Incrementar contador e reprocessar
                        await RetryWithBackoffAsync(result.Message, retryCount);
                    }
                }
            }
        }
    }

    private int GetRetryCount(Headers headers)
    {
        var retryHeader = headers.FirstOrDefault(h => h.Key == "retry-count");
        if (retryHeader != null)
        {
            return int.Parse(Encoding.UTF8.GetString(retryHeader.GetValueBytes()));
        }
        return 0;
    }

    private async Task RetryWithBackoffAsync(Message<string, string> message, int retryCount)
    {
        // Exponential backoff: 1s, 2s, 4s, 8s...
        var delayMs = (int)Math.Pow(2, retryCount) * 1000;
        await Task.Delay(delayMs);

        // Republicar com contador incrementado
        var newHeaders = new Headers(message.Headers);
        newHeaders.Add("retry-count", Encoding.UTF8.GetBytes((retryCount + 1).ToString()));

        await _producer.ProduceAsync("orders-retry", new Message<string, string>
        {
            Key = message.Key,
            Value = message.Value,
            Headers = newHeaders
        });
    }

    private async Task SendToDLQAsync(Message<string, string> message, string errorMessage)
    {
        _logger.LogWarning($"Enviando mensagem para DLQ: {message.Key}");

        var dlqHeaders = new Headers(message.Headers);
        dlqHeaders.Add("error-message", Encoding.UTF8.GetBytes(errorMessage));
        dlqHeaders.Add("dlq-timestamp", Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("o")));

        await _producer.ProduceAsync(_dlqTopic, new Message<string, string>
        {
            Key = message.Key,
            Value = message.Value,
            Headers = dlqHeaders
        });
    }
}

Monitoramento e Observabilidade

Métricas com Prometheus

// Install: dotnet add package prometheus-net.AspNetCore

// Program.cs
using Prometheus;

var builder = WebApplication.CreateBuilder(args);

// ... outros serviços

var app = builder.Build();

// Endpoint de métricas
app.UseMetricServer(); // /metrics
app.UseHttpMetrics();

// Métricas customizadas
public class KafkaMetrics
{
    private static readonly Counter MessagesProduced = Metrics
        .CreateCounter("kafka_messages_produced_total", "Total de mensagens produzidas",
            new CounterConfiguration
            {
                LabelNames = new[] { "topic", "status" }
            });

    private static readonly Histogram MessageProcessingDuration = Metrics
        .CreateHistogram("kafka_message_processing_duration_seconds", "Duração do processamento",
            new HistogramConfiguration
            {
                LabelNames = new[] { "topic", "consumer_group" }
            });

    public static void RecordMessageProduced(string topic, bool success)
    {
        MessagesProduced.WithLabels(topic, success ? "success" : "failure").Inc();
    }

    public static IDisposable MeasureProcessingTime(string topic, string consumerGroup)
    {
        return MessageProcessingDuration.WithLabels(topic, consumerGroup).NewTimer();
    }
}

// Uso no Producer
public async Task<bool> PublishEventAsync<T>(string topic, T eventData) where T : BaseEvent
{
    var success = false;
    try
    {
        var result = await _producer.ProduceAsync(topic, message);
        success = result.Status == PersistenceStatus.Persisted;
        return success;
    }
    finally
    {
        KafkaMetrics.RecordMessageProduced(topic, success);
    }
}

// Uso no Consumer
private async Task ProcessMessageAsync(ConsumeResult<string, string> result)
{
    using (KafkaMetrics.MeasureProcessingTime(result.Topic, "order-processor-group"))
    {
        // Processar mensagem
        await DoWorkAsync(result);
    }
}

Distributed Tracing com OpenTelemetry

// Install: dotnet add package OpenTelemetry.Extensions.Hosting
// Install: dotnet add package OpenTelemetry.Instrumentation.AspNetCore

// Program.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

builder.Services.AddOpenTelemetry()
    .WithTracing(tracerProviderBuilder =>
    {
        tracerProviderBuilder
            .AddSource("Producer.Api")
            .SetResourceBuilder(ResourceBuilder.CreateDefault()
                .AddService("producer-api"))
            .AddAspNetCoreInstrumentation()
            .AddHttpClientInstrumentation()
            .AddJaegerExporter(options =>
            {
                options.AgentHost = "localhost";
                options.AgentPort = 6831;
            });
    });

// Producer com tracing
public class KafkaProducerService
{
    private static readonly ActivitySource ActivitySource = new("Producer.Api");

    public async Task<bool> PublishEventAsync<T>(string topic, T eventData) where T : BaseEvent
    {
        using var activity = ActivitySource.StartActivity("PublishEvent", ActivityKind.Producer);
        activity?.SetTag("messaging.system", "kafka");
        activity?.SetTag("messaging.destination", topic);
        activity?.SetTag("event.type", eventData.EventType);

        // Injetar trace context nos headers
        var headers = new Headers();
        Propagators.DefaultTextMapPropagator.Inject(
            new PropagationContext(activity?.Context ?? default, Baggage.Current),
            headers,
            (h, key, value) => h.Add(key, Encoding.UTF8.GetBytes(value))
        );

        var message = new Message<string, string>
        {
            Key = eventData.EventId.ToString(),
            Value = JsonConvert.SerializeObject(eventData),
            Headers = headers
        };

        var result = await _producer.ProduceAsync(topic, message);

        activity?.SetTag("messaging.kafka.partition", result.Partition.Value);
        activity?.SetTag("messaging.kafka.offset", result.Offset.Value);

        return result.Status == PersistenceStatus.Persisted;
    }
}

Deploy em Produção

Kubernetes com Helm

values.yaml (Producer)

replicaCount: 3

image:
  repository: myregistry.azurecr.io/producer-api
  tag: "1.0.0"
  pullPolicy: IfNotPresent

service:
  type: ClusterIP
  port: 80

ingress:
  enabled: true
  className: nginx
  hosts:
    - host: api.example.com
      paths:
        - path: /
          pathType: Prefix

resources:
  limits:
    cpu: 500m
    memory: 512Mi
  requests:
    cpu: 250m
    memory: 256Mi

autoscaling:
  enabled: true
  minReplicas: 3
  maxReplicas: 10
  targetCPUUtilizationPercentage: 70

env:
  - name: ASPNETCORE_ENVIRONMENT
    value: "Production"
  - name: Kafka__BootstrapServers
    value: "kafka-cluster.kafka.svc.cluster.local:9092"

livenessProbe:
  httpGet:
    path: /health
    port: 80
  initialDelaySeconds: 30
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /health/ready
    port: 80
  initialDelaySeconds: 10
  periodSeconds: 5

deployment.yaml (Consumer)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-consumer
spec:
  replicas: 5
  selector:
    matchLabels:
      app: order-consumer
  template:
    metadata:
      labels:
        app: order-consumer
    spec:
      containers:
      - name: consumer
        image: myregistry.azurecr.io/consumer-service:1.0.0
        resources:
          limits:
            cpu: 1000m
            memory: 1Gi
          requests:
            cpu: 500m
            memory: 512Mi
        env:
        - name: Kafka__BootstrapServers
          value: "kafka-cluster.kafka.svc.cluster.local:9092"
        - name: Kafka__GroupId
          value: "order-processor-group"
        livenessProbe:
          exec:
            command:
            - /bin/sh
            - -c
            - ps aux | grep dotnet
          initialDelaySeconds: 30
          periodSeconds: 10

Kafka em Produção (Confluent Cloud)

// appsettings.Production.json
{
  "Kafka": {
    "BootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
    "SecurityProtocol": "SASL_SSL",
    "SaslMechanism": "PLAIN",
    "SaslUsername": "{{ KAFKA_API_KEY }}",
    "SaslPassword": "{{ KAFKA_API_SECRET }}",
    "SslCaLocation": "/etc/ssl/certs/ca-certificates.crt"
  }
}

// Producer com autenticação
var config = new ProducerConfig
{
    BootstrapServers = configuration["Kafka:BootstrapServers"],
    SecurityProtocol = SecurityProtocol.SaslSsl,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = configuration["Kafka:SaslUsername"],
    SaslPassword = configuration["Kafka:SaslPassword"],
    SslCaLocation = configuration["Kafka:SslCaLocation"],

    // Configurações de produção
    Acks = Acks.All,
    EnableIdempotence = true,
    MaxInFlight = 5,
    MessageSendMaxRetries = int.MaxValue,
    RequestTimeoutMs = 30000,
    CompressionType = CompressionType.Snappy
};

Melhores Práticas

1. Design de Eventos

Boas práticas:

  • Eventos imutáveis (passado: "OrderCreated", não "CreateOrder")
  • Payload completo (evitar joins posteriores)
  • Schema versionado (compatibilidade retroativa)
  • Metadata rico (eventId, timestamp, version, correlationId)

Evitar:

  • Eventos muito grandes (> 1MB)
  • Informações sensíveis não criptografadas
  • Acoplamento temporal

2. Particionamento

// Estratégia: Particionar por aggregate ID
public async Task<bool> PublishEventAsync<T>(string topic, T eventData) where T : BaseEvent
{
    var partitionKey = GetPartitionKey(eventData);

    // Todos os eventos do mesmo agregado vão para a mesma partição
    // Garantia de ordem
    return await _producer.PublishEventAsync(topic, eventData, partitionKey);
}

private string GetPartitionKey<T>(T eventData) where T : BaseEvent
{
    return eventData switch
    {
        OrderCreatedEvent order => order.OrderId,
        PaymentProcessedEvent payment => payment.OrderId,
        _ => eventData.EventId.ToString()
    };
}

3. Idempotência

public class IdempotentConsumer
{
    private readonly IDistributedCache _cache;

    private async Task<bool> IsProcessedAsync(string eventId)
    {
        var key = $"processed:{eventId}";
        var value = await _cache.GetStringAsync(key);
        return value != null;
    }

    private async Task MarkAsProcessedAsync(string eventId)
    {
        var key = $"processed:{eventId}";
        var options = new DistributedCacheEntryOptions
        {
            AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)
        };
        await _cache.SetStringAsync(key, "1", options);
    }

    protected async Task ProcessMessageAsync(ConsumeResult<string, string> result)
    {
        var @event = JsonConvert.DeserializeObject<BaseEvent>(result.Message.Value);

        if (await IsProcessedAsync(@event.EventId.ToString()))
        {
            _logger.LogInformation($"Evento {event.EventId} já foi processado (idempotência)");
            return; // Já processado, ignorar
        }

        try
        {
            await DoWorkAsync(@event);
            await MarkAsProcessedAsync(@event.EventId.ToString());
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Erro no processamento");
            throw;
        }
    }
}

4. Schema Evolution

// V1 do evento
public class OrderCreatedEventV1 : BaseEvent
{
    public string OrderId { get; set; }
    public decimal TotalAmount { get; set; }
}

// V2 do evento (adiciona campo opcional - compatível com V1)
public class OrderCreatedEventV2 : BaseEvent
{
    public string OrderId { get; set; }
    public decimal TotalAmount { get; set; }
    public string Currency { get; set; } = "BRL"; // Novo campo com default
}

// Consumer que aceita ambas as versões
public async Task ProcessMessageAsync(ConsumeResult<string, string> result)
{
    var headers = result.Message.Headers;
    var version = GetEventVersion(headers);

    BaseEvent @event = version switch
    {
        1 => JsonConvert.DeserializeObject<OrderCreatedEventV1>(result.Message.Value),
        2 => JsonConvert.DeserializeObject<OrderCreatedEventV2>(result.Message.Value),
        _ => throw new NotSupportedException($"Versão {version} não suportada")
    };

    await ProcessOrderAsync(@event);
}

5. Configuração de Tópicos

# Criar tópico com configurações otimizadas
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --partitions 10 \
  --replication-factor 3 \
  --config retention.ms=604800000 \      # 7 dias
  --config segment.ms=86400000 \          # 1 dia
  --config compression.type=snappy \
  --config max.message.bytes=1048576      # 1MB

Troubleshooting

Problema 1: Lag de Consumo

Sintoma: Consumidores não conseguem acompanhar produção de eventos.

Diagnóstico:

# Verificar lag por consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor-group

# Output:
# TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# orders    0          1000            5000            4000  <- PROBLEMA!

Soluções:

  1. Escalar consumidores (adicionar réplicas até número de partições)
  2. Otimizar processamento (batch, cache, async)
  3. Aumentar partições (rebalanceamento automático)
// Otimização: Batch processing
public async Task ProcessBatchAsync(List<ConsumeResult<string, string>> batch)
{
    var tasks = batch.Select(async result =>
    {
        await ProcessMessageAsync(result.Message.Value);
    });

    await Task.WhenAll(tasks);

    // Commit em lote
    _consumer.Commit(batch.Last());
}

Problema 2: Rebalancing Frequente

Sintoma: Logs mostram "Revoking partitions" constantemente.

Causa: Session timeout muito curto ou processamento muito lento.

Solução:

var config = new ConsumerConfig
{
    // Aumentar timeouts
    SessionTimeoutMs = 60000,        // 60 segundos (padrão: 10s)
    MaxPollIntervalMs = 300000,      // 5 minutos (padrão: 5min)
    HeartbeatIntervalMs = 3000,      // 3 segundos

    // Processar menos mensagens por poll
    MaxPartitionFetchBytes = 1048576, // 1MB
    FetchMinBytes = 1
};

Problema 3: Mensagens Duplicadas

Sintoma: Mesmo evento processado múltiplas vezes.

Causa: Commit falhou mas processamento foi bem-sucedido.

Solução: Implementar idempotência (ver seção de melhores práticas).

Problema 4: Dead Lock de Consumidor

Sintoma: Consumer para de processar sem erros.

Diagnóstico:

// Adicionar logging detalhado
.SetLogHandler((_, message) =>
{
    _logger.LogInformation($"Kafka Log: {message.Level} | {message.Message}");
})

Solução: Configurar health checks e reiniciar consumer automaticamente.


Conclusão

Event-Driven Architecture com C#, .NET e Kafka oferece uma base sólida para construir sistemas modernos, escaláveis e resilientes. Ao longo deste guia, cobrimos:

✅ Fundamentos de EDA e seus benefícios ✅ Apache Kafka como backbone de mensageria ✅ Implementação completa de Producer e Consumer em C#/.NET ✅ Integração multi-linguagem (Python, Node.js, Java) ✅ Padrões avançados (Event Sourcing, CQRS, Saga) ✅ Monitoramento com Prometheus e OpenTelemetry ✅ Deploy em Kubernetes e Kafka Cloud ✅ Melhores práticas e troubleshooting

Próximos Passos

  1. Experimentar: Implemente o código deste guia localmente
  2. Aprofundar: Estude Event Sourcing e CQRS em produção
  3. Escalar: Configure Kafka cluster em produção (Confluent, MSK)
  4. Monitorar: Integre Prometheus, Grafana e distributed tracing
  5. Automatizar: CI/CD com testes de integração Kafka

Recursos Adicionais


Autor: Kaique Yamamoto Data: 28 de janeiro de 2026

Sobre o Autor: Full Stack Developer e AI Engineer com 10+ anos de experiência em sistemas distribuídos, mensageria (Kafka, RabbitMQ), DevOps e arquitetura de microservices. Especialista em C#/.NET, Java, Python e Node.js.

Tags: #event-driven-architecture #csharp #dotnet #kafka #microservices #mensageria #arquitetura #sistemas-distribuidos

Tags:Event-Driven ArchitectureC#.NETKafkaMicroservicesMensageriaArquitetura de Software

Artigos Relacionados

Vamos conversar sobre seu projeto?

Entre em contato para discutir como posso ajudar a transformar suas ideias em soluções tecnológicas de alta qualidade.

WhatsApp