If you use heroku and AWS and want to customize your heroku application logging, you can hook Logplex up to AWS Lambda.
Background
When a heroku application emits things to stdout or stderr they get shuttled to the magical world of Logplex. The logs enter as syslog messages, containing information like facility, priority, etc. Not only logs from your application but logs from heroku’s build and deploy systems, postgresql, and other add-ons as well. Shortly after arrival these logs are dispatched to whatever sinks your heroku app has configured which can go to add-ons like PaperTrail, and also to custom log sink URLs. The sink destinations can be syslog(+TLS) or syslog-over-HTTPS using octet counting framing.
One advantage of this setup is that you can have your application emit logs with a minimum of blocking. At one point I had my application sending logs to Slack directly but this caused latency in the application any time I logged anything. By sending to Logplex on the other hand, I can process the application messages asynchronously without doing anything remotely fancy in my application. Another benefit is that you can handle your application, database, build, and deploy logs all the same unified fashion.
Using AWS API Gateway and Lambda you can set up your own Logplex sink and can do whatever you desire with the logs coming out of Logplex. This includes your application’s output as well as add-ons and heroku platform messages. You can them send them into CloudWatch Logs, or even Slack as in this example:
"""Sample handler for parsing Heroku logplex drain events (https://devcenter.heroku.com/articles/log-drains#https-drains). | |
Expects messages to be framed with the syslog TCP octet counting method (https://tools.ietf.org/html/rfc6587#section-3.4.1). | |
This is designed to be run as a Python3.6 lambda. | |
""" | |
import json | |
import boto3 | |
import logging | |
import iso8601 | |
import requests | |
from base64 import b64decode | |
from pyparsing import Word, Suppress, nums, Optional, Regex, pyparsing_common, alphanums | |
from syslog import LOG_DEBUG, LOG_WARNING, LOG_INFO, LOG_NOTICE | |
from collections import defaultdict | |
HOOK_URL = "https://" + boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('ascii') | |
CHANNEL = "#alerts" | |
log = logging.getLogger('myapp.heroku.drain') | |
class Parser(object): | |
def __init__(self): | |
ints = Word(nums) | |
# priority | |
priority = Suppress("<") + ints + Suppress(">") | |
# version | |
version = ints | |
# timestamp | |
timestamp = pyparsing_common.iso8601_datetime | |
# hostname | |
hostname = Word(alphanums + "_" + "-" + ".") | |
# source | |
source = Word(alphanums + "_" + "-" + ".") | |
# appname | |
appname = Word(alphanums + "(" + ")" + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress("-") | |
# message | |
message = Regex(".*") | |
# pattern build | |
self.__pattern = priority + version + timestamp + hostname + source + appname + message | |
def parse(self, line): | |
parsed = self.__pattern.parseString(line) | |
# https://tools.ietf.org/html/rfc5424#section-6 | |
# get priority/severity | |
priority = int(parsed[0]) | |
severity = priority & 0x07 | |
facility = priority >> 3 | |
payload = {} | |
payload["priority"] = priority | |
payload["severity"] = severity | |
payload["facility"] = facility | |
payload["version"] = parsed[1] | |
payload["timestamp"] = iso8601.parse_date(parsed[2]) | |
payload["hostname"] = parsed[3] | |
payload["source"] = parsed[4] | |
payload["appname"] = parsed[5] | |
payload["message"] = parsed[6] | |
return payload | |
parser = Parser() | |
def lambda_handler(event, context): | |
handle_lambda_proxy_event(event) | |
return { | |
"isBase64Encoded": False, | |
"statusCode": 200, | |
"headers": {"Content-Length": 0}, | |
} | |
def handle_lambda_proxy_event(event): | |
body = event['body'] | |
headers = event['headers'] | |
# sanity-check source | |
assert headers['X-Forwarded-Proto'] == 'https' | |
assert headers['Content-Type'] == 'application/logplex-1' | |
# split into chunks | |
def get_chunk(payload: bytes): | |
# payload = payload.lstrip() | |
msg_len, syslog_msg_payload = payload.split(b' ', maxsplit=1) | |
if msg_len == '': | |
raise Exception(f"failed to parse heroku logplex payload: '{payload}'") | |
try: | |
msg_len = int(msg_len) | |
except Exception as ex: | |
raise Exception(f"failed to parse {msg_len} as int, payload: {payload}") from ex | |
# only grab msg_len bytes of syslog_msg | |
syslog_msg = syslog_msg_payload[0:msg_len] | |
next_payload = syslog_msg_payload[msg_len:] | |
yield syslog_msg.decode('utf-8') | |
if next_payload: | |
yield from get_chunk(next_payload) | |
# group messages by source,app | |
# format for slack | |
srcapp_msgs = defaultdict(dict) | |
chunk_count = 0 | |
for chunk in get_chunk(bytes(body, 'utf-8')): | |
chunk_count += 1 | |
evt = parser.parse(chunk) | |
if not filter_slack_msg(evt): | |
# skip stuff filtered out | |
continue | |
# add to group | |
sev = evt['severity'] | |
group_name = f"SEV:{sev} {evt['source']} {evt['appname']}" | |
if sev not in srcapp_msgs[group_name]: | |
srcapp_msgs[group_name][sev] = list() | |
body = evt["message"] | |
srcapp_msgs[group_name][sev].append(str(evt["timestamp"]) + ': ' + evt["message"]) | |
for group_name, sevs in srcapp_msgs.items(): | |
for severity, lines in sevs.items(): | |
if not lines: | |
continue | |
title = group_name | |
# format the syslog event as a slack message attachment | |
slack_att = slack_format_attachment(log_msg=None, log_rec=evt) | |
text = "\n" + "\n".join(lines) | |
slack(text=text, title=title, attachments=[slack_att], channel=channel, severity=severity) | |
# sanity-check number of parsed messages | |
assert int(headers['Logplex-Msg-Count']) == chunk_count | |
return "" | |
def slack_format_attachment(log_msg=None, log_rec=None, title=None): | |
"""Format as slack attachment.""" | |
severity = int(log_rec['severity']) | |
# color | |
color = None | |
if severity == LOG_DEBUG: | |
color = "#aaaaaa" | |
elif severity == LOG_INFO: | |
color = "good" | |
elif severity == LOG_NOTICE: | |
color = "#439FE0" | |
elif severity == LOG_WARNING: | |
color = "warning" | |
elif severity < LOG_WARNING: | |
# error! | |
color = "danger" | |
attachment = { | |
# 'text': "`" + log_msg + "`", | |
# 'parse': 'none', | |
'author_name': title, | |
'color': color, | |
'mrkdwn_in': ['text'], | |
'text': log_msg, | |
# 'fields': [ | |
# # { | |
# # 'title': "Facility", | |
# # 'value': log_rec["facility"], | |
# # 'short': True, | |
# # }, | |
# # { | |
# # 'title': "Severity", | |
# # 'value': severity, | |
# # 'short': True, | |
# # }, | |
# { | |
# 'title': "App", | |
# 'value': log_rec["appname"], | |
# 'short': True, | |
# }, | |
# # { | |
# # 'title': "Source", | |
# # 'value': log_rec["source"], | |
# # 'short': True, | |
# # }, | |
# { | |
# 'title': "Timestamp", | |
# 'value': str(log_rec["timestamp"]), | |
# 'short': True, | |
# } | |
# ] | |
} | |
return attachment | |
def filter_slack_msg(msg): | |
"""Return true if we should send to slack.""" | |
sev = msg["severity"] # e.g. LOG_DEBUG | |
source = msg["source"] # e.g. 'app' | |
appname = msg["appname"] # e.g. 'heroku-postgres' | |
body = msg["message"] | |
if sev >= LOG_DEBUG: | |
return False | |
if body.startswith('DEBUG '): | |
return False | |
# if source == 'app' and sev > LOG_WARNING: | |
# return False | |
if appname == 'router': | |
return False | |
if appname == 'heroku-postgres' and sev >= LOG_INFO: | |
return False | |
if 'sql_error_code = 00000 LOG: checkpoint complete' in body: | |
# ignore checkpoint | |
return False | |
if 'sql_error_code = 00000 NOTICE: pg_stop_backup complete, all required WAL segments have been archived' in body: | |
# ignore checkpoint | |
return False | |
if 'sql_error_code = 00000 LOG: checkpoint starting: ' in body: | |
# ignore checkpoint | |
return False | |
if appname == 'logplex' and body.startswith('Error L10'): | |
# NN messages dropped since... | |
return False | |
return True | |
def slack(text=None, title=None, attachments=[], icon=None, channel='#alerts', severity=LOG_WARNING): | |
if not attachments: | |
return | |
# emoji icon | |
icon = 'mega' | |
if severity == LOG_DEBUG: | |
icon = 'information_source' | |
elif severity == LOG_INFO: | |
icon = 'information_desk_person' | |
elif severity == LOG_NOTICE: | |
icon = 'scroll' | |
elif severity == LOG_WARNING: | |
icon = 'warning' | |
elif severity < LOG_WARNING: | |
# error! | |
icon = 'boom' | |
message = { | |
"username": title, | |
"channel": channel, | |
"icon_emoji": f":{icon}:", | |
"attachments": attachments, | |
"text": text, | |
} | |
print(message) | |
slack_raw(message) | |
def slack_raw(payload): | |
response = requests.post( | |
HOOK_URL, data=json.dumps(payload), | |
headers={'Content-Type': 'application/json'} | |
) | |
if response.status_code != 200: | |
raise ValueError( | |
'Request to slack returned an error %s, the response is:\n%s' | |
% (response.status_code, response.text) | |
) |
Drawbacks
There is one major deficiency in this system that is worth noting: there is no way for your application to alter the log message’s syslog fields. So even if your application logger knows a particular message is debug, or warn, or error, it all comes across as severity level 6 (info). Logs from other components such as postgresql preserve their log severities but your application is a second-class citizen and there is no mechanism to send actual syslog messages to Logplex even though add-ons and internal heroku machinery clearly does. I filed a ticket about this and complained at length and they told me they have no plans to allow users to send syslog-formatted messages to Logplex, and everyone is stuck with only stdout/stderr. This means if you wish to treat messages of differing severities differently in your Logplex sink you can’t, at least not with the existing out-of-band syslog data that your sink receives. As far as the sink can tell all of your application debug logs and error logs all look the same, which is frankly an impossible situation when it comes to logging. Hopefully they fix this some day.