Source code for radical.utils.misc


import os
import sys
import time
import errno
import tarfile
import datetime
import tempfile
import itertools

from typing import List, Tuple, Union, Any

from .         import url       as ruu
from .ru_regex import ReString

# ------------------------------------------------------------------------------
#
# globals
#
_RU_stdout = None
_RU_stderr = None
_RU_except = None
_RU_exit   = None


# ------------------------------------------------------------------------------
#
_RU_stdout = None
_RU_stderr = None
_RU_except = None
_RU_exit   = None


# ------------------------------------------------------------------------------
#
[docs]def parse_file_staging_directives(directives): ''' staging directives [local_path] [operator] [remote_path] local path: * interpreted as relative to the application's working directory * must point to local storage (localhost) remote path * interpreted as relative to the job's working directory operator : * > : stage to remote target, overwrite if exists * >> : stage to remote target, append if exists * < : stage to local target, overwrite if exists * << : stage to local target, append if exists This method returns a tuple [src, tgt, op] for each given directive. This parsing is backward compatible with the simple staging directives used previously -- any strings which do not contain staging operators will be interpreted as simple paths (identical for src and tgt), operation is set to '=', which must be interpreted in the caller context. ''' bulk = True if not isinstance(directives, list): bulk = False directives = [directives] ret = list() for directive in directives: if not is_string(directive): raise TypeError("file staging directives muct by of type string, " "not %s" % type(directive)) rs = ReString(directive) if rs // r'^(?P<one>.+?)\s*(?P<op><|<<|>|>>)\s*(?P<two>.+)$': res = rs.get() ret.append([res['one'], res['two'], res['op']]) else: ret.append([directive, directive, '=']) if bulk: return ret else : return ret[0]
# ------------------------------------------------------------------------------ #
[docs]def time_stamp(spec): if isinstance(spec, int) or \ isinstance(spec, float) : return datetime.datetime.utcfromtimestamp(spec) return spec
# ------------------------------------------------------------------------------ #
[docs]def time_diff(dt_abs, dt_stamp): ''' return the time difference bewteen two datetime objects in seconds (incl. fractions). Exceptions (like on improper data types) fall through. ''' delta = dt_stamp - dt_abs # make it easy to use seconds since epoch instead of datetime objects if isinstance(delta, int) or \ isinstance(delta, float) : return delta if not isinstance(delta, datetime.timedelta): raise TypeError("difference between '%s' and '%s' is not a .timedelta" % (type(dt_abs), type(dt_stamp))) # get seconds as float seconds = delta.seconds + delta.microseconds / 1E6 return seconds
# ------------------------------------------------------------------------------ #
[docs]def all_pairs(iterable, n): ''' [ABCD] -> [AB], [AC], [AD], [BC], [BD], [CD] ''' return list(itertools.combinations(iterable, n))
# ------------------------------------------------------------------------------ #
[docs]def cluster_list(iterable, n): ''' s -> [ s0, s1, s2, ... sn-1 ], [ sn, sn+1, sn+2, ... s2n-1 ], [ s2n, s2n+1, s2n+2, ... s3n-1 ], ... ''' return zip(*[iter(iterable)] * n)
# ------------------------------------------------------------------------------ # From https://docs.python.org/release/2.3.5/lib/itertools-example.html #
[docs]def window(seq, n=2): ''' Returns a sliding window (of width n) over data from the iterable" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... ''' it = iter(seq) result = tuple(itertools.islice(it, n)) if len(result) == n: yield result for elem in it: result = result[1:] + (elem,) yield result
# ------------------------------------------------------------------------------ #
[docs]def round_to_base(value: float, base: int = 1) -> int: ''' This method expects an integer or float value, and will round it to any given integer base. For example: 1.5, 2 -> 2 3.5, 2 -> 4 4.5, 2 -> 4 11.5, 20 -> 20 23.5, 20 -> 20 34.5, 20 -> 40 The default base is '1'. ''' return int(base * round(float(value) / base))
# ------------------------------------------------------------------------------ #
[docs]def round_upper_bound(value: Union[int, float]) -> int: ''' This method expects an integer or float value, and will return an integer upper bound suitable for example to define plot ranges. The upper bound is the smallest value larger than the input value which is a multiple of 1, 2 or 5 times the order of magnitude (10**x) of the value. ''' bound = 0 order = 0 check = [1, 2, 5] while True: for c in check: bound = c * (10**order) if value < bound: return bound order += 1
# ------------------------------------------------------------------------------ #
[docs]def is_tuple(data: Any) -> bool: ''' return True if given data are a `tuple`, `False` otherwise ''' return isinstance(data, tuple)
# ------------------------------------------------------------------------------ #
[docs]def as_tuple(data: Any) -> Tuple[Any]: ''' return non-tuple data into a tuple. ''' if data is None : return tuple() elif is_tuple(data): return data else : return (data, )
# ------------------------------------------------------------------------------ #
[docs]def is_list(data: Any) -> bool: ''' return True if given data are a `list`, `False` otherwise ''' return isinstance(data, list)
# ------------------------------------------------------------------------------ #
[docs]def as_list(data: Any) -> List[Any]: ''' return non-list data into a list. ''' if data is None : return [] elif is_list(data): return data else : return [data]
# ------------------------------------------------------------------------------ #
[docs]def to_type(data: Any) -> Union[str, int, float, Any]: if not isinstance(data, str): return data try : return(int(data)) except: pass try : return(float(data)) except: pass return data
# ------------------------------------------------------------------------------ #
[docs]def is_seq(data: Any) -> bool: ''' tests if the given data is a sequence (but not a string) ''' return hasattr(data, '__iter__') and not is_string(data)
# ------------------------------------------------------------------------------ #
[docs]def is_string(data: Any) -> bool: ''' tests if the given data are a `string` type ''' return isinstance(data, str)
# ------------------------------------------------------------------------------ #
[docs]def as_string(data: Any) -> Any: ''' Make a best-effort attempt to convert bytes to strings. Iterate through lists and dicts, but leave all other datatypes alone. ''' if isinstance(data, dict): return {as_string(k): as_string(v) for k,v in data.items()} elif isinstance(data, list): return [as_string(e) for e in data] elif isinstance(data, bytes): return bytes.decode(data, 'utf-8') elif isinstance(data, ruu.Url): return str(data) else: return data
# ------------------------------------------------------------------------------ #
[docs]def is_bytes(data: Any) -> bool: ''' checks if the given data are of types `bytes` or `bytearray` ''' return isinstance(data, (bytes, bytearray))
# ------------------------------------------------------------------------------ # thanks to # http://stackoverflow.com/questions/956867/#13105359
[docs]def as_bytes(data: Any) -> Any: ''' Make a best-effort attempt to convert strings to bytes. Iterate through lists and dicts, but leave all other datatypes alone. ''' if isinstance(data, dict): return {as_bytes(k): as_bytes(v) for k,v in data.items()} elif isinstance(data, list): return [as_bytes(e) for e in data] elif isinstance(data, str): return str.encode(data, 'utf-8') else: return data
# ------------------------------------------------------------------------------ #
[docs]def watch_condition(cond, target=None, timeout=None, interval=0.1): ''' Watch a given condition (a callable) until it returns the target value, and return that value. Stop watching on timeout, in that case return None. The condition is tested approximately every 'interval' seconds. ''' start = time.time() while True: ret = cond() if ret == target: return ret if timeout and time.time() > start + timeout: return None time.sleep(interval)
# ------------------------------------------------------------------------------ #
[docs]def name2env(name): ''' convert a name of the for 'radical.pilot' to an env vare base named 'RADICAL_PILOT'. ''' return name.replace('.', '_').upper()
# ------------------------------------------------------------------------------ #
[docs]def get_env_ns(key, ns, default=None): ''' get an environment setting within a namespace. For example. get_env_ns('verbose', 'radical.pilot.umgr'), will return the value of the first found env variable from the following sequence: RADICAL_PILOT_UMGR_LOG_LVL RADICAL_PILOT_LOG_LVL RADICAL_LOG_LVL or 'None' if none of the above is set. The given `name` and `key` are converted to upper case, dots are replaced by underscores. Note that an environment variable set with export RADICAL_LOG_LVL= (ie. without an explicit, non-empty value) will be returned as an empty string. ''' ns = name2env(ns) key = name2env(key) base = '' checks = list() for elem in ns.split('_'): base += elem + '_' check = base + key checks.append(check) for check in reversed(checks): if check in os.environ: val = os.environ[check] return val return default
# ------------------------------------------------------------------------------ #
[docs]def expand_env(data, env=None, ignore_missing=True): ''' Expand the given data with environment variables from `os.environ`. If `env` is provided, use that dictionary for expansion instead. `data` can be one of three types: - dictionary: `expand_env` is applied to all *values* of the dictionary - sequence : `expand_env` is applied to all elements of the sequence - string : `expand_env` is applied to the string itself The method will alter dictionaries and iterables in place, but will return a copy of scalar strings, as it seems to be custom in Python. Other data types are silently ignored and not altered. The replacement in strings is performed for the following variable specs: assume `export BAR=bar BIZ=biz`: ${BAR} : foo_${BAR}_baz -> foo_bar_baz ${BAR:buz} : foo_${BAR:buz}_baz -> foo_bar_baz ${BAR:$BUZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz assume `unset BAR; export BIZ=biz`, `ignore_missing=True` ${BAR} : foo_${BAR}_baz -> foo__baz ${BAR:buz} : foo_${BAR:buz}_baz -> foo_buz_baz ${BAR:$BUZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz assume `unset BAR; export BIZ=biz`, `ignore_missing=False` ${BAR} : foo_${BAR}_baz -> ValueError('cannot expand $BAR') ${BAR:buz} : foo_${BAR:buz}_baz -> foo_buz_baz ${BAR:$BIZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz The method will also opportunistically convert strings to integers or floats if they are formatted that way and contain no other characters. ''' # no data: None, empty dict / sequence / string if not data: return data # dict type elif isinstance(data, dict): for k,v in data.items(): data[k] = expand_env(v, env, ignore_missing) return data # sequence types: list, set, tuple - but not string elif is_seq(data): for idx, elem in enumerate(data): data[idx] = expand_env(elem, env, ignore_missing) return data # all other non-string types are left alone elif not is_string(data): return data if '$' not in data: # nothing to expand return data # fall back to process env if no other expansion dict is specified if not env: env = os.environ # strings are not expanded in place - create a new one to fill. # iterate over the orginial string as long as there is something to expand ret = '' while data: data = ReString(data) # idea : pre ${ Vari_ABLE : val } post # captures : ( )(? ( )(? ( )) )( ) # indexes : 1 2 3 4 with data // r'(.*?)(?:\${([a-zA-Z][a-zA-Z0-9_-]+)(?::([^}]+))?})(.*)' \ as res: if not res: ret += data break pre = res[0] key = res[1] val = res[2] post = res[3] if not ignore_missing and key not in env: raise ValueError('cannot expand $%s' % key) if pre is None: pre = '' if val is None: val = '' if post is None: post = '' # support env expansion of val, as in # LOGDIR : "${RCT_LOGDIR:$PWD}" if val.startswith('$'): val = env.get(val[1:], '') val = env.get(key, val) if key and not pre and not post and not val: # we had something to expand, and that expansion is all there is # in the key, and the expand failed - then the result it not an # empty string but None return None ret += pre ret += val data = ReString(post) # attempt string-to-type conversion (int and float detection only) return to_type(ret)
# ------------------------------------------------------------------------------ #
[docs]def get_size(obj, seen=None, strict=False): size = sys.getsizeof(obj) obj_id = id(obj) if strict: # perform recursion checks if seen is None: seen = set() if obj_id in seen: return 0 seen.add(obj_id) if isinstance(obj, dict): size += sum([get_size(v, seen, strict) for v in obj.values()]) size += sum([get_size(k, seen, strict) for k in obj.keys()]) elif hasattr(obj, '__dict__'): size += get_size(obj.__dict__, seen, strict) elif hasattr(obj, '__iter__') and \ not isinstance(obj, (str, bytes, bytearray)): size += sum([get_size(i, seen, strict) for i in obj]) return size
# ------------------------------------------------------------------------------ #
[docs]def dockerized(): if os.path.exists('/.dockerenv'): return True return False
# ------------------------------------------------------------------------------ #
[docs]def get_radical_base(module=None): ''' Several parts of the RCT stack store state on the file system. This should usually be under `$HOME/.radical` - but that location is not always available or desireable. We interpret the env variable `RADICAL_BASE`, and fall back to `pwd` if neither that nor `$HOME` exists. The optional `module` parameter will result in the respective subdir name to be appended. The resulting dir is created (if it does not exist), and the name is returned. Any `.` (dot) characters in `module` are replaced by slashes. Leading `radical/` element is removed. ''' return get_base(ns='radical', module=module)
# ------------------------------------------------------------------------------ #
[docs]def get_base(ns, module=None): ''' A generic version of `get_radical_base` which queries the base for any namespace `ns` ''' ns_low = ns.lower() ns_up = ns.upper() if module: module = module.replace('.', '/') if module.startswith('%s/' % ns_low): module = module[len(ns_low) + 1:] base = os.environ.get("%s_BASE" % ns_up) if not base: base = os.environ.get("HOME") if not base: base = os.environ.get("PWD") if not base: base = os.getcwd() if module: base += '/.%s/%s/' % (ns_low, module) else : base += '/.%s/' % ns_low rec_makedir(base) return base
# ------------------------------------------------------------------------------ #
[docs]def rec_makedir(target): ''' recursive makedir which ignores errors if dir already exists ''' try: os.makedirs(target) except OSError as e: # ignore failure on existing directory - otherwise raise if e.errno == errno.EEXIST and os.path.isdir(os.path.dirname(target)): return raise
# ------------------------------------------------------------------------------ #
[docs]def mktar(tarname, fnames=None, data=None): ''' Create a tarfile at the given `tarname`, and pack all files given in `fnames` into it, and also pack any `data` blobs. `fnames` is expected to be list, where each element is either a string pointing to a file to be added under that name, or a tuple where the first element points again to the file to be packed, and the second element specifies the name under which the file should be packed into the archive. And OSError will be raised if the file does not exist. `data` is expected to be a list of tuples, where the first element is a set of bytes comprising the data to be written into the archive, and the second element again specifies the name of the tarred file. Note that this method always create bzip'ed tarfiles, but will never change the `tarname` to reflect that. ''' tar = tarfile.open(tarname, "w:bz2") if fnames: for element in fnames: if isinstance(str, element): tar.add(element) else: src, tgt = element tar.add(src, arcname=tgt) if data: for fname, fdata in data: tmp_name, tmp_fd = tempfile.mkstemp() tmp_fd.write(fdata) tmp_fd.close() tar.add(tmp_name, fname) os.unlink(tmp_name) tar.close()
# ------------------------------------------------------------------------------ #
[docs]def noop(*args, **kwargs): return None
# ------------------------------------------------------------------------------ #
[docs]def script_2_func(fpath): ''' This method accepts a single parameter `fpath` which is expected to point to a file containing a self-sufficient Python script. The script will be read and stored, and a function handle will be returned which, upon calling, will run that script in the currect Python interpreter`. It will be ensured that `__name__` is set to `__main__`, and that any arguments passed to the callable are passed on as `sys.argv`. A single list argument is also allowed which is interpreted as argument list. Example: my_func = ru.script_2_func('/tmp/my_script.py') my_func('-f', 'foo', '-b', 'bar') my_func('-f foo -b bar'.split()) # equivalent NOTE: calling the returned function handle will change `sys.argv` for the current Python interpreter. ''' prefix = [] postfix = [] with ru_open(fpath, 'r') as fin: code_lines = fin.readlines() # ensure that 'if __name__ == '__main__' works prefix += '__name__ = "__main__"\n\n' # capture globals prefix.append('global _RU_stdout\n') prefix.append('global _RU_stderr\n') prefix.append('global _RU_except\n') prefix.append('global _RU_exit\n') # redirect stdout and stderr to be captured in global string vars prefix.append('from io import StringIO\n') prefix.append('_RU_oldout = sys.stdout\n') prefix.append('_RU_olderr = sys.stderr\n') prefix.append('sys.stdout = _RU_stdout = StringIO()\n\n') prefix.append('sys.stderr = _RU_stderr = StringIO()\n\n') # wrap the code in a try/exec to capture exit codes and failures prefix.append('_RU_except = None\n') prefix.append('try:\n') postfix.append('except Exception as e:\n') postfix.append(' _RU_except = str(e)\n') postfix.append(' _RU_exit = 1\n') postfix.append('except SystemExit as e:\n') postfix.append(' _RU_except = "SystemExit"\n') postfix.append(' _RU_exit = e.code\n') postfix.append('sys.stdout = _RU_oldout\n') postfix.append('sys.stderr = _RU_olderr\n') # examine _RU_stdout.getvalue() # indentation of all code lines which go into the try/except code_lines = [' ' + line for line in code_lines] # create closure which can be used to call the code def ret(*argv): if len(argv) == 1 and isinstance(argv[0], list): argv = list(argv[0]) # if args are given, ensure that sys.argv gets set if argv: # indent - this is within try/catch prefix.append(' import sys\n') prefix.append(' sys.argv = ["dummy"] + %s\n' % str(list(argv))) global _RU_stdout global _RU_stderr global _RU_except global _RU_exit _RU_stdout = None _RU_stderr = None _RU_except = None _RU_exit = None tmp_code = ''.join(prefix) \ + ''.join(code_lines) \ + ''.join(postfix) # exec the resulting code, ensure to pass globals exec(tmp_code, globals()) # pylint: disable=W0122 return _RU_stdout.getvalue(), _RU_stderr.getvalue(), \ _RU_except, _RU_exit # return that new callable return ret
# # -------------------------------------------------------------------------- # # import importlib # # loader = importlib.machinery.SourceFileLoader('__main__', fpath) # spec = importlib.util.spec_from_loader(loader.name, loader) # mod = importlib.util.module_from_spec(spec) # code = loader.get_code(mod.__name__) # # def ret(*argv): # args = list(argv) # if len(args) == 1 and isinstance(argv[0], list): # args = list(argv[0]) # sys.argv = ['dummy'] + args # loader.exec_module(mod) # # return ret # ------------------------------------------------------------------------------ #
[docs]def ru_open(*args, **kwargs): ''' ensure that we use UTF8 consistently throughout the stack ''' if 'encoding' not in kwargs: kwargs['encoding'] = 'utf8' return open(*args, **kwargs)
# ------------------------------------------------------------------------------