OpenTelemetry Integration
Receive traces, metrics, and logs via OpenTelemetry Protocol (OTLP) with LogFlux Agent
The LogFlux OpenTelemetry plugin provides an OTLP (OpenTelemetry Protocol) receiver that accepts traces, metrics, and logs from OpenTelemetry SDKs and collectors. This enables seamless integration with the OpenTelemetry ecosystem while forwarding all observability data to LogFlux for centralized analysis.
Overview
The OpenTelemetry plugin provides:
- OTLP Protocol Support: Full OTLP/gRPC and OTLP/HTTP receiver capabilities
- Multi-Signal Reception: Accept traces, metrics, and logs in a single endpoint
- Batch Processing: Efficient batching of telemetry data
- Resource Attribution: Preserve OpenTelemetry resource attributes and metadata
- Compression Support: gzip and zstd compression for efficient data transfer
- Authentication: Bearer token authentication for secure data ingestion
- Format Conversion: Convert OTLP data to LogFlux’s structured log format
Installation
The OpenTelemetry plugin is included with the LogFlux Agent but disabled by default.
Prerequisites
- LogFlux Agent installed (see Installation Guide)
- Network access to OTLP receiver ports (default: 4317 for gRPC, 4318 for HTTP)
Enable the Plugin
1
2
3
4
5
|
# Enable and start the OpenTelemetry plugin
sudo systemctl enable --now logflux-opentelemetry
# Check status
sudo systemctl status logflux-opentelemetry
|
Configuration
Basic Configuration
Create or edit the OpenTelemetry plugin configuration:
1
|
sudo nano /etc/logflux-agent/plugins/opentelemetry.yaml
|
Basic configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# OpenTelemetry Plugin Configuration
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
http:
endpoint: "0.0.0.0:4318"
# Processing
processors:
batch:
timeout: 10s
send_batch_size: 1024
# Output configuration
exporters:
logflux:
# Convert all signals to structured logs
endpoint: "unix:///tmp/logflux-agent.sock"
# Preserve OpenTelemetry semantics
preserve_resource_attributes: true
preserve_instrumentation_scope: true
|
Advanced Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# Advanced OpenTelemetry Plugin Configuration
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
max_recv_msg_size: 4194304 # 4MB
max_concurrent_streams: 16
compression: "gzip"
# TLS configuration
tls:
cert_file: "/etc/ssl/certs/otlp.crt"
key_file: "/etc/ssl/private/otlp.key"
client_ca_file: "/etc/ssl/certs/ca.crt"
# Authentication
auth:
bearer_token_file: "/etc/logflux-agent/otlp-token"
http:
endpoint: "0.0.0.0:4318"
max_request_body_size: 4194304 # 4MB
include_metadata: true
compression: ["gzip", "zstd"]
# CORS configuration
cors:
allowed_origins: ["*"]
allowed_headers: ["*"]
allowed_methods: ["POST", "OPTIONS"]
# Processing pipeline
processors:
# Batch processor for efficiency
batch:
timeout: 5s
send_batch_size: 512
send_batch_max_size: 1024
# Resource processor for metadata enrichment
resource:
attributes:
- key: "deployment.environment"
value: "production"
action: "upsert"
- key: "service.version"
from_attribute: "service_version"
action: "update"
# Memory limiter to prevent OOM
memory_limiter:
limit_mib: 512
spike_limit_mib: 128
check_interval: 5s
# Filtering and routing
processors:
filter:
# Filter traces
traces:
span:
- 'attributes["http.status_code"] >= 400'
- 'name == "health_check"'
- 'resource.attributes["service.name"] == "critical-service"'
# Filter metrics
metrics:
metric:
- 'name == "up"'
- 'type == METRIC_DATA_TYPE_HISTOGRAM'
# Filter logs
logs:
log_record:
- 'severity_number >= SEVERITY_NUMBER_WARN'
- 'attributes["environment"] == "production"'
# Export configuration
exporters:
logflux:
endpoint: "unix:///tmp/logflux-agent.sock"
timeout: 30s
retry_on_failure:
enabled: true
initial_interval: 1s
max_interval: 30s
max_elapsed_time: 300s
# Data transformation
preserve_resource_attributes: true
preserve_instrumentation_scope: true
flatten_attributes: false
# Compression
compression: "gzip"
# Service configuration
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [logflux]
metrics:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [logflux]
logs:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [logflux]
extensions: [health_check, pprof]
# Extensions
extensions:
health_check:
endpoint: "0.0.0.0:13133"
pprof:
endpoint: "localhost:1777"
|
OpenTelemetry SDK Integration
Application Configuration
Go Application:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
package main
import (
"context"
"log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
func main() {
ctx := context.Background()
// Create resource
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName("my-service"),
semconv.ServiceVersion("v1.0.0"),
semconv.DeploymentEnvironment("production"),
),
)
if err != nil {
log.Fatal(err)
}
// Create trace exporter
traceExporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint("http://localhost:4317"),
otlptracegrpc.WithInsecure(),
)
if err != nil {
log.Fatal(err)
}
// Create trace provider
tracerProvider := trace.NewTracerProvider(
trace.WithResource(res),
trace.WithBatcher(traceExporter),
)
otel.SetTracerProvider(tracerProvider)
// Create metric exporter
metricExporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint("http://localhost:4317"),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
log.Fatal(err)
}
// Create metric provider
meterProvider := metric.NewMeterProvider(
metric.WithResource(res),
metric.WithReader(metric.NewPeriodicReader(metricExporter)),
)
otel.SetMeterProvider(meterProvider)
// Your application code here
tracer := otel.Tracer("my-service")
meter := otel.Meter("my-service")
// Create a span
_, span := tracer.Start(ctx, "main-operation")
defer span.End()
// Create a counter
counter, _ := meter.Int64Counter("requests_total")
counter.Add(ctx, 1)
}
|
Python Application:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
# Create resource
resource = Resource.create({
"service.name": "my-python-service",
"service.version": "1.0.0",
"deployment.environment": "production"
})
# Configure tracing
trace_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(tracer_provider)
# Configure metrics
metric_exporter = OTLPMetricExporter(endpoint="http://localhost:4317")
metric_reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# Use the instrumentation
tracer = trace.get_tracer("my-python-service")
meter = metrics.get_meter("my-python-service")
# Create metrics
request_counter = meter.create_counter(
name="requests_total",
description="Total number of requests"
)
# Create spans and record metrics
with tracer.start_as_current_span("main-operation") as span:
request_counter.add(1, {"endpoint": "/api/users"})
# Your application code here
|
Node.js Application:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-otlp-grpc');
const { OTLPMetricExporter } = require('@opentelemetry/exporter-otlp-grpc');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
// Create the SDK
const sdk = new NodeSDK({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'my-node-service',
[SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
[SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: 'production',
}),
traceExporter: new OTLPTraceExporter({
url: 'http://localhost:4317',
}),
metricExporter: new OTLPMetricExporter({
url: 'http://localhost:4317',
}),
});
// Initialize the SDK
sdk.start();
// Use OpenTelemetry API
const { trace, metrics } = require('@opentelemetry/api');
const tracer = trace.getTracer('my-node-service');
const meter = metrics.getMeter('my-node-service');
// Create metrics
const requestCounter = meter.createCounter('requests_total', {
description: 'Total number of requests',
});
// Create spans and record metrics
const span = tracer.startSpan('main-operation');
requestCounter.add(1, { endpoint: '/api/users' });
span.end();
|
OpenTelemetry Collector Configuration
Forward data from an OpenTelemetry Collector to LogFlux:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# otel-collector.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
# Add custom attributes
resource:
attributes:
- key: collector.name
value: "production-collector"
action: upsert
exporters:
otlp:
endpoint: "http://logflux-agent:4317"
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [resource, batch]
exporters: [otlp]
metrics:
receivers: [otlp]
processors: [resource, batch]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [resource, batch]
exporters: [otlp]
|
Trace to Log Conversion
OTLP traces are converted to structured log entries:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
{
"timestamp": "2024-01-20T14:30:45.123Z",
"level": "info",
"message": "OpenTelemetry trace",
"otel": {
"signal_type": "trace",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"parent_span_id": "0102040800000000",
"span_name": "GET /api/users",
"span_kind": "server",
"status": {
"code": "OK",
"message": ""
},
"attributes": {
"http.method": "GET",
"http.url": "/api/users",
"http.status_code": 200,
"http.response_size": 1024
},
"resource_attributes": {
"service.name": "user-service",
"service.version": "1.2.3",
"deployment.environment": "production"
},
"instrumentation_scope": {
"name": "github.com/user-service/instrumentation",
"version": "1.0.0"
}
}
}
|
Metric to Log Conversion
OTLP metrics are converted with full metric metadata:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
{
"timestamp": "2024-01-20T14:30:45.123Z",
"level": "info",
"message": "OpenTelemetry metric",
"otel": {
"signal_type": "metric",
"metric_name": "http_requests_total",
"metric_type": "counter",
"value": 42,
"attributes": {
"method": "GET",
"status": "200",
"endpoint": "/api/users"
},
"resource_attributes": {
"service.name": "user-service",
"service.version": "1.2.3"
},
"instrumentation_scope": {
"name": "github.com/user-service/metrics",
"version": "1.0.0"
}
}
}
|
High-Volume Configuration
For high-volume telemetry data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# High-throughput configuration
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
max_recv_msg_size: 8388608 # 8MB
max_concurrent_streams: 32
processors:
batch:
timeout: 1s
send_batch_size: 2048
send_batch_max_size: 4096
memory_limiter:
limit_mib: 1024
spike_limit_mib: 256
# Resource limits
extensions:
memory_ballast:
size_mib: 512
|
Kubernetes Deployment
Deploy as a DaemonSet in Kubernetes:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# otel-logflux.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: logflux-otel-agent
namespace: observability
spec:
selector:
matchLabels:
name: logflux-otel-agent
template:
metadata:
labels:
name: logflux-otel-agent
spec:
containers:
- name: logflux-agent
image: logflux/agent:latest
ports:
- containerPort: 4317
name: otlp-grpc
- containerPort: 4318
name: otlp-http
env:
- name: LOGFLUX_API_KEY
valueFrom:
secretKeyRef:
name: logflux-secret
key: api-key
volumeMounts:
- name: config
mountPath: /etc/logflux-agent/plugins
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 1
memory: 1Gi
volumes:
- name: config
configMap:
name: otel-config
---
apiVersion: v1
kind: Service
metadata:
name: logflux-otel-service
namespace: observability
spec:
selector:
name: logflux-otel-agent
ports:
- name: otlp-grpc
port: 4317
targetPort: 4317
protocol: TCP
- name: otlp-http
port: 4318
targetPort: 4318
protocol: TCP
|
Security Considerations
TLS Configuration
Enable TLS for production deployments:
1
2
3
4
5
6
7
8
9
10
|
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
tls:
cert_file: "/etc/ssl/certs/otlp-server.crt"
key_file: "/etc/ssl/private/otlp-server.key"
client_ca_file: "/etc/ssl/certs/otlp-ca.crt"
client_auth_type: "RequireAndVerifyClientCert"
|
Authentication
Use bearer token authentication:
1
2
3
4
5
6
7
|
receivers:
otlp:
protocols:
http:
endpoint: "0.0.0.0:4318"
auth:
bearer_token_file: "/etc/logflux-agent/otlp-bearer-token"
|
Client configuration:
1
|
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer your-token-here"
|
Troubleshooting
Common Issues
Connection Refused:
1
2
3
4
5
6
7
|
# Check if plugin is listening
sudo netstat -tlnp | grep :4317
sudo netstat -tlnp | grep :4318
# Check plugin status
sudo systemctl status logflux-opentelemetry
sudo journalctl -u logflux-opentelemetry -f
|
Certificate Issues:
1
2
3
4
5
|
# Verify certificates
openssl x509 -in /etc/ssl/certs/otlp-server.crt -text -noout
# Test TLS connection
openssl s_client -connect localhost:4317 -servername localhost
|
High Memory Usage:
1
2
3
4
5
6
7
8
9
10
|
# Enable memory limiting
processors:
memory_limiter:
limit_mib: 512
spike_limit_mib: 128
# Reduce batch sizes
batch:
send_batch_size: 256
timeout: 5s
|
Data Loss:
1
2
3
4
5
6
7
8
|
# Enable retries
exporters:
logflux:
retry_on_failure:
enabled: true
initial_interval: 1s
max_interval: 30s
max_elapsed_time: 300s
|
Monitoring and Alerting
Health Checks
Monitor OpenTelemetry plugin health:
1
2
3
4
5
|
# Health check endpoint
curl http://localhost:13133/
# Plugin metrics
curl http://localhost:1777/debug/pprof/
|
Integration Validation
Validate data flow:
1
2
3
4
5
6
7
8
|
# Send test trace
grpcurl -plaintext \
-d '{"resource_spans":[{"spans":[{"name":"test"}]}]}' \
localhost:4317 \
opentelemetry.proto.collector.trace.v1.TraceService/Export
# Check LogFlux logs
sudo journalctl -u logflux-agent | grep "OpenTelemetry"
|
Best Practices
Resource Attribution
- Consistent Naming: Use consistent service names across all signals
- Version Tracking: Always include service version in resource attributes
- Environment Labels: Tag with deployment environment
- Instance Identification: Include unique instance identifiers
- Batch Processing: Use appropriate batch sizes for your volume
- Memory Limits: Set memory limits to prevent OOM issues
- Compression: Enable compression for high-volume data
- Connection Pooling: Use connection pooling for high-throughput scenarios
Security
- TLS Everywhere: Use TLS for all production communications
- Authentication: Implement bearer token authentication
- Network Policies: Use Kubernetes network policies to restrict access
- Certificate Management: Implement proper certificate lifecycle management
Disclaimer
OpenTelemetry and the OpenTelemetry logo are trademarks of The Linux Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by The Linux Foundation or the OpenTelemetry project. The OpenTelemetry logo is used solely for identification purposes to indicate compatibility with the OpenTelemetry Protocol (OTLP).
Next Steps