RabbitMQ

Monitor RabbitMQ message broker logs including connection logs, queue operations, cluster events, and performance metrics using the File Stream plugin

RabbitMQRabbitMQ Integration

Monitor and analyze RabbitMQ message broker logs in real-time using LogFlux Agent’s File Stream plugin. This configuration-based approach provides comprehensive message queue monitoring, connection tracking, and cluster health analysis for RabbitMQ deployments.

Overview

The RabbitMQ integration leverages LogFlux Agent’s File Stream plugin to:

  • Real-time monitoring of RabbitMQ server logs, connection logs, and audit logs
  • Message flow tracking with queue operations, routing, and delivery monitoring
  • Connection management with client connection and channel lifecycle monitoring
  • Cluster monitoring for RabbitMQ cluster deployments and node health
  • Performance analysis with memory usage, disk space, and throughput metrics
  • Security monitoring with authentication events and access control logging

Installation

The File Stream plugin is included with LogFlux Agent. Enable it for RabbitMQ log monitoring:

1
2
3
4
5
# Enable File Stream plugin
sudo systemctl enable --now logflux-filestream

# Verify plugin status
sudo systemctl status logflux-filestream

RabbitMQ Configuration

Configure RabbitMQ logging in /etc/rabbitmq/rabbitmq.conf:

Basic Logging Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Log levels: debug, info, warning, error, critical, none
log.console.level = info
log.file.level = info
log.file = /var/log/rabbitmq/rabbit.log

# Connection logging
log.connection.level = info

# Channel logging  
log.channel.level = info

# Queue logging
log.queue.level = info

# Mirroring logging (for HA queues)
log.mirroring.level = info

# Federation logging
log.federation.level = info

# LDAP logging (if using LDAP authentication)
log.ldap.level = info

# Default user logging
log.default.level = info

Enhanced Logging Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Detailed logging configuration
log.console = true
log.console.level = info
log.console.formatter = json

log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info
log.file.formatter = json
log.file.rotation.date = $D0
log.file.rotation.size = 100MB
log.file.rotation.count = 10

# Access logging
log.file.access = /var/log/rabbitmq/access.log
log.file.access.level = info

# Error logging
log.file.error = /var/log/rabbitmq/error.log
log.file.error.level = error

# Connection lifecycle logging
log.connection.level = debug

# SASL (crash reports)
log.file.sasl = /var/log/rabbitmq/sasl.log

# Kernel messages
log.file.kernel = /var/log/rabbitmq/kernel.log

Audit Plugin Configuration

Enable RabbitMQ audit plugin in /etc/rabbitmq/enabled_plugins:

[rabbitmq_auth_backend_ldap,rabbitmq_management,rabbitmq_audit].

Configure audit logging in /etc/rabbitmq/rabbitmq.conf:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Audit logging
audit.file = /var/log/rabbitmq/audit.log
audit.file.level = info
audit.file.rotation.date = $D0
audit.file.rotation.size = 50MB
audit.file.rotation.count = 5

# Audit categories
audit.categories.management = true
audit.categories.channel = true  
audit.categories.connection = true
audit.categories.queue = true
audit.categories.exchange = true
audit.categories.binding = true
audit.categories.vhost = true
audit.categories.user = true
audit.categories.permission = true

Basic Configuration

Configure the File Stream plugin to monitor RabbitMQ logs by creating /etc/logflux-agent/plugins/filestream-rabbitmq.toml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
[filestream.rabbitmq_main]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "message-broker", "amqp"]
fields = {
  service = "rabbitmq",
  log_type = "main"
}

[filestream.rabbitmq_access]
paths = ["/var/log/rabbitmq/access.log"]
format = "rabbitmq_access"
tags = ["rabbitmq", "access", "connection"]
fields = {
  service = "rabbitmq",
  log_type = "access"
}

[filestream.rabbitmq_error]
paths = ["/var/log/rabbitmq/error.log"]
format = "rabbitmq_error"
tags = ["rabbitmq", "error", "critical"]
fields = {
  service = "rabbitmq",
  log_type = "error"
}

[filestream.rabbitmq_audit]
paths = ["/var/log/rabbitmq/audit.log"]
format = "json"
tags = ["rabbitmq", "audit", "security"]
fields = {
  service = "rabbitmq",
  log_type = "audit"
}

RabbitMQ Log Formats

Standard RabbitMQ Log Format

1
2
3
4
5
6
7
[filestream.rabbitmq_standard]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "regex"
regex = '^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \[(?P<level>\w+)\] <(?P<process_id>[^>]+)> (?P<message>.*)$'
parse_timestamp = true
timestamp_field = "timestamp"
timestamp_format = "2006-01-02 15:04:05.000"

JSON Log Format (RabbitMQ 3.9+)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[filestream.rabbitmq_json]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "json"
parse_timestamp = true
timestamp_field = "time"
timestamp_format = "2006-01-02T15:04:05.000Z"
tags = ["rabbitmq", "json"]
fields = {
  service = "rabbitmq",
  log_type = "json"
}

Connection Log Format

1
2
3
4
5
6
7
8
[filestream.rabbitmq_connections]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "regex"
regex = '^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \[(?P<level>\w+)\] <(?P<process_id>[^>]+)> (?P<connection_event>accepting|closing|connection) (?P<connection_details>.*)$'
parse_timestamp = true
timestamp_field = "timestamp"
timestamp_format = "2006-01-02 15:04:05.000"
tags = ["rabbitmq", "connections"]

Management API Audit Format

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
[filestream.rabbitmq_management_audit]
paths = ["/var/log/rabbitmq/audit.log"]
format = "json"
parse_timestamp = true
timestamp_field = "timestamp"
timestamp_format = "2006-01-02T15:04:05.000Z"
tags = ["rabbitmq", "management", "api"]
fields = {
  service = "rabbitmq",
  component = "management",
  log_type = "api_audit"
}

Advanced Configuration

Queue Operations Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[filestream.rabbitmq_queues]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "queues", "operations"]
fields = {
  service = "rabbitmq",
  log_type = "queue_operations"
}

# Filter for queue-related events
[filestream.rabbitmq_queues.processors.grep]
patterns = [
  "queue",
  "declare",
  "delete",
  "purge",
  "bind",
  "unbind",
  "consume",
  "ack",
  "nack",
  "reject"
]

Cluster Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
[filestream.rabbitmq_cluster]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "cluster", "ha"]
fields = {
  service = "rabbitmq",
  log_type = "cluster"
}

# Filter for cluster events
[filestream.rabbitmq_cluster.processors.grep]
patterns = [
  "node",
  "cluster",
  "partition",
  "split-brain",
  "mnesia",
  "join",
  "leave",
  "sync"
]

Performance and Memory Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
[filestream.rabbitmq_performance]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "performance", "memory"]
fields = {
  service = "rabbitmq",
  log_type = "performance"
}

# Filter for performance-related events
[filestream.rabbitmq_performance.processors.grep]
patterns = [
  "memory",
  "disk",
  "alarm",
  "flow",
  "blocked",
  "throttle",
  "gc",
  "load"
]

Federation and Shovel Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
[filestream.rabbitmq_federation]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "federation", "shovel"]
fields = {
  service = "rabbitmq",
  log_type = "federation"
}

# Filter for federation events
[filestream.rabbitmq_federation.processors.grep]
patterns = [
  "federation",
  "shovel",
  "upstream",
  "downstream",
  "link",
  "policy"
]

Usage Examples

Monitor RabbitMQ Operations

1
2
3
4
5
6
7
8
# Stream all RabbitMQ logs
logflux-cli stream --filter 'service:rabbitmq'

# Monitor connection events
logflux-cli stream --filter 'service:rabbitmq AND log_type:access'

# Track queue operations
logflux-cli stream --filter 'service:rabbitmq AND log_type:queue_operations'

Performance Analysis

1
2
3
4
5
6
7
8
# Monitor memory alarms
logflux-cli stream --filter 'service:rabbitmq AND message:memory AND message:alarm'

# Track blocked connections
logflux-cli stream --filter 'service:rabbitmq AND message:blocked'

# Monitor cluster events
logflux-cli stream --filter 'service:rabbitmq AND log_type:cluster'

Security Monitoring

1
2
3
4
5
6
7
8
# Track authentication events
logflux-cli stream --filter 'service:rabbitmq AND message:authentication'

# Monitor management API access
logflux-cli stream --filter 'service:rabbitmq AND component:management'

# Track permission changes
logflux-cli stream --filter 'service:rabbitmq AND message:permission'

Troubleshooting

1
2
3
4
5
6
7
8
# Monitor error logs
logflux-cli stream --filter 'service:rabbitmq AND log_type:error'

# Track connection failures
logflux-cli stream --filter 'service:rabbitmq AND message:connection AND level:ERROR'

# Monitor queue declaration issues
logflux-cli stream --filter 'service:rabbitmq AND message:declare AND level:ERROR'

RabbitMQ Metrics Collection

Management API Integration

Create metrics collection script:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/bin/bash
# Collect RabbitMQ metrics via Management API
RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="password"

collect_metrics() {
    local timestamp=$(date -u +%Y-%m-%dT%H:%M:%S.000Z)
    
    # Overview metrics
    curl -s -u $RABBITMQ_USER:$RABBITMQ_PASS \
        "http://$RABBITMQ_HOST:$RABBITMQ_PORT/api/overview" | \
    jq --arg ts "$timestamp" '. + {"timestamp": $ts, "metric_type": "overview"}' \
        >> /var/log/rabbitmq/metrics.log
    
    # Node metrics
    curl -s -u $RABBITMQ_USER:$RABBITMQ_PASS \
        "http://$RABBITMQ_HOST:$RABBITMQ_PORT/api/nodes" | \
    jq --arg ts "$timestamp" '.[] + {"timestamp": $ts, "metric_type": "node"}' \
        >> /var/log/rabbitmq/metrics.log
    
    # Queue metrics
    curl -s -u $RABBITMQ_USER:$RABBITMQ_PASS \
        "http://$RABBITMQ_HOST:$RABBITMQ_PORT/api/queues" | \
    jq --arg ts "$timestamp" '.[] + {"timestamp": $ts, "metric_type": "queue"}' \
        >> /var/log/rabbitmq/metrics.log
}

while true; do
    collect_metrics
    sleep 60
done

Connection Statistics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#!/bin/bash
# Monitor connection statistics
while true; do
    rabbitmqctl list_connections name peer_host peer_port state channels | \
    awk -v ts="$(date -u +%Y-%m-%dT%H:%M:%S.000Z)" '
    NR > 1 {
        printf "{\"timestamp\": \"%s\", \"connection_name\": \"%s\", \"peer_host\": \"%s\", \"peer_port\": %s, \"state\": \"%s\", \"channels\": %s, \"metric_type\": \"connection\"}\n", 
               ts, $1, $2, $3, $4, $5
    }' >> /var/log/rabbitmq/connection-stats.log
    sleep 30
done

Queue Statistics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#!/bin/bash
# Monitor queue statistics
while true; do
    rabbitmqctl list_queues name messages consumers memory | \
    awk -v ts="$(date -u +%Y-%m-%dT%H:%M:%S.000Z)" '
    NR > 1 {
        printf "{\"timestamp\": \"%s\", \"queue_name\": \"%s\", \"messages\": %s, \"consumers\": %s, \"memory\": %s, \"metric_type\": \"queue_stats\"}\n", 
               ts, $1, $2, $3, $4
    }' >> /var/log/rabbitmq/queue-stats.log
    sleep 60
done

Monitoring and Alerting

Key Metrics to Monitor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Memory alarm alert
[alerts.rabbitmq_memory_alarm]
query = "service:rabbitmq AND message:memory AND message:alarm"
threshold = 1
window = "30s"
message = "RabbitMQ memory alarm triggered"

# Disk space alarm alert
[alerts.rabbitmq_disk_alarm]
query = "service:rabbitmq AND message:disk AND message:alarm"
threshold = 1
window = "30s"
message = "RabbitMQ disk space alarm triggered"

# Connection limit alert
[alerts.rabbitmq_connection_limit]
query = "service:rabbitmq AND message:connection_limit"
threshold = 1
window = "1m"
message = "RabbitMQ connection limit reached"

# Queue buildup alert
[alerts.rabbitmq_queue_buildup]
query = "service:rabbitmq AND metric_type:queue_stats AND messages:>10000"
threshold = 1
window = "2m"
message = "RabbitMQ queue buildup detected: {{ .queue_name }}"

# Cluster partition alert
[alerts.rabbitmq_cluster_partition]
query = "service:rabbitmq AND message:partition"
threshold = 1
window = "10s"
message = "RabbitMQ cluster partition detected"

# Authentication failure alert
[alerts.rabbitmq_auth_failure]
query = "service:rabbitmq AND message:authentication AND level:ERROR"
threshold = 5
window = "1m"
message = "High authentication failure rate in RabbitMQ"

Dashboard Metrics

Monitor these key RabbitMQ metrics:

  • Message rates (publish rate, delivery rate, acknowledge rate)
  • Queue metrics (queue depth, consumer count, memory usage)
  • Connection metrics (active connections, connection rate, channels)
  • Node health (memory usage, disk space, file descriptors)
  • Cluster status (node status, network partitions, synchronization)
  • Performance (message throughput, latency, GC frequency)
  • Security (authentication events, management API access)
  • Alarms (memory alarms, disk alarms, flow control)

Troubleshooting

Common Issues

RabbitMQ logs not appearing:

1
2
3
4
5
6
7
8
# Check RabbitMQ is running
sudo systemctl status rabbitmq-server

# Verify log configuration
sudo rabbitmqctl environment | grep log

# Check log file permissions
sudo ls -la /var/log/rabbitmq/

Log parsing errors:

1
2
3
4
5
6
7
8
# Check log format
sudo tail -n 10 /var/log/rabbitmq/rabbit.log

# Verify JSON logging (if enabled)
sudo tail -n 5 /var/log/rabbitmq/rabbit.log | jq .

# Check configuration
sudo rabbitmqctl environment | grep log

Connection monitoring issues:

1
2
3
4
5
6
7
8
# List active connections
sudo rabbitmqctl list_connections

# Check connection limits
sudo rabbitmqctl environment | grep connection

# Monitor connection logs
sudo tail -f /var/log/rabbitmq/rabbit.log | grep connection

Cluster monitoring problems:

1
2
3
4
5
6
7
8
# Check cluster status
sudo rabbitmqctl cluster_status

# Monitor node health
sudo rabbitmqctl node_health_check

# Check cluster logs
sudo tail -f /var/log/rabbitmq/rabbit.log | grep cluster

Best Practices

Performance

  • Monitor memory usage and set appropriate thresholds
  • Configure flow control to prevent message buildup
  • Use persistent messages judiciously to balance durability and performance
  • Monitor queue lengths and implement alerts for buildup

Security

  • Enable SSL/TLS for client connections
  • Configure authentication with strong credentials
  • Use virtual hosts for multi-tenant deployments
  • Monitor management API access and restrict permissions

High Availability

  • Configure cluster properly with odd number of nodes
  • Monitor network partitions and implement recovery procedures
  • Use mirrored queues for critical message durability
  • Implement proper backup strategies for definitions and messages

Log Management

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Optimize log rotation for RabbitMQ
/var/log/rabbitmq/*.log {
    daily
    rotate 30
    missingok
    notifempty
    compress
    delaycompress
    postrotate
        systemctl reload rabbitmq-server.service > /dev/null 2>&1 || true
        systemctl reload logflux-filestream.service > /dev/null 2>&1 || true
    endpostrotate
}

Monitoring Configuration

Enable comprehensive logging for production:

1
2
3
4
5
6
7
8
9
# Production logging configuration
log.file.level = info
log.connection.level = info
log.channel.level = info
log.queue.level = info
log.mirroring.level = info
audit.categories.connection = true
audit.categories.channel = true
audit.categories.queue = true

Integration Examples

Docker Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.11-management-alpine
    hostname: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - rabbitmq_logs:/var/log/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./enabled_plugins:/etc/rabbitmq/enabled_plugins
    ports:
      - "5672:5672"
      - "15672:15672"
      
  logflux-agent:
    image: logflux/agent:latest
    volumes:
      - rabbitmq_logs:/var/log/rabbitmq:ro
      - ./logflux-config:/etc/logflux-agent/plugins
    depends_on:
      - rabbitmq

volumes:
  rabbitmq_data:
  rabbitmq_logs:

Kubernetes Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.11-management
        env:
        - name: RABBITMQ_DEFAULT_USER
          value: admin
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        - name: RABBITMQ_ERLANG_COOKIE
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: erlang-cookie
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
        - name: rabbitmq-logs
          mountPath: /var/log/rabbitmq
        - name: rabbitmq-config
          mountPath: /etc/rabbitmq/rabbitmq.conf
          subPath: rabbitmq.conf
        ports:
        - containerPort: 5672
        - containerPort: 15672
      volumes:
      - name: rabbitmq-config
        configMap:
          name: rabbitmq-config
      - name: rabbitmq-logs
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq-data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi

Cluster Health Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/bin/bash
# Comprehensive RabbitMQ cluster health check
check_cluster_health() {
    local timestamp=$(date -u +%Y-%m-%dT%H:%M:%S.000Z)
    
    # Cluster status
    rabbitmqctl cluster_status --formatter json | \
    jq --arg ts "$timestamp" '. + {"timestamp": $ts, "check_type": "cluster_status"}' \
        >> /var/log/rabbitmq/health-check.log
    
    # Node health
    for node in $(rabbitmqctl cluster_status --formatter json | jq -r '.running_nodes[]'); do
        rabbitmqctl node_health_check -n $node --formatter json | \
        jq --arg ts "$timestamp" --arg node "$node" '. + {"timestamp": $ts, "node": $node, "check_type": "node_health"}' \
            >> /var/log/rabbitmq/health-check.log
    done
}

while true; do
    check_cluster_health
    sleep 300  # Every 5 minutes
done

This comprehensive RabbitMQ integration provides real-time message broker monitoring, connection tracking, and cluster health analysis using LogFlux Agent’s File Stream plugin. The configuration-based approach offers detailed insights into message flow, queue operations, and system performance across different RabbitMQ deployment scenarios.

Disclaimer

The RabbitMQ logo and trademarks are the property of VMware, Inc. and its subsidiaries. LogFlux is not affiliated with, endorsed by, or sponsored by VMware, Inc. The RabbitMQ logo is used solely for identification purposes to indicate compatibility and integration capabilities.