Elasticsearch Integration
Query and collect logs from Elasticsearch and OpenSearch clusters with LogFlux Agent
The LogFlux Elasticsearch integration queries and collects logs from Elasticsearch and OpenSearch clusters, enabling centralized log analysis and migration to LogFlux. This plugin provides comprehensive query capabilities, smart data processing, and efficient log forwarding from existing Elasticsearch installations to the LogFlux platform.
Overview
The Elasticsearch plugin provides:
- Multi-Platform Support: Works with both Elasticsearch and OpenSearch clusters
- Flexible Authentication: API keys, basic auth, and TLS certificate authentication
- Advanced Querying: Lucene query syntax with time range filtering and custom queries
- Efficient Data Processing: Scroll API support for large datasets with deduplication
- Follow Mode: Continuous polling for new logs like
tail -f
functionality
- Smart Field Detection: Automatic timestamp, message, and log level field recognition
- Batch Processing: Configurable batching for optimal performance
- TLS Security: Full TLS support with custom certificates and CA validation
- Migration Ready: Perfect for migrating from Elasticsearch-based logging to LogFlux
Installation
The Elasticsearch plugin is included with the LogFlux Agent but disabled by default.
Prerequisites
- LogFlux Agent installed (see Installation Guide)
- Access to Elasticsearch or OpenSearch cluster
- Network connectivity to cluster (typically port 9200)
- Optional: Authentication credentials (API keys, certificates)
Enable the Plugin
1
2
3
4
5
|
# Enable and start the Elasticsearch plugin
sudo systemctl enable --now logflux-elasticsearch
# Check status
sudo systemctl status logflux-elasticsearch
|
Configuration
Basic Configuration
Create or edit the Elasticsearch plugin configuration:
1
|
sudo nano /etc/logflux-agent/plugins/elasticsearch.yaml
|
Basic configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# Elasticsearch Plugin Configuration
name: elasticsearch
version: 1.0.0
source: elasticsearch-plugin
# Agent connection
agent:
socket_path: /tmp/logflux-agent.sock
# Elasticsearch connection
elasticsearch:
# Cluster URL (supports multiple hosts)
url: "http://localhost:9200"
# Indices to query (supports wildcards)
indices:
- "logstash-*"
- "filebeat-*"
- "logs-*"
# Query configuration
query:
# Lucene query string (empty = match all)
query_string: ""
# Time range
from: "now-1h"
to: "now"
# Scroll settings for large datasets
scroll_size: 1000
scroll_timeout: "5m"
# Authentication (choose one)
auth:
# No authentication (development only)
type: "none"
# Metadata and labeling
metadata:
labels:
plugin: elasticsearch
source: elasticsearch_query
# Include Elasticsearch metadata
include_es_metadata: true
# Batching for efficiency
batch:
enabled: true
max_size: 100
flush_interval: 5s
|
Advanced Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
# Advanced Elasticsearch Configuration
name: elasticsearch
version: 1.0.0
source: elasticsearch-plugin
# Enhanced agent settings
agent:
socket_path: /tmp/logflux-agent.sock
connect_timeout: 30s
max_retries: 5
retry_delay: 10s
# Comprehensive Elasticsearch settings
elasticsearch:
# Multiple cluster URLs for HA
urls:
- "https://es-1.cluster.local:9200"
- "https://es-2.cluster.local:9200"
- "https://es-3.cluster.local:9200"
# Index patterns with time-based routing
indices:
- "logstash-*"
- "filebeat-*"
- "logs-application-*"
- "logs-system-*"
- "audit-logs-*"
# Connection settings
timeout: "30s"
max_retries: 3
retry_delay: "5s"
# Health check
health_check: true
health_check_interval: "60s"
# Advanced authentication
auth:
# API Key authentication (recommended)
type: "api_key"
api_key: "your-api-key-here"
# OR Basic authentication
# type: "basic"
# username: "elastic"
# password: "password"
# OR Certificate-based authentication
# type: "certificate"
# cert_file: "/etc/ssl/client.crt"
# key_file: "/etc/ssl/client.key"
# TLS configuration
tls:
enabled: true
ca_file: "/etc/ssl/ca.crt"
cert_file: "/etc/ssl/client.crt"
key_file: "/etc/ssl/client.key"
skip_verify: false
server_name: "elasticsearch.cluster.local"
# Advanced query settings
query:
# Complex Lucene queries
query_string: 'level:(ERROR OR WARN) AND host:production-*'
# Time range configuration
from: "now-4h"
to: "now"
# OR absolute time range
# from: "2024-01-20T10:00:00Z"
# to: "2024-01-20T14:00:00Z"
# Scroll API optimization
scroll_size: 2000
scroll_timeout: "10m"
scroll_parallel: false
# Follow mode for continuous monitoring
follow: true
poll_interval: "30s"
# Field selection (improve performance)
source_includes:
- "@timestamp"
- "message"
- "level"
- "host"
- "service"
source_excludes:
- "internal.*"
- "debug.*"
# Query filters
must_match:
- term:
environment: "production"
must_not_match:
- term:
logger_name: "health.check"
# Data processing configuration
processing:
# Field mapping and detection
timestamp_fields:
- "@timestamp"
- "timestamp"
- "time"
- "ts"
message_fields:
- "message"
- "msg"
- "log"
- "content"
level_fields:
- "level"
- "severity"
- "log_level"
- "priority"
# Log level detection from content
level_detection:
enabled: true
patterns:
error: ["ERROR", "FATAL", "CRITICAL"]
warning: ["WARN", "WARNING"]
info: ["INFO", "INFORMATION"]
debug: ["DEBUG", "TRACE"]
# Content filtering
content_filters:
include_patterns:
- "ERROR"
- "Exception"
- "Failed"
exclude_patterns:
- "health check"
- "ping"
- "keepalive"
# Message size limits
max_message_size: "64KB"
truncate_large_messages: true
# Enhanced metadata
metadata:
verbose: true
labels:
plugin: elasticsearch
source: elasticsearch_query
environment: production
cluster: main
# Elasticsearch metadata inclusion
include_es_metadata: true
es_metadata_prefix: "es_"
# Custom field mapping
field_mapping:
index: "es_index"
document_id: "es_doc_id"
document_type: "es_doc_type"
routing: "es_routing"
version: "es_version"
# Performance optimization
performance:
# Batch processing
batch_size: 500
flush_interval: 10s
max_batch_wait: "30s"
# Memory management
max_memory: "512MB"
document_buffer_size: 10000
# Concurrent processing
worker_threads: 5
queue_size: 50000
# Rate limiting
max_documents_per_second: 5000
burst_limit: 7500
# Deduplication
enable_deduplication: true
dedup_cache_size: 100000
dedup_cache_ttl: "1h"
# Resource limits
limits:
max_concurrent_queries: 10
memory_limit: "1GB"
cpu_limit: "2"
# Per-index limits
index_limits:
"high-volume-*": 20000
"normal-*": 5000
"low-priority-*": 1000
# Health monitoring
health:
check_interval: 60s
max_query_errors: 50
alert_on_cluster_down: true
stats_collection: true
|
Usage Examples
Log Migration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# Migrate historical logs from Elasticsearch to LogFlux
query:
query_string: '*' # All documents
from: "2024-01-01T00:00:00Z"
to: "2024-12-31T23:59:59Z"
scroll_size: 5000
follow: false # One-time migration
indices:
- "logstash-2024.*"
- "filebeat-2024.*"
metadata:
labels:
migration: historical_data
source_system: elasticsearch
|
Error Log Collection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# Collect error logs continuously
query:
query_string: 'level:(ERROR OR FATAL) AND NOT logger_name:health'
from: "now-1h"
to: "now"
follow: true
poll_interval: "15s"
processing:
level_detection:
enabled: true
content_filters:
include_patterns:
- "Exception"
- "Error"
- "Failed"
metadata:
labels:
log_type: errors
monitoring: critical
|
Application Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# Monitor specific applications
query:
query_string: 'service:(user-service OR payment-service OR order-service)'
from: "now-2h"
to: "now"
follow: true
poll_interval: "10s"
indices:
- "application-logs-*"
processing:
level_fields:
- "level"
- "severity"
message_fields:
- "message"
- "msg"
metadata:
labels:
architecture: microservices
monitoring: application
|
Command Line Usage
Basic Commands
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# Query specific indices
logflux-elasticsearch -url "http://localhost:9200" -indices "logstash-*"
# Time range query
logflux-elasticsearch -url "http://localhost:9200" \
-from "now-1h" -to "now" -indices "logs-*"
# Custom query
logflux-elasticsearch -url "http://localhost:9200" \
-query "level:ERROR AND host:production-*"
# Follow mode for real-time
logflux-elasticsearch -url "http://localhost:9200" \
-indices "logs-*" -follow -poll-interval 30s
|
Advanced Options
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# API Key authentication
logflux-elasticsearch -url "https://elasticsearch.cluster.local:9200" \
-api-key "your-api-key" \
-indices "secure-logs-*"
# Basic authentication
logflux-elasticsearch -url "https://elasticsearch.cluster.local:9200" \
-username "elastic" -password "password" \
-indices "logs-*"
# TLS with certificates
logflux-elasticsearch -url "https://elasticsearch.cluster.local:9200" \
-tls -tls-cert "/etc/ssl/client.crt" \
-tls-key "/etc/ssl/client.key" \
-tls-ca "/etc/ssl/ca.crt"
# Performance tuning
logflux-elasticsearch -url "http://localhost:9200" \
-indices "logs-*" \
-scroll-size 2000 \
-batch-size 500 \
-flush-interval 10s
# Configuration file
logflux-elasticsearch -config "/etc/logflux-agent/plugins/elasticsearch.yaml"
|
Authentication Methods
API Key Authentication (Recommended)
1
2
3
4
|
# Generate API key in Elasticsearch
auth:
type: "api_key"
api_key: "VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw=="
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Create API key via Elasticsearch API
curl -X POST "localhost:9200/_security/api_key" \
-H "Content-Type: application/json" \
-u elastic:password \
-d '{
"name": "logflux-reader",
"role_descriptors": {
"log_reader": {
"cluster": ["monitor"],
"indices": [
{
"names": ["logs-*", "logstash-*"],
"privileges": ["read"]
}
]
}
}
}'
|
Basic Authentication
1
2
3
4
|
auth:
type: "basic"
username: "logflux_reader"
password: "secure_password"
|
Certificate Authentication
1
2
3
4
5
6
7
8
|
auth:
type: "certificate"
cert_file: "/etc/ssl/elasticsearch/client.crt"
key_file: "/etc/ssl/elasticsearch/client.key"
tls:
enabled: true
ca_file: "/etc/ssl/elasticsearch/ca.crt"
|
Environment Variables
1
2
3
4
5
6
7
8
9
10
11
|
# API Key
export ELASTICSEARCH_API_KEY="your-api-key"
# Basic Auth
export ELASTICSEARCH_USERNAME="elastic"
export ELASTICSEARCH_PASSWORD="password"
# Certificates
export ELASTICSEARCH_CERT_FILE="/etc/ssl/client.crt"
export ELASTICSEARCH_KEY_FILE="/etc/ssl/client.key"
export ELASTICSEARCH_CA_FILE="/etc/ssl/ca.crt"
|
Query Examples
Lucene Query Syntax
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# Simple field queries
query_string: 'level:ERROR'
query_string: 'host:production-* AND service:api'
query_string: 'message:"Connection failed"'
# Range queries
query_string: 'response_time:[500 TO *]'
query_string: '@timestamp:[2024-01-20T10:00:00 TO 2024-01-20T11:00:00]'
# Boolean queries
query_string: 'level:(ERROR OR WARN) AND NOT logger_name:health'
query_string: '+service:payment +level:ERROR -environment:test'
# Wildcard and regex
query_string: 'host:prod-* AND message:/.*exception.*/i'
query_string: 'user_id:user_???? AND action:login'
|
Complex Queries
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Application error monitoring
query_string: |
(level:ERROR OR level:FATAL) AND
service:(user-service OR payment-service) AND
NOT logger_name:(health OR metrics)
# Security event detection
query_string: |
(message:"authentication failed" OR
message:"unauthorized access" OR
http_status:[400 TO 499]) AND
NOT user_agent:monitor
# Performance issue detection
query_string: |
response_time:[1000 TO *] OR
(message:"timeout" AND service:database) OR
level:WARN AND message:"slow query"
|
Data Processing
Field Detection and Mapping
Input Elasticsearch Document:
1
2
3
4
5
6
7
8
9
10
|
{
"@timestamp": "2024-01-20T14:30:50.123Z",
"level": "ERROR",
"message": "Database connection failed",
"host": "web-server-01",
"service": "user-service",
"exception": "SQLException: Connection timeout",
"request_id": "req_12345",
"user_id": "user_67890"
}
|
Output LogFlux Entry:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
{
"timestamp": "2024-01-20T14:30:50.123Z",
"level": "error",
"message": "Database connection failed",
"node": "web-server-01",
"metadata": {
"source_type": "plugin",
"source_name": "elasticsearch",
"es_index": "application-logs-2024.01.20",
"es_doc_id": "abc123def456",
"service": "user-service",
"exception": "SQLException: Connection timeout",
"request_id": "req_12345",
"user_id": "user_67890",
"plugin": "elasticsearch",
"environment": "production"
}
}
|
Smart Field Detection
Purpose |
Auto-Detected Fields |
Priority |
Timestamp |
@timestamp , timestamp , time , ts |
Elasticsearch @timestamp preferred |
Message |
message , msg , log , content , text |
message field preferred |
Level |
level , severity , log_level , priority |
Case-insensitive matching |
Host |
host , hostname , server , node |
Used as LogFlux node field |
Level Detection
Elasticsearch Value |
LogFlux Level |
Detection Method |
ERROR , FATAL , CRITICAL |
Error |
Field value or content pattern |
WARN , WARNING |
Warning |
Field value or content pattern |
INFO , INFORMATION |
Info |
Field value or content pattern |
DEBUG , TRACE |
Debug |
Field value or content pattern |
HTTP 5xx status |
Error |
Status code field analysis |
HTTP 4xx status |
Warning |
Status code field analysis |
High-Volume Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Optimize for large datasets
query:
scroll_size: 5000
scroll_timeout: "15m"
scroll_parallel: true
performance:
batch_size: 1000
flush_interval: 5s
worker_threads: 10
max_documents_per_second: 20000
limits:
memory_limit: "2GB"
cpu_limit: "4"
|
Memory-Efficient Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# Optimize for limited memory
query:
scroll_size: 500
scroll_timeout: "5m"
source_includes:
- "@timestamp"
- "message"
- "level"
performance:
batch_size: 100
max_memory: "256MB"
document_buffer_size: 2000
processing:
max_message_size: "16KB"
truncate_large_messages: true
|
Network-Optimized Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Optimize for slow network connections
elasticsearch:
timeout: "60s"
max_retries: 5
retry_delay: "10s"
query:
scroll_size: 1000
scroll_timeout: "2m"
performance:
batch_size: 200
flush_interval: 30s
|
Monitoring and Alerting
Plugin Health Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/bin/bash
# check-elasticsearch-plugin.sh
if ! systemctl is-active --quiet logflux-elasticsearch; then
echo "CRITICAL: LogFlux Elasticsearch plugin is not running"
exit 2
fi
# Check Elasticsearch connectivity
if ! curl -s http://localhost:9200/_cluster/health | grep -q '"status":"green\|yellow"'; then
echo "CRITICAL: Cannot connect to Elasticsearch cluster"
exit 2
fi
# Check recent document processing
if ! journalctl -u logflux-elasticsearch --since="10 minutes ago" | grep -q "documents processed"; then
echo "WARNING: No documents processed in last 10 minutes"
exit 1
fi
echo "OK: LogFlux Elasticsearch plugin is healthy"
exit 0
|
Elasticsearch Cluster Monitoring
1
2
3
4
5
6
7
8
|
# Check cluster health
curl -s http://localhost:9200/_cluster/health?pretty
# Monitor index sizes
curl -s http://localhost:9200/_cat/indices/logs-*?v&s=store.size:desc
# Check node status
curl -s http://localhost:9200/_cat/nodes?v
|
Common Use Cases
ELK Stack Migration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Migrate from ELK to LogFlux
elasticsearch:
url: "http://elasticsearch.cluster.local:9200"
indices:
- "logstash-*"
- "filebeat-*"
- "metricbeat-*"
query:
from: "now-30d" # Last 30 days
to: "now"
scroll_size: 2000
follow: false # One-time migration
metadata:
labels:
migration: elk_to_logflux
source_system: elasticsearch
|
Security Log Analysis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Extract security events
query:
query_string: |
(level:ERROR OR level:WARN) AND
(message:"authentication" OR message:"authorization" OR
message:"security" OR message:"breach")
follow: true
poll_interval: "60s"
indices:
- "security-logs-*"
- "audit-logs-*"
metadata:
labels:
log_type: security
compliance: required
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# Monitor application performance
query:
query_string: |
response_time:[1000 TO *] OR
level:ERROR OR
message:"timeout" OR
message:"slow"
follow: true
poll_interval: "30s"
indices:
- "application-*"
- "performance-*"
processing:
content_filters:
include_patterns:
- "slow"
- "timeout"
- "performance"
metadata:
labels:
monitoring_type: performance
focus: application_health
|
Troubleshooting
Common Issues
Connection Problems:
1
2
3
4
5
6
7
8
|
# Test Elasticsearch connectivity
curl -X GET "localhost:9200/_cluster/health?pretty"
# Check authentication
curl -X GET "localhost:9200/" -H "Authorization: ApiKey YOUR_API_KEY"
# Test SSL connection
curl -k -X GET "https://elasticsearch.cluster.local:9200/_cluster/health"
|
Query Issues:
1
2
3
4
5
6
7
8
9
|
# Validate query syntax
curl -X GET "localhost:9200/logs-*/_validate/query?pretty" \
-H "Content-Type: application/json" \
-d '{"query": {"query_string": {"query": "level:ERROR"}}}'
# Test query execution
curl -X GET "localhost:9200/logs-*/_search?pretty" \
-H "Content-Type: application/json" \
-d '{"query": {"query_string": {"query": "level:ERROR"}}, "size": 5}'
|
Performance Issues:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Reduce resource usage
query:
scroll_size: 500
scroll_timeout: "2m"
performance:
batch_size: 100
max_memory: "256MB"
worker_threads: 2
processing:
max_message_size: "32KB"
truncate_large_messages: true
|
Debugging
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Enable verbose logging
export LOGFLUX_LOG_LEVEL=debug
logflux-elasticsearch -config /etc/logflux-agent/plugins/elasticsearch.yaml -verbose
# Monitor plugin logs
journalctl -u logflux-elasticsearch -f
# Test specific query
logflux-elasticsearch -url "http://localhost:9200" \
-indices "logs-*" -query "level:ERROR" -verbose
# Check scroll performance
curl -X GET "localhost:9200/logs-*/_search?scroll=5m&size=100&pretty" \
-H "Content-Type: application/json" \
-d '{"query": {"match_all": {}}}'
|
Best Practices
Configuration Management
- Use API keys instead of basic authentication for better security
- Configure appropriate scroll sizes based on cluster performance
- Set memory limits to prevent resource exhaustion
- Use specific index patterns rather than wildcard queries when possible
- Limit query scope with time ranges and specific indices
- Use field filtering to reduce data transfer (
source_includes
)
- Enable deduplication for overlapping time ranges
- Monitor cluster health during high-volume operations
Security
- Use TLS encryption for production clusters
- Implement proper RBAC with minimal required permissions
- Rotate API keys regularly
- Monitor access patterns for unusual query activity
Operational
- Test queries in Elasticsearch first before using in plugin
- Monitor memory usage during large migrations
- Use follow mode for continuous log collection
- Set up proper alerting for plugin and cluster health
Migration from Elasticsearch to LogFlux
Planning the Migration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# Phase 1: Historical data migration
query:
from: "2024-01-01T00:00:00Z"
to: "2024-06-30T23:59:59Z"
follow: false
scroll_size: 2000
metadata:
labels:
migration_phase: historical
# Phase 2: Recent data and follow mode
query:
from: "2024-07-01T00:00:00Z"
to: "now"
follow: true
poll_interval: "60s"
metadata:
labels:
migration_phase: current_and_live
|
Index Strategy
1
2
3
4
5
6
7
|
# Migrate by time periods to manage load
for month in {01..12}; do
logflux-elasticsearch \
-indices "logstash-2024.$month.*" \
-from "2024-$month-01T00:00:00Z" \
-to "2024-$month-31T23:59:59Z"
done
|
Disclaimer
Elasticsearch and the Elasticsearch logo are trademarks of Elasticsearch N.V. LogFlux is not affiliated with, endorsed by, or sponsored by Elasticsearch N.V. The Elasticsearch logo is used solely for identification purposes to indicate compatibility with Elasticsearch and OpenSearch clusters.
Next Steps