RabbitMQ
Monitor RabbitMQ message broker logs including connection logs, queue operations, cluster events, and performance metrics using the File Stream plugin
RabbitMQ Integration
Monitor and analyze RabbitMQ message broker logs in real-time using LogFlux Agent’s File Stream plugin. This configuration-based approach provides comprehensive message queue monitoring, connection tracking, and cluster health analysis for RabbitMQ deployments.
Overview
The RabbitMQ integration leverages LogFlux Agent’s File Stream plugin to:
- Real-time monitoring of RabbitMQ server logs, connection logs, and audit logs
- Message flow tracking with queue operations, routing, and delivery monitoring
- Connection management with client connection and channel lifecycle monitoring
- Cluster monitoring for RabbitMQ cluster deployments and node health
- Performance analysis with memory usage, disk space, and throughput metrics
- Security monitoring with authentication events and access control logging
Installation
The File Stream plugin is included with LogFlux Agent. Enable it for RabbitMQ log monitoring:
1
2
3
4
5
|
# Enable File Stream plugin
sudo systemctl enable --now logflux-filestream
# Verify plugin status
sudo systemctl status logflux-filestream
|
RabbitMQ Configuration
Configure RabbitMQ logging in /etc/rabbitmq/rabbitmq.conf
:
Basic Logging Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# Log levels: debug, info, warning, error, critical, none
log.console.level = info
log.file.level = info
log.file = /var/log/rabbitmq/rabbit.log
# Connection logging
log.connection.level = info
# Channel logging
log.channel.level = info
# Queue logging
log.queue.level = info
# Mirroring logging (for HA queues)
log.mirroring.level = info
# Federation logging
log.federation.level = info
# LDAP logging (if using LDAP authentication)
log.ldap.level = info
# Default user logging
log.default.level = info
|
Enhanced Logging Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# Detailed logging configuration
log.console = true
log.console.level = info
log.console.formatter = json
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info
log.file.formatter = json
log.file.rotation.date = $D0
log.file.rotation.size = 100MB
log.file.rotation.count = 10
# Access logging
log.file.access = /var/log/rabbitmq/access.log
log.file.access.level = info
# Error logging
log.file.error = /var/log/rabbitmq/error.log
log.file.error.level = error
# Connection lifecycle logging
log.connection.level = debug
# SASL (crash reports)
log.file.sasl = /var/log/rabbitmq/sasl.log
# Kernel messages
log.file.kernel = /var/log/rabbitmq/kernel.log
|
Audit Plugin Configuration
Enable RabbitMQ audit plugin in /etc/rabbitmq/enabled_plugins
:
[rabbitmq_auth_backend_ldap,rabbitmq_management,rabbitmq_audit].
Configure audit logging in /etc/rabbitmq/rabbitmq.conf
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# Audit logging
audit.file = /var/log/rabbitmq/audit.log
audit.file.level = info
audit.file.rotation.date = $D0
audit.file.rotation.size = 50MB
audit.file.rotation.count = 5
# Audit categories
audit.categories.management = true
audit.categories.channel = true
audit.categories.connection = true
audit.categories.queue = true
audit.categories.exchange = true
audit.categories.binding = true
audit.categories.vhost = true
audit.categories.user = true
audit.categories.permission = true
|
Basic Configuration
Configure the File Stream plugin to monitor RabbitMQ logs by creating /etc/logflux-agent/plugins/filestream-rabbitmq.toml
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
[filestream.rabbitmq_main]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "message-broker", "amqp"]
fields = {
service = "rabbitmq",
log_type = "main"
}
[filestream.rabbitmq_access]
paths = ["/var/log/rabbitmq/access.log"]
format = "rabbitmq_access"
tags = ["rabbitmq", "access", "connection"]
fields = {
service = "rabbitmq",
log_type = "access"
}
[filestream.rabbitmq_error]
paths = ["/var/log/rabbitmq/error.log"]
format = "rabbitmq_error"
tags = ["rabbitmq", "error", "critical"]
fields = {
service = "rabbitmq",
log_type = "error"
}
[filestream.rabbitmq_audit]
paths = ["/var/log/rabbitmq/audit.log"]
format = "json"
tags = ["rabbitmq", "audit", "security"]
fields = {
service = "rabbitmq",
log_type = "audit"
}
|
1
2
3
4
5
6
7
|
[filestream.rabbitmq_standard]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "regex"
regex = '^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \[(?P<level>\w+)\] <(?P<process_id>[^>]+)> (?P<message>.*)$'
parse_timestamp = true
timestamp_field = "timestamp"
timestamp_format = "2006-01-02 15:04:05.000"
|
1
2
3
4
5
6
7
8
9
10
11
|
[filestream.rabbitmq_json]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "json"
parse_timestamp = true
timestamp_field = "time"
timestamp_format = "2006-01-02T15:04:05.000Z"
tags = ["rabbitmq", "json"]
fields = {
service = "rabbitmq",
log_type = "json"
}
|
1
2
3
4
5
6
7
8
|
[filestream.rabbitmq_connections]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "regex"
regex = '^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \[(?P<level>\w+)\] <(?P<process_id>[^>]+)> (?P<connection_event>accepting|closing|connection) (?P<connection_details>.*)$'
parse_timestamp = true
timestamp_field = "timestamp"
timestamp_format = "2006-01-02 15:04:05.000"
tags = ["rabbitmq", "connections"]
|
1
2
3
4
5
6
7
8
9
10
11
12
|
[filestream.rabbitmq_management_audit]
paths = ["/var/log/rabbitmq/audit.log"]
format = "json"
parse_timestamp = true
timestamp_field = "timestamp"
timestamp_format = "2006-01-02T15:04:05.000Z"
tags = ["rabbitmq", "management", "api"]
fields = {
service = "rabbitmq",
component = "management",
log_type = "api_audit"
}
|
Advanced Configuration
Queue Operations Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
[filestream.rabbitmq_queues]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "queues", "operations"]
fields = {
service = "rabbitmq",
log_type = "queue_operations"
}
# Filter for queue-related events
[filestream.rabbitmq_queues.processors.grep]
patterns = [
"queue",
"declare",
"delete",
"purge",
"bind",
"unbind",
"consume",
"ack",
"nack",
"reject"
]
|
Cluster Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[filestream.rabbitmq_cluster]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "cluster", "ha"]
fields = {
service = "rabbitmq",
log_type = "cluster"
}
# Filter for cluster events
[filestream.rabbitmq_cluster.processors.grep]
patterns = [
"node",
"cluster",
"partition",
"split-brain",
"mnesia",
"join",
"leave",
"sync"
]
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[filestream.rabbitmq_performance]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "performance", "memory"]
fields = {
service = "rabbitmq",
log_type = "performance"
}
# Filter for performance-related events
[filestream.rabbitmq_performance.processors.grep]
patterns = [
"memory",
"disk",
"alarm",
"flow",
"blocked",
"throttle",
"gc",
"load"
]
|
Federation and Shovel Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
[filestream.rabbitmq_federation]
paths = ["/var/log/rabbitmq/rabbit.log"]
format = "rabbitmq_log"
tags = ["rabbitmq", "federation", "shovel"]
fields = {
service = "rabbitmq",
log_type = "federation"
}
# Filter for federation events
[filestream.rabbitmq_federation.processors.grep]
patterns = [
"federation",
"shovel",
"upstream",
"downstream",
"link",
"policy"
]
|
Usage Examples
Monitor RabbitMQ Operations
1
2
3
4
5
6
7
8
|
# Stream all RabbitMQ logs
logflux-cli stream --filter 'service:rabbitmq'
# Monitor connection events
logflux-cli stream --filter 'service:rabbitmq AND log_type:access'
# Track queue operations
logflux-cli stream --filter 'service:rabbitmq AND log_type:queue_operations'
|
1
2
3
4
5
6
7
8
|
# Monitor memory alarms
logflux-cli stream --filter 'service:rabbitmq AND message:memory AND message:alarm'
# Track blocked connections
logflux-cli stream --filter 'service:rabbitmq AND message:blocked'
# Monitor cluster events
logflux-cli stream --filter 'service:rabbitmq AND log_type:cluster'
|
Security Monitoring
1
2
3
4
5
6
7
8
|
# Track authentication events
logflux-cli stream --filter 'service:rabbitmq AND message:authentication'
# Monitor management API access
logflux-cli stream --filter 'service:rabbitmq AND component:management'
# Track permission changes
logflux-cli stream --filter 'service:rabbitmq AND message:permission'
|
Troubleshooting
1
2
3
4
5
6
7
8
|
# Monitor error logs
logflux-cli stream --filter 'service:rabbitmq AND log_type:error'
# Track connection failures
logflux-cli stream --filter 'service:rabbitmq AND message:connection AND level:ERROR'
# Monitor queue declaration issues
logflux-cli stream --filter 'service:rabbitmq AND message:declare AND level:ERROR'
|
RabbitMQ Metrics Collection
Management API Integration
Create metrics collection script:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
#!/bin/bash
# Collect RabbitMQ metrics via Management API
RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="password"
collect_metrics() {
local timestamp=$(date -u +%Y-%m-%dT%H:%M:%S.000Z)
# Overview metrics
curl -s -u $RABBITMQ_USER:$RABBITMQ_PASS \
"http://$RABBITMQ_HOST:$RABBITMQ_PORT/api/overview" | \
jq --arg ts "$timestamp" '. + {"timestamp": $ts, "metric_type": "overview"}' \
>> /var/log/rabbitmq/metrics.log
# Node metrics
curl -s -u $RABBITMQ_USER:$RABBITMQ_PASS \
"http://$RABBITMQ_HOST:$RABBITMQ_PORT/api/nodes" | \
jq --arg ts "$timestamp" '.[] + {"timestamp": $ts, "metric_type": "node"}' \
>> /var/log/rabbitmq/metrics.log
# Queue metrics
curl -s -u $RABBITMQ_USER:$RABBITMQ_PASS \
"http://$RABBITMQ_HOST:$RABBITMQ_PORT/api/queues" | \
jq --arg ts "$timestamp" '.[] + {"timestamp": $ts, "metric_type": "queue"}' \
>> /var/log/rabbitmq/metrics.log
}
while true; do
collect_metrics
sleep 60
done
|
Connection Statistics
1
2
3
4
5
6
7
8
9
10
11
|
#!/bin/bash
# Monitor connection statistics
while true; do
rabbitmqctl list_connections name peer_host peer_port state channels | \
awk -v ts="$(date -u +%Y-%m-%dT%H:%M:%S.000Z)" '
NR > 1 {
printf "{\"timestamp\": \"%s\", \"connection_name\": \"%s\", \"peer_host\": \"%s\", \"peer_port\": %s, \"state\": \"%s\", \"channels\": %s, \"metric_type\": \"connection\"}\n",
ts, $1, $2, $3, $4, $5
}' >> /var/log/rabbitmq/connection-stats.log
sleep 30
done
|
Queue Statistics
1
2
3
4
5
6
7
8
9
10
11
|
#!/bin/bash
# Monitor queue statistics
while true; do
rabbitmqctl list_queues name messages consumers memory | \
awk -v ts="$(date -u +%Y-%m-%dT%H:%M:%S.000Z)" '
NR > 1 {
printf "{\"timestamp\": \"%s\", \"queue_name\": \"%s\", \"messages\": %s, \"consumers\": %s, \"memory\": %s, \"metric_type\": \"queue_stats\"}\n",
ts, $1, $2, $3, $4
}' >> /var/log/rabbitmq/queue-stats.log
sleep 60
done
|
Monitoring and Alerting
Key Metrics to Monitor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# Memory alarm alert
[alerts.rabbitmq_memory_alarm]
query = "service:rabbitmq AND message:memory AND message:alarm"
threshold = 1
window = "30s"
message = "RabbitMQ memory alarm triggered"
# Disk space alarm alert
[alerts.rabbitmq_disk_alarm]
query = "service:rabbitmq AND message:disk AND message:alarm"
threshold = 1
window = "30s"
message = "RabbitMQ disk space alarm triggered"
# Connection limit alert
[alerts.rabbitmq_connection_limit]
query = "service:rabbitmq AND message:connection_limit"
threshold = 1
window = "1m"
message = "RabbitMQ connection limit reached"
# Queue buildup alert
[alerts.rabbitmq_queue_buildup]
query = "service:rabbitmq AND metric_type:queue_stats AND messages:>10000"
threshold = 1
window = "2m"
message = "RabbitMQ queue buildup detected: {{ .queue_name }}"
# Cluster partition alert
[alerts.rabbitmq_cluster_partition]
query = "service:rabbitmq AND message:partition"
threshold = 1
window = "10s"
message = "RabbitMQ cluster partition detected"
# Authentication failure alert
[alerts.rabbitmq_auth_failure]
query = "service:rabbitmq AND message:authentication AND level:ERROR"
threshold = 5
window = "1m"
message = "High authentication failure rate in RabbitMQ"
|
Dashboard Metrics
Monitor these key RabbitMQ metrics:
- Message rates (publish rate, delivery rate, acknowledge rate)
- Queue metrics (queue depth, consumer count, memory usage)
- Connection metrics (active connections, connection rate, channels)
- Node health (memory usage, disk space, file descriptors)
- Cluster status (node status, network partitions, synchronization)
- Performance (message throughput, latency, GC frequency)
- Security (authentication events, management API access)
- Alarms (memory alarms, disk alarms, flow control)
Troubleshooting
Common Issues
RabbitMQ logs not appearing:
1
2
3
4
5
6
7
8
|
# Check RabbitMQ is running
sudo systemctl status rabbitmq-server
# Verify log configuration
sudo rabbitmqctl environment | grep log
# Check log file permissions
sudo ls -la /var/log/rabbitmq/
|
Log parsing errors:
1
2
3
4
5
6
7
8
|
# Check log format
sudo tail -n 10 /var/log/rabbitmq/rabbit.log
# Verify JSON logging (if enabled)
sudo tail -n 5 /var/log/rabbitmq/rabbit.log | jq .
# Check configuration
sudo rabbitmqctl environment | grep log
|
Connection monitoring issues:
1
2
3
4
5
6
7
8
|
# List active connections
sudo rabbitmqctl list_connections
# Check connection limits
sudo rabbitmqctl environment | grep connection
# Monitor connection logs
sudo tail -f /var/log/rabbitmq/rabbit.log | grep connection
|
Cluster monitoring problems:
1
2
3
4
5
6
7
8
|
# Check cluster status
sudo rabbitmqctl cluster_status
# Monitor node health
sudo rabbitmqctl node_health_check
# Check cluster logs
sudo tail -f /var/log/rabbitmq/rabbit.log | grep cluster
|
Best Practices
- Monitor memory usage and set appropriate thresholds
- Configure flow control to prevent message buildup
- Use persistent messages judiciously to balance durability and performance
- Monitor queue lengths and implement alerts for buildup
Security
- Enable SSL/TLS for client connections
- Configure authentication with strong credentials
- Use virtual hosts for multi-tenant deployments
- Monitor management API access and restrict permissions
High Availability
- Configure cluster properly with odd number of nodes
- Monitor network partitions and implement recovery procedures
- Use mirrored queues for critical message durability
- Implement proper backup strategies for definitions and messages
Log Management
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Optimize log rotation for RabbitMQ
/var/log/rabbitmq/*.log {
daily
rotate 30
missingok
notifempty
compress
delaycompress
postrotate
systemctl reload rabbitmq-server.service > /dev/null 2>&1 || true
systemctl reload logflux-filestream.service > /dev/null 2>&1 || true
endpostrotate
}
|
Monitoring Configuration
Enable comprehensive logging for production:
1
2
3
4
5
6
7
8
9
|
# Production logging configuration
log.file.level = info
log.connection.level = info
log.channel.level = info
log.queue.level = info
log.mirroring.level = info
audit.categories.connection = true
audit.categories.channel = true
audit.categories.queue = true
|
Integration Examples
Docker Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.11-management-alpine
hostname: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- rabbitmq_logs:/var/log/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./enabled_plugins:/etc/rabbitmq/enabled_plugins
ports:
- "5672:5672"
- "15672:15672"
logflux-agent:
image: logflux/agent:latest
volumes:
- rabbitmq_logs:/var/log/rabbitmq:ro
- ./logflux-config:/etc/logflux-agent/plugins
depends_on:
- rabbitmq
volumes:
rabbitmq_data:
rabbitmq_logs:
|
Kubernetes Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
spec:
serviceName: rabbitmq
replicas: 3
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3.11-management
env:
- name: RABBITMQ_DEFAULT_USER
value: admin
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: password
- name: RABBITMQ_ERLANG_COOKIE
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: erlang-cookie
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
- name: rabbitmq-logs
mountPath: /var/log/rabbitmq
- name: rabbitmq-config
mountPath: /etc/rabbitmq/rabbitmq.conf
subPath: rabbitmq.conf
ports:
- containerPort: 5672
- containerPort: 15672
volumes:
- name: rabbitmq-config
configMap:
name: rabbitmq-config
- name: rabbitmq-logs
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: rabbitmq-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
|
Cluster Health Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/bin/bash
# Comprehensive RabbitMQ cluster health check
check_cluster_health() {
local timestamp=$(date -u +%Y-%m-%dT%H:%M:%S.000Z)
# Cluster status
rabbitmqctl cluster_status --formatter json | \
jq --arg ts "$timestamp" '. + {"timestamp": $ts, "check_type": "cluster_status"}' \
>> /var/log/rabbitmq/health-check.log
# Node health
for node in $(rabbitmqctl cluster_status --formatter json | jq -r '.running_nodes[]'); do
rabbitmqctl node_health_check -n $node --formatter json | \
jq --arg ts "$timestamp" --arg node "$node" '. + {"timestamp": $ts, "node": $node, "check_type": "node_health"}' \
>> /var/log/rabbitmq/health-check.log
done
}
while true; do
check_cluster_health
sleep 300 # Every 5 minutes
done
|
This comprehensive RabbitMQ integration provides real-time message broker monitoring, connection tracking, and cluster health analysis using LogFlux Agent’s File Stream plugin. The configuration-based approach offers detailed insights into message flow, queue operations, and system performance across different RabbitMQ deployment scenarios.
Disclaimer
The RabbitMQ logo and trademarks are the property of VMware, Inc. and its subsidiaries. LogFlux is not affiliated with, endorsed by, or sponsored by VMware, Inc. The RabbitMQ logo is used solely for identification purposes to indicate compatibility and integration capabilities.