RabbitMQ com Go: Dominando Mensageria em Sistemas Distribuidos
Trabalhar com sistemas distribuidos sem entender mensageria e como tentar fazer um bolo sem forno. Voce ate consegue algo comestivel, mas o resultado vai ser bem diferente do esperado. Nos ultimos anos, RabbitMQ se tornou uma das minhas ferramentas favoritas para desacoplar servicos e garantir resiliencia em sistemas criticos.
Neste post, vou compartilhar tudo que aprendi sobre a integracao entre RabbitMQ e Go, desde os conceitos fundamentais ate patterns avancados que uso em producao.
Por que RabbitMQ?
Antes de mergulhar no codigo, vale entender por que o RabbitMQ ainda e relevante em 2026, mesmo com tantas alternativas no mercado.
Comparativo Rapido
| Caracteristica | RabbitMQ | Kafka | Redis Streams | Amazon SQS | |---------------|----------|-------|---------------|------------| | Modelo | Message Broker | Event Log | Stream | Queue | | Garantia | At-least-once | At-least-once | At-least-once | At-least-once | | Latencia | Baixa | Media | Muito baixa | Alta | | Persistencia | Opcional | Sempre | Opcional | Sempre | | Complexidade | Media | Alta | Baixa | Baixa | | Caso de Uso | Task Queues | Event Sourcing | Cache + Stream | Serverless |
O RabbitMQ brilha quando voce precisa de:
- Roteamento flexivel com exchanges
- Garantias de entrega configuraveis
- Dead Letter Queues para tratamento de erros
- Prioridade de mensagens
- TTL (Time-to-Live) granular
Conceitos Fundamentais
Antes de escrever qualquer linha de codigo, precisamos entender a anatomia do RabbitMQ.
A Arquitetura Basica
Glossario Essencial
- Producer: Quem envia mensagens. Pode ser qualquer servico da sua aplicacao.
- Exchange: O "roteador". Recebe mensagens e decide para quais filas enviar.
- Queue: A fila onde as mensagens aguardam processamento.
- Consumer: Quem processa as mensagens da fila.
- Binding: A regra que conecta Exchange a Queue.
- Routing Key: A "etiqueta" que determina o destino da mensagem.
Tipos de Exchange
Este e um ponto crucial que muita gente confunde. Existem quatro tipos de exchange:
- Direct: Entrega para filas com routing key exata.
- Fanout: Broadcast para todas as filas conectadas.
- Topic: Pattern matching com wildcards (
*e#). - Headers: Roteamento por headers da mensagem (menos comum).
Configurando o Ambiente
Antes de codar, vamos subir um RabbitMQ local. Eu sempre uso Docker para desenvolvimento.
# docker-compose.yml minimalista
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3-management
Acesse http://localhost:15672 com as credenciais admin/admin123 para ver o Management UI.
Dependencias Go
Vamos usar a biblioteca oficial amqp091-go, que e a versao mantida pela comunidade apos a descontinuacao da streadway/amqp.
go get github.com/rabbitmq/amqp091-go
Codigo: Producer Basico
Vamos comecar com o essencial: publicar uma mensagem.
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. Conectar ao RabbitMQ
conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
failOnError(err, "Falha ao conectar ao RabbitMQ")
defer conn.Close()
// 2. Criar um canal (multiplexacao sobre a conexao TCP)
ch, err := conn.Channel()
failOnError(err, "Falha ao abrir canal")
defer ch.Close()
// 3. Declarar a fila (idempotente - cria se nao existir)
q, err := ch.QueueDeclare(
"pedidos", // nome
true, // durable: sobrevive restart do broker
false, // autoDelete: deletar quando nao ha consumers
false, // exclusive: uso exclusivo desta conexao
false, // noWait: nao esperar confirmacao do servidor
nil, // arguments: configuracoes extras
)
failOnError(err, "Falha ao declarar fila")
// 4. Publicar mensagem
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := `{"pedido_id": 12345, "valor": 299.90, "cliente": "Matheus"}`
err = ch.PublishWithContext(ctx,
"", // exchange (vazio = default exchange)
q.Name, // routing key (nome da fila no default exchange)
false, // mandatory: retornar erro se nao houver fila
false, // immediate: deprecado no RabbitMQ 3.x
amqp.Publishing{
DeliveryMode: amqp.Persistent, // mensagem persistente
ContentType: "application/json",
Body: []byte(body),
},
)
failOnError(err, "Falha ao publicar mensagem")
log.Printf(" [x] Mensagem enviada: %s", body)
}
Pontos Importantes
- Conexao vs Canal: Uma conexao TCP pode ter multiplos canais. Isso evita o overhead de criar varias conexoes.
- Durable Queue: Se
durable: true, a fila sobrevive ao restart do RabbitMQ. - DeliveryMode Persistent: A mensagem e gravada em disco. Essencial para dados criticos.
Codigo: Consumer Robusto
Agora vamos criar um consumer que processa mensagens de forma resiliente.
package main
import (
"encoding/json"
"log"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Pedido struct {
PedidoID int `json:"pedido_id"`
Valor float64 `json:"valor"`
Cliente string `json:"cliente"`
}
func main() {
conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
if err != nil {
log.Fatalf("Falha ao conectar: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
// Declarar a mesma fila (idempotente)
q, err := ch.QueueDeclare(
"pedidos",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
// Prefetch: quantas mensagens o consumer pega por vez
// Isso e CRUCIAL para balanceamento de carga
err = ch.Qos(
1, // prefetch count
0, // prefetch size (0 = sem limite)
false, // global (false = por consumer)
)
if err != nil {
log.Fatalf("Falha ao configurar QoS: %s", err)
}
// Registrar consumer
msgs, err := ch.Consume(
q.Name,
"", // consumer tag (vazio = gerado automaticamente)
false, // autoAck: false = precisamos confirmar manualmente
false, // exclusive
false, // noLocal
false, // noWait
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumer: %s", err)
}
// Graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
for d := range msgs {
log.Printf(" [>] Recebido: %s", d.Body)
var pedido Pedido
if err := json.Unmarshal(d.Body, &pedido); err != nil {
log.Printf(" [!] Erro ao parsear JSON: %s", err)
// Rejeitar e nao requeue (mensagem invalida)
d.Nack(false, false)
continue
}
// Simular processamento
if err := processarPedido(pedido); err != nil {
log.Printf(" [!] Erro ao processar pedido %d: %s", pedido.PedidoID, err)
// Rejeitar e requeue para tentar novamente
d.Nack(false, true)
continue
}
// Confirmar processamento com sucesso
d.Ack(false)
log.Printf(" [v] Pedido %d processado com sucesso", pedido.PedidoID)
}
}()
log.Printf(" [*] Aguardando mensagens. CTRL+C para sair.")
<-sigChan
log.Println(" [*] Encerrando consumer...")
}
func processarPedido(p Pedido) error {
// Simular processamento demorado
time.Sleep(2 * time.Second)
log.Printf(" -> Processando pedido #%d de %s (R$ %.2f)",
p.PedidoID, p.Cliente, p.Valor)
return nil
}
O Poder do Prefetch (QoS)
O Qos com prefetch: 1 e uma das configuracoes mais importantes. Ele garante que cada worker so recebe uma nova mensagem apos confirmar a anterior. Isso permite balanceamento de carga real entre multiplos consumers.
Sem prefetch, o RabbitMQ distribui mensagens em round-robin, ignorando se o worker esta ocupado.
Pattern: Dead Letter Queue (DLQ)
Este e um dos patterns mais uteis em producao. Quando uma mensagem falha apos N tentativas, ela vai para uma fila de "mensagens mortas" para analise posterior.
Implementacao
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func setupQueuesWithDLQ(ch *amqp.Channel) error {
// 1. Declarar a Dead Letter Exchange
err := ch.ExchangeDeclare(
"dlx.pedidos", // nome
"direct", // tipo
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil,
)
if err != nil {
return err
}
// 2. Declarar a Dead Letter Queue
_, err = ch.QueueDeclare(
"pedidos.dlq",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// 3. Bind da DLQ com a DLX
err = ch.QueueBind(
"pedidos.dlq",
"pedidos.failed", // routing key
"dlx.pedidos",
false,
nil,
)
if err != nil {
return err
}
// 4. Declarar a fila principal com DLQ configurada
args := amqp.Table{
"x-dead-letter-exchange": "dlx.pedidos",
"x-dead-letter-routing-key": "pedidos.failed",
"x-message-ttl": int32(60000), // 60 segundos TTL
}
_, err = ch.QueueDeclare(
"pedidos.main",
true,
false,
false,
false,
args,
)
if err != nil {
return err
}
log.Println(" [*] Filas com DLQ configuradas com sucesso")
return nil
}
func main() {
conn, _ := amqp.Dial("amqp://admin:admin123@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
setupQueuesWithDLQ(ch)
// Publicar mensagem de teste
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ch.PublishWithContext(ctx,
"",
"pedidos.main",
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: []byte(`{"pedido_id": 999, "valor": 100.00}`),
},
)
log.Println(" [x] Mensagem enviada para fila principal")
}
Consumer com Retry Logic
func consumeWithRetry(ch *amqp.Channel) {
msgs, _ := ch.Consume("pedidos.main", "", false, false, false, false, nil)
for d := range msgs {
// Verificar quantas vezes ja foi tentado
retryCount := int32(0)
if val, ok := d.Headers["x-retry-count"]; ok {
retryCount = val.(int32)
}
err := processarMensagem(d.Body)
if err != nil {
if retryCount >= 3 {
log.Printf(" [!] Max retries atingido. Enviando para DLQ.")
d.Nack(false, false) // Vai para DLQ
} else {
// Republicar com contador incrementado
republishWithRetry(ch, d, retryCount+1)
d.Ack(false)
}
continue
}
d.Ack(false)
}
}
func republishWithRetry(ch *amqp.Channel, original amqp.Delivery, retryCount int32) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
headers := original.Headers
if headers == nil {
headers = amqp.Table{}
}
headers["x-retry-count"] = retryCount
ch.PublishWithContext(ctx,
"",
"pedidos.main",
false,
false,
amqp.Publishing{
Headers: headers,
DeliveryMode: amqp.Persistent,
ContentType: original.ContentType,
Body: original.Body,
},
)
}
Pattern: Publisher Confirms
Em sistemas criticos, voce precisa ter certeza de que a mensagem chegou ao broker. O RabbitMQ oferece Publisher Confirms para isso.
func publishWithConfirm(ch *amqp.Channel, body []byte) error {
// Habilitar modo de confirmacao
err := ch.Confirm(false)
if err != nil {
return err
}
// Canal para receber confirmacoes
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx,
"",
"pedidos.main",
true, // mandatory: true para garantir que chegou em uma fila
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
},
)
if err != nil {
return err
}
// Aguardar confirmacao
select {
case confirm := <-confirms:
if confirm.Ack {
log.Println(" [v] Mensagem confirmada pelo broker")
return nil
}
return fmt.Errorf("mensagem rejeitada pelo broker")
case <-time.After(5 * time.Second):
return fmt.Errorf("timeout aguardando confirmacao")
}
}
Novidade 2025: RabbitMQ Streams
Uma das adicoes mais empolgantes do RabbitMQ 3.9+ sao os Streams. Eles funcionam de forma similar ao Kafka: um log persistente onde multiplos consumers podem ler do mesmo ponto.
Quando usar Streams vs Queues?
| Cenario | Queue Classica | Stream | |---------|----------------|--------| | Task queue com ack | Sim | Nao | | Event sourcing | Nao | Sim | | Replay de eventos | Nao | Sim | | Multiplos consumers independentes | Nao | Sim | | Ordenacao garantida | Sim | Sim |
Exemplo com Stream
func setupStream(ch *amqp.Channel) error {
args := amqp.Table{
"x-queue-type": "stream",
"x-max-length-bytes": int64(1_000_000_000), // 1GB
"x-stream-max-segment-size-bytes": int32(100_000_000), // 100MB por segmento
}
_, err := ch.QueueDeclare(
"eventos.stream",
true,
false,
false,
false,
args,
)
return err
}
func consumeStream(ch *amqp.Channel) {
// Consumer com offset especifico
args := amqp.Table{
"x-stream-offset": "first", // ou "last", "next", timestamp, offset numerico
}
msgs, _ := ch.Consume(
"eventos.stream",
"consumer-analytics",
false,
false,
false,
false,
args,
)
for msg := range msgs {
log.Printf(" [stream] Offset %d: %s", msg.DeliveryTag, msg.Body)
msg.Ack(false)
}
}
Estrutura de Projeto Recomendada
Para projetos maiores, recomendo organizar o codigo assim:
/cmd
/producer
main.go
/consumer
main.go
/internal
/rabbitmq
connection.go # Pool de conexoes
publisher.go # Logica de publicacao
consumer.go # Logica de consumo
config.go # Configuracoes
/domain
pedido.go # Entidades de dominio
/handlers
pedido_handler.go # Processamento de mensagens
/pkg
/retry
backoff.go # Estrategias de retry
/configs
rabbitmq.yaml
Connection Pool
// internal/rabbitmq/connection.go
package rabbitmq
import (
"log"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type ConnectionPool struct {
url string
conn *amqp.Connection
mu sync.RWMutex
notifyClose chan *amqp.Error
}
func NewConnectionPool(url string) *ConnectionPool {
pool := &ConnectionPool{url: url}
pool.connect()
go pool.handleReconnect()
return pool
}
func (p *ConnectionPool) connect() error {
p.mu.Lock()
defer p.mu.Unlock()
conn, err := amqp.Dial(p.url)
if err != nil {
return err
}
p.conn = conn
p.notifyClose = make(chan *amqp.Error)
p.conn.NotifyClose(p.notifyClose)
log.Println(" [*] Conectado ao RabbitMQ")
return nil
}
func (p *ConnectionPool) handleReconnect() {
for {
err := <-p.notifyClose
if err != nil {
log.Printf(" [!] Conexao perdida: %s. Reconectando...", err)
}
for {
time.Sleep(5 * time.Second)
if err := p.connect(); err == nil {
break
}
log.Println(" [!] Falha ao reconectar. Tentando novamente...")
}
}
}
func (p *ConnectionPool) GetChannel() (*amqp.Channel, error) {
p.mu.RLock()
defer p.mu.RUnlock()
return p.conn.Channel()
}
Metricas e Observabilidade
Em producao, voce precisa monitorar o RabbitMQ. As metricas mais importantes sao:
- Queue Depth: Quantas mensagens estao na fila
- Consumer Utilization: Porcentagem de tempo que consumers estao ocupados
- Publish Rate: Mensagens publicadas por segundo
- Deliver Rate: Mensagens entregues por segundo
- Ack Rate: Confirmacoes por segundo
O RabbitMQ expoe essas metricas via API HTTP e Prometheus endpoint (/metrics).
// Exemplo de health check
func healthCheck(conn *amqp.Connection) bool {
if conn.IsClosed() {
return false
}
ch, err := conn.Channel()
if err != nil {
return false
}
ch.Close()
return true
}
Erros Comuns e Como Evitar
1. Conexao por Request
Errado:
func handleRequest() {
conn, _ := amqp.Dial(url) // Nova conexao a cada request!
defer conn.Close()
// ...
}
Certo:
var pool *ConnectionPool // Singleton
func handleRequest() {
ch, _ := pool.GetChannel()
defer ch.Close()
// ...
}
2. AutoAck sem Controle
Errado:
msgs, _ := ch.Consume(queue, "", true, ...) // autoAck: true
Certo:
msgs, _ := ch.Consume(queue, "", false, ...) // autoAck: false
// ...
msg.Ack(false) // Ack manual apos processamento
3. Ignorar Erros de Publicacao
Errado:
ch.PublishWithContext(ctx, ...) // Ignora erro
Certo:
if err := ch.PublishWithContext(ctx, ...); err != nil {
log.Printf("Falha ao publicar: %s", err)
// Retry ou fallback
}
Conclusao
RabbitMQ continua sendo uma escolha solida para mensageria em 2026. A combinacao com Go e natural: ambos sao eficientes, simples e robustos.
Os pontos-chave que voce deve lembrar:
- Prefetch (QoS) e essencial para balanceamento de carga
- Publisher Confirms garantem entrega em sistemas criticos
- Dead Letter Queues sao obrigatorias em producao
- Connection Pooling evita overhead de conexoes
- Streams sao a novidade para event sourcing
Se voce esta construindo microservicos, considere fortemente adicionar RabbitMQ ao seu toolkit. A curva de aprendizado e suave, e os beneficios em resiliencia e desacoplamento sao enormes.

Comentarios (0)