__author__ = 'Radical.Utils Development Team (Andre Merzky)'
__copyright__ = 'Copyright 2013, RADICAL@Rutgers'
__license__ = 'MIT'
import sys
import os
from functools import reduce
# ------------------------------------------------------------------------------
# This file (ab)uses a number of python features which are disliked by pylint
# pylint: disable=E1120,W1505,W0613
# ------------------------------------------------------------------------------
#
# Method call parameters/return value type checking decorators.
# (c) 2006-2007, Dmitry Dvoinikov <dmitry@targeted.org>
# Distributed under BSD license.
#
# code adjustements by 'The SAGA Project', 2013
# See also
# http://code.activestate.com/recipes/\
# 577065-type-checking-function-overloading-decorator/
#
#
# Samples:
#
# from typecheck import *
#
# @takes(int, str) # takes int, str, upon a problem throws TypeError
# @returns(int) # returns int, upon a problem throws TypeError
# def foo(i, s):
# return i + len(s)
#
# @takes((int, long), by_regex('^[0-9]+$')) # int or long, numerical string
# def foo(i, s, anything): # third parameter is not checked
# ...
#
# @takes(int, int, foo = int, bar = optional(int)) # keyword foo must be int
# def foo(a, b, **kwargs): # bar may be int or missing
# ...
#
# Note: @takes for positional arguments, @takes for keyword arguments and
# @returns all support the same checker syntax, for example for the following
# declaration
#
# @takes(C)
# def foo(x):
# ...
#
# then C may be one of the simple checkers:
#
# --------- C --------- ------------- semantics -------------
# typename ==> ok if x is is an instance of typename
# 'typename' ==> ok if x is is an instance of typename
# with_attr('a', 'b') ==> ok if x has specific attributes
# some_callable ==> ok if some_callable(x) is True
# one_of(1, '2') ==> ok if x is one of the literal values
# by_regex('^foo$') ==> ok if x is a matching basestring
# nothing ==> ok if x is None
# anything ==> always ok
#
# simple checkers can further be combined with OR semantics using tuples:
#
# --------- C --------- ------------- semantics -------------
# (checker1, checker2) ==> ok if x conforms with either checker
#
# be optional:
#
# --------- C --------- ------------- semantics -------------
# optional (checker) ==> ok if x is checker-conformant or None
#
# or nested recursively into one of the following checkers
#
# --------- C --------- ------------- semantics -------------
# list_of (checker) ==> ok if x is a list of checker-conformant values
# tuple_of (checker) ==> ok if x is a tuple of checker-conformant values
# set_of (checker) ==> ok if x is a set of checker-conformant values
# dict_of (key_checker,
# value_checker) ==> ok if x is a dict mapping key_checker-
# conformant keys to value_checker-conformant values
#
# More samples:
#
# class foo(object):
# @takes('foo', optional(int)) # foo, maybe int, but foo is yet incomplete
# def __init__(self, i = None): # and is thus specified by name
# ...
# @takes('foo', int) # foo, and int if presents in args,
# def bar(self, *args): # if args is empty, the check passes ok
# ...
# @takes('foo')
# @returns(object) # returns foo which is fine, because
# def biz(self): # foo is an object
# return self
# @classmethod # classmethod's and staticmethod's
# @takes(type) # go same way
# def baz(cls):
# ...
#
# @takes(int)
# @returns(optional('int', foo)) # returns either int, foo or NoneType
# def bar(i): # 'int' (rather than just int) is for fun
# if i > 0:
# return i
# elif i == 0:
# return foo() # otherwise returns NoneType
#
# @takes(callable) # built-in funcs are treated as predicates
# @returns(lambda x: x == 123) # and so do user-defined funcs or lambdas
# def exec(f, *args, **kwargs):
# return f(*args, **kwargs)
#
# assert execute(execute, execute, execute, lambda x: x, 123) == 123
#
# def readable(x): # user-defined type-checking predicate
# return hasattr(x, 'read')
#
# anything is an alias for predicate lambda: True,
# nothing is an alias for NoneType, as in:
#
# @takes(callable, readable,
# optional(anything),
# optional(int))
# @returns(nothing)
# def foo(f, r, x = None, i = None):
# ...
#
# # another way of protocol checking
# @takes(with_attr('read', 'write'))
# def foo(pipe):
# ...
#
# @takes(list_of(int)) # list of ints
# def foo(x):
# print x[0]
#
# @takes(tuple_of(callable)) # tuple of callables
# def foo(x):
# print x[0]()
#
# # dict mapping strs to lists of int
# @takes(dict_of(str, list_of(int)))
# def foo(x):
# print sum(x['foo'])
#
# @takes(by_regex('^[0-9]{1,8}$')) # integer-as-a-string regex
# def foo(x):
# i = int(x)
#
# @takes(one_of(1, 2)) # must be equal to either one
# def set_version(version):
# ...
#
# The(3 times longer) source code with self-tests is available from:
# http://www.targeted.org/python/recipes/typecheck.py
#
# ------------------------------------------------------------------------------
__all__ = ['takes', 'returns', 'optional', 'nothing',
'anything', 'list_of', 'tuple_of', 'dict_of',
'by_regex', 'with_attr', 'one_of', 'set_of' ]
no_check = True # set this to True to turn all checks off
no_return_check = True # set this to True to turn return value cchecks off
if 'RADICAL_DEBUG_SIG' in os.environ:
no_check = False # set this to True to turn all checks off
# ------------------------------------------------------------------------------
from traceback import extract_stack
from re import compile as regex
from functools import wraps
from inspect import getfullargspec, isclass
# NOTE: Python 3: type NoneType does not exist anymore. Test against None.
# from types import NoneType
NoneType = type(None)
# ------------------------------------------------------------------------------
#
def base_names(C):
'''
Returns list of base class names for a given class
'''
return [x.__name__ for x in C.__mro__]
# ------------------------------------------------------------------------------
#
def type_name(v):
'''
Returns the name of the passed value's type
'''
return type(v).__name__
# ------------------------------------------------------------------------------
#
class Checker(object):
registered = list() # a list of registered descendant class factories
def __init__(self, reference):
self.reference = reference
self.spectype = reference
def check(self, value): # abstract
pass
@staticmethod
def create(value): # static factory method
for f, t in Checker.registered:
if f(value):
return t(value)
# ------------------------------------------------------------------------------
#
class TypeChecker(Checker):
def check(self, value):
return isinstance(value, self.reference)
Checker.registered.append((isclass, TypeChecker))
nothing = NoneType
# ------------------------------------------------------------------------------
#
class StrChecker(Checker):
def check(self, value):
value_base_names = base_names(type(value))
return self.reference in value_base_names or \
'instance' in value_base_names
Checker.registered.append((lambda x: isinstance(x, str), StrChecker))
# ------------------------------------------------------------------------------
#
class TupleChecker(Checker):
def __init__(self, reference):
self.reference = list(map(Checker.create, reference))
self.spectype = reference
def check(self, value):
return reduce(lambda r, c: r or c.check(value), self.reference, False)
Checker.registered.append((lambda x: isinstance(x, tuple) and not
[y for y in x if Checker.create(y) is None],
TupleChecker))
[docs]def optional(*args):
return args + (NoneType, )
# ------------------------------------------------------------------------------
#
class CallableChecker(Checker):
def check(self, value):
return self.reference(value)
# note that the callable check is the most relaxed of all, therefore it should
# be registered last, after all the more specific cases have been registered
Checker.registered.append((callable, CallableChecker))
[docs]def anything(*args):
return True
# ------------------------------------------------------------------------------
#
class ListOfChecker(Checker):
def __init__(self, reference):
self.reference = Checker.create(reference)
def check(self, value):
return isinstance(value, list) and \
not [e for e in value if not self.reference.check(e)]
[docs]def list_of(*args):
return ListOfChecker(*args).check
# ------------------------------------------------------------------------------
#
class TupleOfChecker(Checker):
def __init__(self, reference):
self.reference = Checker.create(reference)
def check(self, value):
return isinstance(value, tuple) and \
not [e for e in value if not self.reference.check(e)]
[docs]def tuple_of(*args):
return TupleOfChecker(*args).check
# ------------------------------------------------------------------------------
#
class SetOfChecker(Checker):
def __init__(self, reference):
self.reference = Checker.create(reference)
def check(self, value):
return isinstance(value, set) and \
not [e for e in value if not self.reference.check(e)]
[docs]def set_of(*args):
return SetOfChecker(*args).check
# ------------------------------------------------------------------------------
#
class DictOfChecker(Checker):
def __init__(self, key_reference, value_reference):
self.key_reference = Checker.create(key_reference)
self.value_reference = Checker.create(value_reference)
def check(self, value):
return isinstance(value, dict) and \
not [e for e in value.keys()
if not self.key_reference.check(e)] and \
not [e for e in value.values()
if not self.value_reference.check(e)]
[docs]def dict_of(*args):
return DictOfChecker(*args).check
# ------------------------------------------------------------------------------
#
class RegexChecker(Checker):
def __init__(self, reference):
self.reference = regex(reference)
def check(self, value):
return isinstance(value, str) and self.reference.match(value)
[docs]def by_regex(*args):
return RegexChecker(*args).check
# ------------------------------------------------------------------------------
#
class AttrChecker(Checker):
def __init__(self, *attrs):
self.attrs = attrs
def check(self, value):
return reduce(lambda r, c: r and c,
[hasattr(value, a) for a in self.attrs], True)
[docs]def with_attr(*args):
return AttrChecker(*args).check
# ------------------------------------------------------------------------------
#
class OneOfChecker(Checker):
def __init__(self, *values):
self.values = values
def check(self, value):
return value in self.values
[docs]def one_of(*args):
return OneOfChecker(*args).check
# ------------------------------------------------------------------------------
#
def create_return_exception(method, spectype, result):
stack = extract_stack()
for f in stack:
if 'utils/signatures.py' in f[0]:
break
frame = f
msg = '\nSignature Mismatch\n'
msg += ' in function : %s\n' % (frame[2])
msg += ' in file : %s +%s\n' % (frame[0], frame[1])
msg += ' on line : %s\n' % (frame[3])
msg += ' method : %s\n' % (method.__name__)
msg += ' returned type : %s\n' % (type(result).__name__)
msg += ' instead of : %s\n' % (spectype.__name__)
msg += ' This is an internal error!'
return TypeError(msg)
# ------------------------------------------------------------------------------
#
def create_type_exception(method, arg0, i, arg, spectype, kwname=''):
narg = 0
if arg0 and isinstance(arg0, object):
narg = i
else:
narg = i + 1
stack = extract_stack()
for f in stack:
if 'utils/signatures.py' in f[0]:
break
frame = f
if isinstance(spectype,tuple) or \
isinstance(spectype,list ):
expected = list()
for t in spectype:
if isinstance(t, type):
expected.append(t.__name__)
else:
expected.append(str(t))
elif isinstance(spectype,type):
expected = spectype.__name__
else:
expected = str(spectype)
msg = '\nSignature Mismatch\n'
msg += ' in function : %s\n' % (frame[2])
msg += ' in file : %s +%s\n' % (frame[0], frame[1])
msg += ' on line : %s\n' % (frame[3])
msg += ' method : %s\n' % (method.__name__)
if not kwname :
msg += ' argument : #%s\n' % (narg)
else :
msg += ' parameter : %s' % (kwname)
msg += ' has incorrect type : %s\n' % (type(arg).__name__)
msg += ' instead of : %s' % (str(expected))
return TypeError(msg)
# ------------------------------------------------------------------------------
#
[docs]def takes(*args, **kwargs):
'''
Method signature checking decorator
'''
if no_check: # no type check is performed, return decorated method itself
def takes_proxy(method):
return method
else:
# convert decorator arguments into a list of checkers
checkers = list()
for i, arg in enumerate(args):
checker = Checker.create(arg)
if checker is None:
raise TypeError('@takes got parameter %d of unsupported '
'type %s' % (i + 1, type_name(arg)))
checkers.append(checker)
kwcheckers = dict()
for kwname, kwarg in kwargs.items():
checker = Checker.create(kwarg)
if checker is None:
raise TypeError('@takes got parameter %s of unsupported '
'type %s' % (kwname, type_name(kwarg)))
kwcheckers[kwname] = checker
def takes_proxy(method):
method_args, method_defaults = getfullargspec(method)[0::3]
@wraps(method)
def signature_check(*pargs, **pkwargs):
# append the default parameters
if method_defaults is not None and len(method_defaults) > 0 \
and len(method_args) - len(method_defaults) <= \
len(pargs) < len(method_args):
pargs += method_defaults[len(pargs) - len(method_args):]
# check the types of the actual call parameters
for i,(arg, checker) in enumerate(zip(pargs, checkers)):
if not checker.check(arg):
# print 'Checker.spectype %s' % checker.spectype
excpt = create_type_exception(method, pargs[0], i,
arg, checker.spectype)
raise excpt
for kwname, checker in kwcheckers.items():
if not checker.check(pkwargs.get(kwname, None)):
# print 'checker.spectype %s' % checker.spectype
# pylint: disable=W0631
excpt = create_type_exception(method, pargs[0], i,
arg, checker.spectype,
kwname)
raise excpt
try:
return method(*pargs, **pkwargs)
except Exception as e:
# remove signature decorator from exception call stack
et, ei, tb = sys.exc_info()
raise et(ei).with_traceback(tb.tb_next) from e
signature_check.__name__ = method.__name__
return signature_check
return takes_proxy
# ------------------------------------------------------------------------------
#
[docs]def returns(sometype):
'''
Return type checking decorator
'''
if no_check:
def returns_proxy(method):
return method
else:
# convert decorator argument into a checker
checker = Checker.create(sometype)
if checker is None:
raise TypeError('@returns decorator got parameter of unsupported '
'type %s' % type_name(sometype))
def returns_proxy(method):
@wraps(method)
def signature_check(*args, **kwargs):
try:
result = method(*args, **kwargs)
except Exception as e:
# remove signature decorator from exception call stack
et, ei, tb = sys.exc_info()
raise et(ei).with_traceback(tb.tb_next) from e
if not checker.check(result):
if not no_return_check:
# print 'Checker.spectype %s' % checker.spectype
excpt = create_return_exception(method,
checker.spectype, result)
raise excpt
return result
signature_check.__name__ = method.__name__
return signature_check
return returns_proxy
# ------------------------------------------------------------------------------