Fluentd Integration
Receive logs via the Fluentd Forward protocol with LogFlux Agent
The LogFlux Fluentd plugin provides a Fluentd Forward protocol-compatible receiver that accepts logs from existing Fluentd infrastructure and forwards them to LogFlux. This enables seamless migration from Fluentd deployments while maintaining compatibility with existing log collection pipelines.
Overview
The Fluentd plugin provides:
- Forward Protocol Compatibility: Full support for Fluentd Forward protocol v1
- TLS/SSL Support: Secure communication with existing Fluentd forwarders
- Tag-based Routing: Route logs based on Fluentd tags and labels
- Metadata Preservation: Preserve Fluentd metadata and timestamps
- Buffer Management: Handle high-volume log streams with built-in buffering
- Compression Support: gzip and lz4 compression for network efficiency
- Multi-worker Support: Concurrent processing for high-throughput scenarios
Installation
The Fluentd plugin is included with the LogFlux Agent but disabled by default.
Prerequisites
- LogFlux Agent installed (see Installation Guide)
- Network access to Fluentd Forward port (default: 24224)
- Existing Fluentd forwarders configured to send to LogFlux Agent
Enable the Plugin
1
2
3
4
5
|
# Enable and start the Fluentd plugin
sudo systemctl enable --now logflux-fluentd
# Check status
sudo systemctl status logflux-fluentd
|
Configuration
Basic Configuration
Create or edit the Fluentd plugin configuration:
1
|
sudo nano /etc/logflux-agent/plugins/fluentd.yaml
|
Basic configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# Fluentd Plugin Configuration
fluentd:
# Bind address and port (standard Fluentd Forward port)
bind_address: "0.0.0.0:24224"
# Enable keepalive for persistent connections
keepalive: true
# Chunk size limit (bytes)
chunk_size_limit: 67108864 # 64MB
# Enable compression
compression: gzip
# Security settings
security:
# Shared key authentication (optional)
shared_key: ""
# TLS configuration
tls:
enabled: false
cert_file: "/etc/ssl/certs/fluentd.crt"
key_file: "/etc/ssl/private/fluentd.key"
# Log processing
processing:
# Add LogFlux metadata
add_metadata: true
# Preserve original timestamps
preserve_timestamp: true
# Tag-based log level mapping
tag_mapping:
"app.error": "error"
"app.warn": "warn"
"app.info": "info"
"app.debug": "debug"
|
Advanced Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# Advanced Fluentd Plugin Configuration
fluentd:
# Network settings
bind_address: "0.0.0.0:24224"
keepalive: true
keepalive_timeout: 600
# Buffer settings
chunk_size_limit: 134217728 # 128MB
chunk_size_warn_limit: 67108864 # 64MB
# Worker settings
workers: 4
backlog: 1024
# Compression
compression: gzip
compression_level: 6
# Security configuration
security:
# Shared key for authentication
shared_key: "secure-shared-key-here"
# TLS settings
tls:
enabled: true
cert_file: "/etc/logflux-agent/certs/fluentd.crt"
key_file: "/etc/logflux-agent/certs/fluentd.key"
# Client certificate verification
verify_mode: "peer"
ca_file: "/etc/logflux-agent/certs/ca.crt"
# Tag-based routing and processing
routing:
# Tag patterns and their log levels
tag_rules:
- pattern: "^app\\.error.*"
level: "error"
node: "application"
- pattern: "^app\\.warn.*"
level: "warn"
node: "application"
- pattern: "^system\\..*"
level: "info"
node: "system"
- pattern: "^nginx\\.access.*"
level: "info"
node: "webserver"
- pattern: "^nginx\\.error.*"
level: "error"
node: "webserver"
# Default routing for unmatched tags
default_level: "info"
default_node: "fluentd"
# Message processing
processing:
# Metadata handling
add_metadata: true
preserve_timestamp: true
add_hostname: true
add_tag_as_field: true
tag_field_name: "fluentd_tag"
# Message transformation
json_parsing: true
flatten_nested: false
max_nested_depth: 10
# Field mapping
field_mapping:
"log": "message"
"level": "log_level"
"time": "timestamp"
# Performance tuning
performance:
# Buffer sizes
receive_buffer_size: 1048576 # 1MB
send_buffer_size: 1048576 # 1MB
# Queue settings
queue_size: 10000
flush_interval: 1s
# Connection limits
max_connections: 1000
connection_timeout: 30s
# Monitoring
monitoring:
# Health check endpoint
health_check_port: 24225
# Metrics collection
collect_metrics: true
metrics_interval: 30s
|
Usage Examples
Migrating from Existing Fluentd
Existing Fluentd Configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# fluent.conf
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluentd/app.log.pos
tag app.logs
format json
</source>
<match app.**>
@type forward
<server>
host logflux-agent-host
port 24224
</server>
<buffer>
flush_interval 1s
</buffer>
</match>
|
LogFlux Agent Configuration:
1
2
3
4
5
6
7
8
9
10
|
# fluentd.yaml
fluentd:
bind_address: "0.0.0.0:24224"
compression: gzip
routing:
tag_rules:
- pattern: "^app\\..*"
level: "info"
node: "application"
|
Multi-environment Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# Multi-environment Fluentd configuration
routing:
tag_rules:
# Production logs
- pattern: "^prod\\.app\\.error.*"
level: "error"
node: "prod-app"
- pattern: "^prod\\.app\\..*"
level: "info"
node: "prod-app"
# Staging logs
- pattern: "^staging\\.app\\..*"
level: "debug"
node: "staging-app"
# Development logs
- pattern: "^dev\\..*"
level: "debug"
node: "dev-app"
processing:
# Extract environment from tag
add_metadata: true
field_mapping:
"environment": "env"
|
Container Integration
Docker with Fluentd Logging Driver:
1
2
3
4
5
6
|
# Run container with Fluentd logging driver
docker run -d \
--log-driver=fluentd \
--log-opt fluentd-address=localhost:24224 \
--log-opt tag="docker.{{.Name}}" \
nginx
|
LogFlux Configuration:
1
2
3
4
5
6
7
8
9
10
11
|
routing:
tag_rules:
- pattern: "^docker\\..*"
level: "info"
node: "containers"
processing:
add_metadata: true
field_mapping:
"container_name": "container"
"container_id": "container_id"
|
Fluentd Forward Protocol
The plugin accepts standard Fluentd Forward protocol messages:
1
2
3
4
5
|
[
"tag.name",
[[timestamp, {"key": "value"}], [timestamp, {"key": "value"}]],
{"option": "value"}
]
|
Messages are transformed and sent to LogFlux:
1
2
3
4
5
6
7
8
9
10
11
|
{
"timestamp": "2024-01-20T14:30:50.000Z",
"level": "info",
"message": "User login successful",
"node": "application",
"fluentd_tag": "app.auth",
"metadata": {
"source": "fluentd",
"hostname": "app-server-01"
}
}
|
Security Configuration
Shared Key Authentication
1
2
3
|
# Enable shared key authentication
security:
shared_key: "your-secure-shared-key"
|
Fluentd Forwarder Configuration:
1
2
3
4
5
6
7
8
|
<match **>
@type forward
<server>
host logflux-agent
port 24224
shared_key your-secure-shared-key
</server>
</match>
|
TLS/SSL Configuration
1
2
3
4
5
6
7
8
|
# Enable TLS
security:
tls:
enabled: true
cert_file: "/etc/logflux-agent/certs/server.crt"
key_file: "/etc/logflux-agent/certs/server.key"
verify_mode: "peer"
ca_file: "/etc/logflux-agent/certs/ca.crt"
|
Generate certificates:
1
2
3
4
5
6
7
|
# Generate CA and server certificates
openssl genrsa -out ca.key 4096
openssl req -new -x509 -key ca.key -out ca.crt -days 365
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -out server.crt
|
High-Volume Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# High-throughput configuration
fluentd:
workers: 8
backlog: 2048
chunk_size_limit: 268435456 # 256MB
performance:
receive_buffer_size: 2097152 # 2MB
queue_size: 50000
max_connections: 2000
processing:
json_parsing: false # Disable if not needed
add_metadata: false # Reduce overhead
|
Memory Management
1
2
3
4
5
6
7
8
9
10
11
|
# Memory-optimized settings
fluentd:
chunk_size_limit: 67108864 # 64MB
chunk_size_warn_limit: 33554432 # 32MB
performance:
queue_size: 5000
flush_interval: 5s
processing:
max_nested_depth: 5 # Limit JSON parsing depth
|
Monitoring and Health Checks
Health Check Endpoint
1
2
3
4
5
6
7
8
9
10
|
# Check plugin health
curl http://localhost:24225/health
# Expected response
{
"status": "ok",
"connections": 25,
"messages_received": 150420,
"messages_forwarded": 150420
}
|
Log Analysis
1
2
3
4
5
6
7
8
|
# Monitor plugin logs
sudo journalctl -u logflux-fluentd -f
# Check connection statistics
sudo ss -tulnp | grep :24224
# Monitor buffer usage
sudo du -h /var/lib/logflux-agent/buffers/fluentd/
|
Troubleshooting
Common Issues
Connection Refused:
1
2
3
4
5
6
7
8
|
# Check if plugin is listening
sudo ss -tulnp | grep 24224
# Verify firewall rules
sudo iptables -L | grep 24224
# Check plugin status
sudo systemctl status logflux-fluentd
|
Authentication Failures:
1
2
3
4
5
6
7
|
# Verify shared key configuration
sudo journalctl -u logflux-fluentd | grep -i auth
# Test without authentication first
# fluentd.yaml
security:
shared_key: ""
|
High Memory Usage:
1
2
3
4
5
6
7
|
# Reduce buffer sizes
fluentd:
chunk_size_limit: 16777216 # 16MB
performance:
queue_size: 1000
flush_interval: 1s
|
Message Loss:
1
2
3
4
5
6
7
8
9
|
# Enable acknowledgements
fluentd:
require_ack_response: true
ack_response_timeout: 30s
# Check buffer persistence
performance:
buffer_persistent: true
buffer_path: "/var/lib/logflux-agent/buffers/fluentd"
|
Debugging
1
2
3
4
5
6
7
8
9
10
11
12
|
# Enable verbose logging
sudo systemctl edit logflux-fluentd
# Add:
[Service]
Environment="LOGFLUX_LOG_LEVEL=debug"
# Test Fluentd connectivity
echo '["test.tag", [[1640995850, {"message": "test"}]]]' | \
nc localhost 24224
# Monitor real-time messages
sudo journalctl -u logflux-fluentd -f | grep -E "(received|forwarded)"
|
Migration Best Practices
Gradual Migration
- Parallel Setup: Run LogFlux Agent alongside existing Fluentd aggregators
- Tag-based Routing: Route specific tags to LogFlux for testing
- Data Validation: Compare log volumes and content between systems
- Performance Testing: Validate performance under production load
- Complete Migration: Switch all forwarders to LogFlux Agent
Configuration Migration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/bin/bash
# migrate-fluentd-config.sh
# Helper script to migrate Fluentd match patterns to LogFlux routing rules
input_file="fluent.conf"
output_file="fluentd.yaml"
echo "routing:" > $output_file
echo " tag_rules:" >> $output_file
grep -E "^<match" $input_file | while read line; do
pattern=$(echo $line | sed 's/<match \(.*\)>/\1/')
echo " - pattern: \"^${pattern}\"" >> $output_file
echo " level: \"info\"" >> $output_file
echo " node: \"fluentd\"" >> $output_file
done
|
Integration Examples
ELK Stack Migration
Replace Fluentd → Elasticsearch with Fluentd → LogFlux:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# Original: Fluentd → Elasticsearch
# New: Fluentd → LogFlux Agent → LogFlux
routing:
tag_rules:
- pattern: "^elasticsearch\\..*"
level: "info"
node: "search"
- pattern: "^kibana\\..*"
level: "info"
node: "visualization"
- pattern: "^logstash\\..*"
level: "info"
node: "processing"
|
Kubernetes Integration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# K8s DaemonSet with LogFlux Agent
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: logflux-agent
spec:
template:
spec:
containers:
- name: logflux-agent
image: logflux/agent:latest
ports:
- containerPort: 24224
name: fluentd-forward
volumeMounts:
- name: config
mountPath: /etc/logflux-agent/plugins/
volumes:
- name: config
configMap:
name: logflux-fluentd-config
|
Disclaimer
Fluentd and the Fluentd logo are trademarks of The Linux Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by The Linux Foundation or the Fluentd project. The Fluentd logo is used solely for identification purposes to indicate compatibility with the Fluentd Forward protocol.
Next Steps