Source code for radical.utils.dict_mixin


__author__    = 'Radical.Utils Development Team (Andre Merzky)'
__copyright__ = 'Copyright 2013, RADICAL@Rutgers'
__license__   = 'MIT'

import re
import fnmatch

PRESERVE   = 'preserve'
OVERWRITE  = 'overwrite'


# see http://code.activestate.com/recipes/117236-dictionary-mixin-framework/

# ------------------------------------------------------------------------------
#
[docs]class DictMixin(dict): ''' Mixin defining all dictionary methods for classes that already have a minimum dictionary interface including getitem, setitem, delitem, and keys. Based on those methods, the mixin provides the remaining interface functionality to make the class look like a fully compliant dictionary. ''' # # -------------------------------------------------------------------------- # # # # mascerade as dict for `isinstance` calls # # # @property # def __class__(self): # return dict # -------------------------------------------------------------------------- # # first level definitions should be implemented by the sub-class # def __getitem__(self, key): raise NotImplementedError def __setitem__(self, key, value): raise NotImplementedError def __delitem__(self, key): raise NotImplementedError
[docs] def keys(self): raise NotImplementedError
# -------------------------------------------------------------------------- # # second level definitions which assume only getitem and keys #
[docs] def has_key(self, key): return key in list(self.keys())
def __iter__(self): for k in list(self.keys()): yield k # -------------------------------------------------------------------------- # # third level uses second level instead of first # def __contains__(self, key): return self.has_key(key) # noqa: W601
[docs] def items(self): for k in self: yield k, self[k]
# -------------------------------------------------------------------------- # # fourth level uses second and third levels instead of first #
[docs] def iterkeys(self): return self.__iter__()
[docs] def itervalues(self): for _, v in self.items(): yield v
[docs] def clear(self): for key in list(self.keys()): del self[key]
[docs] def setdefault(self, key, default): if key not in self: self[key] = default return default return self[key]
[docs] def popitem(self): key = list(self.keys())[0] value = self[key] del self[key] return key, value
[docs] def update(self, other): for key in list(other.keys()): self[key] = other[key]
[docs] def get(self, key, default=None): if key in self: return self[key] return default
def __repr__(self): return repr(dict(list(self.items()))) def __len__(self): return len(self.keys())
# ------------------------------------------------------------------------------ #
[docs]def dict_merge(a, b, policy=None, wildcards=False, log=None, _path=None): # thanks to # http://stackoverflow.com/questions/7204805/ \ # python-dictionaries-of-dictionaries-merge ''' This merges two dict in place, modifying the original dict in a. Merge Policies: - None (default) : raise an exception on conflicts - OVERWRITE : values in a are overwritten by new values from b - PRESERVE : original value in a are preserved, new values from b are only added where the original value is not set. ''' if a is None: return a if b is None: return a if _path is None: _path = list() if not isinstance(a, dict): raise TypeError('*dict*_merge expects dicts, not %s' % type(a)) if not isinstance(b, dict): raise TypeError('*dict*_merge expects dicts, not %s' % type(b)) # -------------------------------------------------------------------------- def merge_key(a, key_a, b, key_b): # need to resolve conflict if isinstance(a[key_a], dict) and isinstance(b[key_b], dict): dict_merge(a[key_a], b[key_b], policy = policy, wildcards = wildcards, log = log, _path = _path + [str(key_a)]) elif key_a not in a and key_b in b: a[key_a] = b[key_b] # use b value elif key_a in a and key_b not in b: pass # keep a value elif a[key_a] == b[key_b]: pass # same leaf value elif key_a not in a and key_b not in b: pass # keep no a value else: if policy == PRESERVE: if log: log.debug('preserving key %s:%s \t(%s)' % (':'.join(_path), key_b, b[key_b])) elif policy == OVERWRITE: if log: log.debug('overwriting key %s:%s \t(%s)' % (':'.join(_path), key_b, b[key_b])) a[key_a] = b[key_b] # use new value else: raise ValueError('Conflict at %s (%s : %s)' % ('.'.join(_path + [str(key_a)]), a[key_a], b[key_b])) # -------------------------------------------------------------------------- # first a clean merge, i.e. no interpretation of wildcards for key in sorted(b.keys()): if key in a: # need to resolve conflict merge_key(a, key, b, key) else: # no conflict - simply add. Not that this is a potential shallow # copy if b[key] is a complex type. a[key] = b[key] # optionally, check if other merge options are also valid for key_b in sorted(b.keys()): if wildcards: if '*' in key_b: pat = re.compile(fnmatch.translate(key_b)) for key_a in sorted(a.keys()): if pat.match(key_a): merge_key(a, key_a, b, key_b) return a
# ------------------------------------------------------------------------------ #
[docs]def dict_stringexpand(target, sources=None): ''' This expands dict entries (strings only) with keys from a second dict. For example, the dicts:: target = {'workdir' : '/home/%(user)s/', 'resource' : '%(resource)s'} sources = {'user' : 'peer_gynt', 'protocol' : 'ssh', 'host' : 'localhost', 'resource' : '%(protocol)s://%(host)s/'} would result in:: target = {'workdir' : '/home/peer_gynt/', 'resource' : 'ssh://localhost'} Note that expansion happened twice, for the `resource` tag to be fully specified. ''' assert isinstance(target, dict) # expand from self, and all given dicts, but only use # first-level primitive types (string, int, float) if sources: if isinstance(sources, dict): sources = [sources] else: sources = list() if not isinstance(sources, list): raise TypeError('Need dict as expansion source, not %s' % type(sources)) # target must be first source, to avoid cycles (other sources are likely to # have *other* info) sources.insert(0, target) repl_source = dict() for source in sources: for key, val in source.items(): if isinstance(val, str) or \ isinstance(val, int ) or \ isinstance(val, float ) : repl_source[key] = val again = True while again: target, again = _generic_stringexpand(target, repl_source) return target
# ------------------------------------------------------------------------------ # def _generic_stringexpand(target, source): if isinstance(target, str): return _string_stringexpand(target, source) elif isinstance(target, list): return _list_stringexpand(target, source) elif isinstance(target, dict): return _dict_stringexpand(target, source) else: # ignore other types for now return target, False # ------------------------------------------------------------------------------ # def _list_stringexpand(target, source): assert isinstance(target, list) assert isinstance(source, dict) all_again = 0 for idx, elem in enumerate(target): target[idx], again = _generic_stringexpand(elem, source) all_again += again return target, all_again # ------------------------------------------------------------------------------ # def _dict_stringexpand(target, source): assert (isinstance(target, dict)) assert (isinstance(source, dict)) all_again = 0 for key, val in target.items(): target[key], again = _generic_stringexpand(val, source) all_again += again return target, all_again # ------------------------------------------------------------------------------ # def _string_stringexpand(target, source): assert (isinstance(target, str)) assert (isinstance(source, dict)) orig = str(target) try: expanded = target % source except KeyError: # ignore incomplete expands return orig, False except ValueError: # ignore incomplete expands return orig, False # only check for success after success. Duh! if orig == expanded: return expanded, False else : return expanded, True # ------------------------------------------------------------------------------ #
[docs]def iter_diff(a, b): if isinstance(a, list) and isinstance(b, list): for ea, eb in zip(a, b): iter_diff(ea, eb) elif isinstance(a, dict) and isinstance(b, dict): assert len(list(a.keys())) == len(list(b.keys())), \ (len(a), sorted(list(a.keys())), len(b), sorted(list(b.keys()))) for k in a: iter_diff(a[k], b[k]) else: if a != b: print('elem diff: %s' % a) print('elem diff: %s' % b) print() assert a == b, [a, b]
# ------------------------------------------------------------------------------ #
[docs]def dict_diff(a, b): ''' return a dict of the form:: { 'k1': {'a': 'foo', 'b': 'bar'}, 'k2': {'a': 'foo'}, 'k3': {'b': 'bar'}, } which contains only those keys which are different in the two given dicts. Keys which are missing in either one are not included (to distinguish from `None` values). This methods operates recursively over the given dicts. ''' def _list_diff(a, b): if len(a) != len(b): la, lb = sorted([len(a), len(b)]) ret = ['len(%d) != len(%d)' % (la, lb)] else: ret = list() for va, vb in zip(a, b): if isinstance(va, dict) and isinstance(vb, dict): tmp = _dict_diff(va, vb) if tmp: ret.append(tmp) elif isinstance(va, list) and isinstance(vb, list): tmp = _list_diff(va, vb) if tmp: ret.append(tmp) else: if va != vb: ret.append({'a': va, 'b': vb}) return ret def _dict_diff(a, b): ka = sorted(list(a.keys())) kb = sorted(list(b.keys())) ret = dict() for k in ka: if k not in kb: ret[k] = {'a': a[k]} else: va = a[k] vb = b[k] if isinstance(va, dict) and isinstance(vb, dict): tmp = _dict_diff(va, vb) if tmp: ret[k] = tmp elif isinstance(va, list) and isinstance(vb, list): tmp = _list_diff(va, vb) if tmp: ret[k] = tmp elif va != vb: ret[k] = {'a': va, 'b': vb} for k in kb: if k not in ka: ret[k] = {'b': b[k]} else: va = a[k] vb = b[k] if isinstance(vb, dict) and isinstance(va, dict): tmp = _dict_diff(vb, va) if tmp: ret[k] = tmp elif isinstance(va, list) and isinstance(vb, list): tmp = _list_diff(va, vb) if tmp: ret[k] = tmp elif va != vb: ret[k] = {'a': va, 'b': vb} return ret return _dict_diff(a, b)
# ------------------------------------------------------------------------------