Here's a scenario. Your app throws a 500 error. You SSH into the server and run grep "ERROR" app.log. You get 847 lines. You grep harder. You add more pipes. Thirty minutes later you've reconstructed roughly what happened from scattered log fragments across three files and you're not entirely sure the timeline is right.
The problem isn't that you have too many logs. The problem is that your logs are unstructured strings that computers can't efficiently parse. The obvious solution is LET'S AUTOMATE IT — specifically, let's emit structured JSON logs that log aggregation tools can actually query.
Before: The Plaintext Log Problem
Here's typical Python logging:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_order(order_id, user_id):
logger.info(f"Processing order {order_id} for user {user_id}")
# ... do stuff ...
logger.error(f"Failed to process order {order_id}: database timeout")
Output:
INFO:myapp.orders:Processing order 1234 for user 9876
ERROR:myapp.orders:Failed to process order 1234: database timeout
Now try to answer: how many orders failed in the last hour? What's the average processing time? Which users are seeing errors? You can't. Not without writing janky regex that breaks the moment someone changes a log message.
After: JSON Logs
Install python-json-logger:
$ pip install python-json-logger
Set up a JSON formatter:
import logging
from pythonjsonlogger import jsonlogger
def setup_logging():
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
Same log calls now produce:
{"asctime": "2017-09-28 14:32:11,423", "name": "myapp.orders", "levelname": "INFO", "message": "Processing order 1234 for user 9876"}
{"asctime": "2017-09-28 14:32:11,891", "name": "myapp.orders", "levelname": "ERROR", "message": "Failed to process order 1234: database timeout"}
Better, but we can do more. The real power is adding structured fields:
logger.info("Processing order", extra={
"order_id": order_id,
"user_id": user_id,
"amount": 49.99
})
logger.error("Order processing failed", extra={
"order_id": order_id,
"error_type": "DatabaseTimeout",
"duration_ms": 5023
})
Output:
{"asctime": "2017-09-28 14:32:11,423", "levelname": "INFO", "message": "Processing order", "order_id": 1234, "user_id": 9876, "amount": 49.99}
{"asctime": "2017-09-28 14:32:11,891", "levelname": "ERROR", "message": "Order processing failed", "order_id": 1234, "error_type": "DatabaseTimeout", "duration_ms": 5023}
Now that's queryable. In CloudWatch Logs Insights:
fields @timestamp, order_id, error_type, duration_ms
| filter levelname = "ERROR"
| stats count() by error_type
| sort count desc
Or in Kibana:
levelname:ERROR AND error_type:DatabaseTimeout
Adding Correlation IDs
When a request touches multiple services, you need a way to trace a single user action across all the logs it produces. The answer is a correlation ID — generate one per request, inject it into every log line.
For Flask:
import uuid
import logging
from flask import Flask, g, request
from pythonjsonlogger import jsonlogger
app = Flask(__name__)
class RequestIdFilter(logging.Filter):
def filter(self, record):
record.request_id = getattr(g, 'request_id', 'no-request-id')
return True
def setup_logging():
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(name)s %(request_id)s %(message)s'
)
handler.setFormatter(formatter)
handler.addFilter(RequestIdFilter())
logging.root.addHandler(handler)
logging.root.setLevel(logging.INFO)
@app.before_request
def set_request_id():
g.request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
@app.route('/orders/<int:order_id>')
def get_order(order_id):
logger = logging.getLogger(__name__)
logger.info("Fetching order", extra={"order_id": order_id})
# ...
return {"order_id": order_id}
Every log line for a request now includes the same request_id. In a distributed system, pass this ID in the X-Request-ID header to downstream services, and they should propagate it the same way. Now you can do:
filter request_id = "550e8400-e29b-41d4-a716-446655440000"
And see every log line from every service that touched that request. This is the thing that makes on-call bearable.
Logging in Lambda
Lambda is slightly different — you don't set up a stream handler because Lambda captures stdout automatically. You do want structured JSON though. The simplest approach:
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def log(level, message, **kwargs):
record = {"level": level, "message": message}
record.update(kwargs)
print(json.dumps(record))
def handler(event, context):
request_id = context.aws_request_id
log("INFO", "Lambda invoked",
request_id=request_id,
function_name=context.function_name,
remaining_ms=context.get_remaining_time_in_millis())
# ...
Lambda already provides context.aws_request_id, which is your correlation ID for tracing through CloudWatch.
Wrapping Up
The before/after here is stark. Plaintext logs are for reading by humans in a terminal. Structured JSON logs are for reading by humans via a query tool — and the query tool wins every time there's actual volume. Add python-json-logger, add correlation IDs from the start (retrofitting them is painful), and make every field you'd ever want to filter on a first-class log attribute rather than embedded in a string. Future-you at 2am will be grateful.