Heroku logging to AWS Lambda

If you use heroku and AWS and want to customize your heroku application logging, you can hook Logplex up to AWS Lambda.

Background

When a heroku application emits things to stdout or stderr they get shuttled to the magical world of Logplex. The logs enter as syslog messages, containing information like facility, priority, etc. Not only logs from your application but logs from heroku’s build and deploy systems, postgresql, and other add-ons as well. Shortly after arrival these logs are dispatched to whatever sinks your heroku app has configured which can go to add-ons like PaperTrail, and also to custom log sink URLs. The sink destinations can be syslog(+TLS) or syslog-over-HTTPS using octet counting framing.

One advantage of this setup is that you can have your application emit logs with a minimum of blocking. At one point I had my application sending logs to Slack directly but this caused latency in the application any time I logged anything. By sending to Logplex on the other hand, I can process the application messages asynchronously without doing anything remotely fancy in my application. Another benefit is that you can handle your application, database, build, and deploy logs all the same unified fashion.

Using AWS API Gateway and Lambda you can set up your own Logplex sink and can do whatever you desire with the logs coming out of Logplex. This includes your application’s output as well as add-ons and heroku platform messages. You can them send them into CloudWatch Logs, or even Slack as in this example:

"""Sample handler for parsing Heroku logplex drain events (https://devcenter.heroku.com/articles/log-drains#https-drains).
Expects messages to be framed with the syslog TCP octet counting method (https://tools.ietf.org/html/rfc6587#section-3.4.1).
This is designed to be run as a Python3.6 lambda.
"""
import json
import boto3
import logging
import iso8601
import requests
from base64 import b64decode
from pyparsing import Word, Suppress, nums, Optional, Regex, pyparsing_common, alphanums
from syslog import LOG_DEBUG, LOG_WARNING, LOG_INFO, LOG_NOTICE
from collections import defaultdict
HOOK_URL = "https://" + boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('ascii')
CHANNEL = "#alerts"
log = logging.getLogger('myapp.heroku.drain')
class Parser(object):
def __init__(self):
ints = Word(nums)
# priority
priority = Suppress("<") + ints + Suppress(">")
# version
version = ints
# timestamp
timestamp = pyparsing_common.iso8601_datetime
# hostname
hostname = Word(alphanums + "_" + "-" + ".")
# source
source = Word(alphanums + "_" + "-" + ".")
# appname
appname = Word(alphanums + "(" + ")" + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress("-")
# message
message = Regex(".*")
# pattern build
self.__pattern = priority + version + timestamp + hostname + source + appname + message
def parse(self, line):
parsed = self.__pattern.parseString(line)
# https://tools.ietf.org/html/rfc5424#section-6
# get priority/severity
priority = int(parsed[0])
severity = priority & 0x07
facility = priority >> 3
payload = {}
payload["priority"] = priority
payload["severity"] = severity
payload["facility"] = facility
payload["version"] = parsed[1]
payload["timestamp"] = iso8601.parse_date(parsed[2])
payload["hostname"] = parsed[3]
payload["source"] = parsed[4]
payload["appname"] = parsed[5]
payload["message"] = parsed[6]
return payload
parser = Parser()
def lambda_handler(event, context):
handle_lambda_proxy_event(event)
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Length": 0},
}
def handle_lambda_proxy_event(event):
body = event['body']
headers = event['headers']
# sanity-check source
assert headers['X-Forwarded-Proto'] == 'https'
assert headers['Content-Type'] == 'application/logplex-1'
# split into chunks
def get_chunk(payload: bytes):
# payload = payload.lstrip()
msg_len, syslog_msg_payload = payload.split(b' ', maxsplit=1)
if msg_len == '':
raise Exception(f"failed to parse heroku logplex payload: '{payload}'")
try:
msg_len = int(msg_len)
except Exception as ex:
raise Exception(f"failed to parse {msg_len} as int, payload: {payload}") from ex
# only grab msg_len bytes of syslog_msg
syslog_msg = syslog_msg_payload[0:msg_len]
next_payload = syslog_msg_payload[msg_len:]
yield syslog_msg.decode('utf-8')
if next_payload:
yield from get_chunk(next_payload)
# group messages by source,app
# format for slack
srcapp_msgs = defaultdict(dict)
chunk_count = 0
for chunk in get_chunk(bytes(body, 'utf-8')):
chunk_count += 1
evt = parser.parse(chunk)
if not filter_slack_msg(evt):
# skip stuff filtered out
continue
# add to group
sev = evt['severity']
group_name = f"SEV:{sev} {evt['source']} {evt['appname']}"
if sev not in srcapp_msgs[group_name]:
srcapp_msgs[group_name][sev] = list()
body = evt["message"]
srcapp_msgs[group_name][sev].append(str(evt["timestamp"]) + ': ' + evt["message"])
for group_name, sevs in srcapp_msgs.items():
for severity, lines in sevs.items():
if not lines:
continue
title = group_name
# format the syslog event as a slack message attachment
slack_att = slack_format_attachment(log_msg=None, log_rec=evt)
text = "\n" + "\n".join(lines)
slack(text=text, title=title, attachments=[slack_att], channel=channel, severity=severity)
# sanity-check number of parsed messages
assert int(headers['Logplex-Msg-Count']) == chunk_count
return ""
def slack_format_attachment(log_msg=None, log_rec=None, title=None):
"""Format as slack attachment."""
severity = int(log_rec['severity'])
# color
color = None
if severity == LOG_DEBUG:
color = "#aaaaaa"
elif severity == LOG_INFO:
color = "good"
elif severity == LOG_NOTICE:
color = "#439FE0"
elif severity == LOG_WARNING:
color = "warning"
elif severity < LOG_WARNING:
# error!
color = "danger"
attachment = {
# 'text': "`" + log_msg + "`",
# 'parse': 'none',
'author_name': title,
'color': color,
'mrkdwn_in': ['text'],
'text': log_msg,
# 'fields': [
# # {
# # 'title': "Facility",
# # 'value': log_rec["facility"],
# # 'short': True,
# # },
# # {
# # 'title': "Severity",
# # 'value': severity,
# # 'short': True,
# # },
# {
# 'title': "App",
# 'value': log_rec["appname"],
# 'short': True,
# },
# # {
# # 'title': "Source",
# # 'value': log_rec["source"],
# # 'short': True,
# # },
# {
# 'title': "Timestamp",
# 'value': str(log_rec["timestamp"]),
# 'short': True,
# }
# ]
}
return attachment
def filter_slack_msg(msg):
"""Return true if we should send to slack."""
sev = msg["severity"] # e.g. LOG_DEBUG
source = msg["source"] # e.g. 'app'
appname = msg["appname"] # e.g. 'heroku-postgres'
body = msg["message"]
if sev >= LOG_DEBUG:
return False
if body.startswith('DEBUG '):
return False
# if source == 'app' and sev > LOG_WARNING:
# return False
if appname == 'router':
return False
if appname == 'heroku-postgres' and sev >= LOG_INFO:
return False
if 'sql_error_code = 00000 LOG: checkpoint complete' in body:
# ignore checkpoint
return False
if 'sql_error_code = 00000 NOTICE: pg_stop_backup complete, all required WAL segments have been archived' in body:
# ignore checkpoint
return False
if 'sql_error_code = 00000 LOG: checkpoint starting: ' in body:
# ignore checkpoint
return False
if appname == 'logplex' and body.startswith('Error L10'):
# NN messages dropped since...
return False
return True
def slack(text=None, title=None, attachments=[], icon=None, channel='#alerts', severity=LOG_WARNING):
if not attachments:
return
# emoji icon
icon = 'mega'
if severity == LOG_DEBUG:
icon = 'information_source'
elif severity == LOG_INFO:
icon = 'information_desk_person'
elif severity == LOG_NOTICE:
icon = 'scroll'
elif severity == LOG_WARNING:
icon = 'warning'
elif severity < LOG_WARNING:
# error!
icon = 'boom'
message = {
"username": title,
"channel": channel,
"icon_emoji": f":{icon}:",
"attachments": attachments,
"text": text,
}
print(message)
slack_raw(message)
def slack_raw(payload):
response = requests.post(
HOOK_URL, data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)

 

Drawbacks

There is one major deficiency in this system that is worth noting: there is no way for your application to alter the log message’s syslog fields. So even if your application logger knows a particular message is debug, or warn, or error, it all comes across as severity level 6 (info). Logs from other components such as postgresql preserve their log severities but your application is a second-class citizen and there is no mechanism to send actual syslog messages to Logplex even though add-ons and internal heroku machinery clearly does. I filed a ticket about this and complained at length and they told me they have no plans to allow users to send syslog-formatted messages to Logplex, and everyone is stuck with only stdout/stderr. This means if you wish to treat messages of differing severities differently in your Logplex sink you can’t, at least not with the existing out-of-band syslog data that your sink receives. As far as the sink can tell all of your application debug logs and error logs all look the same, which is frankly an impossible situation when it comes to logging. Hopefully they fix this some day.

Leave a Reply