Source code for radical.utils.futures
__author__ = "Radical.Utils Development Team"
__copyright__ = "Copyright 2013, RADICAL@Rutgers"
__license__ = "MIT"
import traceback
import threading as mt
_out_lock = mt.RLock()
# ------------------------------------------------------------------------------
#
# our futures have state, the states are defined here
#
# NOTE: these strings are carefully chosen to match the state specifiers of
# `radical.saga.Task` and `radical.saga.Job` instances.
#
NEW = 'New'
RUNNING = 'Running'
DONE = 'Done'
FAILED = 'Failed'
CANCELED = 'Canceled'
INITIAL = [NEW]
FINAL = [DONE, FAILED, CANCELED]
# ------------------------------------------------------------------------------
#
[docs]class Future(mt.Thread):
"""
This `Future` class is a thin wrapper around Python's native `mt.Thread`
class. It is expected to wrap a callable, and to watch its execution.
"""
# FIXME: we may want to use a thread pool to limit the number of threads
# --------------------------------------------------------------------------
#
def __init__(self, call, *args, **kwargs):
'''
Construct the Future. The first argument is expected to be a Python
`callable` which is executed in its own thread. All other arguments are
passed blindly to that callable when `self.start()` is called.
'''
if not callable(call):
raise ValueError("Thread requires a callable to function, not %s"
% (str(call)))
mt.Thread.__init__(self)
# NOTE: we use daemon threads to avoid termination issues
self.daemon = True
self._call = call
self._args = args
self._kwargs = kwargs
self._state = NEW
self._result = None
self._exception = None
self._traceback = None
# --------------------------------------------------------------------------
#
[docs] @classmethod
def Run(self, call, *args, **kwargs):
"""
This is a shortcut to
f = ru.Future(callable); f.start()
"""
t = self(call, *args, **kwargs)
t.start()
return t
# --------------------------------------------------------------------------
#
[docs] def run(self):
try:
self._state = RUNNING
self._result = self._call(*self._args, **self._kwargs)
self._state = DONE
except Exception as e:
# NOTE: `state` and `exception` updates are racing.
tb = traceback.format_exc()
self._traceback = tb
self._exception = e
self._state = FAILED
# --------------------------------------------------------------------------
#
[docs] def wait(self, timeout=None):
if self.is_alive():
self.join(timeout=timeout)
# --------------------------------------------------------------------------
#
[docs] def cancel(self):
# FIXME: this is not really implementable generically, so we ignore
# cancel requests for now. This *should* be handled on the SAGA
# layer where cancel requests are forwarded to the callable's
# adaptor.
pass
# --------------------------------------------------------------------------
#
@property
def state(self): return self._state
@property
def result(self): return self._result
@property
def exception(self): return self._exception
@property
def traceback(self): return self._traceback
# ------------------------------------------------------------------------------