Python SDK
Official Python SDK for LogFlux with encrypted logging, resilient architecture, and native Python integration
The LogFlux Python SDK provides a secure, high-performance way to send encrypted logs to LogFlux from your Python applications, with native Python features like async support and logging library integration.
π View full documentation and examples on GitHub β
Installation
1
|
pip install logflux-python-sdk
|
Quick Start
Ultra-Simple Usage (2 lines!)
1
2
3
4
5
6
7
8
9
10
11
12
|
import logflux
# Initialize LogFlux (one time setup)
logflux.init("https://<customer-id>.ingest.<region>.logflux.io", "my-app", "lf_your_api_key", "your-secret")
# Use it with proper log levels
logflux.info("User login successful")
logflux.warn("Rate limit approaching")
logflux.error("Database connection failed")
# Clean shutdown
logflux.close()
|
Production-Ready Setup
1
2
|
export LOGFLUX_API_KEY="lf_your_api_key"
export LOGFLUX_SERVER_URL="https://<customer-id>.ingest.<region>.logflux.io"
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from logflux.client import ResilientClient
# Create resilient client from environment
client = ResilientClient.from_env("production-server", "your-secret")
# Send logs - they're queued and sent asynchronously with retry
client.info("Application started successfully")
client.error("Database connection failed - will retry")
# Check statistics
stats = client.get_stats()
print(f"Sent: {stats.total_sent}, Failed: {stats.total_failed}")
# Clean shutdown
client.close()
|
Key Features
- Strong Encryption: AES-256-GCM encryption via RSA handshake
- Resilient Architecture: In-memory queuing, automatic retries, failsafe mode
- High Performance: Async processing with configurable worker pools
- Python Native: Context managers, type hints, async support
- Built-in Statistics: Monitor performance and queue health
- Logger Adapters: Drop-in replacements for standard library and structlog
Configuration
Environment Variables
1
2
3
4
5
6
7
8
9
|
# Required
export LOGFLUX_API_KEY="lf_your_api_key"
export LOGFLUX_SERVER_URL="https://<customer-id>.ingest.<region>.logflux.io"
# Optional
export LOGFLUX_QUEUE_SIZE="1000"
export LOGFLUX_FLUSH_INTERVAL="5.0"
export LOGFLUX_WORKER_COUNT="2"
export LOGFLUX_FAILSAFE_MODE="true"
|
From Environment
1
2
|
# Create client from environment variables
client = ResilientClient.from_env("my-app", "your-secret")
|
Logger Interface
Basic Logger
1
2
3
4
5
6
7
8
9
10
11
|
from logflux.logger import Logger
# Create logger with prefix
logger = Logger.from_env("my-app", "your-secret", prefix="API")
# Use logger interface
logger.info("Request started")
logger.errorf("Failed to process user %s", user_id)
logger.debug("Processing completed")
logger.close()
|
Async Logger
1
2
3
4
5
6
7
8
9
10
11
12
|
from logflux.logger import AsyncLogger, Logger
# High-performance async logging
base_logger = Logger.from_env("my-app", "your-secret", prefix="ASYNC")
async_logger = AsyncLogger(base_logger, buffer_size=1000)
# Send logs asynchronously (non-blocking)
for i in range(1000):
async_logger.info(f"Processing item {i}")
# Clean shutdown
async_logger.close()
|
Logger Adapters
Standard Library Integration
1
2
3
4
5
6
7
8
9
10
11
|
from logflux.adapters.stdlib_adapter import replace_root_logger
from logflux.client import ResilientClient
import logging
# Replace root logger
client = ResilientClient.from_env("my-app", "your-secret")
replace_root_logger(client, prefix="APP")
# Now all standard library logging goes to LogFlux
logging.info("This goes to LogFlux")
logging.error("This also goes to LogFlux")
|
Structured Logging
1
2
3
4
5
6
7
8
|
from logflux.adapters.structlog_adapter import get_logger
client = ResilientClient.from_env("my-app", "your-secret")
logger = get_logger(client, prefix="STRUCT")
# Structured logging with context
logger.info("User login", user_id="12345", ip="192.168.1.1", success=True)
logger.error("Database error", table="users", error_code=1054)
|
Log Levels
Level |
Value |
Method |
Description |
Debug |
0 |
.debug() |
Detailed diagnostic information |
Info |
1 |
.info() |
General informational messages |
Warn |
2 |
.warn() |
Warning messages |
Error |
3 |
.error() |
Error events |
Fatal |
4 |
.fatal() |
Very severe errors |
Best Practices
Use Context Managers
1
2
3
4
|
# Automatic cleanup with context managers
with ResilientClient.from_env("my-app", "secret") as client:
client.info("Application started")
# Client automatically closed
|
Enable Failsafe Mode
1
2
3
4
5
6
|
# Never crash your application
config = ResilientClientConfig(
failsafe_mode=True,
queue_size=5000,
worker_count=4,
)
|
1
2
3
4
|
# Regular health checks
stats = client.get_stats()
if stats.total_dropped > 0:
print(f"Warning: {stats.total_dropped} messages dropped")
|
Common Use Cases
Web Applications
1
2
3
4
5
6
7
8
|
# Flask/Django integration
from logflux.adapters.stdlib_adapter import replace_root_logger
client = ResilientClient.from_env("web-app", secret)
replace_root_logger(client, prefix="WEB")
# Framework logging now goes to LogFlux
app.logger.info("User logged in")
|
Background Workers
1
2
3
4
5
6
|
# High-performance async logging for workers
async_logger = AsyncLogger(base_logger, buffer_size=5000)
@celery.task
def process_data(data):
async_logger.info(f"Processing {len(data)} items")
|
Requirements
- Python: 3.7 or later
- Dependencies:
requests
, cryptography
- Optional:
structlog
for structured logging adapter
Support