# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import abc
from collections import namedtuple
from ._error import (
ThirdPartyCaveatCheckFailed,
CaveatNotRecognizedError,
VerificationError,
)
from ._codec import decode_caveat
from ._macaroon import (
Macaroon,
ThirdPartyLocator,
)
from ._versions import VERSION_2
from ._third_party import ThirdPartyCaveatInfo
import macaroonbakery.checkers as checkers
emptyContext = checkers.AuthContext()
def discharge_all(m, get_discharge, local_key=None):
'''Gathers discharge macaroons for all the third party caveats in m
(and any subsequent caveats required by those) using get_discharge to
acquire each discharge macaroon.
The local_key parameter may optionally hold the key of the client, in
which case it will be used to discharge any third party caveats with the
special location "local". In this case, the caveat itself must be "true".
This can be used be a server to ask a client to prove ownership of the
private key.
It returns a list of macaroon with m as the first element, followed by all
the discharge macaroons.
All the discharge macaroons will be bound to the primary macaroon.
The get_discharge function is passed a context (AuthContext),
the caveat(pymacaroons.Caveat) to be discharged and encrypted_caveat (bytes) will be
passed the external caveat payload found in m, if any.
It should return a bakery.Macaroon object holding the discharge
macaroon for the third party caveat.
'''
primary = m.macaroon
discharges = [primary]
# cav holds the macaroon caveat that needs discharge.
# encrypted_caveat (bytes) holds encrypted caveat if it was held
# externally.
_NeedCaveat = namedtuple('_NeedCaveat', 'cav encrypted_caveat')
need = []
def add_caveats(m):
for cav in m.macaroon.caveats:
if cav.location is None or cav.location == '':
continue
encrypted_caveat = m.caveat_data.get(cav.caveat_id, None)
need.append(
_NeedCaveat(cav=cav,
encrypted_caveat=encrypted_caveat))
add_caveats(m)
while len(need) > 0:
cav = need[0]
need = need[1:]
if cav.cav.location == 'local':
if local_key is None:
raise ThirdPartyCaveatCheckFailed(
'found local third party caveat but no private key provided',
)
# TODO use a small caveat id.
dm = discharge(ctx=emptyContext,
key=local_key,
checker=_LocalDischargeChecker(),
caveat=cav.encrypted_caveat,
id=cav.cav.caveat_id_bytes,
locator=_EmptyLocator())
else:
dm = get_discharge(cav.cav, cav.encrypted_caveat)
# It doesn't matter that we're invalidating dm here because we're
# about to throw it away.
discharge_m = dm.macaroon
m = primary.prepare_for_request(discharge_m)
discharges.append(m)
add_caveats(dm)
return discharges
class ThirdPartyCaveatChecker(object):
''' Defines an abstract class that's used to check third party caveats.
'''
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def check_third_party_caveat(self, ctx, info):
''' If the caveat is valid, it returns optionally a slice of
extra caveats that will be added to the discharge macaroon.
If the caveat kind was not recognised, the checker should
raise a CaveatNotRecognized exception; if the check failed,
it should raise a ThirdPartyCaveatCheckFailed exception.
:param ctx (AuthContext)
:param info (ThirdPartyCaveatInfo) holds the information decoded from
a third party caveat id
:return: An array of extra caveats to be added to the discharge
macaroon.
'''
raise NotImplementedError('check_third_party_caveat method must be '
'defined in subclass')
class _LocalDischargeChecker(ThirdPartyCaveatChecker):
def check_third_party_caveat(self, ctx, info):
if info.condition != 'true':
raise CaveatNotRecognizedError()
return []
def discharge(ctx, id, caveat, key, checker, locator):
''' Creates a macaroon to discharge a third party caveat.
The given parameters specify the caveat and how it should be checked.
The condition implicit in the caveat is checked for validity using checker.
If it is valid, a new macaroon is returned which discharges the caveat.
The macaroon is created with a version derived from the version that was
used to encode the id.
:param id: (bytes) holds the id to give to the discharge macaroon.
If Caveat is empty, then the id also holds the encrypted third party
caveat.
:param caveat: (bytes) holds the encrypted third party caveat.
If this is None, id will be used.
:param key: holds the key to use to decrypt the third party caveat
information and to encrypt any additional third party caveats returned by
the caveat checker.
:param checker: used to check the third party caveat, and may also return
further caveats to be added to the discharge macaroon.
:param locator: used to information on third parties referred to by third
party caveats returned by the Checker.
'''
caveat_id_prefix = []
if caveat is None:
# The caveat information is encoded in the id itself.
caveat = id
else:
# We've been given an explicit id, so when extra third party
# caveats are added, use that id as the prefix
# for any more ids.
caveat_id_prefix = id
cav_info = decode_caveat(key, caveat)
cav_info = ThirdPartyCaveatInfo(
condition=cav_info.condition,
first_party_public_key=cav_info.first_party_public_key,
third_party_key_pair=cav_info.third_party_key_pair,
root_key=cav_info.root_key,
caveat=cav_info.caveat,
version=cav_info.version,
id=id,
namespace=cav_info.namespace
)
# Note that we don't check the error - we allow the
# third party checker to see even caveats that we can't
# understand.
try:
cond, arg = checkers.parse_caveat(cav_info.condition)
except ValueError as exc:
raise VerificationError(exc.args[0])
if cond == checkers.COND_NEED_DECLARED:
cav_info = cav_info._replace(condition=arg)
caveats = _check_need_declared(ctx, cav_info, checker)
else:
caveats = checker.check_third_party_caveat(ctx, cav_info)
# Note that the discharge macaroon does not need to
# be stored persistently. Indeed, it would be a problem if
# we did, because then the macaroon could potentially be used
# for normal authorization with the third party.
m = Macaroon(
cav_info.root_key,
id,
'',
cav_info.version,
cav_info.namespace,
)
m._caveat_id_prefix = caveat_id_prefix
if caveats is not None:
for cav in caveats:
m.add_caveat(cav, key, locator)
return m
def _check_need_declared(ctx, cav_info, checker):
arg = cav_info.condition
i = arg.find(' ')
if i <= 0:
raise VerificationError(
'need-declared caveat requires an argument, got %q'.format(arg),
)
need_declared = arg[0:i].split(',')
for d in need_declared:
if d == '':
raise VerificationError('need-declared caveat with empty required attribute')
if len(need_declared) == 0:
raise VerificationError('need-declared caveat with no required attributes')
cav_info = cav_info._replace(condition=arg[i + 1:])
caveats = checker.check_third_party_caveat(ctx, cav_info)
declared = {}
for cav in caveats:
if cav.location is not None and cav.location != '':
continue
# Note that we ignore the error. We allow the service to
# generate caveats that we don't understand here.
try:
cond, arg = checkers.parse_caveat(cav.condition)
except ValueError:
continue
if cond != checkers.COND_DECLARED:
continue
parts = arg.split()
if len(parts) != 2:
raise VerificationError('declared caveat has no value')
declared[parts[0]] = True
# Add empty declarations for everything mentioned in need-declared
# that was not actually declared.
for d in need_declared:
if not declared.get(d, False):
caveats.append(checkers.declared_caveat(d, ''))
return caveats
class _EmptyLocator(ThirdPartyLocator):
def third_party_info(self, loc):
return None
def local_third_party_caveat(key, version):
''' Returns a third-party caveat that, when added to a macaroon with
add_caveat, results in a caveat with the location "local", encrypted with
the given PublicKey.
This can be automatically discharged by discharge_all passing a local key.
'''
if version >= VERSION_2:
loc = 'local {} {}'.format(version, key)
else:
loc = 'local {}'.format(key)
return checkers.Caveat(location=loc, condition='')