← Voltar para Home
#Go#RabbitMQ#Mensageria#System Design#Microservices#Backend

RabbitMQ com Go: Dominando Mensageria em Sistemas Distribuidos

0
0

Trabalhar com sistemas distribuidos sem entender mensageria e como tentar fazer um bolo sem forno. Voce ate consegue algo comestivel, mas o resultado vai ser bem diferente do esperado. Nos ultimos anos, RabbitMQ se tornou uma das minhas ferramentas favoritas para desacoplar servicos e garantir resiliencia em sistemas criticos.

Neste post, vou compartilhar tudo que aprendi sobre a integracao entre RabbitMQ e Go, desde os conceitos fundamentais ate patterns avancados que uso em producao.


Por que RabbitMQ?

Antes de mergulhar no codigo, vale entender por que o RabbitMQ ainda e relevante em 2026, mesmo com tantas alternativas no mercado.

Comparativo Rapido

| Caracteristica | RabbitMQ | Kafka | Redis Streams | Amazon SQS | |---------------|----------|-------|---------------|------------| | Modelo | Message Broker | Event Log | Stream | Queue | | Garantia | At-least-once | At-least-once | At-least-once | At-least-once | | Latencia | Baixa | Media | Muito baixa | Alta | | Persistencia | Opcional | Sempre | Opcional | Sempre | | Complexidade | Media | Alta | Baixa | Baixa | | Caso de Uso | Task Queues | Event Sourcing | Cache + Stream | Serverless |

O RabbitMQ brilha quando voce precisa de:


Conceitos Fundamentais

Antes de escrever qualquer linha de codigo, precisamos entender a anatomia do RabbitMQ.

A Arquitetura Basica

Glossario Essencial

Tipos de Exchange

Este e um ponto crucial que muita gente confunde. Existem quatro tipos de exchange:

  1. Direct: Entrega para filas com routing key exata.
  2. Fanout: Broadcast para todas as filas conectadas.
  3. Topic: Pattern matching com wildcards (* e #).
  4. Headers: Roteamento por headers da mensagem (menos comum).

Configurando o Ambiente

Antes de codar, vamos subir um RabbitMQ local. Eu sempre uso Docker para desenvolvimento.

# docker-compose.yml minimalista
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin123 \
  rabbitmq:3-management

Acesse http://localhost:15672 com as credenciais admin/admin123 para ver o Management UI.

Dependencias Go

Vamos usar a biblioteca oficial amqp091-go, que e a versao mantida pela comunidade apos a descontinuacao da streadway/amqp.

go get github.com/rabbitmq/amqp091-go

Codigo: Producer Basico

Vamos comecar com o essencial: publicar uma mensagem.

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 1. Conectar ao RabbitMQ
	conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
	failOnError(err, "Falha ao conectar ao RabbitMQ")
	defer conn.Close()

	// 2. Criar um canal (multiplexacao sobre a conexao TCP)
	ch, err := conn.Channel()
	failOnError(err, "Falha ao abrir canal")
	defer ch.Close()

	// 3. Declarar a fila (idempotente - cria se nao existir)
	q, err := ch.QueueDeclare(
		"pedidos", // nome
		true,      // durable: sobrevive restart do broker
		false,     // autoDelete: deletar quando nao ha consumers
		false,     // exclusive: uso exclusivo desta conexao
		false,     // noWait: nao esperar confirmacao do servidor
		nil,       // arguments: configuracoes extras
	)
	failOnError(err, "Falha ao declarar fila")

	// 4. Publicar mensagem
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	body := `{"pedido_id": 12345, "valor": 299.90, "cliente": "Matheus"}`

	err = ch.PublishWithContext(ctx,
		"",     // exchange (vazio = default exchange)
		q.Name, // routing key (nome da fila no default exchange)
		false,  // mandatory: retornar erro se nao houver fila
		false,  // immediate: deprecado no RabbitMQ 3.x
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // mensagem persistente
			ContentType:  "application/json",
			Body:         []byte(body),
		},
	)
	failOnError(err, "Falha ao publicar mensagem")

	log.Printf(" [x] Mensagem enviada: %s", body)
}

Pontos Importantes

  1. Conexao vs Canal: Uma conexao TCP pode ter multiplos canais. Isso evita o overhead de criar varias conexoes.
  2. Durable Queue: Se durable: true, a fila sobrevive ao restart do RabbitMQ.
  3. DeliveryMode Persistent: A mensagem e gravada em disco. Essencial para dados criticos.

Codigo: Consumer Robusto

Agora vamos criar um consumer que processa mensagens de forma resiliente.

package main

import (
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type Pedido struct {
	PedidoID int     `json:"pedido_id"`
	Valor    float64 `json:"valor"`
	Cliente  string  `json:"cliente"`
}

func main() {
	conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
	if err != nil {
		log.Fatalf("Falha ao conectar: %s", err)
	}
	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Falha ao abrir canal: %s", err)
	}
	defer ch.Close()

	// Declarar a mesma fila (idempotente)
	q, err := ch.QueueDeclare(
		"pedidos",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Falha ao declarar fila: %s", err)
	}

	// Prefetch: quantas mensagens o consumer pega por vez
	// Isso e CRUCIAL para balanceamento de carga
	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size (0 = sem limite)
		false, // global (false = por consumer)
	)
	if err != nil {
		log.Fatalf("Falha ao configurar QoS: %s", err)
	}

	// Registrar consumer
	msgs, err := ch.Consume(
		q.Name,
		"",    // consumer tag (vazio = gerado automaticamente)
		false, // autoAck: false = precisamos confirmar manualmente
		false, // exclusive
		false, // noLocal
		false, // noWait
		nil,
	)
	if err != nil {
		log.Fatalf("Falha ao registrar consumer: %s", err)
	}

	// Graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		for d := range msgs {
			log.Printf(" [>] Recebido: %s", d.Body)

			var pedido Pedido
			if err := json.Unmarshal(d.Body, &pedido); err != nil {
				log.Printf(" [!] Erro ao parsear JSON: %s", err)
				// Rejeitar e nao requeue (mensagem invalida)
				d.Nack(false, false)
				continue
			}

			// Simular processamento
			if err := processarPedido(pedido); err != nil {
				log.Printf(" [!] Erro ao processar pedido %d: %s", pedido.PedidoID, err)
				// Rejeitar e requeue para tentar novamente
				d.Nack(false, true)
				continue
			}

			// Confirmar processamento com sucesso
			d.Ack(false)
			log.Printf(" [v] Pedido %d processado com sucesso", pedido.PedidoID)
		}
	}()

	log.Printf(" [*] Aguardando mensagens. CTRL+C para sair.")
	<-sigChan
	log.Println(" [*] Encerrando consumer...")
}

func processarPedido(p Pedido) error {
	// Simular processamento demorado
	time.Sleep(2 * time.Second)
	log.Printf("    -> Processando pedido #%d de %s (R$ %.2f)",
		p.PedidoID, p.Cliente, p.Valor)
	return nil
}

O Poder do Prefetch (QoS)

O Qos com prefetch: 1 e uma das configuracoes mais importantes. Ele garante que cada worker so recebe uma nova mensagem apos confirmar a anterior. Isso permite balanceamento de carga real entre multiplos consumers.

Sem prefetch, o RabbitMQ distribui mensagens em round-robin, ignorando se o worker esta ocupado.


Pattern: Dead Letter Queue (DLQ)

Este e um dos patterns mais uteis em producao. Quando uma mensagem falha apos N tentativas, ela vai para uma fila de "mensagens mortas" para analise posterior.

Implementacao

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func setupQueuesWithDLQ(ch *amqp.Channel) error {
	// 1. Declarar a Dead Letter Exchange
	err := ch.ExchangeDeclare(
		"dlx.pedidos", // nome
		"direct",      // tipo
		true,          // durable
		false,         // autoDelete
		false,         // internal
		false,         // noWait
		nil,
	)
	if err != nil {
		return err
	}

	// 2. Declarar a Dead Letter Queue
	_, err = ch.QueueDeclare(
		"pedidos.dlq",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// 3. Bind da DLQ com a DLX
	err = ch.QueueBind(
		"pedidos.dlq",
		"pedidos.failed", // routing key
		"dlx.pedidos",
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// 4. Declarar a fila principal com DLQ configurada
	args := amqp.Table{
		"x-dead-letter-exchange":    "dlx.pedidos",
		"x-dead-letter-routing-key": "pedidos.failed",
		"x-message-ttl":             int32(60000), // 60 segundos TTL
	}

	_, err = ch.QueueDeclare(
		"pedidos.main",
		true,
		false,
		false,
		false,
		args,
	)
	if err != nil {
		return err
	}

	log.Println(" [*] Filas com DLQ configuradas com sucesso")
	return nil
}

func main() {
	conn, _ := amqp.Dial("amqp://admin:admin123@localhost:5672/")
	defer conn.Close()

	ch, _ := conn.Channel()
	defer ch.Close()

	setupQueuesWithDLQ(ch)

	// Publicar mensagem de teste
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	ch.PublishWithContext(ctx,
		"",
		"pedidos.main",
		false,
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "application/json",
			Body:         []byte(`{"pedido_id": 999, "valor": 100.00}`),
		},
	)

	log.Println(" [x] Mensagem enviada para fila principal")
}

Consumer com Retry Logic

func consumeWithRetry(ch *amqp.Channel) {
	msgs, _ := ch.Consume("pedidos.main", "", false, false, false, false, nil)

	for d := range msgs {
		// Verificar quantas vezes ja foi tentado
		retryCount := int32(0)
		if val, ok := d.Headers["x-retry-count"]; ok {
			retryCount = val.(int32)
		}

		err := processarMensagem(d.Body)
		if err != nil {
			if retryCount >= 3 {
				log.Printf(" [!] Max retries atingido. Enviando para DLQ.")
				d.Nack(false, false) // Vai para DLQ
			} else {
				// Republicar com contador incrementado
				republishWithRetry(ch, d, retryCount+1)
				d.Ack(false)
			}
			continue
		}

		d.Ack(false)
	}
}

func republishWithRetry(ch *amqp.Channel, original amqp.Delivery, retryCount int32) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	headers := original.Headers
	if headers == nil {
		headers = amqp.Table{}
	}
	headers["x-retry-count"] = retryCount

	ch.PublishWithContext(ctx,
		"",
		"pedidos.main",
		false,
		false,
		amqp.Publishing{
			Headers:      headers,
			DeliveryMode: amqp.Persistent,
			ContentType:  original.ContentType,
			Body:         original.Body,
		},
	)
}

Pattern: Publisher Confirms

Em sistemas criticos, voce precisa ter certeza de que a mensagem chegou ao broker. O RabbitMQ oferece Publisher Confirms para isso.

func publishWithConfirm(ch *amqp.Channel, body []byte) error {
	// Habilitar modo de confirmacao
	err := ch.Confirm(false)
	if err != nil {
		return err
	}

	// Canal para receber confirmacoes
	confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	err = ch.PublishWithContext(ctx,
		"",
		"pedidos.main",
		true, // mandatory: true para garantir que chegou em uma fila
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "application/json",
			Body:         body,
		},
	)
	if err != nil {
		return err
	}

	// Aguardar confirmacao
	select {
	case confirm := <-confirms:
		if confirm.Ack {
			log.Println(" [v] Mensagem confirmada pelo broker")
			return nil
		}
		return fmt.Errorf("mensagem rejeitada pelo broker")
	case <-time.After(5 * time.Second):
		return fmt.Errorf("timeout aguardando confirmacao")
	}
}

Novidade 2025: RabbitMQ Streams

Uma das adicoes mais empolgantes do RabbitMQ 3.9+ sao os Streams. Eles funcionam de forma similar ao Kafka: um log persistente onde multiplos consumers podem ler do mesmo ponto.

Quando usar Streams vs Queues?

| Cenario | Queue Classica | Stream | |---------|----------------|--------| | Task queue com ack | Sim | Nao | | Event sourcing | Nao | Sim | | Replay de eventos | Nao | Sim | | Multiplos consumers independentes | Nao | Sim | | Ordenacao garantida | Sim | Sim |

Exemplo com Stream

func setupStream(ch *amqp.Channel) error {
	args := amqp.Table{
		"x-queue-type":             "stream",
		"x-max-length-bytes":       int64(1_000_000_000), // 1GB
		"x-stream-max-segment-size-bytes": int32(100_000_000), // 100MB por segmento
	}

	_, err := ch.QueueDeclare(
		"eventos.stream",
		true,
		false,
		false,
		false,
		args,
	)
	return err
}

func consumeStream(ch *amqp.Channel) {
	// Consumer com offset especifico
	args := amqp.Table{
		"x-stream-offset": "first", // ou "last", "next", timestamp, offset numerico
	}

	msgs, _ := ch.Consume(
		"eventos.stream",
		"consumer-analytics",
		false,
		false,
		false,
		false,
		args,
	)

	for msg := range msgs {
		log.Printf(" [stream] Offset %d: %s", msg.DeliveryTag, msg.Body)
		msg.Ack(false)
	}
}

Estrutura de Projeto Recomendada

Para projetos maiores, recomendo organizar o codigo assim:

/cmd
  /producer
    main.go
  /consumer
    main.go
/internal
  /rabbitmq
    connection.go    # Pool de conexoes
    publisher.go     # Logica de publicacao
    consumer.go      # Logica de consumo
    config.go        # Configuracoes
  /domain
    pedido.go        # Entidades de dominio
  /handlers
    pedido_handler.go # Processamento de mensagens
/pkg
  /retry
    backoff.go       # Estrategias de retry
/configs
  rabbitmq.yaml

Connection Pool

// internal/rabbitmq/connection.go
package rabbitmq

import (
	"log"
	"sync"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type ConnectionPool struct {
	url        string
	conn       *amqp.Connection
	mu         sync.RWMutex
	notifyClose chan *amqp.Error
}

func NewConnectionPool(url string) *ConnectionPool {
	pool := &ConnectionPool{url: url}
	pool.connect()
	go pool.handleReconnect()
	return pool
}

func (p *ConnectionPool) connect() error {
	p.mu.Lock()
	defer p.mu.Unlock()

	conn, err := amqp.Dial(p.url)
	if err != nil {
		return err
	}

	p.conn = conn
	p.notifyClose = make(chan *amqp.Error)
	p.conn.NotifyClose(p.notifyClose)

	log.Println(" [*] Conectado ao RabbitMQ")
	return nil
}

func (p *ConnectionPool) handleReconnect() {
	for {
		err := <-p.notifyClose
		if err != nil {
			log.Printf(" [!] Conexao perdida: %s. Reconectando...", err)
		}

		for {
			time.Sleep(5 * time.Second)
			if err := p.connect(); err == nil {
				break
			}
			log.Println(" [!] Falha ao reconectar. Tentando novamente...")
		}
	}
}

func (p *ConnectionPool) GetChannel() (*amqp.Channel, error) {
	p.mu.RLock()
	defer p.mu.RUnlock()
	return p.conn.Channel()
}

Metricas e Observabilidade

Em producao, voce precisa monitorar o RabbitMQ. As metricas mais importantes sao:

  1. Queue Depth: Quantas mensagens estao na fila
  2. Consumer Utilization: Porcentagem de tempo que consumers estao ocupados
  3. Publish Rate: Mensagens publicadas por segundo
  4. Deliver Rate: Mensagens entregues por segundo
  5. Ack Rate: Confirmacoes por segundo

O RabbitMQ expoe essas metricas via API HTTP e Prometheus endpoint (/metrics).

// Exemplo de health check
func healthCheck(conn *amqp.Connection) bool {
	if conn.IsClosed() {
		return false
	}
	ch, err := conn.Channel()
	if err != nil {
		return false
	}
	ch.Close()
	return true
}

Erros Comuns e Como Evitar

1. Conexao por Request

Errado:

func handleRequest() {
    conn, _ := amqp.Dial(url) // Nova conexao a cada request!
    defer conn.Close()
    // ...
}

Certo:

var pool *ConnectionPool // Singleton

func handleRequest() {
    ch, _ := pool.GetChannel()
    defer ch.Close()
    // ...
}

2. AutoAck sem Controle

Errado:

msgs, _ := ch.Consume(queue, "", true, ...) // autoAck: true

Certo:

msgs, _ := ch.Consume(queue, "", false, ...) // autoAck: false
// ...
msg.Ack(false) // Ack manual apos processamento

3. Ignorar Erros de Publicacao

Errado:

ch.PublishWithContext(ctx, ...) // Ignora erro

Certo:

if err := ch.PublishWithContext(ctx, ...); err != nil {
    log.Printf("Falha ao publicar: %s", err)
    // Retry ou fallback
}

Conclusao

RabbitMQ continua sendo uma escolha solida para mensageria em 2026. A combinacao com Go e natural: ambos sao eficientes, simples e robustos.

Os pontos-chave que voce deve lembrar:

  1. Prefetch (QoS) e essencial para balanceamento de carga
  2. Publisher Confirms garantem entrega em sistemas criticos
  3. Dead Letter Queues sao obrigatorias em producao
  4. Connection Pooling evita overhead de conexoes
  5. Streams sao a novidade para event sourcing

Se voce esta construindo microservicos, considere fortemente adicionar RabbitMQ ao seu toolkit. A curva de aprendizado e suave, e os beneficios em resiliencia e desacoplamento sao enormes.


Referencias

Comentarios (0)

Carregando comentarios...