blob: e59440b349dfccea94c96e415c278df5cb1a5015 [file]
#!/usr/bin/env python3
import copy
import email
import email.header
import email.message
import email.policy
import email.quoprimime
import email.utils
import json
import logging
import logging.handlers
import os
import re
import smtplib
import sys
import textwrap
from configparser import ConfigParser, ExtendedInterpolation
from email import charset, utils
from string import Template
from typing import List, Mapping, Optional, Sequence, Tuple, Union
import ezpi
import falcon
import sqlalchemy as sa
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.sql import tuple_
import patatt
charset.add_charset('utf-8', None)
emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
qspecials = re.compile(r'[()<>@,:;.\"\[\]]')
DB_VERSION = 1
logger = logging.getLogger('b4-send-receive')
logger.setLevel(logging.DEBUG)
JSON = Union[str, int, float, bool, Sequence['JSON'], Mapping[str, 'JSON']]
class SendReceiveListener(object):
def __init__(self, _engine: Engine, _config: ConfigParser) -> None:
self._engine = _engine
self._config = _config
# You shouldn't use this in production
if self._engine.driver == 'pysqlite':
self._init_sa_db()
logfile = _config['main'].get('logfile')
loglevel = _config['main'].get('loglevel', 'info')
if logfile:
self._init_logger(logfile, loglevel)
def _init_logger(self, logfile: str, loglevel: str) -> None:
global logger
lch = logging.handlers.WatchedFileHandler(os.path.expanduser(logfile))
lfmt = logging.Formatter(
'[%(process)d] %(asctime)s - %(levelname)s - %(message)s'
)
lch.setFormatter(lfmt)
if loglevel == 'critical':
lch.setLevel(logging.CRITICAL)
elif loglevel == 'debug':
lch.setLevel(logging.DEBUG)
else:
lch.setLevel(logging.INFO)
logger.addHandler(lch)
def _init_sa_db(self) -> None:
logger.info('Setting up SQLite database')
conn = self._engine.connect()
md = sa.MetaData()
meta = sa.Table('meta', md, sa.Column('version', sa.Integer()))
auth = sa.Table(
'auth',
md,
sa.Column('auth_id', sa.Integer(), primary_key=True),
sa.Column(
'created',
sa.DateTime(),
nullable=False,
server_default=sa.sql.func.now(),
),
sa.Column('identity', sa.Text(), nullable=False),
sa.Column('selector', sa.Text(), nullable=False),
sa.Column('pubkey', sa.Text(), nullable=False),
sa.Column('challenge', sa.Text(), nullable=True),
sa.Column('verified', sa.Integer(), nullable=False),
)
sa.Index('idx_identity_selector', auth.c.identity, auth.c.selector, unique=True)
md.create_all(self._engine)
q = sa.insert(meta).values(version=DB_VERSION)
conn.execute(q)
conn.close()
def on_get(self, req: falcon.Request, resp: falcon.Response) -> None:
resp.status = falcon.HTTP_200
resp.content_type = falcon.MEDIA_TEXT
resp.text = "We don't serve GETs here\n"
def send_error(self, resp: falcon.Response, message: str) -> None:
resp.status = falcon.HTTP_500
logger.critical('Returning error: %s', message)
resp.text = json.dumps({'result': 'error', 'message': message})
def send_success(self, resp: falcon.Response, message: str) -> None:
resp.status = falcon.HTTP_200
logger.debug('Returning success: %s', message)
resp.text = json.dumps({'result': 'success', 'message': message})
def get_smtp(self) -> Tuple[smtplib.SMTP, Tuple[str, str]]:
sconfig = self._config['sendemail']
server = sconfig.get('smtpserver', 'localhost')
port = sconfig.getint('smtpserverport', 0)
encryption = sconfig.get('smtpencryption')
logger.debug('Connecting to %s:%s', server, port)
# We only authenticate if we have encryption
if encryption:
if encryption in ('tls', 'starttls'):
# We do startssl
smtp = smtplib.SMTP(server, port)
# Introduce ourselves
smtp.ehlo()
# Start encryption
smtp.starttls()
# Introduce ourselves again to get new criteria
smtp.ehlo()
elif encryption in ('ssl', 'smtps'):
# We do TLS from the get-go
smtp = smtplib.SMTP_SSL(server, port)
else:
raise smtplib.SMTPException(
'Unclear what to do with smtpencryption=%s' % encryption
)
# If we got to this point, we should do authentication.
auser = sconfig.get('smtpuser')
apass = sconfig.get('smtppass')
if auser and apass:
# Let any exceptions bubble up
smtp.login(auser, apass)
else:
# We assume you know what you're doing if you don't need encryption
smtp = smtplib.SMTP(server, port)
afrom = sconfig.get('from')
assert afrom is not None
frompair = utils.getaddresses([afrom])[0]
return smtp, frompair
def auth_new(self, jdata: Mapping[str, JSON], resp: falcon.Response) -> None:
# Is it already authorized?
conn = self._engine.connect()
md = sa.MetaData()
identity: Optional[JSON] = jdata.get('identity')
selector: Optional[JSON] = jdata.get('selector')
pubkey: Optional[JSON] = jdata.get('pubkey')
if (
not isinstance(identity, str)
or not isinstance(selector, str)
or not isinstance(pubkey, str)
):
self.send_error(resp, message='Invalid authentication request')
return
logger.info('New authentication request for %s/%s', identity, selector)
t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
select_auth = sa.select(t_auth.c.auth_id).where(
t_auth.c.identity == identity,
t_auth.c.selector == selector,
t_auth.c.verified == 1,
)
rp = conn.execute(select_auth)
if len(rp.fetchall()):
self.send_error(
resp, message='i=%s;s=%s is already authorized' % (identity, selector)
)
return
# delete any existing challenges for this and create a new one
delete_auth = sa.delete(t_auth).where(
t_auth.c.identity == identity,
t_auth.c.selector == selector,
t_auth.c.verified == 0,
)
conn.execute(delete_auth)
# create new challenge
import uuid
cstr = str(uuid.uuid4())
insert_auth = sa.insert(t_auth).values(
identity=identity,
selector=selector,
pubkey=pubkey,
challenge=cstr,
verified=0,
)
conn.execute(insert_auth)
logger.info('Created new challenge for %s/%s: %s', identity, selector, cstr)
conn.close()
smtp, frompair = self.get_smtp()
cmsg = email.message.EmailMessage()
fromname, fromaddr = frompair
if len(fromname):
cmsg.add_header('From', f'{fromname} <{fromaddr}>')
else:
cmsg.add_header('From', fromaddr)
tpt_subject = self._config['templates']['verify-subject'].strip()
tpt_body = self._config['templates']['verify-body'].strip()
signature = self._config['templates']['signature'].strip()
subject = Template(tpt_subject).safe_substitute(
{'identity': jdata.get('identity')}
)
cmsg.add_header('Subject', subject)
name = jdata.get('name', 'Anonymous Llama')
cmsg.add_header('To', f'{name} <{identity}>')
cmsg.add_header('Message-Id', utils.make_msgid('b4-verify'))
vals = {
'name': name,
'myurl': self._config['main'].get('myurl'),
'challenge': cstr,
}
body = Template(tpt_body).safe_substitute(vals)
body += '\n-- \n'
body += Template(signature).safe_substitute(vals)
body += '\n'
cmsg.set_payload(body, charset='utf-8')
cmsg.set_charset('utf-8')
bdata = self.get_msg_as_bytes(cmsg, headers='encode')
destaddrs = [identity]
alwaysbcc = self._config['main'].get('alwayscc')
if alwaysbcc:
destaddrs += [x[1] for x in utils.getaddresses([alwaysbcc])]
logger.info('Sending challenge to %s', identity)
smtp.sendmail(fromaddr, [identity], bdata)
smtp.close()
self.send_success(resp, message=f'Challenge generated and sent to {identity}')
def validate_message(
self,
conn: Connection,
t_auth: sa.Table,
bdata: bytes,
verified: int = 1,
) -> Tuple[str, str, int]:
# Returns auth_id of the matching record
pm = patatt.PatattMessage(bdata)
if not pm.signed:
raise patatt.ValidationError('Message is not signed')
identity_selector_pairs = [
(
''
if (i := ds.get_field('i')) is None
else i.decode()
if isinstance(i, bytes)
else i,
'default'
if (s := ds.get_field('s')) is None
else s.decode()
if isinstance(s, bytes)
else s,
)
for ds in pm.get_sigs()
]
logger.debug('is_pairs=%s', identity_selector_pairs)
q = sa.select(
t_auth.c.identity, t_auth.c.selector, t_auth.c.auth_id, t_auth.c.pubkey
).where(
tuple_(t_auth.c.identity, t_auth.c.selector).in_(identity_selector_pairs),
t_auth.c.verified == verified,
)
rp = conn.execute(q)
rows = rp.fetchall()
if not rows:
logger.debug('Did not find a matching identity!')
raise patatt.NoKeyError('No match for this identity')
identity, selector, auth_id, pubkey = rows[0]
logger.debug(
'Found matching %s/%s with auth_id=%s', identity, selector, auth_id
)
pm.validate(identity, pubkey.encode())
return identity, selector, auth_id
def auth_verify(self, jdata: Mapping[str, JSON], resp: falcon.Response) -> None:
msg = jdata.get('msg')
if not isinstance(msg, str) or msg.find('\nverify:') < 0:
self.send_error(resp, message='Invalid verification message')
return
conn = self._engine.connect()
md = sa.MetaData()
t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
bdata = msg.encode()
try:
identity, selector, auth_id = self.validate_message(
conn, t_auth, bdata, verified=0
)
except Exception as ex:
self.send_error(resp, message='Signature validation failed: %s' % ex)
return
logger.debug(
'Message validation passed for %s/%s with auth_id=%s',
identity,
selector,
auth_id,
)
# Now compare the challenge to what we received
select_challenge = sa.select(t_auth.c.challenge).where(
t_auth.c.auth_id == auth_id
)
rp = conn.execute(select_challenge)
res = rp.fetchall()
challenge = res[0][0]
if msg.find(f'\nverify:{challenge}') < 0:
self.send_error(
resp,
message='Challenge verification for %s/%s did not match'
% (identity, selector),
)
return
logger.info(
'Successfully verified challenge for %s/%s with auth_id=%s',
identity,
selector,
auth_id,
)
update_auth = (
sa.update(t_auth)
.where(t_auth.c.auth_id == auth_id)
.values(challenge=None, verified=1)
)
conn.execute(update_auth)
conn.close()
self.send_success(
resp, message='Challenge verified for %s/%s' % (identity, selector)
)
def auth_delete(self, jdata: Mapping[str, JSON], resp: falcon.Response) -> None:
msg = jdata.get('msg')
if not isinstance(msg, str) or msg.find('\nauth-delete') < 0:
self.send_error(resp, message='Invalid key delete message')
return
conn = self._engine.connect()
md = sa.MetaData()
t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
bdata = msg.encode()
try:
identity, selector, auth_id = self.validate_message(conn, t_auth, bdata)
except Exception as ex:
self.send_error(resp, message='Signature validation failed: %s' % ex)
return
logger.info(
'Deleting record for %s/%s with auth_id=%s', identity, selector, auth_id
)
delete_auth = sa.delete(t_auth).where(t_auth.c.auth_id == auth_id)
conn.execute(delete_auth)
conn.close()
self.send_success(
resp, message='Record deleted for %s/%s' % (identity, selector)
)
def clean_header(self, hdrval: Optional[str]) -> str:
if hdrval is None:
return ''
decoded = ''
for hstr, hcs in email.header.decode_header(hdrval):
if hcs is None:
hcs = 'utf-8'
try:
decoded += hstr.decode(hcs, errors='replace')
except LookupError:
# Try as utf-u
decoded += hstr.decode('utf-8', errors='replace')
except (UnicodeDecodeError, AttributeError):
decoded += hstr
new_hdrval = re.sub(r'\n?\s+', ' ', decoded)
return new_hdrval.strip()
def format_addrs(self, pairs: List[Tuple[str, str]], clean: bool = True) -> str:
addrs = list()
for pair in pairs:
if pair[0] == pair[1]:
addrs.append(pair[1])
continue
if clean:
# Remove any quoted-printable header junk from the name
pair = (self.clean_header(pair[0]), pair[1])
# Work around https://github.com/python/cpython/issues/100900
if (
not pair[0].startswith('=?')
and not pair[0].startswith('"')
and qspecials.search(pair[0])
):
quoted = email.utils.quote(pair[0])
addrs.append(f'"{quoted}" <{pair[1]}>')
continue
addrs.append(email.utils.formataddr(pair))
return ', '.join(addrs)
def isascii(self, strval: str) -> bool:
try:
return strval.isascii()
except AttributeError:
return all([ord(c) < 128 for c in strval])
def wrap_header(
self,
hdr: Tuple[str, str],
width: int = 75,
nl: str = '\r\n',
transform: str = 'preserve',
) -> bytes:
hname, hval = hdr
if hname.lower() in ('to', 'cc', 'from', 'x-original-from'):
_parts = [f'{hname}: ']
first = True
for addr in email.utils.getaddresses([hval]):
if transform == 'encode' and not self.isascii(addr[0]):
addr = (
email.quoprimime.header_encode(
addr[0].encode(), charset='utf-8'
),
addr[1],
)
qp = self.format_addrs([addr], clean=False)
elif transform == 'decode':
qp = self.format_addrs([addr], clean=True)
else:
qp = self.format_addrs([addr], clean=False)
# See if there is enough room on the existing line
if first:
_parts[-1] += qp
first = False
continue
if len(_parts[-1] + ', ' + qp) > width:
_parts[-1] += ', '
_parts.append(qp)
continue
_parts[-1] += ', ' + qp
else:
if transform == 'decode' and hval.find('?=') >= 0:
hdata = f'{hname}: ' + self.clean_header(hval)
else:
hdata = f'{hname}: {hval}'
if transform != 'encode' or self.isascii(hval):
if len(hdata) <= width:
return hdata.encode()
# Use simple textwrap, with a small trick that ensures that long non-breakable
# strings don't show up on the next line from the bare header
hdata = hdata.replace(': ', ':_', 1)
wrapped = textwrap.wrap(
hdata,
break_long_words=False,
break_on_hyphens=False,
subsequent_indent=' ',
width=width,
)
return nl.join(wrapped).replace(':_', ': ', 1).encode()
qp = f'{hname}: ' + email.quoprimime.header_encode(
hval.encode(), charset='utf-8'
)
# is it longer than width?
if len(qp) <= width:
return qp.encode()
_parts = list()
while len(qp) > width:
wrapat = width - 2
if len(_parts):
# Also allow for the ' ' at the front on continuation lines
wrapat -= 1
# Make sure we don't break on a =XX escape sequence
while '=' in qp[wrapat - 2 : wrapat]:
wrapat -= 1
_parts.append(qp[:wrapat] + '?=')
qp = '=?utf-8?q?' + qp[wrapat:]
_parts.append(qp)
return f'{nl} '.join(_parts).encode()
def get_msg_as_bytes(
self, msg: email.message.Message, nl: str = '\r\n', headers: str = 'preserve'
) -> bytes:
bdata = b''
for hname, hval in msg.items():
bdata += (
self.wrap_header((hname, str(hval)), nl=nl, transform=headers)
+ nl.encode()
)
bdata += nl.encode()
payload = msg.get_payload(decode=True)
assert isinstance(payload, bytes)
for bline in payload.split(b'\n'):
bdata += re.sub(rb'[\r\n]*$', b'', bline) + nl.encode()
return bdata
def receive(
self,
jdata: Mapping[str, JSON],
resp: falcon.Response,
reflect: bool = False,
) -> None:
servicename = self._config['main'].get('myname')
if not servicename:
servicename = 'Web Endpoint'
umsgs = jdata.get('messages')
if not umsgs:
self.send_error(resp, message='Missing the messages array')
return
if not isinstance(umsgs, Sequence):
self.send_error(resp, message='Invalid messages array')
return
logger.debug('Received a request for %s messages', len(umsgs))
diffre = re.compile(
rb'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)',
flags=re.M | re.I,
)
diffstatre = re.compile(
rb'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I
)
msgs = list()
conn = self._engine.connect()
md = sa.MetaData()
t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
mustdest = self._config['main'].get('mustdest')
# First, validate all messages
seenid = identity = selector = validfrom = None
for umsg in umsgs:
if not isinstance(umsg, str):
self.send_error(resp, message='Invalid message payload')
return
bdata = umsg.encode()
try:
identity, selector, auth_id = self.validate_message(conn, t_auth, bdata)
except patatt.NoKeyError:
self.send_error(
resp, message='No matching key, please complete web auth first.'
)
return
except Exception as ex:
self.send_error(resp, message='Signature validation failed: %s' % ex)
return
# Make sure only a single auth_id is used within a receive session
if seenid is None:
seenid = auth_id
elif seenid != auth_id:
self.send_error(
resp,
message='We only support a single signing identity across patch series.',
)
return
msg = email.message_from_bytes(bdata, policy=emlpolicy)
logger.debug('Checking sanity on message: %s', msg.get('Subject'))
# Some quick sanity checking:
# - Subject must include [PATCH and have only bracketed prefixes before
# - Content-type may ONLY be text/plain
# - Has to include a diff or a diffstat
passes = True
subject = self.clean_header(msg.get('Subject', ''))
if not re.match(r'(\[[^]]+\])*\[PATCH', subject):
passes = False
if passes:
cte = msg.get_content_type()
if cte.lower() != 'text/plain':
passes = False
if passes:
payload = msg.get_payload(decode=True)
assert isinstance(payload, bytes)
if not (diffre.search(payload) or diffstatre.search(payload)):
passes = False
if not passes:
self.send_error(resp, message='This service only accepts patches')
return
# Make sure that From, Date, Subject, and Message-Id headers exist
if (
not msg.get('From')
or not msg.get('Date')
or not msg.get('Subject')
or not msg.get('Message-Id')
):
self.send_error(
resp, message='Message is missing some required headers.'
)
return
# Make sure that From: matches the validated identity. We allow + expansion,
# such that foo+listname@example.com is allowed for foo@example.com
froms = msg.get_all('from')
assert froms is not None
allfroms = utils.getaddresses(froms)
# Allow only a single From: address
if len(allfroms) > 1:
self.send_error(
resp, message='Message may only contain a single From: address.'
)
return
fromaddr = allfroms[0][1]
if validfrom != fromaddr:
ldparts = fromaddr.split('@')
if len(ldparts) != 2:
self.send_error(
resp, message=f'Invalid address in From: {fromaddr}'
)
return
lparts = ldparts[0].split('+', maxsplit=1)
toval = f'{lparts[0]}@{ldparts[1]}'
if toval != identity:
self.send_error(
resp,
message=f'From header invalid for identity {identity}: {fromaddr}',
)
return
# usually, all From: addresses will be the same, so use validfrom as a quick bypass
if validfrom is None:
validfrom = fromaddr
# Check that To/Cc have a mailing list we recognize
alldests = utils.getaddresses([str(x) for x in msg.get_all('to', [])])
alldests += utils.getaddresses([str(x) for x in msg.get_all('cc', [])])
destaddrs = {x[1] for x in alldests}
if mustdest:
matched = False
for destaddr in destaddrs:
if re.search(mustdest, destaddr, flags=re.I):
matched = True
break
if not matched:
self.send_error(
resp,
message='Destinations must include a mailing list we recognize.',
)
return
msg.add_header(
'X-Endpoint-Received',
f'by {servicename} for {identity}/{selector} with auth_id={auth_id}',
)
msgs.append((msg, destaddrs))
# Must be the case if the loop above runs at least once, and we check
# that umsgs is truthy (not empty).
assert identity is not None
conn.close()
# All signatures verified. Prepare messages for sending.
cfgdomains = self._config['main'].get('mydomains')
if cfgdomains is not None:
mydomains = [x.strip() for x in cfgdomains.split(',')]
else:
mydomains = list()
smtp, frompair = self.get_smtp()
bccaddrs = set()
_bcc = self._config['main'].get('alwaysbcc')
if _bcc:
bccaddrs.update([x[1] for x in utils.getaddresses([_bcc])])
repo_and_listid = None
if 'public-inbox' in self._config and not reflect:
public_inbox = self._config['public-inbox']
if (repo := public_inbox.get('repo')) is not None and os.path.isdir(repo):
if (listid := public_inbox.get('listid')) is not None:
repo_and_listid = (repo, listid)
if reflect:
logger.info('Reflecting %s messages back to %s', len(msgs), identity)
sentaction = 'Reflected'
else:
logger.info('Sending %s messages for %s/%s', len(msgs), identity, selector)
sentaction = 'Sent'
for msg, destaddrs in msgs:
subject = self.clean_header(msg.get('Subject'))
if repo_and_listid is not None:
repo, listid = repo_and_listid
pmsg = copy.deepcopy(msg)
if pmsg.get('List-Id'):
pmsg.replace_header('List-Id', listid)
else:
pmsg.add_header('List-Id', listid)
ezpi.add_rfc822(repo, pmsg)
logger.debug('Wrote %s to public-inbox at %s', subject, repo)
origfrom = msg.get('From')
assert origfrom is not None
origpair = utils.getaddresses([origfrom])[0]
assert origpair is not None
origaddr = origpair[1]
# Does it match one of our domains
mydomain = False
for _domain in mydomains:
if origaddr.endswith(f'@{_domain}'):
mydomain = True
break
if mydomain:
logger.debug('%s matches mydomain, no substitution required', origaddr)
fromaddr = origaddr
else:
logger.debug(
'%s does not match mydomain, substitution required', origaddr
)
# We can't just send this as-is due to DMARC policies. Therefore, we set
# Reply-To and X-Original-From.
fromaddr = frompair[1]
origname = origpair[0]
if not origname:
origname = origpair[1]
delim = self._config['main'].get('from-recipient-delimiter', '+')
if delim and '@' in fromaddr:
_flocal, _fdomain = fromaddr.split('@', maxsplit=1)
_forig = origaddr.replace('@', '.')
fromaddr = f'{_flocal}{delim}{_forig}@{_fdomain}'
msg.replace_header('From', f'{origname} via {servicename} <{fromaddr}>')
if msg.get('X-Original-From'):
msg.replace_header('X-Original-From', origfrom)
else:
msg.add_header('X-Original-From', origfrom)
if msg.get('Reply-To'):
msg.replace_header('Reply-To', f'<{origpair[1]}>')
else:
msg.add_header('Reply-To', f'<{origpair[1]}>')
body = msg.get_payload(decode=True)
assert isinstance(body, bytes)
# Add a From: header (if there isn't already one), but only if it's a patch
if diffre.search(body):
# Parse it as a message and see if we get a From: header
cmsg = email.message_from_bytes(body, policy=emlpolicy)
if cmsg.get('From') is None:
newbody = 'From: ' + self.clean_header(origfrom) + '\n'
if cmsg.get('Subject'):
newbody += (
'Subject: '
+ self.clean_header(cmsg.get('Subject'))
+ '\n'
)
if cmsg.get('Date'):
newbody += (
'Date: ' + self.clean_header(cmsg.get('Date')) + '\n'
)
newbody += '\n' + body.decode()
msg.set_payload(newbody, charset='utf-8')
# If we have non-ascii content in the new body, force CTE to 8bit
if msg['Content-Transfer-Encoding'] == '7bit' and not all(
ord(char) < 128 for char in newbody
):
msg.set_charset('utf-8')
msg.replace_header('Content-Transfer-Encoding', '8bit')
if bccaddrs:
destaddrs.update(bccaddrs)
if not self._config['main'].getboolean('dryrun'):
bdata = self.get_msg_as_bytes(msg, headers='encode')
if reflect:
smtp.sendmail(fromaddr, [identity], bdata)
else:
smtp.sendmail(fromaddr, list(destaddrs), bdata)
logger.info('%s: %s', sentaction, subject)
else:
logger.info('---DRYRUN MSG START---')
logger.info(msg)
logger.info('---DRYRUN MSG END---')
smtp.close()
if repo_and_listid is not None:
repo, _ = repo_and_listid
# run it once after writing all messages
logger.debug('Running public-inbox repo hook (if present)')
ezpi.run_hook(repo)
logger.info(
'%s %s messages for %s/%s', sentaction, len(msgs), identity, selector
)
self.send_success(
resp, message=f'{sentaction} {len(msgs)} messages for {identity}/{selector}'
)
def on_post(self, req: falcon.Request, resp: falcon.Response) -> None:
if not req.content_length:
resp.status = falcon.HTTP_500
resp.content_type = falcon.MEDIA_TEXT
resp.text = 'Payload required\n'
return
raw = req.bounded_stream.read()
try:
jdata: JSON = json.loads(raw)
except Exception:
resp.status = falcon.HTTP_500
resp.content_type = falcon.MEDIA_TEXT
resp.text = 'Failed to parse the request\n'
return
# TODO(https://github.com/astral-sh/ruff/pull/24458): remove this when ty understands conditional walrus.
action = None
if not isinstance(jdata, Mapping) or (action := jdata.get('action')) is None:
logger.critical('Action not set from %s', req.remote_addr)
return
logger.info('Action: %s; from: %s', action, req.remote_addr)
if action == 'auth-new':
self.auth_new(jdata, resp)
return
if action == 'auth-verify':
self.auth_verify(jdata, resp)
return
if action == 'auth-delete':
self.auth_delete(jdata, resp)
return
if action == 'receive':
self.receive(jdata, resp)
return
if action == 'reflect':
self.receive(jdata, resp, reflect=True)
return
resp.status = falcon.HTTP_500
resp.content_type = falcon.MEDIA_TEXT
resp.text = 'Unknown action: %s\n' % action
parser = ConfigParser(interpolation=ExtendedInterpolation())
cfgfile = os.getenv('CONFIG')
if not cfgfile or not os.path.exists(cfgfile):
sys.stderr.write('CONFIG env var is not set or is not valid')
sys.exit(1)
parser.read(cfgfile)
gpgbin = parser['main'].get('gpgbin')
if gpgbin:
patatt.GPGBIN = gpgbin
dburl = parser['main'].get('dburl')
if not dburl:
sys.stderr.write('main.dburl is not set in CONFIG')
sys.exit(1)
# By default, recycle db connections after 5 min
db_pool_recycle = parser['main'].getint('dbpoolrecycle', 300)
engine = sa.create_engine(dburl, pool_recycle=db_pool_recycle)
srl = SendReceiveListener(engine, parser)
app = falcon.App()
mp = os.getenv('MOUNTPOINT', '/_b4_submit')
app.add_route(mp, srl)
if __name__ == '__main__':
from wsgiref.simple_server import make_server
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
with make_server('', 8000, app) as httpd:
logger.info('Serving on port 8000...')
# Serve until process is killed
httpd.serve_forever()