Apache Kafka Integration
Consume log messages from Kafka topics in real-time with LogFlux Agent
The LogFlux Kafka integration consumes log messages from Apache Kafka topics in real-time, enabling centralized log analysis from your distributed systems and microservices. This plugin provides comprehensive Kafka consumer functionality with enterprise security, performance optimization, and intelligent message processing.
Overview
The Kafka plugin provides:
- Consumer Group Support: Full Kafka consumer group implementation with automatic partition rebalancing
- Multi-Topic Consumption: Consume from multiple Kafka topics simultaneously
- Intelligent Message Processing: Automatic JSON parsing and log level detection
- Enterprise Security: SASL authentication and TLS encryption support
- Batch Processing: Efficient message batching for high-throughput scenarios
- Resilient Architecture: Automatic reconnection and error recovery
- Rich Metadata: Kafka-specific metadata extraction (topic, partition, offset, headers)
- Performance Optimization: Configurable memory limits, rate limiting, and resource management
Installation
The Kafka plugin is included with the LogFlux Agent but disabled by default.
Prerequisites
- LogFlux Agent installed (see Installation Guide)
- Apache Kafka cluster accessible from the agent
- Network connectivity to Kafka brokers (default port 9092)
- Optional: SASL/TLS credentials for secured Kafka clusters
Enable the Plugin
1
2
3
4
5
|
# Enable and start the Kafka plugin
sudo systemctl enable --now logflux-kafka
# Check status
sudo systemctl status logflux-kafka
|
Configuration
Basic Configuration
Create or edit the Kafka plugin configuration:
1
|
sudo nano /etc/logflux-agent/plugins/kafka.yaml
|
Basic configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# Kafka Plugin Configuration
name: kafka
version: 1.0.0
source: kafka-plugin
# Agent connection
agent:
socket_path: /tmp/logflux-agent.sock
# Kafka broker configuration
kafka:
# Kafka broker addresses
brokers:
- "localhost:9092"
- "kafka-2:9092"
- "kafka-3:9092"
# Topics to consume from
topics:
- "application-logs"
- "system-logs"
- "error-logs"
# Consumer group settings
consumer_group: "logflux-agent"
# Offset mode: "oldest" or "newest"
offset_mode: "newest"
# Message processing
processing:
# Automatically parse JSON payloads
parse_json: true
# Extract log levels from headers/content
detect_log_levels: true
# Default log level if not detected
default_level: "info"
# Metadata and labeling
metadata:
labels:
plugin: kafka
source: kafka_consumer
# Include Kafka metadata
include_kafka_metadata: true
# Batching for efficiency
batch:
enabled: true
size: 100
flush_interval: 5s
|
Advanced Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
# Advanced Kafka Configuration
name: kafka
version: 1.0.0
source: kafka-plugin
# Enhanced agent settings
agent:
socket_path: /tmp/logflux-agent.sock
connect_timeout: 30s
max_retries: 5
retry_delay: 10s
# Comprehensive Kafka settings
kafka:
# Broker configuration
brokers:
- "kafka-1.cluster.local:9092"
- "kafka-2.cluster.local:9092"
- "kafka-3.cluster.local:9092"
# Topic configuration
topics:
- "microservice-logs"
- "api-logs"
- "worker-logs"
- "audit-logs"
# Consumer group settings
consumer_group: "logflux-prod-consumers"
offset_mode: "oldest"
# Advanced consumer settings
session_timeout: 30s
heartbeat_interval: 3s
max_processing_time: 30s
# Message size limits
max_message_size: "10MB"
fetch_min_bytes: 1024
fetch_max_wait: 500ms
# Security configuration
security:
# SASL authentication
sasl:
enabled: true
mechanism: "SCRAM-SHA-512" # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
username: "logflux-consumer"
password: "secure-password"
# TLS encryption
tls:
enabled: true
cert_file: "/etc/ssl/kafka/client.crt"
key_file: "/etc/ssl/kafka/client.key"
ca_file: "/etc/ssl/kafka/ca.crt"
skip_verify: false
server_name: "kafka.cluster.local"
# Advanced message processing
processing:
# JSON parsing settings
parse_json: true
json_flatten: false
json_max_depth: 10
# Log level detection
detect_log_levels: true
level_field_names:
- "level"
- "severity"
- "log_level"
- "priority"
level_keywords:
error: ["error", "err", "fatal", "critical"]
warning: ["warn", "warning"]
info: ["info", "information"]
debug: ["debug", "trace", "verbose"]
# Content filtering
content_filters:
# Include only messages matching patterns
include_patterns:
- "ERROR"
- "WARN"
- "FATAL"
# Exclude messages matching patterns
exclude_patterns:
- "healthcheck"
- "ping"
# Topic filtering
filtering:
# Topic allowlist (empty = allow all)
allowed_topics: []
# Topic denylist
denied_topics:
- "kafka-connect-offsets"
- "__consumer_offsets"
# Topic regex patterns
topic_patterns:
- "^app-.*-logs$"
- "^service-.*-events$"
# Enhanced metadata
metadata:
verbose: true
labels:
plugin: kafka
source: kafka_consumer
environment: production
cluster: main
# Kafka metadata inclusion
include_kafka_metadata: true
kafka_metadata_prefix: "kafka_"
# Custom field mapping
field_mapping:
topic: "kafka_topic"
partition: "kafka_partition"
offset: "kafka_offset"
timestamp: "kafka_timestamp"
key: "kafka_key"
# Performance optimization
batch:
enabled: true
size: 200
flush_interval: 10s
max_wait_time: 30s
# Memory management
max_memory: "256MB"
queue_size: 10000
# Rate limiting
max_messages_per_second: 5000
# Resource limits
limits:
max_goroutines: 100
memory_limit: "512MB"
cpu_limit: "2"
# Consumer lag monitoring
lag_threshold: 10000
lag_alert: true
# Health monitoring
health:
check_interval: 60s
max_consumer_errors: 10
alert_on_broker_disconnect: true
stats_collection: true
|
Usage Examples
Application Log Collection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Microservices log aggregation
kafka:
brokers:
- "kafka.internal:9092"
topics:
- "user-service-logs"
- "payment-service-logs"
- "notification-service-logs"
consumer_group: "logflux-microservices"
processing:
parse_json: true
detect_log_levels: true
metadata:
labels:
architecture: microservices
environment: production
|
Security Event Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# Security audit log collection
kafka:
brokers:
- "secure-kafka.internal:9092"
topics:
- "security-audit-logs"
- "authentication-events"
- "authorization-events"
consumer_group: "logflux-security"
security:
sasl:
enabled: true
mechanism: "SCRAM-SHA-512"
username: "security-consumer"
password: "secure-password"
tls:
enabled: true
cert_file: "/etc/ssl/security.crt"
key_file: "/etc/ssl/security.key"
ca_file: "/etc/ssl/ca.crt"
metadata:
labels:
log_type: security
compliance: required
monitoring: critical
|
High-Volume Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# High-throughput log processing
kafka:
brokers:
- "kafka-1:9092"
- "kafka-2:9092"
- "kafka-3:9092"
topics:
- "high-volume-logs"
consumer_group: "logflux-high-volume"
# Optimize for throughput
fetch_min_bytes: 10240
fetch_max_wait: 100ms
max_message_size: "1MB"
batch:
size: 1000
flush_interval: 5s
max_memory: "1GB"
limits:
max_messages_per_second: 10000
memory_limit: "2GB"
metadata:
labels:
volume: high
optimization: throughput
|
Command Line Usage
Basic Commands
1
2
3
4
5
6
7
8
9
10
11
|
# Consume from specific topics
logflux-kafka -brokers="localhost:9092" -topics="logs,events"
# Specify consumer group
logflux-kafka -brokers="kafka:9092" -consumer-group="logflux-dev"
# Start from oldest messages
logflux-kafka -brokers="kafka:9092" -offset-mode="oldest"
# Enable JSON parsing
logflux-kafka -brokers="kafka:9092" -parse-json
|
Advanced Options
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# SASL authentication
logflux-kafka -brokers="kafka:9092" \
-sasl -sasl-username="user" -sasl-password="pass" \
-sasl-mechanism="SCRAM-SHA-512"
# TLS encryption
logflux-kafka -brokers="kafka:9092" \
-tls -tls-cert="/path/to/cert.pem" \
-tls-key="/path/to/key.pem" \
-tls-ca="/path/to/ca.pem"
# Performance tuning
logflux-kafka -brokers="kafka:9092" \
-batch-size=500 -flush-interval=10s \
-max-message-size="5MB"
# Verbose logging
logflux-kafka -brokers="kafka:9092" \
-verbose -stats
# Configuration file
logflux-kafka -config="/etc/logflux-agent/plugins/kafka.yaml"
|
Message Processing
JSON Message Handling
Input Kafka Message:
1
2
3
4
5
6
7
8
9
|
{
"timestamp": "2024-01-20T14:30:50.123Z",
"level": "ERROR",
"service": "payment-service",
"message": "Payment processing failed",
"user_id": "user123",
"transaction_id": "txn456",
"error_code": "INSUFFICIENT_FUNDS"
}
|
Output LogFlux Log:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
{
"timestamp": "2024-01-20T14:30:50.123Z",
"level": "error",
"message": "Payment processing failed",
"node": "kafka-consumer",
"metadata": {
"source_type": "plugin",
"source_name": "kafka",
"kafka_topic": "payment-service-logs",
"kafka_partition": 2,
"kafka_offset": 12345,
"kafka_timestamp": "2024-01-20T14:30:50.123Z",
"kafka_key": "user123",
"service": "payment-service",
"user_id": "user123",
"transaction_id": "txn456",
"error_code": "INSUFFICIENT_FUNDS",
"plugin": "kafka",
"environment": "production"
}
}
|
Log Level Detection
The plugin automatically detects log levels from:
Source |
Field/Header Names |
Keywords |
Headers |
level, severity, log_level |
- |
JSON Fields |
level, severity, log_level, priority |
- |
Content Analysis |
- |
error, warn, info, debug, trace, fatal, critical |
Field |
Description |
Example |
kafka_topic |
Source Kafka topic |
application-logs |
kafka_partition |
Partition number |
2 |
kafka_offset |
Message offset |
12345 |
kafka_timestamp |
Kafka message timestamp |
2024-01-20T14:30:50.123Z |
kafka_key |
Message key |
user123 |
kafka_headers |
Kafka headers as JSON |
{"correlation-id":"abc123"} |
Security Configuration
SASL Authentication
1
2
3
4
5
6
7
|
# SASL PLAIN (username/password)
security:
sasl:
enabled: true
mechanism: "PLAIN"
username: "kafka-user"
password: "kafka-password"
|
1
2
3
4
5
6
7
|
# SASL SCRAM-SHA-512 (recommended)
security:
sasl:
enabled: true
mechanism: "SCRAM-SHA-512"
username: "logflux-consumer"
password: "secure-password"
|
TLS Configuration
1
2
3
4
5
6
7
8
9
|
# TLS with client certificates
security:
tls:
enabled: true
cert_file: "/etc/ssl/kafka/client.crt"
key_file: "/etc/ssl/kafka/client.key"
ca_file: "/etc/ssl/kafka/ca.crt"
skip_verify: false
server_name: "kafka.cluster.local"
|
Environment Variables
1
2
3
4
5
6
7
8
|
# SASL credentials
export KAFKA_SASL_USERNAME="logflux-consumer"
export KAFKA_SASL_PASSWORD="secure-password"
# TLS certificates
export KAFKA_TLS_CERT="/etc/ssl/kafka/client.crt"
export KAFKA_TLS_KEY="/etc/ssl/kafka/client.key"
export KAFKA_TLS_CA="/etc/ssl/kafka/ca.crt"
|
High-Throughput Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Optimize for high message volume
kafka:
fetch_min_bytes: 10240 # Larger fetch batches
fetch_max_wait: 100ms # Shorter wait times
max_message_size: "1MB" # Larger message limit
batch:
size: 1000 # Larger batches
flush_interval: 5s # Frequent flushing
max_memory: "1GB" # More memory
limits:
max_messages_per_second: 10000
memory_limit: "2GB"
max_goroutines: 50
|
Low-Latency Configuration
1
2
3
4
5
6
7
8
9
10
|
# Optimize for low latency
kafka:
fetch_min_bytes: 1 # Minimal fetch size
fetch_max_wait: 10ms # Very short waits
heartbeat_interval: 1s # Frequent heartbeats
batch:
size: 10 # Small batches
flush_interval: 1s # Frequent flushing
max_wait_time: 5s # Quick timeouts
|
Resource-Constrained Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Optimize for limited resources
kafka:
max_message_size: "100KB" # Smaller messages only
batch:
size: 50 # Small batches
flush_interval: 30s # Less frequent flushing
max_memory: "64MB" # Limited memory
limits:
max_messages_per_second: 1000
memory_limit: "256MB"
max_goroutines: 10
|
Monitoring and Alerting
Plugin Health Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/bin/bash
# check-kafka-plugin.sh
if ! systemctl is-active --quiet logflux-kafka; then
echo "CRITICAL: LogFlux Kafka plugin is not running"
exit 2
fi
# Check Kafka connectivity
if ! kafkacat -L -b localhost:9092 &>/dev/null; then
echo "CRITICAL: Cannot connect to Kafka brokers"
exit 2
fi
# Check recent message processing
if ! journalctl -u logflux-kafka --since="10 minutes ago" | grep -q "messages processed"; then
echo "WARNING: No messages processed in last 10 minutes"
exit 1
fi
echo "OK: LogFlux Kafka plugin is healthy"
exit 0
|
Consumer Lag Monitoring
1
2
3
4
5
6
7
8
9
|
# Monitor consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group logflux-agent \
--describe
# Check partition assignments
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group logflux-agent \
--members --verbose
|
1
2
3
4
5
6
7
8
9
10
11
12
|
# Enable statistics collection
health:
stats_collection: true
check_interval: 60s
# Metrics to collect
collect_metrics:
- "messages_per_second"
- "bytes_per_second"
- "consumer_lag"
- "processing_time"
- "error_rate"
|
Common Use Cases
Microservices Log Aggregation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# Collect logs from multiple microservices
kafka:
topics:
- "user-service-logs"
- "order-service-logs"
- "payment-service-logs"
- "inventory-service-logs"
consumer_group: "logflux-microservices"
processing:
parse_json: true
detect_log_levels: true
metadata:
labels:
architecture: microservices
domain: ecommerce
|
Event-Driven Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# Process domain events as logs
kafka:
topics:
- "order-events"
- "payment-events"
- "user-events"
consumer_group: "logflux-events"
processing:
parse_json: true
content_filters:
include_patterns:
- "OrderCreated"
- "PaymentProcessed"
- "UserRegistered"
metadata:
labels:
event_sourcing: true
domain: business_events
|
Error Tracking and Alerting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# Focus on error events
kafka:
topics:
- "application-errors"
- "system-alerts"
consumer_group: "logflux-errors"
processing:
detect_log_levels: true
content_filters:
include_patterns:
- "ERROR"
- "FATAL"
- "CRITICAL"
metadata:
labels:
log_type: errors
alerting: enabled
|
Audit Log Collection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# Compliance and audit logs
kafka:
topics:
- "audit-logs"
- "security-events"
- "compliance-logs"
consumer_group: "logflux-audit"
security:
sasl:
enabled: true
mechanism: "SCRAM-SHA-512"
tls:
enabled: true
metadata:
labels:
log_type: audit
compliance: sox_pci
retention: "7_years"
|
Troubleshooting
Common Issues
Connection Issues:
1
2
3
4
5
6
7
8
|
# Test Kafka connectivity
kafkacat -L -b localhost:9092
# Check DNS resolution
nslookup kafka.cluster.local
# Test port connectivity
telnet kafka.cluster.local 9092
|
Authentication Failures:
1
2
3
4
5
6
7
8
9
|
# Test SASL authentication
kafkacat -b localhost:9092 -X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username=user \
-X sasl.password=pass \
-L
# Check certificates
openssl x509 -in /path/to/cert.crt -text -noout
|
Consumer Group Issues:
1
2
3
4
5
6
7
8
|
# Check consumer group status
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group logflux-agent --describe
# Reset consumer group offsets
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group logflux-agent --reset-offsets --to-earliest \
--topic application-logs --execute
|
High Memory Usage:
1
2
3
4
5
6
7
8
9
10
11
|
# Reduce memory usage
batch:
size: 50
max_memory: "64MB"
limits:
memory_limit: "256MB"
max_messages_per_second: 1000
kafka:
max_message_size: "100KB"
|
Debugging
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Enable verbose logging
export LOGFLUX_LOG_LEVEL=debug
logflux-kafka -config /etc/logflux-agent/plugins/kafka.yaml -verbose
# Monitor plugin logs
journalctl -u logflux-kafka -f
# Check Kafka logs
sudo journalctl -u kafka -f
# Test message production
echo '{"level":"info","message":"test"}' | \
kafkacat -P -b localhost:9092 -t application-logs
|
Best Practices
Configuration Management
- Use consumer groups for scalable message consumption
- Configure appropriate batch sizes based on message volume
- Enable security for production Kafka clusters
- Monitor consumer lag to ensure timely message processing
Security
- Use SASL SCRAM-SHA-512 for authentication when possible
- Enable TLS encryption for data in transit
- Regularly rotate credentials and certificates
- Restrict topic access using ACLs
- Tune fetch parameters based on message size and volume
- Use appropriate batch sizes to balance latency and throughput
- Monitor memory usage and adjust limits accordingly
- Scale consumers horizontally by adjusting partition counts
Operational
- Monitor consumer lag to detect processing issues
- Set up alerting for connection failures and errors
- Use appropriate offset modes based on requirements
- Test disaster recovery procedures regularly
Disclaimer
Apache Kafka and the Apache Kafka logo are trademarks of The Apache Software Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by The Apache Software Foundation or the Apache Kafka project. The Apache Kafka logo is used solely for identification purposes to indicate compatibility with Apache Kafka message streaming.
Next Steps