The complete Python implementation for signature verification of Amazon Alexa

Our customer Veriset and its partners wanted to use Alexa to control a digital recipe book dizmo. No problem, we thought, integration is one of our strengths. Unfortunately it soon turned out that wanting to independently implement the Amazon signature verification is very cumbersome, due to the feedback process being very slow. It has been a though nut to crack, but once everything came together, reading the code is actually pretty self-explanatory.

To avoid that other developers have to go through the same ordeal, we have decided to make the code open source. So finally, a complete Python implementation for signature verification of Amazon Alexa service requests. We hope that it will serve all the other companies and people out there, integrating their services with Amazon Alexa. Happy coding!

The implementation is realized as a hook for the Falcon framework, which essentially is simply a function. Hence integrating it with other Python based frameworks should be pretty straight forward. Further, a local redis instance has been used for demonstration purposes, but in general any caching mechanism should do.

__author__ = 'Hasan Karahan <hkarahan@dizmo.com>'
from OpenSSL import crypto

import base64
import falcon
import logging
import os
import os.path
import pem
import re
import requests
import urllib.parse
class SignatureHook(object):

    def __call__(self, req, res, *args, **kwargs):

        scc_url = self.certificate_url(req=req)
        assert scc_url is not None
        url_chk = self.verify_url(scc_url)
        assert url_chk is not None
        scc_pem = self.cached_pem(scc_url)
        assert scc_pem is not None

        ca_cert = self.verify_pem(scc_pem, req=req)
        assert ca_cert is not None
        req_sig = self.verify_sig(ca_cert, req=req)
        assert req_sig is not None

    def certificate_url(self, req):

        certificate_url = req.get_header('SignatureCertChainUrl')
        if not certificate_url or certificate_url == '':
            raise falcon.HTTPBadRequest(
                description='no certificate URL header',
                headers={
                    'SignatureCertChainUrl': certificate_url
                })

        return certificate_url

    def signature(self, req):

        signature = req.get_header('Signature')
        if not signature or signature == '':
            raise falcon.HTTPBadRequest(
                description='no signature header',
                headers={
                    'Signature': signature
                })

        return signature

    def verify_url(self, url):

        uri = urllib.parse.urlparse(url)
        assert uri.scheme is not None
        assert uri.netloc is not None
        assert uri.path is not None

        if not URI.local(uri) \
           and uri.scheme not in ['https']:

            raise falcon.HTTPBadRequest(
                description='invalid protocol for URL',
                headers={
                    'SignatureCertChainUrl': url
                })

        if not URI.local(uri) \
           and uri.netloc not in [
               's3.amazonaws.com', 's3.amazonaws.com:443']:

            raise falcon.HTTPBadRequest(
                description='invalid netloc for URL', headers={
                    'SignatureCertChainUrl': url
                })

        if not uri.path.startswith('/echo.api'):

            raise falcon.HTTPBadRequest(
                description='invalid path for URL', headers={
                    'SignatureCertChainUrl': url
                })

        return True

    def cached_pem(self, url):

        if not rdb.connection.exists(url):

            uri = urllib.parse.urlparse(url)
            if URI.local(uri):
                with open('./static' + uri.path) as file:
                    rdb.connection.set(url, file.read())
            else:
                res = requests.get(url)
                if res.status_code != 200:
                    raise falcon.HTTPBadRequest(
                        description=
                            'invalid status for URL',
                        headers={
                            'SignatureCertChainUrl': url
                        })

                rdb.connection.set(url, res.text)

        cached_pem = rdb.connection.get(url) ## pem
        assert cached_pem and cached_pem != ''
        return cached_pem

    def verify_pem(self, pem_text, req, ca_store=None):

        if ca_store is None:
            ca_store = self.ca_store()

        ca_certs, certs = [], pem.parse(pem_text)
        for i, cert in enumerate(certs):
            ca_certs.append(crypto.load_certificate(
                crypto.FILETYPE_PEM, cert.as_bytes()))

            try:
                crypto.X509StoreContext(
                       ca_store, ca_certs[i]
                ).verify_certificate()

            except crypto.X509StoreContextError as ex:
                logging.error(
                    'issuer:{} and subject:{}'.format(
                        ca_certs[i].get_subject(),
                        ca_certs[i].get_issuer()
                    )
                )
                logging.error(ex)
                ca_certs.pop()
                break

        if len(ca_certs) != len(certs):
            raise falcon.HTTPBadRequest(
                description='certificate verification failed', headers={
                    'SignatureCertChainUrl': 
                         self.certificate_url(req),
                    'Signature':
                         self.signature(req)
                })
        else:
            return ca_certs[0]

    def verify_sig(self, ca_cert, req):

        signature = self.signature(req)
        assert signature is not None
        signature_bin = base64.b64decode(signature)
        assert signature_bin is not None

        try:
            crypto.verify(ca_cert, signature_bin, req.body, 'sha1')

        except crypto.Error as ex:
            logging.error(ex)

            raise falcon.HTTPBadRequest(
                description='signature verification failed', headers={
                    'SignatureCertChainUrl': self.certificate_url(req),
                    'Signature': signature
                })

        return signature

    @classmethod
    def ca_store(cls):

        crt_paths = os.environ.get('CRT_PATH', '/etc/ssl')
        crt_flush = os.environ.get('CRT_FLUSH')

        if hasattr(cls, 'store') and not crt_flush:
            return cls.store

        else:
            cls.store = crypto.X509Store()
            suffix = r'(.+)\.(crt|pem)$'

            for crt_path in crt_paths.split(':'):
                for root, ds, fs in os.walk(crt_path):
                    for p in filter(
                        lambda f: re.match(suffix, f), fs):
                        for cert in pem.parse_file(
                            os.path.join(root, p)
                        ):
                            try:
                                cls.store.add_cert(
                                    crypto.load_certificate(
                                        crypto.FILETYPE_PEM,
                                        cert.as_bytes()))

                            except crypto.Error:
                                pass

            return cls.store
class URI(object):

    @staticmethod
    def local(uri):
        return uri.netloc.split(':')[0] in ['localhost', '127.0.0.1']
import redis

class RDB(object):

    def __init__(self, url, db='0'):

        self.connection = redis.StrictRedis.from_url(url, db=db)

rdb = RDB(url='redis://localhost:6379')

See also repository at: github.com/amazon-alexa-python

A version of this blog post appeared in calaganne.blogspot.com on March 29, 2018 with the headline ‘Amazon Alexa: request signature verification’

Leave a Reply

Your email address will not be published.