1. radical.utils package

1.1. Subpackages

1.2. Submodules

1.2.1. radical.utils.algorithms module

radical.utils.algorithms.collapse_ranges(ranges)[source]

Given be a set of ranges (as a set of pairs of floats [start, end] with ‘start <= end’. This algorithm will then collapse that set into the smallest possible set of ranges which cover the same, but not more nor less, of the domain (floats).

We first sort the ranges by their starting point. We then start with the range with the smallest starting point [start_1, end_1], and compare to the next following range [start_2, end_2], where we now know that start_1 <= start_2. We have now two cases:

a) when start_2 <= end_1, then the ranges overlap, and we collapse them into range_1: range_1 = [start_1, max[end_1, end_2]

b) when start_2 > end_1, then ranges don’t overlap. Importantly, none of the other later ranges can ever overlap range_1, because there start points are even larger. So we move range_1 to the set of final ranges, and restart the algorithm with range_2 being the smallest one.

Termination condition is if only one range is left – it is also moved to the list of final ranges then, and that list is returned.

radical.utils.algorithms.in_range(value, ranges)[source]

checks if a float value value is in any of the given ranges, which are assumed to be a list of tuples (or a single tuple) of start end end points (floats). Returns True or False.

radical.utils.algorithms.lazy_bisect(data, check, on_ok=None, on_nok=None, on_fail=None, on_skip=None, ratio=0.5, log=None)[source]

Find the next potentially good candidate element in a presumably ordered list data (from smallest to largest). The given callable check should accept a list element for good/bad checks, and return True for good (small enough), False otherwise (too large).

Note that this is only a bisect if ratio is set at 0.5 - otherwise the search will back off through the list faster (1.0 >= ratio > 0.5) or slower (0.5 > ratio >= 0.0) than bisect.

The method returns three lists, one list with elements of data which were checked good, and one list of elements checked as bad. Checks which raise an exception are also handled as bad, any element on which this happens will be returned in the third list (fail). Note that this last list (fail) will consist of tuples: [element, error] where error is the exception’s error message.

on_ok will be called on tasks where check() returned good, on_nok where check() returned bad, on_fail() where check() raised an exceptionm and on_skip where bisect decided that no check() invocation is needed. The respective data item is the only argument to the calls.

radical.utils.algorithms.partition(space, nparts)[source]

create balanced partitions from an iterable space. This method preserves contiguous order.

kudos: http://code.activestate.com/recipes/425397-split-a-list-into-roughly-equal-sized-pieces/

radical.utils.algorithms.range_concurrency(ranges)[source]

given a set of uncollapsed ranges, return a series which describes the range-concurrency at any point.

Example

Ranges:
[—-] [———] [——–]

[——–] [-] [————] [–]

Concurrency:

1 2 1 2 1 0 1 0 1 2 1 0 1 0

Returned is a sorted list of tuples where the first entry defines at what range value the concurrency changed, and the second value defines to what the concurrency count changed at that point.

You could consider the ranges to be of type [time_start, time_end], and the return would be a list of [timestamp, concurrency], if that helps – but the algorithm does not make any assumption on the data type, really, only that the values can be sorted.

radical.utils.algorithms.remove_common_prefix(data, extend=0)[source]

For the given list of strings, remove the part which appears to be a common prefix to all of them. If extend is given, the last n letters of the prefix are preserved.

See also https://stackoverflow.com/questions/46128615/ .

1.2.2. radical.utils.config module

Config file parsing.

We provide a json based config file parser with following properties

  • system config files will be merged with user configs (if those exist)

  • python style comments are filtered out before parsing

  • after parsing, ${ABC:default}-style values are set or expanded via os.environ

  • the returned class exposes settings as dicts or attributes cfg[‘foo’][‘bar’] == cfg.foo.bar

1.2.2.1. Config Names and Locations

We assume two basic locations for config files: one is installed within the scope of a Python module, the other one is under user control, and usually in the users home directory. The config reader accepts the following parameters to derive the exact locations:

  • module: name of module under which the config is installed

  • path : config file path relative to the module home

  • name : config file name relative to the path

The module string is interpreted as follows:

module = ‘module’ module_path = radical.utils.debug.find_module(module) sys_config_dir = ‘%s/configs’ % module_path usr_base_dir = os.getenv(‘RADICAL_CONFIG_USER_DIR’) or os.getenv(‘HOME’) if usr_base_dir:

usr_config_dir = ‘%s/.%s/configs’ % (usr_base_dir, module.replace(‘.’, ‘/’))

so the location of the module’s __init__.py is used to derive the location of the installed system config files, and the module name is used to derive the location of the user provided config files.

For example, the module radical.utils will have the following config dirs:

sys_config_dir = /tmp/ve/lib/python3.7/site-packages/radical/utils/configs usr_config_dir = /home/merzky/.radical/utils/configs

After loading the system level config file, any existing user level config file is merged into it, via

radical.utils.dict_merge(user_cfg, system_cfg, mode=’overwrite’)

so that the user config settings supersede the system config settings.

Both path and name specifiers can contain * as wildcard, which is then interpreted as by glob(). If that wildcard exists, then all matching config files are read into one configuration dict, where each root key is set to the value the ‘*’ expands to (minus the .json extension).

For example, the name radical.pilot.resource_* with the following config files:

/tmp/ve/[…]/radical/pilot/configs/resource_xsede.json /tmp/ve/[…]/radical/pilot/configs/resource_ncsa.json

will result in a config dict like:

{'xsede' : { 'foo' : 'bar' },
 'ncsa'  : { 'fiz' : 'baz' }}

1.2.2.2. Queries

We support two types of queries on the resulting parsed configs:

  • dict like queries

  • the _query(key) method returns a single value, or ‘None’ if not found.

In the latter _query() case, the key can be specified as dot-separated path, so that the following two snippets are equivalent (assuming that a foo.bar section exists):

val = cfg[‘foo’][‘bar’].get(‘baz’) val = cfg._query(‘foo.bar.baz’)

1.2.2.3. Environment

Towards os.environ completion, we support the following syntax in all string values (not keys):

${RADICAL_UTILS_ENV:default_value}

which will be replaced by

os.environ.get(‘RADICAL_UTILS_ENV’, ‘default_value’)

The default value is optional, an empty string is used if no default value is given. Env evaluation is only performed at time of parsing, not at time of query. RU attempts to convert env variables to float and int - if that fails, values are stored as strings.

1.2.2.4. Validation

It probably makes sense to switch to a json schema validator at some point, see for example https://pypi.python.org/pypi/json-schema-validator. For now this implementation remains schema-less, and will thus, in a very pythonesque way, only fail once values are queried or used.

1.2.2.5. Implementation

This implementation is based on typed dictionaries which are accessed as TypedDict’ed object hierarchy.

class radical.utils.config.Config(module=None, category=None, name=None, cfg=None, from_dict=None, path=None, expand=True, env=None, _internal=False)[source]

Bases: TypedDict

Contents of a config (json) file from a module’s config tree.

module

used to determine the module’s config file location (default: radical.utils)

category

name of config to be loaded from module’s config path

name

specify a specific configuration to be used

path

path to app config json to be used for initialization

cfg

application config dict to be used for initialization

from_dict

alias for cfg, to satisfy base class constructor

expand

enable / disable environment var expansion (default: True)

env

environment dictionary to be used for expansion (default: os.environ)

The naming of config files follows this rule:

<category>_<name>.json

For example, if the following is used in a system python installation:

ru.Config(‘radical.pilot’, category=’session’, name=’minimal’)

it would attempt to load (depending on system details):

/usr/lib/python3/site-packages/ radical/pilot/configs/session_minimal.json

NOTE: Keys containing an underscore are not exposed via the API.

Keys containing dots are split and interpreted as paths in the configuration hierarchy.

write(fname)[source]
class radical.utils.config.DefaultConfig(*args, **kwargs)[source]

Bases: Config

The settings in this default config are, unsurprisingly, used as default values for various RU classes and methods, as for example for log file locations, log levels, profile locations, etc.

class radical.utils.config.DefaultConfigMeta(name, bases, namespace)[source]

Bases: TypedDictMeta, Singleton

Metaclass inherited from TypedDict’s metaclass along with Singleton to avoid metaclass conflict (the metaclass of a derived class must be a subclass of the metaclasses of all its bases). Singleton metaclass allows to have only one instance of a corresponding class.

1.2.3. radical.utils.constants module

1.2.4. radical.utils.daemon module

class radical.utils.daemon.Daemon(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', target=None, args=None)[source]

Bases: object

A generic daemon class.

Usage: subclass the Daemon class and override the run() method

restart(pid=None)[source]

Stop the daemon and restart it. If a pid is passed, then stop the daeon process with that pid and replace it with a daemon process represented by this class instance. It is the caller’s responsibility to ensure that this makes semantic sense.

This method returns the pid of the new daemon process (see start()).

run()[source]

You should override this method when you subclass Daemon and do not pass target to the class constructor. This method will be called after the daemon process has been created by start() or restart().

start()[source]
stop(pid=None)[source]

Stop the daemon. If a pid is passed, then stop the daeon process with that pid.

radical.utils.daemon.daemonize(main=None, args=None, stdout=None, stderr=None, stdin=None, timeout=None)[source]

Create a daemon process and run the given method in it. For that, do the UNIX double-fork magic, see Stevens’ “Advanced Programming in the UNIX Environment” for details (ISBN 0201563177)

The method will return the PID of the spawned damon process, or None on failure to create it. stdout, stderr, stdin file names are interpreted by the daemon process, and are expected to be path names which can be opened and read / written in their respective capabilities.

1.2.5. radical.utils.debug module

class radical.utils.debug.DebugHelper(name=None, info=None)[source]

Bases: object

When instantiated, and when ‘RADICAL_DEBUG’ is set in the environment, this class will install a signal handler for SIGUSR1. When that signal is received, a stacktrace for all threads is printed to stdout. We also check if SIGINFO is available, which is generally bound to CTRL-T.

Additionally, a call to ‘dh.fs_block(info=None)’ will create a file system based barrier: it will create a unique file in /tmp/ (based on ‘name’ if given), and dump the stack trace and any ‘info’ into it. It then waits until that file has changed (touched or removed etc), and then returns. The wait is a simple pull based ‘os.stat()’ (once per sec).

fs_block(info=None)[source]

Dump state, info in barrier file, and wait for it tou be touched or read or removed, then continue. Leave no trace.

register_lock(name, lock)[source]
register_rlock(name, rlock)[source]
unregister_lock(name)[source]
unregister_rlock(name)[source]
class radical.utils.debug.Lock(name=None)[source]

Bases: object

acquire(blocking=True)[source]
release()[source]
class radical.utils.debug.RLock(name=None)[source]

Bases: object

acquire(blocking=True)[source]
release()[source]
radical.utils.debug.add_snippet_path(path)[source]

add a path to the search path for dynamically loaded python snippets (see ru.get_snippet()).

radical.utils.debug.attach_pudb(log=None)[source]
radical.utils.debug.get_caller_name(skip=2)[source]

Get a name of a caller in the format module.class.method

skip specifies how many levels of stack to skip while getting caller name. skip=1 means ‘who calls me’, skip=2 ‘who calls my caller’ etc.

An empty string is returned if skipped levels exceed stack height

Kudos: http://stackoverflow.com/questions/2654113/ python-how-to-get-the-callers-method-name-in-the-called-method

radical.utils.debug.get_exception_trace(msg=None)[source]
radical.utils.debug.get_snippet(sid)[source]

RU exposes small python snippets for runtime code insertion. The usage is intended as follows:

  • a programmer implements a class

  • for some experiment or test, that class’s behavior must be controled at runtime.

  • in all places where such an adaptation is expected to take place, the programmer inserts a hook like this:

    exec(ru.get_snippet(‘my_class.init_hook’))

  • this will trigger RU to search for python files of the name my_class.init_hook.py in $RADICAL_BASE/.radical/snippets/ (default), and return their content for injection.

The snippet search path can be extended by calling.

ru.add_snippet_path(path)

The RADICAL_DEBUG environment variable needs to be set for this method to do anything. A snippet can use the following literal strinfgs which will be replaced by their actual values:

‘###SNIPPET_FILE###’ - filename from which snippet was loaded ‘###SNIPPET_PATH###’ - path in which the snippet file is located ‘###SNIPPET_ID###’ - the sid string used to identify the snippet

radical.utils.debug.get_stacktrace()[source]
radical.utils.debug.get_stacktraces()[source]
radical.utils.debug.get_trace()[source]
radical.utils.debug.print_exception_trace(msg=None, exc=None)[source]
radical.utils.debug.print_stacktrace(msg=None, _stack=None)[source]
radical.utils.debug.print_stacktraces(signum=None, sigframe=None)[source]

signum, sigframe exist to satisfy signal handler signature requirements

radical.utils.debug.raise_on(tag, log=None, msg=None)[source]

The purpose of this method is to artificially trigger error conditions for testing purposes, for example when handling the n’th unit, getting the n’th heartbeat signal, etc.

The tag parameter is interpreted as follows: on the n’th invocation of this method with any given tag, an exception is raised, and the counter for that tag is reset.

The limit n is set via an environment variable RU_RAISE_ON_<tag>, with tag in upper casing. The environment will only be inspected during the first invocation of the method with any given tag. The tag counter is process-local, but is shared amongst threads of that process.

1.2.6. radical.utils.dict_mixin module

class radical.utils.dict_mixin.DictMixin[source]

Bases: dict

Mixin defining all dictionary methods for classes that already have a minimum dictionary interface including getitem, setitem, delitem, and keys. Based on those methods, the mixin provides the remaining interface functionality to make the class look like a fully compliant dictionary.

clear() None.  Remove all items from D.[source]
get(key, default=None)[source]

Return the value for key if key is in the dictionary, else default.

has_key(key)[source]
items() a set-like object providing a view on D's items[source]
iterkeys()[source]
itervalues()[source]
keys() a set-like object providing a view on D's keys[source]
popitem() (k, v), remove and return some (key, value) pair as a[source]

2-tuple; but raise KeyError if D is empty.

setdefault(key, default)[source]

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.[source]

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

radical.utils.dict_mixin.dict_diff(a, b)[source]

return a dict of the form:

{
   'k1': {'a': 'foo',
          'b': 'bar'},
   'k2': {'a': 'foo'},
   'k3': {'b': 'bar'},
}

which contains only those keys which are different in the two given dicts. Keys which are missing in either one are not included (to distinguish from None values). This methods operates recursively over the given dicts.

radical.utils.dict_mixin.dict_merge(a, b, policy=None, wildcards=False, log=None, _path=None)[source]

This merges two dict in place, modifying the original dict in a.

Merge Policies:
  • None (default) : raise an exception on conflicts

  • OVERWRITE : values in a are overwritten by new values from b

  • PRESERVEoriginal value in a are preserved, new values

    from b are only added where the original value is not set.

radical.utils.dict_mixin.dict_stringexpand(target, sources=None)[source]

This expands dict entries (strings only) with keys from a second dict. For example, the dicts:

target  = {'workdir'  : '/home/%(user)s/',
           'resource' : '%(resource)s'}
sources = {'user'     : 'peer_gynt',
           'protocol' : 'ssh',
           'host'     : 'localhost',
           'resource' : '%(protocol)s://%(host)s/'}
would result in::
target = {‘workdir’‘/home/peer_gynt/’,

‘resource’ : ‘ssh://localhost’}

Note that expansion happened twice, for the resource tag to be fully specified.

radical.utils.dict_mixin.iter_diff(a, b)[source]

1.2.7. radical.utils.env module

class radical.utils.env.EnvProcess(env: Dict[str, str])[source]

Bases: object

run a code segment in a different os.environ:

env = {'foo': 'buz'}
with ru.EnvProcess(env=env) as p:
    if p:
        p.put(os.environ['foo'])

print('-->', p.get())
get() Any[source]
put(data: str) None[source]
radical.utils.env.env_diff(env_1: Dict[str, str], env_2: Dict[str, str]) Tuple[Dict[str, str], Dict[str, str], Dict[str, str]][source]

This method serves debug purposes: it compares to environments and returns those elements which appear in only either one or the other env, and which changed from one env to another.

It will ignore any keys in the BLACKLIST and will also ignore BASH_FUNC_* keys which point to bash function definitions.

radical.utils.env.env_dump(environment: Optional[Dict[str, str]] = None, script_path: Optional[str] = None) None[source]
radical.utils.env.env_eval(fname: str) Dict[str, str][source]

helper to create a dictionary with the env settings in the specified file which contains unset and export directives, or simple ‘key=val’ lines

radical.utils.env.env_prep(environment: Optional[Dict[str, str]] = None, unset: Optional[List[str]] = None, pre_exec: Optional[List[str]] = None, pre_exec_cached: Optional[List[str]] = None, script_path: Optional[str] = None) Dict[str, str][source]

Create a shell script which restores the environment specified in environment environment (dict). While doing so, ensure that all env variables not defined in environment but defined in unset (list) are unset. Also ensure that all commands provided in pre_exec_cached (list) are executed after these settings.

Once the shell script is created, run it and dump the resulting env, then read it back via env_read() and return the resulting env dict - that can then be used for process fork/execve to run a process is the thus defined environment.

The resulting environment will be cached: a subsequent call with the same set of parameters will simply return a previously cached environment if it exists.

If script_path is given, a shell script will be created in the given location so that shell commands can source it and restore the specified environment.

Any commands given in ‘pre_exec’ will be part of the cached script, and will thus not be executed when preparing the env, but will be executed whenever the prepared shell script is sources. The returned env dictionary will thus not include the effects of those injected commands.

radical.utils.env.env_read(fname: str) Dict[str, str][source]

helper to parse environment from a file: this method parses the output of env and returns a dict with the found environment settings.

radical.utils.env.env_read_lines(lines: List[str]) Dict[str, str][source]

read lines which are the result of an env shell call, and sort the resulting keys into and environment and a shell function dict, return both

radical.utils.env.env_write(script_path, env, unset=None, pre_exec=None)[source]

1.2.8. radical.utils.flux module

class radical.utils.flux.FluxHelper[source]

Bases: object

Helper CLASS to programnatically handle flux instances and to obtain state update events for flux jobs known in that instance.

attach_jobs(ids: List[int], cb: Optional[Callable[[int, Any], None]] = None) Any[source]
cancel_jobs(flux_ids: List[int]) None[source]
connect_flux(uri: Optional[str] = None) None[source]

Connect to an existing Flux instance

property env

environment dict for the connected Flux instance. Returns None if no instance is connected.

get_executor() Any[source]
get_handle() Any[source]
reset()[source]

Close the connection to the FLux instance (if it exists), and terminate the Flux service if it was started by this instance. All handles and executors created for this service will be invalidated.

start_flux() None[source]

Start a private Flux instance

FIXME: forward env

submit_jobs(specs: List[Dict[str, Any]], cb: Optional[Callable[[str, Any], None]] = None) Any[source]
property uid

unique ID for this FluxHelper instance

property uri

uri for the connected Flux instance. Returns None if no instance is connected.

1.2.9. radical.utils.futures module

class radical.utils.futures.Future(call, *args, **kwargs)[source]

Bases: Thread

This Future class is a thin wrapper around Python’s native mt.Thread class. It is expected to wrap a callable, and to watch its execution.

classmethod Run(call, *args, **kwargs)[source]

This is a shortcut to

f = ru.Future(callable); f.start()

cancel()[source]
property exception
property result
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

property state
property traceback
wait(timeout=None)[source]

1.2.10. radical.utils.get_version module

radical.utils.get_version.get_version(paths=None)[source]
paths:

a VERSION file containing the detailed version is checked for in every directory listed in paths. When we find a VERSION file, we also look for an SDIST file, and return the found name and location as absolute path to the sdist.

1.2.11. radical.utils.heartbeat module

class radical.utils.heartbeat.Heartbeat(uid, timeout, interval=1, beat_cb=None, term_cb=None, log=None)[source]

Bases: object

beat(uid=None, timestamp=None)[source]
dump(log)[source]
start()[source]
stop()[source]
property uid
wait_startup(uids=None, timeout=None)[source]

Wait for the first heartbeat of the given UIDs to appear. This returns the list of UIDs which have not been found, or None otherwise.

watch(uid)[source]

1.2.12. radical.utils.host module

radical.utils.host.compress_hostlist(hostlist)[source]
radical.utils.host.create_hostfile(sandbox, name, hostlist, sep=' ', impaired=False)[source]
radical.utils.host.get_hostip(req=None, log=None)[source]

Look up the IP address for a given requested interface name. If interface is not given, do some magic.

radical.utils.host.get_hostlist(hoststring)[source]

Convert string with hosts (IDs within brackets) into list of hosts.

Example: ‘node-b1-[1-3,5],node-c1-4,node-d3-3,node-k[10-12,15]’ =>
[‘node-b1-1’, ‘node-b1-2’, ‘node-b1-3’, ‘node-b1-5’,

‘node-c1-4’, ‘node-d3-3’, ‘node-k10’, ‘node-k11’, ‘node-k12’, ‘node-k15’]

radical.utils.host.get_hostlist_by_range(hoststring, prefix='', width=0)[source]

Convert string with host IDs into list of hosts.

Example: Cobalt RM would have host template as ‘nid%05d’

get_hostlist_by_range(‘1-3,5’, prefix=’nid’, width=5) => [‘nid00001’, ‘nid00002’, ‘nid00003’, ‘nid00005’]

radical.utils.host.get_hostname()[source]

Look up the hostname.

radical.utils.host.is_localhost(host: str) bool[source]

Returns True if given hostname is localhost, False otherwise.

1.2.13. radical.utils.ids module

radical.utils.ids.generate_id(prefix: str, mode='simple', ns=None)[source]

Generate a human readable, sequential ID for the given prefix.

The ID is by default very simple and thus very readable, but cannot be assumed to be globally unique – simple ID uniqueness is only guaranteed within the scope of one python instance.

If mode is set to the non-default type ID_UNIQUE, an attempt is made to generate readable but globally unique IDs – although the level of confidence for uniqueness is significantly smaller than for, say UUIDs.

The ID format per mode is: ID_SIMPLE = “%(prefix)s.%(counter)04d” ID_UNIQUE = “%(prefix)s.%(date)s.%(time)s.%(pid)06d.%(counter)04d” ID_PRIVATE = “%(prefix)s.%(host)s.%(user)s.%(days)06d.%(day_counter)04d” ID_UUID = “%(prefix)s.%(uuid)s”

Examples:

print(radical.utils.generate_id('item'))
print(radical.utils.generate_id('item'))
print(radical.utils.generate_id('item', mode=radical.utils.ID_SIMPLE))
print(radical.utils.generate_id('item', mode=radical.utils.ID_SIMPLE))
print(radical.utils.generate_id('item', mode=radical.utils.ID_UNIQUE))
print(radical.utils.generate_id('item', mode=radical.utils.ID_UNIQUE))
print(radical.utils.generate_id('item', mode=radical.utils.ID_PRIVATE))
print(radical.utils.generate_id('item', mode=radical.utils.ID_PRIVATE))
print(radical.utils.generate_id('item', mode=radical.utils.ID_UUID))

The above will generate the IDs:

item.0000 item.0001 item.0002 item.0003 item.2014.07.30.13.13.44.0000 item.2014.07.30.13.13.44.0001 item.cameo.merzky.018375.0000 item.cameo.merzky.018375.0001 item.23cacb7e-0b08-11e5-9f0f-08002716eaa9

where ‘cameo’ is the (short) hostname, ‘merzky’ is the username, and ‘18375’ is ‘days since epoch’. The last element, the counter is unique for each id type and item type, and restarts for each session (application process). In the last case though (ID_PRIVATE), the counter is reset for every new day, and can thus span multiple applications.

‘ns’ argument can be specified to a value such that unique IDs are created local to that namespace. For example, you can create a session and use the session ID as a namespace for all the IDs of the objects of that execution.

Example:

sid  = generate_id('re.session', ID_PRIVATE)
uid1 = generate_id('task.%(item_counter)04d', ID_CUSTOM, ns=sid)
uid2 = generate_id('task.%(item_counter)04d', ID_CUSTOM, ns=sid)
...

This will generate the following ids:

re.session.rivendell.vivek.017548.0001
task.0000
task.0001

where the task.* IDs are unique for the used sid namespace.

The namespaces are stored under `$RADICAL_BASE/.radical/utils/`. If RADICAL_BASE is not set, then $HOME is used.

Note that for docker containers, we try to avoid hostname / username clashes and will, for ID_PRIVATE, revert to ID_UUID.

radical.utils.ids.reset_id_counters(prefix=None, reset_all_others=False)[source]

1.2.14. radical.utils.json_io module

radical.utils.json_io.metric_expand(data)[source]

iterate through the given dictionary, and when encountering a key string of the form ru.XYZ or rp.ABC, expand them to their actually defined values. This the following dict:

{
    "ru.EVENT" : "foo"
}

becomes:

{
    2 : "foo"
}
radical.utils.json_io.parse_json(json_str, filter_comments=True)[source]

Comment lines in the form of

# some json data or text

are stripped from json before parsing

radical.utils.json_io.parse_json_str(json_str)[source]

same as parse_json, but converts unicode strings to simple strings

radical.utils.json_io.read_json(fname, filter_comments=True)[source]

Comments line in the form of

# some json data or text

are stripped from json before parsing:

import pprint pprint.pprint(read_json(“my_file.json”))

radical.utils.json_io.read_json_str(filename, filter_comments=True)[source]

same as read_json, but converts unicode strings and byte arrays to ASCII strings.

radical.utils.json_io.write_json(data, fname)[source]

thin wrapper around python’s json write, for consistency of interface

1.2.15. radical.utils.lease_manager module

class radical.utils.lease_manager.LeaseManager(max_pool_size=None, max_pool_wait=None, max_obj_age=None)[source]

Bases: object

This is a lease manager – it creates resource instances on demand and hands out leases to them. If for a given ID no object instance exists, one is created, locked and a lease is returned. If for an ID an object instance exists and is not leased, it is locked and returned. If one or more instances exist but all are in use already (leased), a new instance is created (up to MAX_POOL_SIZE which can be overwritten in the lease call). If that limit is reached, no objects are returned, and instead the lease call blocks until one of the existing objects gets released.

is_locked()
lease(pool_id, creator, args=None)[source]

For a given object identified, attempt to retrieve an existing object from the pool. If such a free (released) object is found, lock and refturn it. If such objects exist but all are in use, create a new one up to max_pool_size (default: 10). used, block untill it is freed. If that object does not exist, create it and proceed per above. return the object thusly created.

pool_idid of the pool to lease from. The pool ID essentially

determines the scope of validity for the managed objects – they form a namespace, where objects are shared when living under the same name space entry (the pool_id).

creator : method to use to create a new object instance

Example:
def creator() :

return Logger(name)

ret = lease_manager.lease(name, creator)

lock()
locked()
release(instance, delete=False)[source]

the given object is not needed right now – unlock it so that somebody else can lease it. This will not delete the object,

unlock(*args)

1.2.16. radical.utils.lockable module

radical.utils.lockable.Lockable(cls)[source]

This class decorator will add lock/unlock methods to the thusly decorated classes, which will be enacted via an also added threading.RLock member (self._rlock):

@Lockable
class A(object):

    def call(self):
        print 'locked: %s' % self._locked

The class instance can then be used like this:

a = A()
a.call()
a.lock()
a.call()
a.lock()
a.call()
a.unlock()
a.call()
with a:
  a.call()
a.call()
a.unlock()
a.call()

which will result in:

locked: 0
locked: 1
locked: 2
locked: 1
locked: 2
locked: 1
locked: 0

The class A can also internally use the lock, and can, for example, use:

@Lockable
class A(object):
    ...
    def work(self):
        with self:
            # locked code section
            ...

1.2.17. radical.utils.lockfile module

class radical.utils.lockfile.Lockfile(fname, *args, **kwargs)[source]

Bases: object

This class represents a lockfile. On calling open() (or entering a resource context with), the lockfile is exclusively opened and locked. The returned object can be used for read(), write() and seek() operations, and the lock is only released on close() (or when leaving the resource context).

If delete=True is specified on construction, then the lockfile is removed uppon release(), and the content is lost. Only for this mode is it possible to obtain the current lock owner when using get_owner() while waiting for a lock owned by another process or thread - the call will return ‘unknown’ otherwise.

Note that the file offset is not shared between owners of the lock file - a newly acquired lock on the file should prompt a seek to the desired file location (the initial offset is ‘0’).

Example:

with ru.Lockfile(fname) as lf0:
    lf0.write('test 0\n')

with ru.Lockfile(fname) as lf1:
    lf1.lseek(0, os.SEEK_SET)
    lf1.write('test 1\n')

with ru.Lockfile(fname) as lf2:
    lf2.lseek(0, os.SEEK_END)
    lf2.write('test 2\n')

    # raises a RuntimeError, as we won't be able to acquire the lock
    with ru.Lockfile(fname) as lf3:
        lf3.lseek(0, os.SEEK_END)
        lf3.write('test 3\n')

with open(fname, 'r') as fin:
    data = fin.read()
    assert data == 'test 1\ntest 2\n'

Example:

lf1 = ru.Lockfile(fname)
lf2 = ru.Lockfile(fname)

with lf1:
    lf1.write('test 0')

    with lf2(timeout=3):
        # we won't reach this, the `with` above will time out and raise
        lf2.write('test 0')

Implementation:

The implementation relies on cooperative locking: a process or thread which uses this API to lock a file will create a temporary file in the same directory (filename: .<filename>.<pid>.<counter> where counter is a process singleton). On lock acquisition, we will attempt to create a symbolic link to the file <filename>.lck, which will only succeed if no other process or thread holds that lock. On release, that symbolic gets removed; on object destruction, the temporary file gets removed.

The private file will contain the name of the owner. Any other process or thread can follow the .<filename>.lck symlink, open the link’s target file, and read the name of the owner. No attempt is made to avoid races between failure to acquire the lockfile and querying the current owner, so that information is not reliable, but intented to be an informative help for debugging and tracing purposes.

Example::

lf1 = ru.Lockfile(fname) lf2 = ru.Lockfile(fname)

with lf1:

lf1.write(‘test 0’)

try:

lf2.acquire() # timeout == None == 0.0

except RuntimeError:

print(‘lock is held by %s’ % lf2.get_owner())

acquire(timeout=None, owner=None)[source]

When the lock could not be acquired after timeout seconds, an TimeoutError is raised (i.e. an OSError with errno ETIME). When owner is specified and the lockfile was created with delete=True, then that string will be passed to any other method which tries to acquire this lockfile while it is locked (see get_owner()). If not specified, the owner string is set to ru.get_caller_name().

timeout:

< 0 : wait forever = 0 : try locking once and raise RuntimeError on failure > 0 : try for that many seconds, and raise TimeoutError on failure None: same as 0

get_owner()[source]

That name of the current owner of the lockfile can be retrieved by any other process or thread which attempts to lock the same file. This method returns None if the file is not currently locked.

Note: this method may race against lock acquisition / release, and the

information returned may be outdated.

locked() bool[source]
lseek(pos, how)[source]

Change the offset at which the next read or write method is applied. The arguments are interpreted as documented by os.lseek().

read(length)[source]

Read from the locked file at the current offset. This method will raise an RuntimeError when being called without the file being locked.

release()[source]

Release the lock on the file. If delete=True was specified on construction, then the file (and all owner information) are removed. Once released, all other threads/processes waiting in the acquire() method call will compete for the lock, and one of them will obtain it.

remove()[source]
seek(pos, how)[source]

Same as lseek()

write(data)[source]

Write to the locked file at the current offset. This method will raise an RuntimeError when being called without the file being locked.

1.2.18. radical.utils.logger module

class radical.utils.logger.ColorStreamHandler(target)[source]

Bases: StreamHandler

A colorized output SteamHandler

emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

class radical.utils.logger.FSHandler(target)[source]

Bases: FileHandler

class radical.utils.logger.Logger(name, ns=None, path=None, targets=None, level=None, debug=None, verbose=False)[source]

Bases: object

Logger documentation

close()[source]
property debug_level
property level
property name
property ns
property num_level
property path
property targets

1.2.19. radical.utils.misc module

radical.utils.misc.all_pairs(iterable, n)[source]

[ABCD] -> [AB], [AC], [AD], [BC], [BD], [CD]

radical.utils.misc.as_bytes(data: Any) Any[source]

Make a best-effort attempt to convert strings to bytes. Iterate through lists and dicts, but leave all other datatypes alone.

radical.utils.misc.as_list(data: Any) List[Any][source]

return non-list data into a list.

radical.utils.misc.as_string(data: Any) Any[source]

Make a best-effort attempt to convert bytes to strings. Iterate through lists and dicts, but leave all other datatypes alone.

radical.utils.misc.as_tuple(data: Any) Tuple[Any][source]

return non-tuple data into a tuple.

radical.utils.misc.cluster_list(iterable, n)[source]
s -> [ s0, s1, s2, … sn-1 ],

[ sn, sn+1, sn+2, … s2n-1 ], [ s2n, s2n+1, s2n+2, … s3n-1 ], …

radical.utils.misc.dockerized()[source]
radical.utils.misc.expand_env(data, env=None, ignore_missing=True)[source]

Expand the given data with environment variables from os.environ. If env is provided, use that dictionary for expansion instead.

data can be one of three types:

  • dictionary: expand_env is applied to all values of the dictionary

  • sequence : expand_env is applied to all elements of the sequence

  • string : expand_env is applied to the string itself

The method will alter dictionaries and iterables in place, but will return a copy of scalar strings, as it seems to be custom in Python. Other data types are silently ignored and not altered.

The replacement in strings is performed for the following variable specs:

assume export BAR=bar BIZ=biz:

${BAR} : foo_${BAR}_baz -> foo_bar_baz ${BAR:buz} : foo_${BAR:buz}_baz -> foo_bar_baz ${BAR:$BUZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz

assume unset BAR; export BIZ=biz, ignore_missing=True

${BAR} : foo_${BAR}_baz -> foo__baz ${BAR:buz} : foo_${BAR:buz}_baz -> foo_buz_baz ${BAR:$BUZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz

assume unset BAR; export BIZ=biz, ignore_missing=False

${BAR} : foo_${BAR}_baz -> ValueError(‘cannot expand $BAR’) ${BAR:buz} : foo_${BAR:buz}_baz -> foo_buz_baz ${BAR:$BIZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz

The method will also opportunistically convert strings to integers or floats if they are formatted that way and contain no other characters.

radical.utils.misc.get_base(ns, module=None)[source]

A generic version of get_radical_base which queries the base for any namespace ns

radical.utils.misc.get_env_ns(key, ns, default=None)[source]

get an environment setting within a namespace. For example.

get_env_ns(‘verbose’, ‘radical.pilot.umgr’),

will return the value of the first found env variable from the following sequence:

RADICAL_PILOT_UMGR_LOG_LVL RADICAL_PILOT_LOG_LVL RADICAL_LOG_LVL

or ‘None’ if none of the above is set. The given name and key are converted to upper case, dots are replaced by underscores.

Note that an environment variable set with

export RADICAL_LOG_LVL=

(ie. without an explicit, non-empty value) will be returned as an empty string.

radical.utils.misc.get_radical_base(module=None)[source]

Several parts of the RCT stack store state on the file system. This should usually be under $HOME/.radical - but that location is not always available or desireable. We interpret the env variable RADICAL_BASE, and fall back to pwd if neither that nor $HOME exists.

The optional module parameter will result in the respective subdir name to be appended. The resulting dir is created (if it does not exist), and the name is returned. Any . (dot) characters in module are replaced by slashes. Leading radical/ element is removed.

radical.utils.misc.get_size(obj, seen=None, strict=False)[source]
radical.utils.misc.is_bytes(data: Any) bool[source]

checks if the given data are of types bytes or bytearray

radical.utils.misc.is_list(data: Any) bool[source]

return True if given data are a list, False otherwise

radical.utils.misc.is_seq(data: Any) bool[source]

tests if the given data is a sequence (but not a string)

radical.utils.misc.is_string(data: Any) bool[source]

tests if the given data are a string type

radical.utils.misc.is_tuple(data: Any) bool[source]

return True if given data are a tuple, False otherwise

radical.utils.misc.mktar(tarname, fnames=None, data=None)[source]

Create a tarfile at the given tarname, and pack all files given in fnames into it, and also pack any data blobs.

fnames is expected to be list, where each element is either a string pointing to a file to be added under that name, or a tuple where the first element points again to the file to be packed, and the second element specifies the name under which the file should be packed into the archive. And OSError will be raised if the file does not exist.

data is expected to be a list of tuples, where the first element is a set of bytes comprising the data to be written into the archive, and the second element again specifies the name of the tarred file.

Note that this method always create bzip’ed tarfiles, but will never change the tarname to reflect that.

radical.utils.misc.name2env(name)[source]

convert a name of the for ‘radical.pilot’ to an env vare base named ‘RADICAL_PILOT’.

radical.utils.misc.noop(*args, **kwargs)[source]
radical.utils.misc.parse_file_staging_directives(directives)[source]

staging directives

[local_path] [operator] [remote_path]

local path:
  • interpreted as relative to the application’s working directory

  • must point to local storage (localhost)

remote path
  • interpreted as relative to the job’s working directory

operator :
  • > : stage to remote target, overwrite if exists

  • >> : stage to remote target, append if exists

  • < : stage to local target, overwrite if exists

  • << : stage to local target, append if exists

This method returns a tuple [src, tgt, op] for each given directive. This parsing is backward compatible with the simple staging directives used previously – any strings which do not contain staging operators will be interpreted as simple paths (identical for src and tgt), operation is set to ‘=’, which must be interpreted in the caller context.

radical.utils.misc.rec_makedir(target)[source]

recursive makedir which ignores errors if dir already exists

radical.utils.misc.round_to_base(value: float, base: int = 1) int[source]

This method expects an integer or float value, and will round it to any given integer base. For example:

1.5, 2 -> 2 3.5, 2 -> 4 4.5, 2 -> 4

11.5, 20 -> 20 23.5, 20 -> 20 34.5, 20 -> 40

The default base is ‘1’.

radical.utils.misc.round_upper_bound(value: Union[int, float]) int[source]

This method expects an integer or float value, and will return an integer upper bound suitable for example to define plot ranges. The upper bound is the smallest value larger than the input value which is a multiple of 1, 2 or 5 times the order of magnitude (10**x) of the value.

radical.utils.misc.ru_open(*args, **kwargs)[source]

ensure that we use UTF8 consistently throughout the stack

radical.utils.misc.script_2_func(fpath)[source]

This method accepts a single parameter fpath which is expected to point to a file containing a self-sufficient Python script. The script will be read and stored, and a function handle will be returned which, upon calling, will run that script in the currect Python interpreter`. It will be ensured that __name__ is set to __main__, and that any arguments passed to the callable are passed on as sys.argv. A single list argument is also allowed which is interpreted as argument list.

Example

my_func = ru.script_2_func(‘/tmp/my_script.py’) my_func(‘-f’, ‘foo’, ‘-b’, ‘bar’) my_func(‘-f foo -b bar’.split()) # equivalent

NOTE: calling the returned function handle will change sys.argv for the

current Python interpreter.

radical.utils.misc.time_diff(dt_abs, dt_stamp)[source]

return the time difference bewteen two datetime objects in seconds (incl. fractions). Exceptions (like on improper data types) fall through.

radical.utils.misc.time_stamp(spec)[source]
radical.utils.misc.to_type(data: Any) Union[str, int, float, Any][source]
radical.utils.misc.watch_condition(cond, target=None, timeout=None, interval=0.1)[source]

Watch a given condition (a callable) until it returns the target value, and return that value. Stop watching on timeout, in that case return None. The condition is tested approximately every ‘interval’ seconds.

radical.utils.misc.window(seq, n=2)[source]

Returns a sliding window (of width n) over data from the iterable” s -> (s0,s1,…s[n-1]), (s1,s2,…,sn), …

1.2.20. radical.utils.modules module

radical.utils.modules.find_module(name)[source]
radical.utils.modules.get_type(type_name: str) Optional[type][source]

get a type object from a type name (str)

radical.utils.modules.import_file(path)[source]
radical.utils.modules.import_module(name)[source]
radical.utils.modules.load_class(fpath: str, cname: str, ctype: Optional[Union[type, str]] = None) Optional[Any][source]

load class cname from a source file at location fpath and return it (the class, not an instance).

1.2.21. radical.utils.object_cache module

class radical.utils.object_cache.ObjectCache(*args, **kwargs)[source]

Bases: object

This is a singleton object caching class – it maintains a reference counted registry of existing objects.

get_obj(oid, creator, ns='global')[source]

For a given object id, attempt to retrieve an existing object. If that object exists, increase the reference counter, as there is now one more user for that object.

If that object does not exist, call the given creator, then register and return the object thusly created.

oid : id of the object to get from the cache. creator: method to use to create a new object instance

Example:
def creator():

return Logger(name)

ret = object_cache.get_object(name, creator)

is_locked()
lock()
locked()
rem_obj(obj, ns='global')[source]

For a given objects instance, decrease the refcounter as the caller stops using that object. Once the ref counter is ‘0’, remove all traces of the object – this should make that object eligable for Python’s garbage collection. Returns ‘True’ if the given object was indeed registered, ‘False’ otherwise.

The removal of the object is actually time-delayed. That way, we will keep the object around just a little longer, which provides caching semantics in the case of frequent creation/dstruction cycles.

unlock(*args)

1.2.22. radical.utils.plugin_manager module

class radical.utils.plugin_manager.PluginBase(descr: dict)[source]

Bases: object

This class serves as base class for any plugin managed by the plugin handler

property plugin_class: str
property plugin_description: str
property plugin_name: str
property plugin_type: str
property plugin_version: str
class radical.utils.plugin_manager.PluginManager(*args, **kwargs)[source]

Bases: object

The PluginManager manages plugins of specific types: the manager can search for installed plugins, list and describe plugins found, load plugins, and instantiate the plugin for further use.

Example

# try to load the ‘echo’ plugin from the ‘radical’ namespace plugin_type = ‘echo’

pm = radical.utils.PluginManager(‘radical’)

for plugin_name in pm.list(plugin_type):

print plugin_name print pm.describe(plugin_type, plugin_name)

default_plugin = pm.load(‘echo’, ‘default’)

default_plugin.init_plugin(‘world’) default_plugin.run() # prints ‘hello default world’

The plugins are expected to follow a specific naming and coding schema to be recognized by the plugin manager. The naming schema is:

[namespace].plugins.[ptype].plugin_[ptype]_[pname].py

i.e. for the example above: radical.plugins.echo.plugin_echo_default.py

The plugin code consists of two parts: a plugin description, and a plugin class. The description is a module level dictionary named PLUGIN_DESCRIPTION, the plugin class must have a class constructor __init__(*args, **kwargs) to create plugin instances for further use.

At this point, we leave the definition of the exact plugin signatures open, but expect that to be more strictly defined per plugin type in the future.

Note that the PluginManager construction is, at this point, not considered thread safe.

describe(ptype, pname)[source]

return a plugin details for a given plugin type / name pair

dump()[source]
dump_str()[source]
list(ptype)[source]

return a list of loaded plugins for a given plugin type

list_types()[source]

return a list of loaded plugin types

load(ptype, pname, *args, **kwargs)[source]

check if a plugin with given type and name was loaded, if so, instantiate its plugin class and return it.

load_plugins(namespace, log)[source]

Load all plugins for the given namespace. Previously loaded plugins are overloaded.

register(ptype, pname, pinfo)[source]
retrieve(ptype, pname)[source]
seen(pfile)[source]

1.2.23. radical.utils.poll module

class radical.utils.poll.Poller(log=None)[source]

Bases: object

This object will accept a set of things we can call select.select() on, which is basically anything which has a file desciptor exposed via a fileno() member method, or any integers which directly represent an OS level file escriptor. This class implements most of the interface as defined by Python’s select.Poller class, which is created by calling select.poll(). This implementation is thread-safe.

NOTE: Poller.poll() returns the original object handle instead of the

polled file descriptor.

NOTE: Support for POLLPRI and POLLNVAL selection is not implemented,

and will never be returned on poll().

close()[source]
modify(fd, eventmask=None)[source]
poll(timeout=None)[source]
register(fd, eventmask=None)[source]
unregister(fd, _assert_existence=True)[source]
radical.utils.poll.poll(log=None)[source]

1.2.24. radical.utils.profile module

class radical.utils.profile.Profiler(name, ns=None, path=None)[source]

Bases: object

This class is really just a persistent file handle with a convenience call (prof()) to write lines timestamped events. Any profiling intelligence must be applied when reading and evaluating the created profiles. the following fields are defined for each event:

time : mandatory, float, time in seconds since epoch event: mandatory, string, short, unique name of event to be recorded comp : optional, string, name of component where the event originates tid : optional, string, current thread id (name) uid : optional, string, ID of entity involved (when available) state: optional, string, state of entity involved, if applicable msg : optional, string, free for message describing the event

Strings MUST NOT contain commas. Otherwise they are encouraged to be formed as [a-z][0-9a-z_.]*’. `msg are free-form, but the inhibition of comma holds. We propose to limit the sum of strings to about 256 characters - this will guarantee atomic writes on most OS’s, w/o additional locking overheads. Less than 100 charcters makes the profiles almost human-readable.

The profile is enabled by setting environment variables. For a profiler named radical.utils, the following env variables will be evaluated:

RADICAL_UTILS_PROFILE RADICAL_PROFILE

If either is present in the environemnt, the profile is enabled (the value of the setting is ignored).

close()[source]
disable()[source]
enable()[source]
property enabled
fields = ['time', 'event', 'comp', 'thread', 'uid', 'state', 'msg']
flush(verbose=False)[source]
property path
prof(event, uid=None, state=None, msg=None, ts=None, comp=None, tid=None)[source]
timestamp()[source]
radical.utils.profile.clean_profile(profile, sid, state_final=None, state_canceled=None)[source]

This method will prepare a profile for consumption in radical.analytics. It performs the following actions:

  • makes sure all events have a ename entry

  • remove all state transitions to CANCELLED if a different final state is encountered for the same uid

  • assignes the session uid to all events without uid

  • makes sure that state transitions have an ename set to state

radical.utils.profile.combine_profiles(profs)[source]

We merge all profiles and sort by time.

This routine expects all profiles to have a synchronization time stamp. Two kinds of sync timestamps are supported: absolute (sync_abs) and relative (sync_rel).

Time syncing is done based on ‘sync_abs’ timestamps. We expect one such absolute timestamp to be available per host (the first profile entry will contain host information). All timestamps from the same host will be corrected by the respectively determined NTP offset. We define an ‘accuracy’ measure which is the maximum difference of clock correction offsets across all hosts.

The sync_rel timestamps are expected to occur in pairs, one for a profile with no other sync timestamp, and one profile which has a sync_abs timestamp. In that case, the time correction from the latter is transfered to the former (the two time stamps are considered to have been written at the exact same time).

The method returnes the combined profile and accuracy, as tuple.

radical.utils.profile.event_to_label(event)[source]
radical.utils.profile.read_profiles(profiles, sid=None, efilter=None)[source]

We read all profiles as CSV files and parse them. For each profile, we back-calculate global time (epoch) from the synch timestamps.

The caller can provide a filter of the following structure:

filter = {ru.EVENT: ['event 1', 'event 2', ...],
          ru.MSG  : ['msg 1',   'msg 2',   ...],
          ...
         }

Filters apply on substring matches!

radical.utils.profile.timestamp()[source]

1.2.25. radical.utils.registry module

1.2.26. radical.utils.reporter module

class radical.utils.reporter.Reporter(name, ns=None, path=None, targets=None, enabled=None)[source]

Bases: object

COLORS = {'black': '\x1b[30m', 'blue': '\x1b[34m', 'cyan': '\x1b[36m', 'darkgray': '\x1b[90m', 'green': '\x1b[32m', 'lightblue': '\x1b[94m', 'lightcyan': '\x1b[96m', 'lightgray': '\x1b[37m', 'lightgreen': '\x1b[92m', 'lightmagenta': '\x1b[95m', 'lightred': '\x1b[91m', 'lightyellow': '\x1b[93m', 'magenta': '\x1b[35m', 'red': '\x1b[31m', 'reset': '\x1b[39m', 'white': '\x1b[97m', 'yellow': '\x1b[33m'}
DOTTED = '.'
DOUBLE = '='
EMPTY = ''
ERROR = 'lightred'
HASHED = '#'
HEADER = 'bold lightyellow'
IDLE = 'lightwhite'
INFO = 'lightblue'
MODS = {'': '', 'blink': '\x1b[5m', 'bold': '\x1b[1m', 'inverse': '\x1b[7m', 'reset': '\x1b[0m', 'underline': '\x1b[4m'}
OK = 'lightgreen'
PROGRESS = 'lightwhite'
SINGLE = '-'
TITLE = 'bold lightblue'
WARN = 'lightyellow'
error(msg='')[source]
exit(msg='', exit_code=0)[source]
header(msg='')[source]
idle(c=None, mode=None, color=None, idle_id=None)[source]
info(msg='')[source]
ok(msg='')[source]
plain(msg='')[source]
progress(msg=None)[source]
progress_done()[source]
progress_tgt(tgt=None, label=None)[source]
set_style(which, color=None, style=None, segment=None)[source]
title(title)[source]
warn(msg='')[source]

1.2.27. radical.utils.ru_regex module

class radical.utils.ru_regex.ReString(*args, **kw)[source]

Bases: str

This is a string class which supports simplified regular expression matching. It is not thought that the regex language or expressions are simplified, but rather that the invokation of the matching is simple, as is the handling of the match results:

txt = ReString('The quick brown fox jumps over the lazy dog')

# the '//' operator is overloaded to match against a regular
# expression. The result is a `ReSult` instance, which allows simple
# access to the matches
with txt // r'(\s.u)(?P<x>.*?j\S+)' as res:
    if res: print 'Matched!'                  # boolean check
    print 'res     : '%%s' ' %% res           # list of results
    print 'res[0]  : '%%s' ' %% res[0]        # index by number ...
    print 'res[1]  : '%%s' ' %% res[1]        # ... for all matches
    print 'res['x']: '%%s' ' %% res['x']      # index by match name
    print 'res.x   : '%%s' ' %% res.x         # ...   as properties
    for i, r in enumerate(res):
        print 'res %%d   : '%%s' ' %% (i, r)  # matches as iterable

    assert len(res) == 2                     # number of matches
    assert res == [' qu', 'ick brown fox jumps']  # compare to list

if txt // r'(rabbit)':                        # simple use in if / elif
    res = txt.get()                           # get ReSult of last match

elif txt // r'((?:\s).{12,15}?(\S+))':      # full Python regex slang
    res = txt.get()

else:
    print 'no match'
get(key=None, default=None)[source]
class radical.utils.ru_regex.ReSult(result=None)[source]

Bases: object

This class is a container around a regular expression match, which provides some more conventient access methods, boolean tests, etc.

We only handle base strings, not unicode strings!

get(key, default=None)[source]

get is supported for default based dict access,

start(idx)[source]

1.2.28. radical.utils.shell module

radical.utils.shell.sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None, cwd=None)[source]

call a shell command, return [stdout, stderr, retval].

radical.utils.shell.sh_callout_async(cmd, stdin=True, stdout=True, stderr=True, shell=False, env=None, cwd=None)[source]

Run a command, and capture stdout/stderr if so flagged. The call will return an PROC object instance on which the captured output can be retrieved line by line (I/O is line buffered). When the process is done, a None will be returned on the I/O queues.

Line breaks are stripped.

stdout/stderr: True [default], False, string
  • False : discard I/O

  • True : capture I/O as queue [default]

  • string: capture I/O as queue, also write to named file

shell: True, False [default]
  • pass to popen

cwd: string
  • working directory for command to run in

PROC:
  • PROC.stdout : queue.Queue instance delivering stdout lines

  • PROC.stderr : queue.Queue instance delivering stderr lines

  • PROC.state : ru.RUNNING, ru.DONE, ru.FAILED

  • PROC.rc : returncode (None while ru.RUNNING)

  • PROC.stdout_filename: name of stdout file (when available)

  • PROC.stderr_filename: name of stderr file (when available)

radical.utils.shell.sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None, cwd=None)[source]

call a shell command in the background. Do not attempt to pipe STDOUT/ERR, but only support writing to named files.

radical.utils.shell.sh_quote(data)[source]

quote a string and wrap it in double quotes so that it can passed to a POSIX shell.

Examples

foo -> “foo” foo”bar -> “foo"bar” foo"bar -> “foo\"bar” $FOO’$BAR -> “$FOO’$BAR”

NOTE: this method does not attempt to strip out backticks or other code

execution mechanisms from the string.

1.2.29. radical.utils.signatures module

radical.utils.signatures.anything(*args)[source]
radical.utils.signatures.by_regex(*args)[source]
radical.utils.signatures.dict_of(*args)[source]
radical.utils.signatures.list_of(*args)[source]
radical.utils.signatures.nothing

alias of None

radical.utils.signatures.one_of(*args)[source]
radical.utils.signatures.optional(*args)[source]
radical.utils.signatures.returns(sometype)[source]

Return type checking decorator

radical.utils.signatures.set_of(*args)[source]
radical.utils.signatures.takes(*args, **kwargs)[source]

Method signature checking decorator

radical.utils.signatures.tuple_of(*args)[source]
radical.utils.signatures.with_attr(*args)[source]

1.2.30. radical.utils.singleton module

class radical.utils.singleton.Singleton[source]

Bases: type

A metaclass to ‘tag’ other classes as singleton:

class MyClass(BaseClass):
    __metaclass__ = Singleton

1.2.31. radical.utils.stack module

radical.utils.stack.stack(ns='radical')[source]

returns a dict with information about the currently active python interpreter and all radical modules (incl. version details)

1.2.32. radical.utils.testing module

class radical.utils.testing.TestConfig(ns, cfg_name=None, cfg_section=None)[source]

Bases: dict

This class represents a set of configurations for a test suite. It usually contains parameters which tweak the tests toward specific environments and test cases, such as specific remote systems to interact with, specific backends to use, etc.

The class expects test configurations to be deployed in the current Python installation (virtualenv, condaenv, etc.). The namespace passed on construction is used to search for configurations under $namespace/configs/tests/, configuration files are expected to follow the naming scheme test_$cfg_name.json. The json file is read and this object then behaves like a dictionary for those json data, but all top level keys in the dictionary are also exposed as object attributes.

add_config(ns, cfg_name, cfg_section)[source]

To the current config dict contents, add the content of the specified additional config file

radical.utils.testing.add_test_config(ns, cfg_name, cfg_section=None)[source]

To an existing active config, add the contents of an additional test configuration.

radical.utils.testing.get_test_config()[source]

If a test config is currently set, return it. If not, attempt to load atest config for the given namespace, set is as active, and return it.

radical.utils.testing.set_test_config(ns, cfg_name=None, cfg_section=None)[source]

Set a test config. All subsequent calls to get_test_config() will retrieve the same configuration, until a new config is explicitly set with this method again.

radical.utils.testing.sys_exit(ret)[source]

call pytest.exit(ret) when running under pytest, sys.exit(ret) otherwise

1.2.33. radical.utils.threads module

exception radical.utils.threads.SignalRaised(msg, signum=None)[source]

Bases: SystemExit

exception radical.utils.threads.ThreadExit[source]

Bases: SystemExit

radical.utils.threads.cancel_main_thread(signame=None, once=False)[source]

This method will call thread.interrupt_main from any calling subthread. That will cause a ‘KeyboardInterrupt’ exception in the main thread. This can be excepted via except KeyboardInterrupt

The main thread MUST NOT have a SIGINT signal handler installed (other than the default handler or SIGIGN), otherwise this call will cause an exception in the core python signal handling (see http://bugs.python.org/issue23395).

The thread will exit after this, via sys.exit(0), and can then be joined from the main thread.

When being called from the main thread, no interrupt will be generated, but sys.exit() will still be called. This can be excepted in the code via except SystemExit.

Another way to avoid the SIGINT problem is to send a different signal to the main thread. We do so if signal is specified.

After sending the signal, any sub-thread will call sys.exit(), and thus finish. We leave it to the main thread though to decide if it will exit at this point or not. Either way, it will have to handle the signal first.

If once is set to True, we will send the given signal at most once. This will mitigate races between multiple error causes, specifically during finalization.

radical.utils.threads.get_signal_by_name(signame)[source]

Translate a signal name into the respective signal number. If signame is not found to be a valid signal name, this method will raise a KeyError exception. Lookup is case insensitive.

radical.utils.threads.get_thread_id()[source]
radical.utils.threads.get_thread_name()[source]
radical.utils.threads.gettid()[source]

Python is not able to give us the native thread ID. We thus use a syscall to do so. Since this is not portable, we fall back to None in case of error (Hi MacOS).

radical.utils.threads.is_main_thread(t=None)[source]
radical.utils.threads.is_this_thread(t)[source]

check if the given thread (type: threading.Thread) is the same as the current thread of execution.

radical.utils.threads.main_thread()[source]

return a handle to the main thread of execution in this process

radical.utils.threads.raise_in_thread(e=None, tname=None, tident=None)[source]

This method uses an internal Python function to inject an exception ‘e’ into any given thread. That thread can be specified by its name (‘tname’) or thread id (‘tid’). If not specified, the exception is sent to the MainThread.

The target thread will receive the exception with some delay. More specifically, it needs to call up to 100 op codes before the exception is evaluated and raised. The thread interruption can thus be delayed significantly, like when the thread sleeps.

The default exception raised is ‘radical.utils.ThreadExit’ which inherits from ‘SystemExit’.

NOTE: this is not reliable: the exception is not raised immediately, but is

scheduled for raising at some point, ie. in general after about 100 opcodes (sys.getcheckinterval()). Depending on when exactly the exception is finally raised, the interpreter might silently swallow it, if that happens in a generic try/except clause. Those exist in the Python core, even if discouraged by some PEP or the other.

See https://bugs.python.org/issue1779233

NOTE: we can only raise exception types, not exception instances

See https://bugs.python.org/issue1538556

Example:

def sub():
    time.sleep(0.1)
    ru.raise_in_thread()

try:
    t = mt.Thread(target=sub)
    t.start()

    while True:
        time.sleep(0.01)

except ru.ThreadExit:  print 'thread exit'
except Exception as e: print 'except: %s' % e
except SystemExit:     print 'exit'
else:                  print 'unexcepted'
finally:               t.join()
radical.utils.threads.set_cancellation_handler()[source]
radical.utils.threads.this_thread()[source]

return a handle to the current thread of execution

radical.utils.threads.unset_cancellation_handler()[source]

1.2.34. radical.utils.timing module

class radical.utils.timing.Time(tick=0.01, speed=1.0)[source]

Bases: object

This is a timing class that allows to simulate different types of clocks.

Parameters: tick: This is the resolution of the clock. speed: This is the speed of the clock.

advance(amount)[source]

Advance the clock with a specific amount of time units

sleep(amount)[source]

Sleeps for a specific amount of time units. Actual time spent is equal to amount divided by speed.

stop()[source]

Stops the clock

time()[source]

Returns the current value of the clock

radical.utils.timing.dt_epoch(dt)[source]

convert a given datetime instance into seconds since EPOCH.

radical.utils.timing.epoch(data, pattern)[source]

convert a given datetime string into seconds since EPOCH. The string is parsed as defined by POSIX’s strptime().

radical.utils.timing.timed_method(func)[source]

This class decorator will decorate all public class methods with a timing function. That will time the call invocation, and store the respective data in a private class dict ‘__timing’. Additionally, it will add the class method ‘_timing_last()’, which will return the tuple [method name, method timer] for the last timed invocation of a class method.

1.2.35. radical.utils.tracer module

radical.utils.tracer.trace(namespace='radical', log=None)[source]
radical.utils.tracer.untrace()[source]

1.2.36. radical.utils.typeddict module

exception radical.utils.typeddict.TDError(msg=None, level=1)[source]

Bases: TDErrorMixin, Exception

class radical.utils.typeddict.TDErrorMixin(msg=None, level=1)[source]

Bases: object

exception radical.utils.typeddict.TDKeyError(msg=None, level=1)[source]

Bases: TDErrorMixin, KeyError

exception radical.utils.typeddict.TDTypeError(msg=None, level=1)[source]

Bases: TDErrorMixin, TypeError

exception radical.utils.typeddict.TDValueError(msg=None, level=1)[source]

Bases: TDErrorMixin, ValueError

class radical.utils.typeddict.TypedDict(from_dict=None, **kwargs)[source]

Bases: dict

as_dict()[source]
clear() None.  Remove all items from D.[source]
get(key, default=None)[source]

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items[source]
keys() a set-like object providing a view on D's keys[source]
pop(k[, d]) v, remove specified key and return the corresponding value.[source]

If key is not found, d is returned if given, otherwise KeyError is raised

popitem() (k, v), remove and return some (key, value) pair as a[source]

2-tuple; but raise KeyError if D is empty.

setdefault(key, default=None)[source]

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update(other)[source]

Overload dict.update(): the call is used to ensure that sub-dicts are instantiated as their respective TypedDict-inheriting class types, if so specified by the respective schema.

So, if the schema contains:

{
  ...
  'foo' : BarTypedDict
  'foos': [BarTypedDict]
  ...
}

where BarTypedDict is a valid type in the scope of the schema definition, which inherits from ru.TypedDict, then update() will ensure that the value for key foo is indeed of type ru.TypedDict. An error will be raised if (a) BarTypedDict does not have a single parameter constructor like ru.TypedDict, or (b) the data value for foo cannot be used as from_dict parameter to the BarTypedDict constructor.

values() an object providing a view on D's values[source]
verify()[source]
class radical.utils.typeddict.TypedDictMeta(name, bases, namespace)[source]

Bases: type

radical.utils.typeddict.as_dict(src)[source]

Iterate given object, apply as_dict() to all typed values, and return the result (effectively a shallow copy).

1.2.37. radical.utils.url module

class radical.utils.url.Url(url_in=None)[source]

Bases: object

The RADICAL Url class.

URLs are used in several places in the RADICAL software projects: to specify service endpoints for job submission or resource management, for file or directory locations, etc.

The URL class is designed to simplify URL management for these purposes – it allows to manipulate individual URL elements, while ensuring that the resulting URL is well formatted. Example:

# create a URL from a string
location = radical.utils.Url('file://localhost/tmp/file.dat')
d = radical.utils.filesystem.Directory(location)

A URL consists of the following components (where one ore more can be ‘None’):

<scheme>://<user>:<pass>@<host>:<port>/<path>?<query>#<fragment>

Each of these components can be accessed via its property or alternatively, via getter / setter methods. Example:

url = ru.Url('scheme://pass:user@host:port/path?query#fragment')

# modify the scheme
url.scheme = 'anotherscheme'

# above is equivalent with
url.set_scheme('anotherscheme')
property fragment

The fragment component.

get_fragment()[source]

Return the URL ‘fragment’ component.

get_host()[source]

Return the URL ‘hostname’ component.

get_password()[source]

Return the URL ‘username’ component.

get_path()[source]

Return the URL ‘path’ component.

get_port()[source]

Return the URL ‘port’ component.

get_query()[source]

Return the URL ‘query’ component.

get_scheme()[source]

Return the URL ‘scheme’ component.

get_username()[source]

Return the URL ‘username’ component.

property host

The hostname component.

normpath(path)[source]
property password

The password component.

property path

The path component.

property port

The port component.

property query

The query component.

property schema

The scheme component.

property scheme

Return the URL ‘scheme’ component.

set_fragment(fragment)[source]

Set the URL ‘fragment’ component.

Parameters:

fragment (str) – The new fragment

set_host(hostname)[source]

Set the ‘hostname’ component.

Parameters:

hostname (str) – The new hostname

set_password(password)[source]

Set the URL ‘password’ component.

Parameters:

password (str) – The new password

set_path(path)[source]

Set the URL ‘path’ component.

Parameters:

path (str) – The new path

set_port(port)[source]

Set the URL ‘port’ component.

Parameters:

port (int) – The new port

set_query(query)[source]

Set the URL ‘query’ component.

Parameters:

query (str) – The new query

set_scheme(scheme)[source]

Set the URL ‘scheme’ component.

Parameters:

scheme (str) – The new scheme

set_username(username)[source]

Set the URL ‘username’ component.

Parameters:

username (str) – The new username

property username

The username component.

1.2.38. radical.utils.which module

radical.utils.which.which(names)[source]

Takes a (list of) name(s) and looks for an executable in the path. It will return the first match found, or None if none of the given names is found.