Python SDK
Production-grade logging SDK with automatic batching, retry logic, graceful shutdown, and zero dependencies. Built for Flask, FastAPI, Django, and any Python application.
Installation
pip install luminalog
# or
poetry add luminalog
# or
pipenv install luminalogQuick Start
import os
from luminalog import LuminaLog
# Initialize the logger
logger = LuminaLog(
api_key=os.getenv('LUMINALOG_API_KEY'),
environment=os.getenv('ENVIRONMENT', 'production'),
batch_size=100, # Auto-flush after 100 logs
flush_interval=5.0, # Auto-flush every 5 seconds
)
# Start logging
logger.info('Application started', {
'version': '1.0.0',
'python_version': sys.version
})
# Automatic error tracking with stack traces
try:
process_payment(order)
except Exception as e:
logger.capture_error(e, {'user_id': 'user_123'})Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
| api_key | str | Required | Your LuminaLog API key |
| environment | str | 'default' | Label for filtering logs (stored as metadata). Actual environment is set by your API key. |
| project_id | str | None | Optional project identifier for multi-project apps |
| batch_size | int | 100 | Logs to buffer before auto-flush. Min: 50, Max: 500. Automatically capped. |
| flush_interval | float | 5.0 | Seconds between auto-flushes (real-time balance) |
| min_level | str | None | Filter logs by minimum level (debug|info|warn|error|fatal|panic) |
| privacy_mode | bool | False | Skip automatic PII scrubbing (you handle sensitive data) |
| endpoint | str | api.luminalog.cloud | Custom API endpoint for self-hosted deployments |
| debug | bool | False | Enable SDK debug logging to console |
Environment Handling
The environment parameter is stored as metadata for filtering within your logs. The actual environment (production, staging, etc.) is determined by your API key's environment setting in the dashboard.
Why? This prevents quota abuse and ensures clear project boundaries. Create separate API keys for each environment in your project settings.
SDK Efficiency & Batching
The Python SDK implements a high-performance batching strategy to minimize network overhead:
- Minimum (50): Logs are held until at least 50 are ready to avoid high-frequency small requests.
- Maximum (500): Logs are flushed immediately upon reaching 500 to stay within ingestion buffer limits.
Note: Values provided outside this range will be automatically clamped to the nearest threshold.
Logging Methods
πlogger.debug(message, metadata)
Detailed diagnostics for development. Filtered in production with min_level.
βΉοΈlogger.info(message, metadata)
General operational events. Track user actions, system events.
β οΈlogger.warn(message, metadata)
Warning conditions. Slow queries, deprecated API usage, high memory.
βlogger.error(message, metadata)
Error conditions requiring attention. Failed operations, exceptions.
π₯logger.fatal(message, metadata)
Critical errors causing service degradation.
π¨logger.panic(message, metadata)
System-wide failures. Flushes immediately, bypasses batching.
Child Loggers
Create child loggers with inherited context metadata. Perfect for request-scoped logging where you want to automatically include request ID, user ID, trace ID, or other contextual information in every log.
from flask import Flask, g, request
from luminalog import LuminaLog
import uuid
app = Flask(__name__)
logger = LuminaLog(api_key=os.getenv('LUMINALOG_API_KEY'))
@app.before_request
def before_request():
# Create child logger with request context
g.logger = logger.child({
'request_id': str(uuid.uuid4()),
'user_id': getattr(request, 'user_id', None),
'path': request.path,
'method': request.method,
'ip': request.remote_addr,
'user_agent': request.user_agent.string
})
@app.route('/api/payment', methods=['POST'])
def process_payment_route():
g.logger.info('Processing payment', {
'amount': request.json.get('amount'),
'currency': request.json.get('currency')
})
# Log automatically includes: request_id, user_id, path, method, ip, user_agent
try:
result = process_payment(request.json)
g.logger.info('Payment successful', {'transaction_id': result['id']})
return result
except Exception as e:
g.logger.capture_error(e, {
'amount': request.json.get('amount'),
'payment_method': request.json.get('method')
})
return {'error': 'Payment failed'}, 500Nested Child Loggers
request_logger = logger.child({'request_id': 'req_123'})
user_logger = request_logger.child({'user_id': 'user_456'})
transaction_logger = user_logger.child({'transaction_id': 'txn_789'})
transaction_logger.info('Payment processed')
# Includes: request_id, user_id, transaction_idLevel Filtering
Filter logs by minimum level before sending to reduce costs and noise in production. Logs below the minimum level are discarded in the SDK, saving bandwidth and ingestion costs.
import os
from luminalog import LuminaLog
# Environment-based filtering
logger = LuminaLog(
api_key=os.getenv('LUMINALOG_API_KEY'),
environment=os.getenv('ENVIRONMENT', 'development'),
min_level='info' if os.getenv('ENVIRONMENT') == 'production' else 'debug'
)
# Development: All logs sent (debug, info, warn, error, fatal, panic)
logger.debug('Detailed variable state', {'vars': local_vars}) # β
Sent in dev
logger.info('User logged in') # β
Sent in dev
# Production: Only info and above (info, warn, error, fatal, panic)
logger.debug('Detailed variable state', {'vars': local_vars}) # β Filtered in prod
logger.info('User logged in') # β
Sent in prod
logger.error('Payment failed') # β
Always sentLog Level Hierarchy
Setting min_level='info' sends info, warn, error, fatal, and panic logs. Debug logs are filtered out.
Cost Savings
Error Tracking
Automatic error tracking with stack traces, fingerprinting, and context. Errors are automatically deduplicated and grouped in your dashboard.
try:
process_payment(order)
except Exception as e:
# Automatically extracts:
# - Error type and message
# - Full stack trace
# - Error fingerprint for deduplication
logger.capture_error(e, {
'order_id': order['id'],
'user_id': order['user_id'],
'amount': order['amount']
})
# Also available as capture_exception (alias)
logger.capture_exception(e, context)Automatic Fingerprinting
Privacy Mode
Enable privacy mode to skip automatic PII scrubbing when you handle sensitive data yourself. This tells LuminaLog to store your logs as-is without processing.
logger = LuminaLog(
api_key=os.getenv('LUMINALOG_API_KEY'),
privacy_mode=True # Skip automatic PII scrubbing
)
# You're responsible for not sending PII
logger.info('User action', {
'user_id': hash_user_id(user.id), # β
Hashed ID
'action': 'login', # β
No PII
'timestamp': time.time() # β
Safe
})Use With Caution
Distributed Tracing
Built-in helpers for managing trace and span identifiers, compatible with W3C Trace Context.
Tracing Helpers
generate_trace_id()Generates a unique UUID v4.generate_span_id()Generates a unique span ID.get_trace_id_from_request(req)Extracts from Flask, FastAPI, or Django headers.
Implementation
from luminalog import generate_trace_id, generate_span_id
# Track manual operations
trace_id = generate_trace_id()
span_id = generate_span_id()
logger.info("Internal task", {
"trace_id": trace_id,
"span_id": span_id
})Framework Integrations
Flask
from flask import Flask, g, request
from luminalog import LuminaLog
import uuid
import time
app = Flask(__name__)
logger = LuminaLog(api_key=os.getenv('LUMINALOG_API_KEY'))
@app.before_request
def before_request():
g.start_time = time.time()
g.logger = logger.child({
'request_id': str(uuid.uuid4()),
'method': request.method,
'path': request.path
})
@app.after_request
def after_request(response):
duration = (time.time() - g.start_time) * 1000
g.logger.info('Request completed', {
'status_code': response.status_code,
'duration_ms': round(duration, 2)
})
return response
@app.errorhandler(Exception)
def handle_error(error):
g.logger.capture_error(error, {'path': request.path})
return {'error': 'Internal server error'}, 500FastAPI
from fastapi import FastAPI, Request
from luminalog import LuminaLog
import time
import uuid
app = FastAPI()
logger = LuminaLog(api_key=os.getenv('LUMINALOG_API_KEY'))
@app.middleware('http')
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())
request_logger = logger.child({
'request_id': request_id,
'method': request.method,
'path': request.url.path
})
start_time = time.time()
response = await call_next(request)
duration = (time.time() - start_time) * 1000
request_logger.info('Request completed', {
'status_code': response.status_code,
'duration_ms': round(duration, 2)
})
return response
@app.exception_handler(Exception)
async def exception_handler(request: Request, exc: Exception):
logger.capture_error(exc, {
'method': request.method,
'path': request.url.path
})
return {'error': 'Internal server error'}Django
from luminalog import LuminaLog
import time
import uuid
logger = LuminaLog(api_key=os.getenv('LUMINALOG_API_KEY'))
class LuminaLogMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
request.request_id = str(uuid.uuid4())
request.logger = logger.child({
'request_id': request.request_id,
'method': request.method,
'path': request.path
})
start_time = time.time()
response = self.get_response(request)
duration = (time.time() - start_time) * 1000
request.logger.info('Request completed', {
'status_code': response.status_code,
'duration_ms': round(duration, 2)
})
return response
def process_exception(self, request, exception):
if hasattr(request, 'logger'):
request.logger.capture_error(exception, {'path': request.path})
return NoneAdvanced Features
Automatic Batching
Logs are automatically batched and sent when batch_size is reached or flush_interval expires, whichever comes first. This reduces network overhead and improves performance.
logger = LuminaLog(
api_key=os.getenv('LUMINALOG_API_KEY'),
batch_size=50, # Send every 50 logs
flush_interval=3.0 # Or every 3 seconds
)Retry Logic
Failed requests are automatically retried with exponential backoff (3 attempts: immediate, 1s delay, 2s delay). This ensures logs aren't lost during temporary network issues.
Graceful Shutdown
The SDK automatically registers an atexit handler to flush remaining logs on process exit. No manual cleanup needed!
logger = LuminaLog(api_key=os.getenv('LUMINALOG_API_KEY'))
# Your application code...
# Logs are automatically flushed when process exits
# Or manually trigger shutdown:
logger.shutdown() # Stops timer and flushes remaining logsManual Flushing
Force immediate flush of all queued logs before critical operations.
logger.info('Starting database migration')
logger.flush() # Ensure log is sent before migration
run_migration()
logger.info('Migration completed')
logger.flush() # Ensure completion is loggedProduction Example
Complete production-ready Flask application with all features enabled:
import os
from flask import Flask, g, request
from luminalog import LuminaLog
import uuid
import time
app = Flask(__name__)
# Initialize with production settings
logger = LuminaLog(
api_key=os.getenv('LUMINALOG_API_KEY'),
environment=os.getenv('ENVIRONMENT', 'production'),
project_id='ecommerce-api',
min_level='info', # Skip debug logs in production
privacy_mode=False, # Enable PII scrubbing
batch_size=50, # Optimize for throughput
flush_interval=3.0, # Flush every 3 seconds
debug=False # Disable debug output
)
@app.before_request
def before_request():
g.start_time = time.time()
g.request_id = str(uuid.uuid4())
# Create request-scoped logger
g.logger = logger.child({
'request_id': g.request_id,
'method': request.method,
'path': request.path,
'user_agent': request.user_agent.string,
'ip': request.remote_addr
})
g.logger.info('Request started')
@app.after_request
def after_request(response):
duration = (time.time() - g.start_time) * 1000
g.logger.info('Request completed', {
'status_code': response.status_code,
'duration_ms': round(duration, 2),
'content_length': response.content_length
})
return response
@app.errorhandler(Exception)
def handle_error(error):
g.logger.capture_error(error, {
'path': request.path,
'method': request.method
})
return {'error': 'Internal server error'}, 500
@app.route('/api/checkout', methods=['POST'])
def checkout():
try:
order_data = request.get_json()
g.logger.info('Processing checkout', {
'cart_items': len(order_data.get('items', [])),
'total': order_data.get('total')
})
# Process payment
result = process_payment(order_data)
g.logger.info('Checkout successful', {
'order_id': result['order_id'],
'amount': result['amount']
})
return result
except PaymentError as e:
g.logger.error('Payment failed', {
'error': str(e),
'order_data': order_data
})
return {'error': 'Payment failed'}, 400
if __name__ == '__main__':
app.run()