The official Python SDK for LogFlux. Send logs, metrics, traces, events, and audit entries directly to the LogFlux cloud with end-to-end encryption. The server never sees your plaintext data.
GitHub Repository ยท Release Notes
Key Features
- End-to-end encryption – AES-256-GCM with RSA key exchange
- 7 entry types – Log, Metric, Trace, Event, Audit, Telemetry, TelemetryManaged
- Multipart binary transport – 33% less overhead than JSON + base64
- Async by default – Background daemon threads with non-blocking queue
- Breadcrumbs – Automatic trail of recent events attached to error captures
- Distributed tracing – Spans, child spans, trace header propagation
- Scopes – Per-request context isolation
- Failsafe – SDK errors never crash your application
- Single dependency – Only
cryptography (for AES-256-GCM)
Installation
1
| pip install logflux-sdk
|
Quick Start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import logflux
from logflux import Options
logflux.init(Options(
api_key="eu-lf_your_api_key",
source="my-service",
environment="production",
release="1.2.3",
))
logflux.info("server started")
# Before process exit
logflux.flush(timeout=2.0)
logflux.close()
|
Entry Types
Log (Type 1)
Standard application logs with 8 severity levels.
1
2
3
4
5
6
7
8
9
10
11
| logflux.debug("cache miss for key users:123")
logflux.info("request processed")
logflux.warn("deprecated API called")
logflux.error("database connection failed")
logflux.critical("out of memory")
# With attributes
logflux.log(LogLevel.ERROR, "query timeout", {
"db.host": "primary.db.internal",
"duration_ms": "5023",
})
|
Metric (Type 2)
Counters, gauges, and distributions.
1
2
3
4
5
6
7
8
| logflux.counter("http.requests.total", 1, {
"method": "GET",
"status": "200",
})
logflux.gauge("system.cpu.usage", 85.2, {
"host": "web-01",
})
|
Event (Type 4)
Discrete application events.
1
2
3
4
| logflux.event("user.signup", {
"user_id": "usr_987",
"plan": "starter",
})
|
Audit (Type 5)
Immutable audit trail with Object Lock storage (365-day retention).
1
2
3
| logflux.audit("record.deleted", "usr_456", "invoice", "inv_789", {
"reason": "customer_request",
})
|
Trace (Type 3)
Distributed tracing with span helpers.
1
2
3
4
5
6
7
| span = logflux.start_span("http.server", "GET /api/users")
db_span = span.start_child("db.query", "SELECT * FROM users")
# ... query ...
db_span.end()
span.end()
|
Telemetry (Types 6 and 7)
Device and sensor data. Type 6 is end-to-end encrypted, type 7 is server-side encrypted.
Error Capture
Capture Python exceptions with automatic stack traces and breadcrumb trail.
1
2
3
4
5
6
7
8
9
10
| try:
database.query(sql)
except Exception as err:
logflux.capture_error(err)
# With extra context
logflux.capture_error(err, {
"sql": sql,
"db.host": "primary",
})
|
Breadcrumbs
Breadcrumbs record a trail of events leading up to an error. They are automatically added for log and event calls, and attached to capture_error.
1
2
3
4
5
6
7
8
| logflux.info("loading config") # auto breadcrumb
logflux.event("user.login") # auto breadcrumb
logflux.add_breadcrumb("http", "GET /api/users", {
"status": "200",
})
logflux.capture_error(err) # includes breadcrumb trail
|
Scopes
Per-request context isolation. Attributes set on a scope are merged into every entry.
1
2
3
4
5
6
7
8
9
10
11
| def handle_scope(scope):
scope.set_user("usr_456")
scope.set_request("GET", "/api/users", "req_abc123")
scope.set_attribute("tenant", "acme-corp")
scope.info("processing request")
if err:
scope.capture_error(err)
logflux.with_scope(handle_scope)
|
Trace Context Propagation
Propagate trace context across services via HTTP headers.
1
2
3
4
5
6
7
8
9
| # Client: inject trace header
span = logflux.start_span("http.client", "GET /api")
headers = {"X-LogFlux-Trace": span.trace_header}
requests.get(url, headers=headers)
# Server: continue from incoming headers
span = logflux.continue_from_headers(request.headers, "http.server", "GET /api")
# ... handle request ...
span.end()
|
Framework Middleware
Django
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # middleware.py
import logflux
class LogFluxMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
span = logflux.start_span("http.server", f"{request.method} {request.path}")
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", request.get_full_path())
response = self.get_response(request)
span.set_attribute("http.status_code", str(response.status_code))
span.end()
return response
|
Flask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| from flask import Flask, request
import logflux
app = Flask(__name__)
@app.before_request
def before_request():
span = logflux.start_span("http.server", f"{request.method} {request.path}")
request._logflux_span = span
@app.after_request
def after_request(response):
span = getattr(request, "_logflux_span", None)
if span:
span.set_attribute("http.status_code", str(response.status_code))
span.end()
return response
|
FastAPI
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| from fastapi import FastAPI, Request
import logflux
app = FastAPI()
@app.middleware("http")
async def logflux_middleware(request: Request, call_next):
span = logflux.start_span("http.server", f"{request.method} {request.url.path}")
span.set_attribute("http.method", request.method)
response = await call_next(request)
span.set_attribute("http.status_code", str(response.status_code))
span.end()
return response
|
Configuration
Options
| Option | Type | Default | Description |
|---|
api_key | str | required | API key (<region>-lf_<key>) |
source | str | | Service name |
environment | str | | Attached to meta.environment |
release | str | | Attached to meta.release |
queue_size | int | 1000 | In-memory buffer capacity |
batch_size | int | 100 | Entries per HTTP request |
flush_interval | float | 5.0 | Auto-flush interval (seconds) |
worker_count | int | 2 | Background threads |
max_retries | int | 3 | Max retry attempts |
sample_rate | float | 1.0 | 0.0-1.0, send probability |
max_breadcrumbs | int | 100 | Ring buffer size |
failsafe | bool | True | Never crash host app |
enable_compression | bool | True | Gzip before encryption |
Environment Variables
1
| logflux.init_from_env(node="my-node")
|
Reads LOGFLUX_API_KEY, LOGFLUX_ENVIRONMENT, LOGFLUX_NODE, LOGFLUX_QUEUE_SIZE, LOGFLUX_BATCH_SIZE, LOGFLUX_KEY_PERSISTENCE, LOGFLUX_KEY_PERSISTENCE_PATH, LOGFLUX_KEY_ROTATION_DAYS, etc.
Key Persistence
The SDK persists AES encryption keys to disk by default, reusing them across restarts instead of performing a new handshake each time. This reduces key proliferation in the database and speeds up initialization.
| Option | Type | Default | Description |
|---|
key_persistence | bool | True | Enable/disable key persistence |
key_persistence_path | str | None | ~/.logflux/sessions/ | Custom directory for session files |
key_rotation_days | int | 0 (never) | Rotate key after N days |
Session files are stored at ~/.logflux/sessions/<hash>.json with 0600 permissions. If the cached key is rejected by the server (401/403), the SDK automatically re-handshakes and saves the new key.
1
2
3
4
5
6
7
8
9
10
11
12
| # Disable persistence (for ephemeral containers)
logflux.init(Options(
api_key="eu-lf_your_key",
key_persistence=False,
))
# Custom path and 30-day rotation
logflux.init(Options(
api_key="eu-lf_your_key",
key_persistence_path="/var/lib/myapp/logflux/",
key_rotation_days=30,
))
|
BeforeSend Hooks
Filter or modify entries before they are sent. Return None to drop.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def filter_debug(log):
if log["level"] == 8: # debug
return None
return log
def scrub_audit(audit):
audit.get("attributes", {}).pop("ip", None)
return audit
logflux.init(Options(
api_key="eu-lf_...",
before_send_log=filter_debug,
before_send_audit=scrub_audit,
))
|
Sampling
1
| logflux.init(Options(api_key="...", sample_rate=0.1)) # send 10%
|
Audit entries (type 5) are never sampled.
Security
- Zero-knowledge: All payloads encrypted client-side with AES-256-GCM
- RSA key exchange: AES keys negotiated via RSA-2048 OAEP handshake
- Key zeroing: AES keys cleared from memory on
close() - Bounded reads: All HTTP responses size-limited
- Failsafe: SDK errors never crash the host application
Requirements
- Python 3.10 or later
- LogFlux account with API key
License
Elastic License 2.0 (ELv2) – free for all use except offering as a hosted or managed service to third parties.
Support
Disclaimer
The Python logo and trademarks are the property of the Python Software Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by the Python Software Foundation. The Python logo is used solely for identification purposes to indicate compatibility and integration capabilities.