Fluentd Integration

Receive logs via the Fluentd Forward protocol with LogFlux Agent

Fluentd

The LogFlux Fluentd plugin provides a Fluentd Forward protocol-compatible receiver that accepts logs from existing Fluentd infrastructure and forwards them to LogFlux. This enables seamless migration from Fluentd deployments while maintaining compatibility with existing log collection pipelines.

Overview

The Fluentd plugin provides:

  • Forward Protocol Compatibility: Full support for Fluentd Forward protocol v1
  • TLS/SSL Support: Secure communication with existing Fluentd forwarders
  • Tag-based Routing: Route logs based on Fluentd tags and labels
  • Metadata Preservation: Preserve Fluentd metadata and timestamps
  • Buffer Management: Handle high-volume log streams with built-in buffering
  • Compression Support: gzip and lz4 compression for network efficiency
  • Multi-worker Support: Concurrent processing for high-throughput scenarios

Installation

The Fluentd plugin is included with the LogFlux Agent but disabled by default.

Prerequisites

  • LogFlux Agent installed (see Installation Guide)
  • Network access to Fluentd Forward port (default: 24224)
  • Existing Fluentd forwarders configured to send to LogFlux Agent

Enable the Plugin

1
2
3
4
5
# Enable and start the Fluentd plugin
sudo systemctl enable --now logflux-fluentd

# Check status
sudo systemctl status logflux-fluentd

Configuration

Basic Configuration

Create or edit the Fluentd plugin configuration:

1
sudo nano /etc/logflux-agent/plugins/fluentd.yaml

Basic configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Fluentd Plugin Configuration
fluentd:
  # Bind address and port (standard Fluentd Forward port)
  bind_address: "0.0.0.0:24224"
  
  # Enable keepalive for persistent connections
  keepalive: true
  
  # Chunk size limit (bytes)
  chunk_size_limit: 67108864  # 64MB
  
  # Enable compression
  compression: gzip

# Security settings
security:
  # Shared key authentication (optional)
  shared_key: ""
  
  # TLS configuration
  tls:
    enabled: false
    cert_file: "/etc/ssl/certs/fluentd.crt"
    key_file: "/etc/ssl/private/fluentd.key"

# Log processing
processing:
  # Add LogFlux metadata
  add_metadata: true
  
  # Preserve original timestamps
  preserve_timestamp: true
  
  # Tag-based log level mapping
  tag_mapping:
    "app.error": "error"
    "app.warn": "warn" 
    "app.info": "info"
    "app.debug": "debug"

Advanced Configuration

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
# Advanced Fluentd Plugin Configuration
fluentd:
  # Network settings
  bind_address: "0.0.0.0:24224"
  keepalive: true
  keepalive_timeout: 600
  
  # Buffer settings
  chunk_size_limit: 134217728  # 128MB
  chunk_size_warn_limit: 67108864  # 64MB
  
  # Worker settings
  workers: 4
  backlog: 1024
  
  # Compression
  compression: gzip
  compression_level: 6

# Security configuration
security:
  # Shared key for authentication
  shared_key: "secure-shared-key-here"
  
  # TLS settings
  tls:
    enabled: true
    cert_file: "/etc/logflux-agent/certs/fluentd.crt"
    key_file: "/etc/logflux-agent/certs/fluentd.key"
    
    # Client certificate verification
    verify_mode: "peer"
    ca_file: "/etc/logflux-agent/certs/ca.crt"

# Tag-based routing and processing
routing:
  # Tag patterns and their log levels
  tag_rules:
    - pattern: "^app\\.error.*"
      level: "error"
      node: "application"
    - pattern: "^app\\.warn.*"
      level: "warn"
      node: "application"
    - pattern: "^system\\..*"
      level: "info" 
      node: "system"
    - pattern: "^nginx\\.access.*"
      level: "info"
      node: "webserver"
    - pattern: "^nginx\\.error.*"
      level: "error"
      node: "webserver"
  
  # Default routing for unmatched tags
  default_level: "info"
  default_node: "fluentd"

# Message processing
processing:
  # Metadata handling
  add_metadata: true
  preserve_timestamp: true
  add_hostname: true
  add_tag_as_field: true
  tag_field_name: "fluentd_tag"
  
  # Message transformation
  json_parsing: true
  flatten_nested: false
  max_nested_depth: 10
  
  # Field mapping
  field_mapping:
    "log": "message"
    "level": "log_level"
    "time": "timestamp"

# Performance tuning
performance:
  # Buffer sizes
  receive_buffer_size: 1048576  # 1MB
  send_buffer_size: 1048576     # 1MB
  
  # Queue settings
  queue_size: 10000
  flush_interval: 1s
  
  # Connection limits
  max_connections: 1000
  connection_timeout: 30s

# Monitoring
monitoring:
  # Health check endpoint
  health_check_port: 24225
  
  # Metrics collection
  collect_metrics: true
  metrics_interval: 30s

Usage Examples

Migrating from Existing Fluentd

Existing Fluentd Configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# fluent.conf
<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/fluentd/app.log.pos
  tag app.logs
  format json
</source>

<match app.**>
  @type forward
  <server>
    host logflux-agent-host
    port 24224
  </server>
  <buffer>
    flush_interval 1s
  </buffer>
</match>

LogFlux Agent Configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# fluentd.yaml
fluentd:
  bind_address: "0.0.0.0:24224"
  compression: gzip

routing:
  tag_rules:
    - pattern: "^app\\..*"
      level: "info"
      node: "application"

Multi-environment Setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Multi-environment Fluentd configuration
routing:
  tag_rules:
    # Production logs
    - pattern: "^prod\\.app\\.error.*"
      level: "error"
      node: "prod-app"
    - pattern: "^prod\\.app\\..*"
      level: "info"
      node: "prod-app"
    
    # Staging logs  
    - pattern: "^staging\\.app\\..*"
      level: "debug"
      node: "staging-app"
    
    # Development logs
    - pattern: "^dev\\..*"
      level: "debug"
      node: "dev-app"

processing:
  # Extract environment from tag
  add_metadata: true
  field_mapping:
    "environment": "env"

Container Integration

Docker with Fluentd Logging Driver:

1
2
3
4
5
6
# Run container with Fluentd logging driver
docker run -d \
  --log-driver=fluentd \
  --log-opt fluentd-address=localhost:24224 \
  --log-opt tag="docker.{{.Name}}" \
  nginx

LogFlux Configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
routing:
  tag_rules:
    - pattern: "^docker\\..*"
      level: "info"
      node: "containers"

processing:
  add_metadata: true
  field_mapping:
    "container_name": "container"
    "container_id": "container_id"

Message Format

Fluentd Forward Protocol

The plugin accepts standard Fluentd Forward protocol messages:

1
2
3
4
5
[
  "tag.name",
  [[timestamp, {"key": "value"}], [timestamp, {"key": "value"}]],
  {"option": "value"}
]

LogFlux Output Format

Messages are transformed and sent to LogFlux:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
  "timestamp": "2024-01-20T14:30:50.000Z",
  "level": "info",
  "message": "User login successful",
  "node": "application",
  "fluentd_tag": "app.auth",
  "metadata": {
    "source": "fluentd",
    "hostname": "app-server-01"
  }
}

Security Configuration

Shared Key Authentication

1
2
3
# Enable shared key authentication
security:
  shared_key: "your-secure-shared-key"

Fluentd Forwarder Configuration:

1
2
3
4
5
6
7
8
<match **>
  @type forward
  <server>
    host logflux-agent
    port 24224
    shared_key your-secure-shared-key
  </server>
</match>

TLS/SSL Configuration

1
2
3
4
5
6
7
8
# Enable TLS
security:
  tls:
    enabled: true
    cert_file: "/etc/logflux-agent/certs/server.crt"
    key_file: "/etc/logflux-agent/certs/server.key"
    verify_mode: "peer"
    ca_file: "/etc/logflux-agent/certs/ca.crt"

Generate certificates:

1
2
3
4
5
6
7
# Generate CA and server certificates
openssl genrsa -out ca.key 4096
openssl req -new -x509 -key ca.key -out ca.crt -days 365

openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -out server.crt

Performance Optimization

High-Volume Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# High-throughput configuration
fluentd:
  workers: 8
  backlog: 2048
  chunk_size_limit: 268435456  # 256MB

performance:
  receive_buffer_size: 2097152  # 2MB
  queue_size: 50000
  max_connections: 2000

processing:
  json_parsing: false  # Disable if not needed
  add_metadata: false  # Reduce overhead

Memory Management

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Memory-optimized settings
fluentd:
  chunk_size_limit: 67108864    # 64MB
  chunk_size_warn_limit: 33554432  # 32MB

performance:
  queue_size: 5000
  flush_interval: 5s

processing:
  max_nested_depth: 5  # Limit JSON parsing depth

Monitoring and Health Checks

Health Check Endpoint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Check plugin health
curl http://localhost:24225/health

# Expected response
{
  "status": "ok",
  "connections": 25,
  "messages_received": 150420,
  "messages_forwarded": 150420
}

Log Analysis

1
2
3
4
5
6
7
8
# Monitor plugin logs
sudo journalctl -u logflux-fluentd -f

# Check connection statistics
sudo ss -tulnp | grep :24224

# Monitor buffer usage
sudo du -h /var/lib/logflux-agent/buffers/fluentd/

Troubleshooting

Common Issues

Connection Refused:

1
2
3
4
5
6
7
8
# Check if plugin is listening
sudo ss -tulnp | grep 24224

# Verify firewall rules
sudo iptables -L | grep 24224

# Check plugin status
sudo systemctl status logflux-fluentd

Authentication Failures:

1
2
3
4
5
6
7
# Verify shared key configuration
sudo journalctl -u logflux-fluentd | grep -i auth

# Test without authentication first
# fluentd.yaml
security:
  shared_key: ""

High Memory Usage:

1
2
3
4
5
6
7
# Reduce buffer sizes
fluentd:
  chunk_size_limit: 16777216  # 16MB
  
performance:
  queue_size: 1000
  flush_interval: 1s

Message Loss:

1
2
3
4
5
6
7
8
9
# Enable acknowledgements
fluentd:
  require_ack_response: true
  ack_response_timeout: 30s

# Check buffer persistence
performance:
  buffer_persistent: true
  buffer_path: "/var/lib/logflux-agent/buffers/fluentd"

Debugging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Enable verbose logging
sudo systemctl edit logflux-fluentd
# Add:
[Service]
Environment="LOGFLUX_LOG_LEVEL=debug"

# Test Fluentd connectivity
echo '["test.tag", [[1640995850, {"message": "test"}]]]' | \
  nc localhost 24224

# Monitor real-time messages
sudo journalctl -u logflux-fluentd -f | grep -E "(received|forwarded)"

Migration Best Practices

Gradual Migration

  1. Parallel Setup: Run LogFlux Agent alongside existing Fluentd aggregators
  2. Tag-based Routing: Route specific tags to LogFlux for testing
  3. Data Validation: Compare log volumes and content between systems
  4. Performance Testing: Validate performance under production load
  5. Complete Migration: Switch all forwarders to LogFlux Agent

Configuration Migration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#!/bin/bash
# migrate-fluentd-config.sh
# Helper script to migrate Fluentd match patterns to LogFlux routing rules

input_file="fluent.conf"
output_file="fluentd.yaml"

echo "routing:" > $output_file
echo "  tag_rules:" >> $output_file

grep -E "^<match" $input_file | while read line; do
  pattern=$(echo $line | sed 's/<match \(.*\)>/\1/')
  echo "    - pattern: \"^${pattern}\"" >> $output_file
  echo "      level: \"info\"" >> $output_file
  echo "      node: \"fluentd\"" >> $output_file
done

Integration Examples

ELK Stack Migration

Replace Fluentd → Elasticsearch with Fluentd → LogFlux:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Original: Fluentd → Elasticsearch
# New: Fluentd → LogFlux Agent → LogFlux

routing:
  tag_rules:
    - pattern: "^elasticsearch\\..*"
      level: "info"
      node: "search"
    - pattern: "^kibana\\..*" 
      level: "info"
      node: "visualization"
    - pattern: "^logstash\\..*"
      level: "info"
      node: "processing"

Kubernetes Integration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# K8s DaemonSet with LogFlux Agent
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: logflux-agent
spec:
  template:
    spec:
      containers:
      - name: logflux-agent
        image: logflux/agent:latest
        ports:
        - containerPort: 24224
          name: fluentd-forward
        volumeMounts:
        - name: config
          mountPath: /etc/logflux-agent/plugins/
      volumes:
      - name: config
        configMap:
          name: logflux-fluentd-config

Disclaimer

Fluentd and the Fluentd logo are trademarks of The Linux Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by The Linux Foundation or the Fluentd project. The Fluentd logo is used solely for identification purposes to indicate compatibility with the Fluentd Forward protocol.

Next Steps