1. radical.utils package¶
1.1. Subpackages¶
- 1.1.1. radical.utils.atfork package
- 1.1.2. radical.utils.contrib package
- 1.1.3. utils.plugins package
- 1.1.4. utils.queue_network package
- 1.1.5. utils.scheduler package
- 1.1.6. utils.zmq package
- 1.1.6.1. Submodules
- 1.1.6.1.1. radical.utils.zmq.bridge module
- 1.1.6.1.2. radical.utils.zmq.client module
- 1.1.6.1.3. radical.utils.zmq.pubsub module
- 1.1.6.1.4. radical.utils.zmq.queue module
- 1.1.6.1.5. radical.utils.zmq.registry module
- 1.1.6.1.6. radical.utils.zmq.server module
- 1.1.6.1.7. radical.utils.zmq.utils module
- 1.1.6.1. Submodules
1.2. Submodules¶
1.2.1. radical.utils.algorithms module¶
- radical.utils.algorithms.collapse_ranges(ranges)[source]¶
Given be a set of ranges (as a set of pairs of floats [start, end] with ‘start <= end’. This algorithm will then collapse that set into the smallest possible set of ranges which cover the same, but not more nor less, of the domain (floats).
We first sort the ranges by their starting point. We then start with the range with the smallest starting point [start_1, end_1], and compare to the next following range [start_2, end_2], where we now know that start_1 <= start_2. We have now two cases:
a) when start_2 <= end_1, then the ranges overlap, and we collapse them into range_1: range_1 = [start_1, max[end_1, end_2]
b) when start_2 > end_1, then ranges don’t overlap. Importantly, none of the other later ranges can ever overlap range_1, because there start points are even larger. So we move range_1 to the set of final ranges, and restart the algorithm with range_2 being the smallest one.
Termination condition is if only one range is left – it is also moved to the list of final ranges then, and that list is returned.
- radical.utils.algorithms.in_range(value, ranges)[source]¶
checks if a float value value is in any of the given ranges, which are assumed to be a list of tuples (or a single tuple) of start end end points (floats). Returns True or False.
- radical.utils.algorithms.lazy_bisect(data, check, on_ok=None, on_nok=None, on_fail=None, on_skip=None, ratio=0.5, log=None)[source]¶
Find the next potentially good candidate element in a presumably ordered list data (from smallest to largest). The given callable check should accept a list element for good/bad checks, and return True for good (small enough), False otherwise (too large).
Note that this is only a bisect if ratio is set at 0.5 - otherwise the search will back off through the list faster (1.0 >= ratio > 0.5) or slower (0.5 > ratio >= 0.0) than bisect.
The method returns three lists, one list with elements of data which were checked good, and one list of elements checked as bad. Checks which raise an exception are also handled as bad, any element on which this happens will be returned in the third list (fail). Note that this last list (fail) will consist of tuples: [element, error] where error is the exception’s error message.
on_ok will be called on tasks where check() returned good, on_nok where check() returned bad, on_fail() where check() raised an exceptionm and on_skip where bisect decided that no check() invocation is needed. The respective data item is the only argument to the calls.
- radical.utils.algorithms.partition(space, nparts)[source]¶
create balanced partitions from an iterable space. This method preserves contiguous order.
kudos: http://code.activestate.com/recipes/425397-split-a-list-into-roughly-equal-sized-pieces/
- radical.utils.algorithms.range_concurrency(ranges)[source]¶
given a set of uncollapsed ranges, return a series which describes the range-concurrency at any point.
Example
- Ranges:
- [—-] [———] [——–]
[——–] [-] [————] [–]
- Concurrency:
1 2 1 2 1 0 1 0 1 2 1 0 1 0
Returned is a sorted list of tuples where the first entry defines at what range value the concurrency changed, and the second value defines to what the concurrency count changed at that point.
You could consider the ranges to be of type [time_start, time_end], and the return would be a list of [timestamp, concurrency], if that helps – but the algorithm does not make any assumption on the data type, really, only that the values can be sorted.
1.2.2. radical.utils.config module¶
Config file parsing.
We provide a json based config file parser with following properties
system config files will be merged with user configs (if those exist)
python style comments are filtered out before parsing
after parsing, ${ABC:default}-style values are set or expanded via os.environ
the returned class exposes settings as dicts or attributes cfg[‘foo’][‘bar’] == cfg.foo.bar
1.2.2.1. Config Names and Locations¶
We assume two basic locations for config files: one is installed within the scope of a Python module, the other one is under user control, and usually in the users home directory. The config reader accepts the following parameters to derive the exact locations:
module: name of module under which the config is installed
path : config file path relative to the module home
name : config file name relative to the path
The module string is interpreted as follows:
module = ‘module’ module_path = radical.utils.debug.find_module(module) sys_config_dir = ‘%s/configs’ % module_path usr_base_dir = os.getenv(‘RADICAL_CONFIG_USER_DIR’) or os.getenv(‘HOME’) if usr_base_dir:
usr_config_dir = ‘%s/.%s/configs’ % (usr_base_dir, module.replace(‘.’, ‘/’))
so the location of the module’s __init__.py is used to derive the location of the installed system config files, and the module name is used to derive the location of the user provided config files.
For example, the module radical.utils will have the following config dirs:
sys_config_dir = /tmp/ve/lib/python3.7/site-packages/radical/utils/configs usr_config_dir = /home/merzky/.radical/utils/configs
After loading the system level config file, any existing user level config file is merged into it, via
radical.utils.dict_merge(user_cfg, system_cfg, mode=’overwrite’)
so that the user config settings supersede the system config settings.
Both path and name specifiers can contain * as wildcard, which is then interpreted as by glob(). If that wildcard exists, then all matching config files are read into one configuration dict, where each root key is set to the value the ‘*’ expands to (minus the .json extension).
For example, the name radical.pilot.resource_* with the following config files:
/tmp/ve/[…]/radical/pilot/configs/resource_xsede.json /tmp/ve/[…]/radical/pilot/configs/resource_ncsa.json
will result in a config dict like:
{'xsede' : { 'foo' : 'bar' },
'ncsa' : { 'fiz' : 'baz' }}
1.2.2.2. Queries¶
We support two types of queries on the resulting parsed configs:
dict like queries
the _query(key) method returns a single value, or ‘None’ if not found.
In the latter _query() case, the key can be specified as dot-separated path, so that the following two snippets are equivalent (assuming that a foo.bar section exists):
val = cfg[‘foo’][‘bar’].get(‘baz’) val = cfg._query(‘foo.bar.baz’)
1.2.2.3. Environment¶
Towards os.environ completion, we support the following syntax in all string values (not keys):
${RADICAL_UTILS_ENV:default_value}
which will be replaced by
os.environ.get(‘RADICAL_UTILS_ENV’, ‘default_value’)
The default value is optional, an empty string is used if no default value is given. Env evaluation is only performed at time of parsing, not at time of query. RU attempts to convert env variables to float and int - if that fails, values are stored as strings.
1.2.2.4. Validation¶
It probably makes sense to switch to a json schema validator at some point, see for example https://pypi.python.org/pypi/json-schema-validator. For now this implementation remains schema-less, and will thus, in a very pythonesque way, only fail once values are queried or used.
1.2.2.5. Implementation¶
This implementation is based on typed dictionaries which are accessed as TypedDict’ed object hierarchy.
- class radical.utils.config.Config(module=None, category=None, name=None, cfg=None, from_dict=None, path=None, expand=True, env=None, _internal=False)[source]¶
Bases:
TypedDict
Contents of a config (json) file from a module’s config tree.
- module¶
used to determine the module’s config file location (default: radical.utils)
- category¶
name of config to be loaded from module’s config path
- name¶
specify a specific configuration to be used
- path¶
path to app config json to be used for initialization
- cfg¶
application config dict to be used for initialization
- from_dict¶
alias for cfg, to satisfy base class constructor
- expand¶
enable / disable environment var expansion (default: True)
- env¶
environment dictionary to be used for expansion (default: os.environ)
The naming of config files follows this rule:
<category>_<name>.json
For example, if the following is used in a system python installation:
ru.Config(‘radical.pilot’, category=’session’, name=’minimal’)
it would attempt to load (depending on system details):
/usr/lib/python3/site-packages/ radical/pilot/configs/session_minimal.json
- NOTE: Keys containing an underscore are not exposed via the API.
Keys containing dots are split and interpreted as paths in the configuration hierarchy.
- class radical.utils.config.DefaultConfig(*args, **kwargs)[source]¶
Bases:
Config
The settings in this default config are, unsurprisingly, used as default values for various RU classes and methods, as for example for log file locations, log levels, profile locations, etc.
- class radical.utils.config.DefaultConfigMeta(name, bases, namespace)[source]¶
Bases:
TypedDictMeta
,Singleton
Metaclass inherited from TypedDict’s metaclass along with Singleton to avoid metaclass conflict (the metaclass of a derived class must be a subclass of the metaclasses of all its bases). Singleton metaclass allows to have only one instance of a corresponding class.
1.2.3. radical.utils.constants module¶
1.2.4. radical.utils.daemon module¶
- class radical.utils.daemon.Daemon(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', target=None, args=None)[source]¶
Bases:
object
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
- restart(pid=None)[source]¶
Stop the daemon and restart it. If a pid is passed, then stop the daeon process with that pid and replace it with a daemon process represented by this class instance. It is the caller’s responsibility to ensure that this makes semantic sense.
This method returns the pid of the new daemon process (see start()).
- radical.utils.daemon.daemonize(main=None, args=None, stdout=None, stderr=None, stdin=None, timeout=None)[source]¶
Create a daemon process and run the given method in it. For that, do the UNIX double-fork magic, see Stevens’ “Advanced Programming in the UNIX Environment” for details (ISBN 0201563177)
The method will return the PID of the spawned damon process, or None on failure to create it. stdout, stderr, stdin file names are interpreted by the daemon process, and are expected to be path names which can be opened and read / written in their respective capabilities.
1.2.5. radical.utils.debug module¶
- class radical.utils.debug.DebugHelper(name=None, info=None)[source]¶
Bases:
object
When instantiated, and when ‘RADICAL_DEBUG’ is set in the environment, this class will install a signal handler for SIGUSR1. When that signal is received, a stacktrace for all threads is printed to stdout. We also check if SIGINFO is available, which is generally bound to CTRL-T.
Additionally, a call to ‘dh.fs_block(info=None)’ will create a file system based barrier: it will create a unique file in /tmp/ (based on ‘name’ if given), and dump the stack trace and any ‘info’ into it. It then waits until that file has changed (touched or removed etc), and then returns. The wait is a simple pull based ‘os.stat()’ (once per sec).
- radical.utils.debug.add_snippet_path(path)[source]¶
add a path to the search path for dynamically loaded python snippets (see ru.get_snippet()).
- radical.utils.debug.get_caller_name(skip=2)[source]¶
Get a name of a caller in the format module.class.method
skip specifies how many levels of stack to skip while getting caller name. skip=1 means ‘who calls me’, skip=2 ‘who calls my caller’ etc.
An empty string is returned if skipped levels exceed stack height
Kudos: http://stackoverflow.com/questions/2654113/ python-how-to-get-the-callers-method-name-in-the-called-method
- radical.utils.debug.get_snippet(sid)[source]¶
RU exposes small python snippets for runtime code insertion. The usage is intended as follows:
a programmer implements a class
for some experiment or test, that class’s behavior must be controled at runtime.
in all places where such an adaptation is expected to take place, the programmer inserts a hook like this:
exec(ru.get_snippet(‘my_class.init_hook’))
this will trigger RU to search for python files of the name my_class.init_hook.py in $RADICAL_BASE/.radical/snippets/ (default), and return their content for injection.
The snippet search path can be extended by calling.
ru.add_snippet_path(path)
The RADICAL_DEBUG environment variable needs to be set for this method to do anything. A snippet can use the following literal strinfgs which will be replaced by their actual values:
‘###SNIPPET_FILE###’ - filename from which snippet was loaded ‘###SNIPPET_PATH###’ - path in which the snippet file is located ‘###SNIPPET_ID###’ - the sid string used to identify the snippet
- radical.utils.debug.print_stacktraces(signum=None, sigframe=None)[source]¶
signum, sigframe exist to satisfy signal handler signature requirements
- radical.utils.debug.raise_on(tag, log=None, msg=None)[source]¶
The purpose of this method is to artificially trigger error conditions for testing purposes, for example when handling the n’th unit, getting the n’th heartbeat signal, etc.
The tag parameter is interpreted as follows: on the n’th invocation of this method with any given tag, an exception is raised, and the counter for that tag is reset.
The limit n is set via an environment variable RU_RAISE_ON_<tag>, with tag in upper casing. The environment will only be inspected during the first invocation of the method with any given tag. The tag counter is process-local, but is shared amongst threads of that process.
1.2.6. radical.utils.dict_mixin module¶
- class radical.utils.dict_mixin.DictMixin[source]¶
Bases:
dict
Mixin defining all dictionary methods for classes that already have a minimum dictionary interface including getitem, setitem, delitem, and keys. Based on those methods, the mixin provides the remaining interface functionality to make the class look like a fully compliant dictionary.
- popitem() (k, v), remove and return some (key, value) pair as a [source]¶
2-tuple; but raise KeyError if D is empty.
- radical.utils.dict_mixin.dict_diff(a, b)[source]¶
return a dict of the form:
{ 'k1': {'a': 'foo', 'b': 'bar'}, 'k2': {'a': 'foo'}, 'k3': {'b': 'bar'}, }
which contains only those keys which are different in the two given dicts. Keys which are missing in either one are not included (to distinguish from None values). This methods operates recursively over the given dicts.
- radical.utils.dict_mixin.dict_merge(a, b, policy=None, wildcards=False, log=None, _path=None)[source]¶
This merges two dict in place, modifying the original dict in a.
- Merge Policies:
None (default) : raise an exception on conflicts
OVERWRITE : values in a are overwritten by new values from b
- PRESERVEoriginal value in a are preserved, new values
from b are only added where the original value is not set.
- radical.utils.dict_mixin.dict_stringexpand(target, sources=None)[source]¶
This expands dict entries (strings only) with keys from a second dict. For example, the dicts:
target = {'workdir' : '/home/%(user)s/', 'resource' : '%(resource)s'} sources = {'user' : 'peer_gynt', 'protocol' : 'ssh', 'host' : 'localhost', 'resource' : '%(protocol)s://%(host)s/'}
- would result in::
- target = {‘workdir’‘/home/peer_gynt/’,
‘resource’ : ‘ssh://localhost’}
Note that expansion happened twice, for the resource tag to be fully specified.
1.2.7. radical.utils.env module¶
- class radical.utils.env.EnvProcess(env: Dict[str, str])[source]¶
Bases:
object
run a code segment in a different os.environ:
env = {'foo': 'buz'} with ru.EnvProcess(env=env) as p: if p: p.put(os.environ['foo']) print('-->', p.get())
- radical.utils.env.env_diff(env_1: Dict[str, str], env_2: Dict[str, str]) Tuple[Dict[str, str], Dict[str, str], Dict[str, str]] [source]¶
This method serves debug purposes: it compares to environments and returns those elements which appear in only either one or the other env, and which changed from one env to another.
It will ignore any keys in the BLACKLIST and will also ignore BASH_FUNC_* keys which point to bash function definitions.
- radical.utils.env.env_dump(environment: Optional[Dict[str, str]] = None, script_path: Optional[str] = None) None [source]¶
- radical.utils.env.env_eval(fname: str) Dict[str, str] [source]¶
helper to create a dictionary with the env settings in the specified file which contains unset and export directives, or simple ‘key=val’ lines
- radical.utils.env.env_prep(environment: Optional[Dict[str, str]] = None, unset: Optional[List[str]] = None, pre_exec: Optional[List[str]] = None, pre_exec_cached: Optional[List[str]] = None, script_path: Optional[str] = None) Dict[str, str] [source]¶
Create a shell script which restores the environment specified in environment environment (dict). While doing so, ensure that all env variables not defined in environment but defined in unset (list) are unset. Also ensure that all commands provided in pre_exec_cached (list) are executed after these settings.
Once the shell script is created, run it and dump the resulting env, then read it back via env_read() and return the resulting env dict - that can then be used for process fork/execve to run a process is the thus defined environment.
The resulting environment will be cached: a subsequent call with the same set of parameters will simply return a previously cached environment if it exists.
If script_path is given, a shell script will be created in the given location so that shell commands can source it and restore the specified environment.
Any commands given in ‘pre_exec’ will be part of the cached script, and will thus not be executed when preparing the env, but will be executed whenever the prepared shell script is sources. The returned env dictionary will thus not include the effects of those injected commands.
- radical.utils.env.env_read(fname: str) Dict[str, str] [source]¶
helper to parse environment from a file: this method parses the output of env and returns a dict with the found environment settings.
1.2.8. radical.utils.flux module¶
- class radical.utils.flux.FluxHelper[source]¶
Bases:
object
Helper CLASS to programnatically handle flux instances and to obtain state update events for flux jobs known in that instance.
- property env¶
environment dict for the connected Flux instance. Returns None if no instance is connected.
- reset()[source]¶
Close the connection to the FLux instance (if it exists), and terminate the Flux service if it was started by this instance. All handles and executors created for this service will be invalidated.
- submit_jobs(specs: List[Dict[str, Any]], cb: Optional[Callable[[str, Any], None]] = None) Any [source]¶
- property uid¶
unique ID for this FluxHelper instance
- property uri¶
uri for the connected Flux instance. Returns None if no instance is connected.
1.2.9. radical.utils.futures module¶
- class radical.utils.futures.Future(call, *args, **kwargs)[source]¶
Bases:
Thread
This Future class is a thin wrapper around Python’s native mt.Thread class. It is expected to wrap a callable, and to watch its execution.
- classmethod Run(call, *args, **kwargs)[source]¶
This is a shortcut to
f = ru.Future(callable); f.start()
- property exception¶
- property result¶
- run()[source]¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- property state¶
- property traceback¶
1.2.10. radical.utils.get_version module¶
1.2.11. radical.utils.heartbeat module¶
- class radical.utils.heartbeat.Heartbeat(uid, timeout, interval=1, beat_cb=None, term_cb=None, log=None)[source]¶
Bases:
object
- property uid¶
1.2.12. radical.utils.host module¶
- radical.utils.host.get_hostip(req=None, log=None)[source]¶
Look up the IP address for a given requested interface name. If interface is not given, do some magic.
- radical.utils.host.get_hostlist(hoststring)[source]¶
Convert string with hosts (IDs within brackets) into list of hosts.
- Example: ‘node-b1-[1-3,5],node-c1-4,node-d3-3,node-k[10-12,15]’ =>
- [‘node-b1-1’, ‘node-b1-2’, ‘node-b1-3’, ‘node-b1-5’,
‘node-c1-4’, ‘node-d3-3’, ‘node-k10’, ‘node-k11’, ‘node-k12’, ‘node-k15’]
1.2.13. radical.utils.ids module¶
- radical.utils.ids.generate_id(prefix: str, mode='simple', ns=None)[source]¶
Generate a human readable, sequential ID for the given prefix.
The ID is by default very simple and thus very readable, but cannot be assumed to be globally unique – simple ID uniqueness is only guaranteed within the scope of one python instance.
If mode is set to the non-default type ID_UNIQUE, an attempt is made to generate readable but globally unique IDs – although the level of confidence for uniqueness is significantly smaller than for, say UUIDs.
The ID format per mode is: ID_SIMPLE = “%(prefix)s.%(counter)04d” ID_UNIQUE = “%(prefix)s.%(date)s.%(time)s.%(pid)06d.%(counter)04d” ID_PRIVATE = “%(prefix)s.%(host)s.%(user)s.%(days)06d.%(day_counter)04d” ID_UUID = “%(prefix)s.%(uuid)s”
Examples:
print(radical.utils.generate_id('item')) print(radical.utils.generate_id('item')) print(radical.utils.generate_id('item', mode=radical.utils.ID_SIMPLE)) print(radical.utils.generate_id('item', mode=radical.utils.ID_SIMPLE)) print(radical.utils.generate_id('item', mode=radical.utils.ID_UNIQUE)) print(radical.utils.generate_id('item', mode=radical.utils.ID_UNIQUE)) print(radical.utils.generate_id('item', mode=radical.utils.ID_PRIVATE)) print(radical.utils.generate_id('item', mode=radical.utils.ID_PRIVATE)) print(radical.utils.generate_id('item', mode=radical.utils.ID_UUID))
The above will generate the IDs:
item.0000 item.0001 item.0002 item.0003 item.2014.07.30.13.13.44.0000 item.2014.07.30.13.13.44.0001 item.cameo.merzky.018375.0000 item.cameo.merzky.018375.0001 item.23cacb7e-0b08-11e5-9f0f-08002716eaa9
where ‘cameo’ is the (short) hostname, ‘merzky’ is the username, and ‘18375’ is ‘days since epoch’. The last element, the counter is unique for each id type and item type, and restarts for each session (application process). In the last case though (ID_PRIVATE), the counter is reset for every new day, and can thus span multiple applications.
‘ns’ argument can be specified to a value such that unique IDs are created local to that namespace. For example, you can create a session and use the session ID as a namespace for all the IDs of the objects of that execution.
Example:
sid = generate_id('re.session', ID_PRIVATE) uid1 = generate_id('task.%(item_counter)04d', ID_CUSTOM, ns=sid) uid2 = generate_id('task.%(item_counter)04d', ID_CUSTOM, ns=sid) ...
This will generate the following ids:
re.session.rivendell.vivek.017548.0001 task.0000 task.0001
where the task.* IDs are unique for the used sid namespace.
The namespaces are stored under
`$RADICAL_BASE/.radical/utils/`
. If RADICAL_BASE is not set, then $HOME is used.Note that for docker containers, we try to avoid hostname / username clashes and will, for ID_PRIVATE, revert to ID_UUID.
1.2.14. radical.utils.json_io module¶
- radical.utils.json_io.metric_expand(data)[source]¶
iterate through the given dictionary, and when encountering a key string of the form ru.XYZ or rp.ABC, expand them to their actually defined values. This the following dict:
{ "ru.EVENT" : "foo" }
becomes:
{ 2 : "foo" }
- radical.utils.json_io.parse_json(json_str, filter_comments=True)[source]¶
Comment lines in the form of
# some json data or text
are stripped from json before parsing
- radical.utils.json_io.parse_json_str(json_str)[source]¶
same as parse_json, but converts unicode strings to simple strings
- radical.utils.json_io.read_json(fname, filter_comments=True)[source]¶
Comments line in the form of
# some json data or text
are stripped from json before parsing:
import pprint pprint.pprint(read_json(“my_file.json”))
1.2.15. radical.utils.lease_manager module¶
- class radical.utils.lease_manager.LeaseManager(max_pool_size=None, max_pool_wait=None, max_obj_age=None)[source]¶
Bases:
object
This is a lease manager – it creates resource instances on demand and hands out leases to them. If for a given ID no object instance exists, one is created, locked and a lease is returned. If for an ID an object instance exists and is not leased, it is locked and returned. If one or more instances exist but all are in use already (leased), a new instance is created (up to MAX_POOL_SIZE which can be overwritten in the lease call). If that limit is reached, no objects are returned, and instead the lease call blocks until one of the existing objects gets released.
- is_locked()¶
- lease(pool_id, creator, args=None)[source]¶
For a given object identified, attempt to retrieve an existing object from the pool. If such a free (released) object is found, lock and refturn it. If such objects exist but all are in use, create a new one up to max_pool_size (default: 10). used, block untill it is freed. If that object does not exist, create it and proceed per above. return the object thusly created.
- pool_idid of the pool to lease from. The pool ID essentially
determines the scope of validity for the managed objects – they form a namespace, where objects are shared when living under the same name space entry (the pool_id).
creator : method to use to create a new object instance
- Example:
- def creator() :
return Logger(name)
ret = lease_manager.lease(name, creator)
- lock()¶
- locked()¶
- release(instance, delete=False)[source]¶
the given object is not needed right now – unlock it so that somebody else can lease it. This will not delete the object,
- unlock(*args)¶
1.2.16. radical.utils.lockable module¶
- radical.utils.lockable.Lockable(cls)[source]¶
This class decorator will add lock/unlock methods to the thusly decorated classes, which will be enacted via an also added threading.RLock member (self._rlock):
@Lockable class A(object): def call(self): print 'locked: %s' % self._locked
The class instance can then be used like this:
a = A() a.call() a.lock() a.call() a.lock() a.call() a.unlock() a.call() with a: a.call() a.call() a.unlock() a.call()
which will result in:
locked: 0 locked: 1 locked: 2 locked: 1 locked: 2 locked: 1 locked: 0
The class A can also internally use the lock, and can, for example, use:
@Lockable class A(object): ... def work(self): with self: # locked code section ...
1.2.17. radical.utils.lockfile module¶
- class radical.utils.lockfile.Lockfile(fname, *args, **kwargs)[source]¶
Bases:
object
This class represents a lockfile. On calling open() (or entering a resource context with), the lockfile is exclusively opened and locked. The returned object can be used for read(), write() and seek() operations, and the lock is only released on close() (or when leaving the resource context).
If delete=True is specified on construction, then the lockfile is removed uppon release(), and the content is lost. Only for this mode is it possible to obtain the current lock owner when using get_owner() while waiting for a lock owned by another process or thread - the call will return ‘unknown’ otherwise.
Note that the file offset is not shared between owners of the lock file - a newly acquired lock on the file should prompt a seek to the desired file location (the initial offset is ‘0’).
Example:
with ru.Lockfile(fname) as lf0: lf0.write('test 0\n') with ru.Lockfile(fname) as lf1: lf1.lseek(0, os.SEEK_SET) lf1.write('test 1\n') with ru.Lockfile(fname) as lf2: lf2.lseek(0, os.SEEK_END) lf2.write('test 2\n') # raises a RuntimeError, as we won't be able to acquire the lock with ru.Lockfile(fname) as lf3: lf3.lseek(0, os.SEEK_END) lf3.write('test 3\n') with open(fname, 'r') as fin: data = fin.read() assert data == 'test 1\ntest 2\n'
Example:
lf1 = ru.Lockfile(fname) lf2 = ru.Lockfile(fname) with lf1: lf1.write('test 0') with lf2(timeout=3): # we won't reach this, the `with` above will time out and raise lf2.write('test 0')
Implementation:
The implementation relies on cooperative locking: a process or thread which uses this API to lock a file will create a temporary file in the same directory (filename: .<filename>.<pid>.<counter> where counter is a process singleton). On lock acquisition, we will attempt to create a symbolic link to the file <filename>.lck, which will only succeed if no other process or thread holds that lock. On release, that symbolic gets removed; on object destruction, the temporary file gets removed.
The private file will contain the name of the owner. Any other process or thread can follow the .<filename>.lck symlink, open the link’s target file, and read the name of the owner. No attempt is made to avoid races between failure to acquire the lockfile and querying the current owner, so that information is not reliable, but intented to be an informative help for debugging and tracing purposes.
- Example::
lf1 = ru.Lockfile(fname) lf2 = ru.Lockfile(fname)
- with lf1:
lf1.write(‘test 0’)
- try:
lf2.acquire() # timeout == None == 0.0
- except RuntimeError:
print(‘lock is held by %s’ % lf2.get_owner())
- acquire(timeout=None, owner=None)[source]¶
When the lock could not be acquired after timeout seconds, an TimeoutError is raised (i.e. an OSError with errno ETIME). When owner is specified and the lockfile was created with delete=True, then that string will be passed to any other method which tries to acquire this lockfile while it is locked (see get_owner()). If not specified, the owner string is set to ru.get_caller_name().
- timeout:
< 0 : wait forever = 0 : try locking once and raise RuntimeError on failure > 0 : try for that many seconds, and raise TimeoutError on failure None: same as 0
- get_owner()[source]¶
That name of the current owner of the lockfile can be retrieved by any other process or thread which attempts to lock the same file. This method returns None if the file is not currently locked.
- Note: this method may race against lock acquisition / release, and the
information returned may be outdated.
- lseek(pos, how)[source]¶
Change the offset at which the next read or write method is applied. The arguments are interpreted as documented by os.lseek().
- read(length)[source]¶
Read from the locked file at the current offset. This method will raise an RuntimeError when being called without the file being locked.
1.2.18. radical.utils.logger module¶
- class radical.utils.logger.ColorStreamHandler(target)[source]¶
Bases:
StreamHandler
A colorized output SteamHandler
- emit(record)[source]¶
Emit a record.
If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.
1.2.19. radical.utils.misc module¶
- radical.utils.misc.as_bytes(data: Any) Any [source]¶
Make a best-effort attempt to convert strings to bytes. Iterate through lists and dicts, but leave all other datatypes alone.
- radical.utils.misc.as_string(data: Any) Any [source]¶
Make a best-effort attempt to convert bytes to strings. Iterate through lists and dicts, but leave all other datatypes alone.
- radical.utils.misc.cluster_list(iterable, n)[source]¶
- s -> [ s0, s1, s2, … sn-1 ],
[ sn, sn+1, sn+2, … s2n-1 ], [ s2n, s2n+1, s2n+2, … s3n-1 ], …
- radical.utils.misc.expand_env(data, env=None, ignore_missing=True)[source]¶
Expand the given data with environment variables from os.environ. If env is provided, use that dictionary for expansion instead.
data can be one of three types:
dictionary: expand_env is applied to all values of the dictionary
sequence : expand_env is applied to all elements of the sequence
string : expand_env is applied to the string itself
The method will alter dictionaries and iterables in place, but will return a copy of scalar strings, as it seems to be custom in Python. Other data types are silently ignored and not altered.
The replacement in strings is performed for the following variable specs:
assume export BAR=bar BIZ=biz:
${BAR} : foo_${BAR}_baz -> foo_bar_baz ${BAR:buz} : foo_${BAR:buz}_baz -> foo_bar_baz ${BAR:$BUZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz
assume unset BAR; export BIZ=biz, ignore_missing=True
${BAR} : foo_${BAR}_baz -> foo__baz ${BAR:buz} : foo_${BAR:buz}_baz -> foo_buz_baz ${BAR:$BUZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz
assume unset BAR; export BIZ=biz, ignore_missing=False
${BAR} : foo_${BAR}_baz -> ValueError(‘cannot expand $BAR’) ${BAR:buz} : foo_${BAR:buz}_baz -> foo_buz_baz ${BAR:$BIZ}: foo_${BAR:$BIZ}_baz -> foo_biz_baz
The method will also opportunistically convert strings to integers or floats if they are formatted that way and contain no other characters.
- radical.utils.misc.get_base(ns, module=None)[source]¶
A generic version of get_radical_base which queries the base for any namespace ns
- radical.utils.misc.get_env_ns(key, ns, default=None)[source]¶
get an environment setting within a namespace. For example.
get_env_ns(‘verbose’, ‘radical.pilot.umgr’),
will return the value of the first found env variable from the following sequence:
RADICAL_PILOT_UMGR_LOG_LVL RADICAL_PILOT_LOG_LVL RADICAL_LOG_LVL
or ‘None’ if none of the above is set. The given name and key are converted to upper case, dots are replaced by underscores.
Note that an environment variable set with
export RADICAL_LOG_LVL=
(ie. without an explicit, non-empty value) will be returned as an empty string.
- radical.utils.misc.get_radical_base(module=None)[source]¶
Several parts of the RCT stack store state on the file system. This should usually be under $HOME/.radical - but that location is not always available or desireable. We interpret the env variable RADICAL_BASE, and fall back to pwd if neither that nor $HOME exists.
The optional module parameter will result in the respective subdir name to be appended. The resulting dir is created (if it does not exist), and the name is returned. Any . (dot) characters in module are replaced by slashes. Leading radical/ element is removed.
- radical.utils.misc.is_bytes(data: Any) bool [source]¶
checks if the given data are of types bytes or bytearray
- radical.utils.misc.is_list(data: Any) bool [source]¶
return True if given data are a list, False otherwise
- radical.utils.misc.is_seq(data: Any) bool [source]¶
tests if the given data is a sequence (but not a string)
- radical.utils.misc.is_tuple(data: Any) bool [source]¶
return True if given data are a tuple, False otherwise
- radical.utils.misc.mktar(tarname, fnames=None, data=None)[source]¶
Create a tarfile at the given tarname, and pack all files given in fnames into it, and also pack any data blobs.
fnames is expected to be list, where each element is either a string pointing to a file to be added under that name, or a tuple where the first element points again to the file to be packed, and the second element specifies the name under which the file should be packed into the archive. And OSError will be raised if the file does not exist.
data is expected to be a list of tuples, where the first element is a set of bytes comprising the data to be written into the archive, and the second element again specifies the name of the tarred file.
Note that this method always create bzip’ed tarfiles, but will never change the tarname to reflect that.
- radical.utils.misc.name2env(name)[source]¶
convert a name of the for ‘radical.pilot’ to an env vare base named ‘RADICAL_PILOT’.
- radical.utils.misc.parse_file_staging_directives(directives)[source]¶
staging directives
[local_path] [operator] [remote_path]
- local path:
interpreted as relative to the application’s working directory
must point to local storage (localhost)
- remote path
interpreted as relative to the job’s working directory
- operator :
> : stage to remote target, overwrite if exists
>> : stage to remote target, append if exists
< : stage to local target, overwrite if exists
<< : stage to local target, append if exists
This method returns a tuple [src, tgt, op] for each given directive. This parsing is backward compatible with the simple staging directives used previously – any strings which do not contain staging operators will be interpreted as simple paths (identical for src and tgt), operation is set to ‘=’, which must be interpreted in the caller context.
- radical.utils.misc.rec_makedir(target)[source]¶
recursive makedir which ignores errors if dir already exists
- radical.utils.misc.round_to_base(value: float, base: int = 1) int [source]¶
This method expects an integer or float value, and will round it to any given integer base. For example:
1.5, 2 -> 2 3.5, 2 -> 4 4.5, 2 -> 4
11.5, 20 -> 20 23.5, 20 -> 20 34.5, 20 -> 40
The default base is ‘1’.
- radical.utils.misc.round_upper_bound(value: Union[int, float]) int [source]¶
This method expects an integer or float value, and will return an integer upper bound suitable for example to define plot ranges. The upper bound is the smallest value larger than the input value which is a multiple of 1, 2 or 5 times the order of magnitude (10**x) of the value.
- radical.utils.misc.ru_open(*args, **kwargs)[source]¶
ensure that we use UTF8 consistently throughout the stack
- radical.utils.misc.script_2_func(fpath)[source]¶
This method accepts a single parameter fpath which is expected to point to a file containing a self-sufficient Python script. The script will be read and stored, and a function handle will be returned which, upon calling, will run that script in the currect Python interpreter`. It will be ensured that __name__ is set to __main__, and that any arguments passed to the callable are passed on as sys.argv. A single list argument is also allowed which is interpreted as argument list.
Example
my_func = ru.script_2_func(‘/tmp/my_script.py’) my_func(‘-f’, ‘foo’, ‘-b’, ‘bar’) my_func(‘-f foo -b bar’.split()) # equivalent
- NOTE: calling the returned function handle will change sys.argv for the
current Python interpreter.
- radical.utils.misc.time_diff(dt_abs, dt_stamp)[source]¶
return the time difference bewteen two datetime objects in seconds (incl. fractions). Exceptions (like on improper data types) fall through.
- radical.utils.misc.watch_condition(cond, target=None, timeout=None, interval=0.1)[source]¶
Watch a given condition (a callable) until it returns the target value, and return that value. Stop watching on timeout, in that case return None. The condition is tested approximately every ‘interval’ seconds.
1.2.20. radical.utils.modules module¶
1.2.21. radical.utils.object_cache module¶
- class radical.utils.object_cache.ObjectCache(*args, **kwargs)[source]¶
Bases:
object
This is a singleton object caching class – it maintains a reference counted registry of existing objects.
- get_obj(oid, creator, ns='global')[source]¶
For a given object id, attempt to retrieve an existing object. If that object exists, increase the reference counter, as there is now one more user for that object.
If that object does not exist, call the given creator, then register and return the object thusly created.
oid : id of the object to get from the cache. creator: method to use to create a new object instance
- Example:
- def creator():
return Logger(name)
ret = object_cache.get_object(name, creator)
- is_locked()¶
- lock()¶
- locked()¶
- rem_obj(obj, ns='global')[source]¶
For a given objects instance, decrease the refcounter as the caller stops using that object. Once the ref counter is ‘0’, remove all traces of the object – this should make that object eligable for Python’s garbage collection. Returns ‘True’ if the given object was indeed registered, ‘False’ otherwise.
The removal of the object is actually time-delayed. That way, we will keep the object around just a little longer, which provides caching semantics in the case of frequent creation/dstruction cycles.
- unlock(*args)¶
1.2.22. radical.utils.plugin_manager module¶
- class radical.utils.plugin_manager.PluginBase(descr: dict)[source]¶
Bases:
object
This class serves as base class for any plugin managed by the plugin handler
- property plugin_class: str¶
- property plugin_description: str¶
- property plugin_name: str¶
- property plugin_type: str¶
- property plugin_version: str¶
- class radical.utils.plugin_manager.PluginManager(*args, **kwargs)[source]¶
Bases:
object
The PluginManager manages plugins of specific types: the manager can search for installed plugins, list and describe plugins found, load plugins, and instantiate the plugin for further use.
Example
# try to load the ‘echo’ plugin from the ‘radical’ namespace plugin_type = ‘echo’
pm = radical.utils.PluginManager(‘radical’)
- for plugin_name in pm.list(plugin_type):
print plugin_name print pm.describe(plugin_type, plugin_name)
default_plugin = pm.load(‘echo’, ‘default’)
default_plugin.init_plugin(‘world’) default_plugin.run() # prints ‘hello default world’
The plugins are expected to follow a specific naming and coding schema to be recognized by the plugin manager. The naming schema is:
[namespace].plugins.[ptype].plugin_[ptype]_[pname].py
i.e. for the example above: radical.plugins.echo.plugin_echo_default.py
The plugin code consists of two parts: a plugin description, and a plugin class. The description is a module level dictionary named PLUGIN_DESCRIPTION, the plugin class must have a class constructor __init__(*args, **kwargs) to create plugin instances for further use.
At this point, we leave the definition of the exact plugin signatures open, but expect that to be more strictly defined per plugin type in the future.
Note that the PluginManager construction is, at this point, not considered thread safe.
- load(ptype, pname, *args, **kwargs)[source]¶
check if a plugin with given type and name was loaded, if so, instantiate its plugin class and return it.
1.2.23. radical.utils.poll module¶
- class radical.utils.poll.Poller(log=None)[source]¶
Bases:
object
This object will accept a set of things we can call select.select() on, which is basically anything which has a file desciptor exposed via a fileno() member method, or any integers which directly represent an OS level file escriptor. This class implements most of the interface as defined by Python’s select.Poller class, which is created by calling select.poll(). This implementation is thread-safe.
- NOTE: Poller.poll() returns the original object handle instead of the
polled file descriptor.
- NOTE: Support for POLLPRI and POLLNVAL selection is not implemented,
and will never be returned on poll().
1.2.24. radical.utils.profile module¶
- class radical.utils.profile.Profiler(name, ns=None, path=None)[source]¶
Bases:
object
This class is really just a persistent file handle with a convenience call (prof()) to write lines timestamped events. Any profiling intelligence must be applied when reading and evaluating the created profiles. the following fields are defined for each event:
time : mandatory, float, time in seconds since epoch event: mandatory, string, short, unique name of event to be recorded comp : optional, string, name of component where the event originates tid : optional, string, current thread id (name) uid : optional, string, ID of entity involved (when available) state: optional, string, state of entity involved, if applicable msg : optional, string, free for message describing the event
Strings MUST NOT contain commas. Otherwise they are encouraged to be formed as [a-z][0-9a-z_.]*’. `msg are free-form, but the inhibition of comma holds. We propose to limit the sum of strings to about 256 characters - this will guarantee atomic writes on most OS’s, w/o additional locking overheads. Less than 100 charcters makes the profiles almost human-readable.
The profile is enabled by setting environment variables. For a profiler named radical.utils, the following env variables will be evaluated:
RADICAL_UTILS_PROFILE RADICAL_PROFILE
If either is present in the environemnt, the profile is enabled (the value of the setting is ignored).
- property enabled¶
- fields = ['time', 'event', 'comp', 'thread', 'uid', 'state', 'msg']¶
- property path¶
- radical.utils.profile.clean_profile(profile, sid, state_final=None, state_canceled=None)[source]¶
This method will prepare a profile for consumption in radical.analytics. It performs the following actions:
makes sure all events have a ename entry
remove all state transitions to CANCELLED if a different final state is encountered for the same uid
assignes the session uid to all events without uid
makes sure that state transitions have an ename set to state
- radical.utils.profile.combine_profiles(profs)[source]¶
We merge all profiles and sort by time.
This routine expects all profiles to have a synchronization time stamp. Two kinds of sync timestamps are supported: absolute (sync_abs) and relative (sync_rel).
Time syncing is done based on ‘sync_abs’ timestamps. We expect one such absolute timestamp to be available per host (the first profile entry will contain host information). All timestamps from the same host will be corrected by the respectively determined NTP offset. We define an ‘accuracy’ measure which is the maximum difference of clock correction offsets across all hosts.
The sync_rel timestamps are expected to occur in pairs, one for a profile with no other sync timestamp, and one profile which has a sync_abs timestamp. In that case, the time correction from the latter is transfered to the former (the two time stamps are considered to have been written at the exact same time).
The method returnes the combined profile and accuracy, as tuple.
- radical.utils.profile.read_profiles(profiles, sid=None, efilter=None)[source]¶
We read all profiles as CSV files and parse them. For each profile, we back-calculate global time (epoch) from the synch timestamps.
The caller can provide a filter of the following structure:
filter = {ru.EVENT: ['event 1', 'event 2', ...], ru.MSG : ['msg 1', 'msg 2', ...], ... }
Filters apply on substring matches!
1.2.25. radical.utils.registry module¶
1.2.26. radical.utils.reporter module¶
- class radical.utils.reporter.Reporter(name, ns=None, path=None, targets=None, enabled=None)[source]¶
Bases:
object
- COLORS = {'black': '\x1b[30m', 'blue': '\x1b[34m', 'cyan': '\x1b[36m', 'darkgray': '\x1b[90m', 'green': '\x1b[32m', 'lightblue': '\x1b[94m', 'lightcyan': '\x1b[96m', 'lightgray': '\x1b[37m', 'lightgreen': '\x1b[92m', 'lightmagenta': '\x1b[95m', 'lightred': '\x1b[91m', 'lightyellow': '\x1b[93m', 'magenta': '\x1b[35m', 'red': '\x1b[31m', 'reset': '\x1b[39m', 'white': '\x1b[97m', 'yellow': '\x1b[33m'}¶
- DOTTED = '.'¶
- DOUBLE = '='¶
- EMPTY = ''¶
- ERROR = 'lightred'¶
- HASHED = '#'¶
- HEADER = 'bold lightyellow'¶
- IDLE = 'lightwhite'¶
- INFO = 'lightblue'¶
- MODS = {'': '', 'blink': '\x1b[5m', 'bold': '\x1b[1m', 'inverse': '\x1b[7m', 'reset': '\x1b[0m', 'underline': '\x1b[4m'}¶
- OK = 'lightgreen'¶
- PROGRESS = 'lightwhite'¶
- SINGLE = '-'¶
- TITLE = 'bold lightblue'¶
- WARN = 'lightyellow'¶
1.2.27. radical.utils.ru_regex module¶
- class radical.utils.ru_regex.ReString(*args, **kw)[source]¶
Bases:
str
This is a string class which supports simplified regular expression matching. It is not thought that the regex language or expressions are simplified, but rather that the invokation of the matching is simple, as is the handling of the match results:
txt = ReString('The quick brown fox jumps over the lazy dog') # the '//' operator is overloaded to match against a regular # expression. The result is a `ReSult` instance, which allows simple # access to the matches with txt // r'(\s.u)(?P<x>.*?j\S+)' as res: if res: print 'Matched!' # boolean check print 'res : '%%s' ' %% res # list of results print 'res[0] : '%%s' ' %% res[0] # index by number ... print 'res[1] : '%%s' ' %% res[1] # ... for all matches print 'res['x']: '%%s' ' %% res['x'] # index by match name print 'res.x : '%%s' ' %% res.x # ... as properties for i, r in enumerate(res): print 'res %%d : '%%s' ' %% (i, r) # matches as iterable assert len(res) == 2 # number of matches assert res == [' qu', 'ick brown fox jumps'] # compare to list if txt // r'(rabbit)': # simple use in if / elif res = txt.get() # get ReSult of last match elif txt // r'((?:\s).{12,15}?(\S+))': # full Python regex slang res = txt.get() else: print 'no match'
1.2.28. radical.utils.shell module¶
- radical.utils.shell.sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None, cwd=None)[source]¶
call a shell command, return [stdout, stderr, retval].
- radical.utils.shell.sh_callout_async(cmd, stdin=True, stdout=True, stderr=True, shell=False, env=None, cwd=None)[source]¶
Run a command, and capture stdout/stderr if so flagged. The call will return an PROC object instance on which the captured output can be retrieved line by line (I/O is line buffered). When the process is done, a None will be returned on the I/O queues.
Line breaks are stripped.
- stdout/stderr: True [default], False, string
False : discard I/O
True : capture I/O as queue [default]
string: capture I/O as queue, also write to named file
- shell: True, False [default]
pass to popen
- cwd: string
working directory for command to run in
- PROC:
PROC.stdout : queue.Queue instance delivering stdout lines
PROC.stderr : queue.Queue instance delivering stderr lines
PROC.state : ru.RUNNING, ru.DONE, ru.FAILED
PROC.rc : returncode (None while ru.RUNNING)
PROC.stdout_filename: name of stdout file (when available)
PROC.stderr_filename: name of stderr file (when available)
- radical.utils.shell.sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None, cwd=None)[source]¶
call a shell command in the background. Do not attempt to pipe STDOUT/ERR, but only support writing to named files.
- radical.utils.shell.sh_quote(data)[source]¶
quote a string and wrap it in double quotes so that it can passed to a POSIX shell.
Examples
foo -> “foo” foo”bar -> “foo"bar” foo"bar -> “foo\"bar” $FOO’$BAR -> “$FOO’$BAR”
- NOTE: this method does not attempt to strip out backticks or other code
execution mechanisms from the string.
1.2.29. radical.utils.signatures module¶
- radical.utils.signatures.nothing¶
alias of
None
1.2.30. radical.utils.singleton module¶
1.2.31. radical.utils.stack module¶
1.2.32. radical.utils.testing module¶
- class radical.utils.testing.TestConfig(ns, cfg_name=None, cfg_section=None)[source]¶
Bases:
dict
This class represents a set of configurations for a test suite. It usually contains parameters which tweak the tests toward specific environments and test cases, such as specific remote systems to interact with, specific backends to use, etc.
The class expects test configurations to be deployed in the current Python installation (virtualenv, condaenv, etc.). The namespace passed on construction is used to search for configurations under $namespace/configs/tests/, configuration files are expected to follow the naming scheme test_$cfg_name.json. The json file is read and this object then behaves like a dictionary for those json data, but all top level keys in the dictionary are also exposed as object attributes.
- radical.utils.testing.add_test_config(ns, cfg_name, cfg_section=None)[source]¶
To an existing active config, add the contents of an additional test configuration.
- radical.utils.testing.get_test_config()[source]¶
If a test config is currently set, return it. If not, attempt to load atest config for the given namespace, set is as active, and return it.
1.2.33. radical.utils.threads module¶
- radical.utils.threads.cancel_main_thread(signame=None, once=False)[source]¶
This method will call thread.interrupt_main from any calling subthread. That will cause a ‘KeyboardInterrupt’ exception in the main thread. This can be excepted via except KeyboardInterrupt
The main thread MUST NOT have a SIGINT signal handler installed (other than the default handler or SIGIGN), otherwise this call will cause an exception in the core python signal handling (see http://bugs.python.org/issue23395).
The thread will exit after this, via sys.exit(0), and can then be joined from the main thread.
When being called from the main thread, no interrupt will be generated, but sys.exit() will still be called. This can be excepted in the code via except SystemExit.
Another way to avoid the SIGINT problem is to send a different signal to the main thread. We do so if signal is specified.
After sending the signal, any sub-thread will call sys.exit(), and thus finish. We leave it to the main thread though to decide if it will exit at this point or not. Either way, it will have to handle the signal first.
If once is set to True, we will send the given signal at most once. This will mitigate races between multiple error causes, specifically during finalization.
- radical.utils.threads.get_signal_by_name(signame)[source]¶
Translate a signal name into the respective signal number. If signame is not found to be a valid signal name, this method will raise a KeyError exception. Lookup is case insensitive.
- radical.utils.threads.gettid()[source]¶
Python is not able to give us the native thread ID. We thus use a syscall to do so. Since this is not portable, we fall back to None in case of error (Hi MacOS).
- radical.utils.threads.is_this_thread(t)[source]¶
check if the given thread (type: threading.Thread) is the same as the current thread of execution.
- radical.utils.threads.main_thread()[source]¶
return a handle to the main thread of execution in this process
- radical.utils.threads.raise_in_thread(e=None, tname=None, tident=None)[source]¶
This method uses an internal Python function to inject an exception ‘e’ into any given thread. That thread can be specified by its name (‘tname’) or thread id (‘tid’). If not specified, the exception is sent to the MainThread.
The target thread will receive the exception with some delay. More specifically, it needs to call up to 100 op codes before the exception is evaluated and raised. The thread interruption can thus be delayed significantly, like when the thread sleeps.
The default exception raised is ‘radical.utils.ThreadExit’ which inherits from ‘SystemExit’.
- NOTE: this is not reliable: the exception is not raised immediately, but is
scheduled for raising at some point, ie. in general after about 100 opcodes (sys.getcheckinterval()). Depending on when exactly the exception is finally raised, the interpreter might silently swallow it, if that happens in a generic try/except clause. Those exist in the Python core, even if discouraged by some PEP or the other.
- NOTE: we can only raise exception types, not exception instances
Example:
def sub(): time.sleep(0.1) ru.raise_in_thread() try: t = mt.Thread(target=sub) t.start() while True: time.sleep(0.01) except ru.ThreadExit: print 'thread exit' except Exception as e: print 'except: %s' % e except SystemExit: print 'exit' else: print 'unexcepted' finally: t.join()
1.2.34. radical.utils.timing module¶
- class radical.utils.timing.Time(tick=0.01, speed=1.0)[source]¶
Bases:
object
This is a timing class that allows to simulate different types of clocks.
Parameters: tick: This is the resolution of the clock. speed: This is the speed of the clock.
- radical.utils.timing.dt_epoch(dt)[source]¶
convert a given datetime instance into seconds since EPOCH.
- radical.utils.timing.epoch(data, pattern)[source]¶
convert a given datetime string into seconds since EPOCH. The string is parsed as defined by POSIX’s strptime().
- radical.utils.timing.timed_method(func)[source]¶
This class decorator will decorate all public class methods with a timing function. That will time the call invocation, and store the respective data in a private class dict ‘__timing’. Additionally, it will add the class method ‘_timing_last()’, which will return the tuple [method name, method timer] for the last timed invocation of a class method.
1.2.35. radical.utils.tracer module¶
1.2.36. radical.utils.typeddict module¶
- exception radical.utils.typeddict.TDError(msg=None, level=1)[source]¶
Bases:
TDErrorMixin
,Exception
- exception radical.utils.typeddict.TDKeyError(msg=None, level=1)[source]¶
Bases:
TDErrorMixin
,KeyError
- exception radical.utils.typeddict.TDTypeError(msg=None, level=1)[source]¶
Bases:
TDErrorMixin
,TypeError
- exception radical.utils.typeddict.TDValueError(msg=None, level=1)[source]¶
Bases:
TDErrorMixin
,ValueError
- class radical.utils.typeddict.TypedDict(from_dict=None, **kwargs)[source]¶
Bases:
dict
- pop(k[, d]) v, remove specified key and return the corresponding value. [source]¶
If key is not found, d is returned if given, otherwise KeyError is raised
- popitem() (k, v), remove and return some (key, value) pair as a [source]¶
2-tuple; but raise KeyError if D is empty.
- setdefault(key, default=None)[source]¶
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update(other)[source]¶
Overload dict.update(): the call is used to ensure that sub-dicts are instantiated as their respective TypedDict-inheriting class types, if so specified by the respective schema.
So, if the schema contains:
{ ... 'foo' : BarTypedDict 'foos': [BarTypedDict] ... }
where BarTypedDict is a valid type in the scope of the schema definition, which inherits from ru.TypedDict, then update() will ensure that the value for key foo is indeed of type ru.TypedDict. An error will be raised if (a) BarTypedDict does not have a single parameter constructor like ru.TypedDict, or (b) the data value for foo cannot be used as from_dict parameter to the BarTypedDict constructor.
1.2.37. radical.utils.url module¶
- class radical.utils.url.Url(url_in=None)[source]¶
Bases:
object
The RADICAL Url class.
URLs are used in several places in the RADICAL software projects: to specify service endpoints for job submission or resource management, for file or directory locations, etc.
The URL class is designed to simplify URL management for these purposes – it allows to manipulate individual URL elements, while ensuring that the resulting URL is well formatted. Example:
# create a URL from a string location = radical.utils.Url('file://localhost/tmp/file.dat') d = radical.utils.filesystem.Directory(location)
A URL consists of the following components (where one ore more can be ‘None’):
<scheme>://<user>:<pass>@<host>:<port>/<path>?<query>#<fragment>
Each of these components can be accessed via its property or alternatively, via getter / setter methods. Example:
url = ru.Url('scheme://pass:user@host:port/path?query#fragment') # modify the scheme url.scheme = 'anotherscheme' # above is equivalent with url.set_scheme('anotherscheme')
- property fragment¶
The fragment component.
- property host¶
The hostname component.
- property password¶
The password component.
- property path¶
The path component.
- property port¶
The port component.
- property query¶
The query component.
- property schema¶
The scheme component.
- property scheme¶
Return the URL ‘scheme’ component.
- set_fragment(fragment)[source]¶
Set the URL ‘fragment’ component.
- Parameters:
fragment (str) – The new fragment
- set_host(hostname)[source]¶
Set the ‘hostname’ component.
- Parameters:
hostname (str) – The new hostname
- set_password(password)[source]¶
Set the URL ‘password’ component.
- Parameters:
password (str) – The new password
- set_scheme(scheme)[source]¶
Set the URL ‘scheme’ component.
- Parameters:
scheme (str) – The new scheme
- set_username(username)[source]¶
Set the URL ‘username’ component.
- Parameters:
username (str) – The new username
- property username¶
The username component.