Apache Kafka Integration

Consume log messages from Kafka topics in real-time with LogFlux Agent

Apache Kafka

The LogFlux Kafka integration consumes log messages from Apache Kafka topics in real-time, enabling centralized log analysis from your distributed systems and microservices. This plugin provides comprehensive Kafka consumer functionality with enterprise security, performance optimization, and intelligent message processing.

Overview

The Kafka plugin provides:

  • Consumer Group Support: Full Kafka consumer group implementation with automatic partition rebalancing
  • Multi-Topic Consumption: Consume from multiple Kafka topics simultaneously
  • Intelligent Message Processing: Automatic JSON parsing and log level detection
  • Enterprise Security: SASL authentication and TLS encryption support
  • Batch Processing: Efficient message batching for high-throughput scenarios
  • Resilient Architecture: Automatic reconnection and error recovery
  • Rich Metadata: Kafka-specific metadata extraction (topic, partition, offset, headers)
  • Performance Optimization: Configurable memory limits, rate limiting, and resource management

Installation

The Kafka plugin is included with the LogFlux Agent but disabled by default.

Prerequisites

  • LogFlux Agent installed (see Installation Guide)
  • Apache Kafka cluster accessible from the agent
  • Network connectivity to Kafka brokers (default port 9092)
  • Optional: SASL/TLS credentials for secured Kafka clusters

Enable the Plugin

1
2
3
4
5
# Enable and start the Kafka plugin
sudo systemctl enable --now logflux-kafka

# Check status
sudo systemctl status logflux-kafka

Configuration

Basic Configuration

Create or edit the Kafka plugin configuration:

1
sudo nano /etc/logflux-agent/plugins/kafka.yaml

Basic configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Kafka Plugin Configuration
name: kafka
version: 1.0.0
source: kafka-plugin

# Agent connection
agent:
  socket_path: /tmp/logflux-agent.sock

# Kafka broker configuration
kafka:
  # Kafka broker addresses
  brokers:
    - "localhost:9092"
    - "kafka-2:9092"
    - "kafka-3:9092"
  
  # Topics to consume from
  topics:
    - "application-logs"
    - "system-logs"
    - "error-logs"
  
  # Consumer group settings
  consumer_group: "logflux-agent"
  
  # Offset mode: "oldest" or "newest"
  offset_mode: "newest"

# Message processing
processing:
  # Automatically parse JSON payloads
  parse_json: true
  
  # Extract log levels from headers/content
  detect_log_levels: true
  
  # Default log level if not detected
  default_level: "info"

# Metadata and labeling
metadata:
  labels:
    plugin: kafka
    source: kafka_consumer
  
  # Include Kafka metadata
  include_kafka_metadata: true

# Batching for efficiency
batch:
  enabled: true
  size: 100
  flush_interval: 5s

Advanced Configuration

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Advanced Kafka Configuration
name: kafka
version: 1.0.0
source: kafka-plugin

# Enhanced agent settings
agent:
  socket_path: /tmp/logflux-agent.sock
  connect_timeout: 30s
  max_retries: 5
  retry_delay: 10s

# Comprehensive Kafka settings
kafka:
  # Broker configuration
  brokers:
    - "kafka-1.cluster.local:9092"
    - "kafka-2.cluster.local:9092"
    - "kafka-3.cluster.local:9092"
  
  # Topic configuration
  topics:
    - "microservice-logs"
    - "api-logs"
    - "worker-logs"
    - "audit-logs"
  
  # Consumer group settings
  consumer_group: "logflux-prod-consumers"
  offset_mode: "oldest"
  
  # Advanced consumer settings
  session_timeout: 30s
  heartbeat_interval: 3s
  max_processing_time: 30s
  
  # Message size limits
  max_message_size: "10MB"
  fetch_min_bytes: 1024
  fetch_max_wait: 500ms

# Security configuration
security:
  # SASL authentication
  sasl:
    enabled: true
    mechanism: "SCRAM-SHA-512"  # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
    username: "logflux-consumer"
    password: "secure-password"
  
  # TLS encryption
  tls:
    enabled: true
    cert_file: "/etc/ssl/kafka/client.crt"
    key_file: "/etc/ssl/kafka/client.key"
    ca_file: "/etc/ssl/kafka/ca.crt"
    skip_verify: false
    server_name: "kafka.cluster.local"

# Advanced message processing
processing:
  # JSON parsing settings
  parse_json: true
  json_flatten: false
  json_max_depth: 10
  
  # Log level detection
  detect_log_levels: true
  level_field_names:
    - "level"
    - "severity"
    - "log_level"
    - "priority"
  
  level_keywords:
    error: ["error", "err", "fatal", "critical"]
    warning: ["warn", "warning"]
    info: ["info", "information"]
    debug: ["debug", "trace", "verbose"]
  
  # Content filtering
  content_filters:
    # Include only messages matching patterns
    include_patterns:
      - "ERROR"
      - "WARN"
      - "FATAL"
    
    # Exclude messages matching patterns  
    exclude_patterns:
      - "healthcheck"
      - "ping"

# Topic filtering
filtering:
  # Topic allowlist (empty = allow all)
  allowed_topics: []
  
  # Topic denylist
  denied_topics:
    - "kafka-connect-offsets"
    - "__consumer_offsets"
  
  # Topic regex patterns
  topic_patterns:
    - "^app-.*-logs$"
    - "^service-.*-events$"

# Enhanced metadata
metadata:
  verbose: true
  labels:
    plugin: kafka
    source: kafka_consumer
    environment: production
    cluster: main
  
  # Kafka metadata inclusion
  include_kafka_metadata: true
  kafka_metadata_prefix: "kafka_"
  
  # Custom field mapping
  field_mapping:
    topic: "kafka_topic"
    partition: "kafka_partition"
    offset: "kafka_offset"
    timestamp: "kafka_timestamp"
    key: "kafka_key"

# Performance optimization
batch:
  enabled: true
  size: 200
  flush_interval: 10s
  max_wait_time: 30s
  
  # Memory management
  max_memory: "256MB"
  queue_size: 10000
  
  # Rate limiting
  max_messages_per_second: 5000

# Resource limits
limits:
  max_goroutines: 100
  memory_limit: "512MB"
  cpu_limit: "2"
  
  # Consumer lag monitoring
  lag_threshold: 10000
  lag_alert: true

# Health monitoring
health:
  check_interval: 60s
  max_consumer_errors: 10
  alert_on_broker_disconnect: true
  stats_collection: true

Usage Examples

Application Log Collection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Microservices log aggregation
kafka:
  brokers:
    - "kafka.internal:9092"
  topics:
    - "user-service-logs"
    - "payment-service-logs"
    - "notification-service-logs"
  consumer_group: "logflux-microservices"

processing:
  parse_json: true
  detect_log_levels: true

metadata:
  labels:
    architecture: microservices
    environment: production

Security Event Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Security audit log collection
kafka:
  brokers:
    - "secure-kafka.internal:9092"
  topics:
    - "security-audit-logs"
    - "authentication-events"
    - "authorization-events"
  consumer_group: "logflux-security"

security:
  sasl:
    enabled: true
    mechanism: "SCRAM-SHA-512"
    username: "security-consumer"
    password: "secure-password"
  
  tls:
    enabled: true
    cert_file: "/etc/ssl/security.crt"
    key_file: "/etc/ssl/security.key"
    ca_file: "/etc/ssl/ca.crt"

metadata:
  labels:
    log_type: security
    compliance: required
    monitoring: critical

High-Volume Processing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# High-throughput log processing
kafka:
  brokers:
    - "kafka-1:9092"
    - "kafka-2:9092" 
    - "kafka-3:9092"
  topics:
    - "high-volume-logs"
  consumer_group: "logflux-high-volume"
  
  # Optimize for throughput
  fetch_min_bytes: 10240
  fetch_max_wait: 100ms
  max_message_size: "1MB"

batch:
  size: 1000
  flush_interval: 5s
  max_memory: "1GB"

limits:
  max_messages_per_second: 10000
  memory_limit: "2GB"

metadata:
  labels:
    volume: high
    optimization: throughput

Command Line Usage

Basic Commands

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Consume from specific topics
logflux-kafka -brokers="localhost:9092" -topics="logs,events"

# Specify consumer group
logflux-kafka -brokers="kafka:9092" -consumer-group="logflux-dev"

# Start from oldest messages
logflux-kafka -brokers="kafka:9092" -offset-mode="oldest"

# Enable JSON parsing
logflux-kafka -brokers="kafka:9092" -parse-json

Advanced Options

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# SASL authentication
logflux-kafka -brokers="kafka:9092" \
  -sasl -sasl-username="user" -sasl-password="pass" \
  -sasl-mechanism="SCRAM-SHA-512"

# TLS encryption
logflux-kafka -brokers="kafka:9092" \
  -tls -tls-cert="/path/to/cert.pem" \
  -tls-key="/path/to/key.pem" \
  -tls-ca="/path/to/ca.pem"

# Performance tuning
logflux-kafka -brokers="kafka:9092" \
  -batch-size=500 -flush-interval=10s \
  -max-message-size="5MB"

# Verbose logging
logflux-kafka -brokers="kafka:9092" \
  -verbose -stats

# Configuration file
logflux-kafka -config="/etc/logflux-agent/plugins/kafka.yaml"

Message Processing

JSON Message Handling

Input Kafka Message:

1
2
3
4
5
6
7
8
9
{
  "timestamp": "2024-01-20T14:30:50.123Z",
  "level": "ERROR",
  "service": "payment-service",
  "message": "Payment processing failed",
  "user_id": "user123",
  "transaction_id": "txn456",
  "error_code": "INSUFFICIENT_FUNDS"
}

Output LogFlux Log:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "timestamp": "2024-01-20T14:30:50.123Z",
  "level": "error",
  "message": "Payment processing failed",
  "node": "kafka-consumer",
  "metadata": {
    "source_type": "plugin",
    "source_name": "kafka",
    "kafka_topic": "payment-service-logs",
    "kafka_partition": 2,
    "kafka_offset": 12345,
    "kafka_timestamp": "2024-01-20T14:30:50.123Z",
    "kafka_key": "user123",
    "service": "payment-service",
    "user_id": "user123",
    "transaction_id": "txn456",
    "error_code": "INSUFFICIENT_FUNDS",
    "plugin": "kafka",
    "environment": "production"
  }
}

Log Level Detection

The plugin automatically detects log levels from:

Source Field/Header Names Keywords
Headers level, severity, log_level -
JSON Fields level, severity, log_level, priority -
Content Analysis - error, warn, info, debug, trace, fatal, critical

Metadata Enrichment

Field Description Example
kafka_topic Source Kafka topic application-logs
kafka_partition Partition number 2
kafka_offset Message offset 12345
kafka_timestamp Kafka message timestamp 2024-01-20T14:30:50.123Z
kafka_key Message key user123
kafka_headers Kafka headers as JSON {"correlation-id":"abc123"}

Security Configuration

SASL Authentication

1
2
3
4
5
6
7
# SASL PLAIN (username/password)
security:
  sasl:
    enabled: true
    mechanism: "PLAIN"
    username: "kafka-user"
    password: "kafka-password"
1
2
3
4
5
6
7
# SASL SCRAM-SHA-512 (recommended)
security:
  sasl:
    enabled: true
    mechanism: "SCRAM-SHA-512"
    username: "logflux-consumer"
    password: "secure-password"

TLS Configuration

1
2
3
4
5
6
7
8
9
# TLS with client certificates
security:
  tls:
    enabled: true
    cert_file: "/etc/ssl/kafka/client.crt"
    key_file: "/etc/ssl/kafka/client.key"
    ca_file: "/etc/ssl/kafka/ca.crt"
    skip_verify: false
    server_name: "kafka.cluster.local"

Environment Variables

1
2
3
4
5
6
7
8
# SASL credentials
export KAFKA_SASL_USERNAME="logflux-consumer"
export KAFKA_SASL_PASSWORD="secure-password"

# TLS certificates
export KAFKA_TLS_CERT="/etc/ssl/kafka/client.crt"
export KAFKA_TLS_KEY="/etc/ssl/kafka/client.key"
export KAFKA_TLS_CA="/etc/ssl/kafka/ca.crt"

Performance Optimization

High-Throughput Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Optimize for high message volume
kafka:
  fetch_min_bytes: 10240      # Larger fetch batches
  fetch_max_wait: 100ms       # Shorter wait times
  max_message_size: "1MB"     # Larger message limit

batch:
  size: 1000                  # Larger batches
  flush_interval: 5s          # Frequent flushing
  max_memory: "1GB"           # More memory

limits:
  max_messages_per_second: 10000
  memory_limit: "2GB"
  max_goroutines: 50

Low-Latency Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Optimize for low latency
kafka:
  fetch_min_bytes: 1          # Minimal fetch size
  fetch_max_wait: 10ms        # Very short waits
  heartbeat_interval: 1s      # Frequent heartbeats

batch:
  size: 10                    # Small batches
  flush_interval: 1s          # Frequent flushing
  max_wait_time: 5s           # Quick timeouts

Resource-Constrained Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Optimize for limited resources
kafka:
  max_message_size: "100KB"   # Smaller messages only

batch:
  size: 50                    # Small batches
  flush_interval: 30s         # Less frequent flushing
  max_memory: "64MB"          # Limited memory

limits:
  max_messages_per_second: 1000
  memory_limit: "256MB"
  max_goroutines: 10

Monitoring and Alerting

Plugin Health Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/bin/bash
# check-kafka-plugin.sh

if ! systemctl is-active --quiet logflux-kafka; then
    echo "CRITICAL: LogFlux Kafka plugin is not running"
    exit 2
fi

# Check Kafka connectivity
if ! kafkacat -L -b localhost:9092 &>/dev/null; then
    echo "CRITICAL: Cannot connect to Kafka brokers"
    exit 2
fi

# Check recent message processing
if ! journalctl -u logflux-kafka --since="10 minutes ago" | grep -q "messages processed"; then
    echo "WARNING: No messages processed in last 10 minutes"
    exit 1
fi

echo "OK: LogFlux Kafka plugin is healthy"
exit 0

Consumer Lag Monitoring

1
2
3
4
5
6
7
8
9
# Monitor consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group logflux-agent \
  --describe

# Check partition assignments
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group logflux-agent \
  --members --verbose

Performance Metrics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Enable statistics collection
health:
  stats_collection: true
  check_interval: 60s
  
  # Metrics to collect
  collect_metrics:
    - "messages_per_second"
    - "bytes_per_second"
    - "consumer_lag"
    - "processing_time"
    - "error_rate"

Common Use Cases

Microservices Log Aggregation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Collect logs from multiple microservices
kafka:
  topics:
    - "user-service-logs"
    - "order-service-logs"
    - "payment-service-logs"
    - "inventory-service-logs"
  consumer_group: "logflux-microservices"

processing:
  parse_json: true
  detect_log_levels: true

metadata:
  labels:
    architecture: microservices
    domain: ecommerce

Event-Driven Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Process domain events as logs
kafka:
  topics:
    - "order-events"
    - "payment-events"
    - "user-events"
  consumer_group: "logflux-events"

processing:
  parse_json: true
  content_filters:
    include_patterns:
      - "OrderCreated"
      - "PaymentProcessed"
      - "UserRegistered"

metadata:
  labels:
    event_sourcing: true
    domain: business_events

Error Tracking and Alerting

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Focus on error events
kafka:
  topics:
    - "application-errors"
    - "system-alerts"
  consumer_group: "logflux-errors"

processing:
  detect_log_levels: true
  content_filters:
    include_patterns:
      - "ERROR"
      - "FATAL"
      - "CRITICAL"

metadata:
  labels:
    log_type: errors
    alerting: enabled

Audit Log Collection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Compliance and audit logs
kafka:
  topics:
    - "audit-logs"
    - "security-events"
    - "compliance-logs"
  consumer_group: "logflux-audit"

security:
  sasl:
    enabled: true
    mechanism: "SCRAM-SHA-512"
  tls:
    enabled: true

metadata:
  labels:
    log_type: audit
    compliance: sox_pci
    retention: "7_years"

Troubleshooting

Common Issues

Connection Issues:

1
2
3
4
5
6
7
8
# Test Kafka connectivity
kafkacat -L -b localhost:9092

# Check DNS resolution
nslookup kafka.cluster.local

# Test port connectivity
telnet kafka.cluster.local 9092

Authentication Failures:

1
2
3
4
5
6
7
8
9
# Test SASL authentication
kafkacat -b localhost:9092 -X security.protocol=SASL_PLAINTEXT \
  -X sasl.mechanism=PLAIN \
  -X sasl.username=user \
  -X sasl.password=pass \
  -L

# Check certificates
openssl x509 -in /path/to/cert.crt -text -noout

Consumer Group Issues:

1
2
3
4
5
6
7
8
# Check consumer group status
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group logflux-agent --describe

# Reset consumer group offsets
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group logflux-agent --reset-offsets --to-earliest \
  --topic application-logs --execute

High Memory Usage:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Reduce memory usage
batch:
  size: 50
  max_memory: "64MB"

limits:
  memory_limit: "256MB"
  max_messages_per_second: 1000

kafka:
  max_message_size: "100KB"

Debugging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Enable verbose logging
export LOGFLUX_LOG_LEVEL=debug
logflux-kafka -config /etc/logflux-agent/plugins/kafka.yaml -verbose

# Monitor plugin logs
journalctl -u logflux-kafka -f

# Check Kafka logs
sudo journalctl -u kafka -f

# Test message production
echo '{"level":"info","message":"test"}' | \
  kafkacat -P -b localhost:9092 -t application-logs

Best Practices

Configuration Management

  1. Use consumer groups for scalable message consumption
  2. Configure appropriate batch sizes based on message volume
  3. Enable security for production Kafka clusters
  4. Monitor consumer lag to ensure timely message processing

Security

  1. Use SASL SCRAM-SHA-512 for authentication when possible
  2. Enable TLS encryption for data in transit
  3. Regularly rotate credentials and certificates
  4. Restrict topic access using ACLs

Performance

  1. Tune fetch parameters based on message size and volume
  2. Use appropriate batch sizes to balance latency and throughput
  3. Monitor memory usage and adjust limits accordingly
  4. Scale consumers horizontally by adjusting partition counts

Operational

  1. Monitor consumer lag to detect processing issues
  2. Set up alerting for connection failures and errors
  3. Use appropriate offset modes based on requirements
  4. Test disaster recovery procedures regularly

Disclaimer

Apache Kafka and the Apache Kafka logo are trademarks of The Apache Software Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by The Apache Software Foundation or the Apache Kafka project. The Apache Kafka logo is used solely for identification purposes to indicate compatibility with Apache Kafka message streaming.

Next Steps