# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import abc
from collections import namedtuple
from datetime import datetime
import pyrfc3339
from ._caveat import parse_caveat
from ._conditions import (
COND_ALLOW,
COND_DECLARED,
COND_DENY,
COND_ERROR,
COND_TIME_BEFORE,
STD_NAMESPACE,
)
from ._declared import DECLARED_KEY
from ._namespace import Namespace
from ._operation import OP_KEY
from ._time import TIME_KEY
from ._utils import condition_with_prefix
class RegisterError(Exception):
'''Raised when a condition cannot be registered with a Checker.'''
pass
class FirstPartyCaveatChecker(object):
'''Used to check first party caveats for validity with respect to
information in the provided context.
If the caveat kind was not recognised, the checker should return
ErrCaveatNotRecognized.
'''
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def check_first_party_caveat(self, ctx, caveat):
''' Checks that the given caveat condition is valid with respect to
the given context information.
:param ctx: an Auth context
:param caveat a string
'''
raise NotImplementedError('check_first_party_caveat method must be '
'defined in subclass')
def namespace(self):
''' Returns the namespace associated with the caveat checker.
'''
raise NotImplementedError('namespace method must be '
'defined in subclass')
class Checker(FirstPartyCaveatChecker):
''' Holds a set of checkers for first party caveats.
'''
def __init__(self, namespace=None, include_std_checkers=True):
if namespace is None:
namespace = Namespace()
self._namespace = namespace
self._checkers = {}
if include_std_checkers:
self.register_std()
def check_first_party_caveat(self, ctx, cav):
''' Checks the caveat against all registered caveat conditions.
:return: error message string if any or None
'''
try:
cond, arg = parse_caveat(cav)
except ValueError as ex:
# If we can't parse it, perhaps it's in some other format,
# return a not-recognised error.
return 'cannot parse caveat "{}": {}'.format(cav, ex.args[0])
checker = self._checkers.get(cond)
if checker is None:
return 'caveat "{}" not satisfied: caveat not recognized'.format(
cav)
err = checker.check(ctx, cond, arg)
if err is not None:
return 'caveat "{}" not satisfied: {}'.format(cav, err)
def namespace(self):
''' Returns the namespace associated with the Checker.
'''
return self._namespace
def info(self):
''' Returns information on all the registered checkers.
Sorted by namespace and then name
:returns a list of CheckerInfo
'''
return sorted(self._checkers.values(), key=lambda x: (x.ns, x.name))
def register(self, cond, uri, check):
''' Registers the given condition(string) in the given namespace
uri (string) to be checked with the given check function.
The check function checks a caveat by passing an auth context, a cond
parameter(string) that holds the caveat condition including any
namespace prefix and an arg parameter(string) that hold any additional
caveat argument text. It will return any error as string otherwise
None.
It will raise a ValueError if the namespace is not registered or
if the condition has already been registered.
'''
if check is None:
raise RegisterError(
'no check function registered for namespace {} when '
'registering condition {}'.format(uri, cond))
prefix = self._namespace.resolve(uri)
if prefix is None:
raise RegisterError('no prefix registered for namespace {} when '
'registering condition {}'.format(uri, cond))
if prefix == '' and cond.find(':') >= 0:
raise RegisterError(
'caveat condition {} in namespace {} contains a colon but its'
' prefix is empty'.format(cond, uri))
full_cond = condition_with_prefix(prefix, cond)
info = self._checkers.get(full_cond)
if info is not None:
raise RegisterError(
'checker for {} (namespace {}) already registered in '
'namespace {}'.format(full_cond, uri, info.ns))
self._checkers[full_cond] = CheckerInfo(
check=check,
ns=uri,
name=cond,
prefix=prefix)
def register_std(self):
''' Registers all the standard checkers in the given checker.
If not present already, the standard checkers schema (STD_NAMESPACE) is
added to the checker's namespace with an empty prefix.
'''
self._namespace.register(STD_NAMESPACE, '')
for cond in _ALL_CHECKERS:
self.register(cond, STD_NAMESPACE, _ALL_CHECKERS[cond])
class CheckerInfo(namedtuple('CheckInfo', 'prefix name ns check')):
'''CheckerInfo holds information on a registered checker.
'''
__slots__ = ()
def __new__(cls, prefix, name, ns, check=None):
'''
:param check holds the actual checker function which takes an auth
context and a condition and arg string as arguments.
:param prefix holds the prefix for the checker condition as string.
:param name holds the name of the checker condition as string.
:param ns holds the namespace URI for the checker's schema as
Namespace.
'''
return super(CheckerInfo, cls).__new__(cls, prefix, name, ns, check)
def _check_time_before(ctx, cond, arg):
clock = ctx.get(TIME_KEY)
if clock is None:
now = datetime.utcnow()
else:
now = clock.utcnow()
try:
# Note: pyrfc3339 returns a datetime with a timezone, which
# we need to remove before we can compare it with the naive
# datetime object returned by datetime.utcnow.
expiry = pyrfc3339.parse(arg, utc=True).replace(tzinfo=None)
if now >= expiry:
return 'macaroon has expired'
except ValueError:
return 'cannot parse "{}" as RFC 3339'.format(arg)
return None
def _check_declared(ctx, cond, arg):
parts = arg.split(' ', 1)
if len(parts) != 2:
return 'declared caveat has no value'
attrs = ctx.get(DECLARED_KEY, {})
val = attrs.get(parts[0])
if val is None:
return 'got {}=null, expected "{}"'.format(parts[0], parts[1])
if val != parts[1]:
return 'got {}="{}", expected "{}"'.format(parts[0], val, parts[1])
return None
def _check_error(ctx, cond, arg):
return 'bad caveat'
def _check_allow(ctx, cond, arg):
return _check_operations(ctx, True, arg)
def _check_deny(ctx, cond, arg):
return _check_operations(ctx, False, arg)
def _check_operations(ctx, need_ops, arg):
''' Checks an allow or a deny caveat. The need_ops parameter specifies
whether we require all the operations in the caveat to be declared in
the context.
'''
ctx_ops = ctx.get(OP_KEY, [])
if len(ctx_ops) == 0:
if need_ops:
f = arg.split()
if len(f) == 0:
return 'no operations allowed'
return '{} not allowed'.format(f[0])
return None
fields = arg.split()
for op in ctx_ops:
err = _check_op(op, need_ops, fields)
if err is not None:
return err
return None
def _check_op(ctx_op, need_op, fields):
found = False
for op in fields:
if op == ctx_op:
found = True
break
if found != need_op:
return '{} not allowed'.format(ctx_op)
return None
_ALL_CHECKERS = {
COND_TIME_BEFORE: _check_time_before,
COND_DECLARED: _check_declared,
COND_ERROR: _check_error,
COND_ALLOW: _check_allow,
COND_DENY: _check_deny,
}