LogFlux Python SDK

Python Logo

The official Python SDK for LogFlux. Send logs, metrics, traces, events, and audit entries directly to the LogFlux cloud with end-to-end encryption. The server never sees your plaintext data.

GitHub Repository ยท Release Notes

Key Features

  • End-to-end encryption – AES-256-GCM with RSA key exchange
  • 7 entry types – Log, Metric, Trace, Event, Audit, Telemetry, TelemetryManaged
  • Multipart binary transport – 33% less overhead than JSON + base64
  • Async by default – Background daemon threads with non-blocking queue
  • Breadcrumbs – Automatic trail of recent events attached to error captures
  • Distributed tracing – Spans, child spans, trace header propagation
  • Scopes – Per-request context isolation
  • Failsafe – SDK errors never crash your application
  • Single dependency – Only cryptography (for AES-256-GCM)

Installation

1
pip install logflux-sdk

Quick Start

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import logflux
from logflux import Options

logflux.init(Options(
    api_key="eu-lf_your_api_key",
    source="my-service",
    environment="production",
    release="1.2.3",
))

logflux.info("server started")

# Before process exit
logflux.flush(timeout=2.0)
logflux.close()

Entry Types

Log (Type 1)

Standard application logs with 8 severity levels.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
logflux.debug("cache miss for key users:123")
logflux.info("request processed")
logflux.warn("deprecated API called")
logflux.error("database connection failed")
logflux.critical("out of memory")

# With attributes
logflux.log(LogLevel.ERROR, "query timeout", {
    "db.host": "primary.db.internal",
    "duration_ms": "5023",
})

Metric (Type 2)

Counters, gauges, and distributions.

1
2
3
4
5
6
7
8
logflux.counter("http.requests.total", 1, {
    "method": "GET",
    "status": "200",
})

logflux.gauge("system.cpu.usage", 85.2, {
    "host": "web-01",
})

Event (Type 4)

Discrete application events.

1
2
3
4
logflux.event("user.signup", {
    "user_id": "usr_987",
    "plan": "starter",
})

Audit (Type 5)

Immutable audit trail with Object Lock storage (365-day retention).

1
2
3
logflux.audit("record.deleted", "usr_456", "invoice", "inv_789", {
    "reason": "customer_request",
})

Trace (Type 3)

Distributed tracing with span helpers.

1
2
3
4
5
6
7
span = logflux.start_span("http.server", "GET /api/users")

db_span = span.start_child("db.query", "SELECT * FROM users")
# ... query ...
db_span.end()

span.end()

Telemetry (Types 6 and 7)

Device and sensor data. Type 6 is end-to-end encrypted, type 7 is server-side encrypted.

Error Capture

Capture Python exceptions with automatic stack traces and breadcrumb trail.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
try:
    database.query(sql)
except Exception as err:
    logflux.capture_error(err)

    # With extra context
    logflux.capture_error(err, {
        "sql": sql,
        "db.host": "primary",
    })

Breadcrumbs record a trail of events leading up to an error. They are automatically added for log and event calls, and attached to capture_error.

1
2
3
4
5
6
7
8
logflux.info("loading config")          # auto breadcrumb
logflux.event("user.login")             # auto breadcrumb

logflux.add_breadcrumb("http", "GET /api/users", {
    "status": "200",
})

logflux.capture_error(err)  # includes breadcrumb trail

Scopes

Per-request context isolation. Attributes set on a scope are merged into every entry.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def handle_scope(scope):
    scope.set_user("usr_456")
    scope.set_request("GET", "/api/users", "req_abc123")
    scope.set_attribute("tenant", "acme-corp")

    scope.info("processing request")

    if err:
        scope.capture_error(err)

logflux.with_scope(handle_scope)

Trace Context Propagation

Propagate trace context across services via HTTP headers.

1
2
3
4
5
6
7
8
9
# Client: inject trace header
span = logflux.start_span("http.client", "GET /api")
headers = {"X-LogFlux-Trace": span.trace_header}
requests.get(url, headers=headers)

# Server: continue from incoming headers
span = logflux.continue_from_headers(request.headers, "http.server", "GET /api")
# ... handle request ...
span.end()

Framework Middleware

Django

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# middleware.py
import logflux

class LogFluxMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        span = logflux.start_span("http.server", f"{request.method} {request.path}")
        span.set_attribute("http.method", request.method)
        span.set_attribute("http.url", request.get_full_path())

        response = self.get_response(request)

        span.set_attribute("http.status_code", str(response.status_code))
        span.end()
        return response

Flask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from flask import Flask, request
import logflux

app = Flask(__name__)

@app.before_request
def before_request():
    span = logflux.start_span("http.server", f"{request.method} {request.path}")
    request._logflux_span = span

@app.after_request
def after_request(response):
    span = getattr(request, "_logflux_span", None)
    if span:
        span.set_attribute("http.status_code", str(response.status_code))
        span.end()
    return response

FastAPI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from fastapi import FastAPI, Request
import logflux

app = FastAPI()

@app.middleware("http")
async def logflux_middleware(request: Request, call_next):
    span = logflux.start_span("http.server", f"{request.method} {request.url.path}")
    span.set_attribute("http.method", request.method)

    response = await call_next(request)

    span.set_attribute("http.status_code", str(response.status_code))
    span.end()
    return response

Configuration

Options

OptionTypeDefaultDescription
api_keystrrequiredAPI key (<region>-lf_<key>)
sourcestrService name
environmentstrAttached to meta.environment
releasestrAttached to meta.release
queue_sizeint1000In-memory buffer capacity
batch_sizeint100Entries per HTTP request
flush_intervalfloat5.0Auto-flush interval (seconds)
worker_countint2Background threads
max_retriesint3Max retry attempts
sample_ratefloat1.00.0-1.0, send probability
max_breadcrumbsint100Ring buffer size
failsafeboolTrueNever crash host app
enable_compressionboolTrueGzip before encryption

Environment Variables

1
logflux.init_from_env(node="my-node")

Reads LOGFLUX_API_KEY, LOGFLUX_ENVIRONMENT, LOGFLUX_NODE, LOGFLUX_QUEUE_SIZE, LOGFLUX_BATCH_SIZE, LOGFLUX_KEY_PERSISTENCE, LOGFLUX_KEY_PERSISTENCE_PATH, LOGFLUX_KEY_ROTATION_DAYS, etc.

Key Persistence

The SDK persists AES encryption keys to disk by default, reusing them across restarts instead of performing a new handshake each time. This reduces key proliferation in the database and speeds up initialization.

OptionTypeDefaultDescription
key_persistenceboolTrueEnable/disable key persistence
key_persistence_pathstr | None~/.logflux/sessions/Custom directory for session files
key_rotation_daysint0 (never)Rotate key after N days

Session files are stored at ~/.logflux/sessions/<hash>.json with 0600 permissions. If the cached key is rejected by the server (401/403), the SDK automatically re-handshakes and saves the new key.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Disable persistence (for ephemeral containers)
logflux.init(Options(
    api_key="eu-lf_your_key",
    key_persistence=False,
))

# Custom path and 30-day rotation
logflux.init(Options(
    api_key="eu-lf_your_key",
    key_persistence_path="/var/lib/myapp/logflux/",
    key_rotation_days=30,
))

BeforeSend Hooks

Filter or modify entries before they are sent. Return None to drop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def filter_debug(log):
    if log["level"] == 8:  # debug
        return None
    return log

def scrub_audit(audit):
    audit.get("attributes", {}).pop("ip", None)
    return audit

logflux.init(Options(
    api_key="eu-lf_...",
    before_send_log=filter_debug,
    before_send_audit=scrub_audit,
))

Sampling

1
logflux.init(Options(api_key="...", sample_rate=0.1))  # send 10%

Audit entries (type 5) are never sampled.

Security

  • Zero-knowledge: All payloads encrypted client-side with AES-256-GCM
  • RSA key exchange: AES keys negotiated via RSA-2048 OAEP handshake
  • Key zeroing: AES keys cleared from memory on close()
  • Bounded reads: All HTTP responses size-limited
  • Failsafe: SDK errors never crash the host application

Requirements

  • Python 3.10 or later
  • LogFlux account with API key

License

Elastic License 2.0 (ELv2) – free for all use except offering as a hosted or managed service to third parties.

Support

Disclaimer

The Python logo and trademarks are the property of the Python Software Foundation. LogFlux is not affiliated with, endorsed by, or sponsored by the Python Software Foundation. The Python logo is used solely for identification purposes to indicate compatibility and integration capabilities.