__author__ = "Radical.Utils Development Team"
__copyright__ = "Copyright 2016, RADICAL@Rutgers"
__license__ = "MIT"
import os
import sys
import signal
import ctypes
import _thread
import threading as mt
# ------------------------------------------------------------------------------
#
[docs]def get_thread_name():
if not mt.current_thread():
return None
return mt.current_thread().name
# ------------------------------------------------------------------------------
#
[docs]def get_thread_id():
return mt.current_thread().ident
# ------------------------------------------------------------------------------
#
[docs]def gettid():
"""
Python is not able to give us the native thread ID. We thus use a syscall
to do so. Since this is not portable, we fall back to None in case of error
(Hi MacOS).
"""
try:
SYS_gettid = 186
libc = ctypes.cdll.LoadLibrary('libc.so.6')
return int(libc.syscall(SYS_gettid))
except:
return None
# ------------------------------------------------------------------------------
#
[docs]def is_main_thread(t=None):
if t:
assert isinstance(t, mt.Thread)
else:
t = this_thread()
return isinstance(t, mt._MainThread) # pylint: disable=W0212
# ------------------------------------------------------------------------------
#
[docs]def is_this_thread(t):
'''
check if the given thread (type: threading.Thread) is the same as the
current thread of execution.
'''
assert isinstance(t, mt.Thread)
return bool(t == this_thread())
# ------------------------------------------------------------------------------
#
[docs]def main_thread():
'''
return a handle to the main thread of execution in this process
'''
for t in mt.enumerate():
if isinstance(t, mt._MainThread): # pylint: disable=W0212
return t
assert False, 'main thread not found'
# ------------------------------------------------------------------------------
#
[docs]def this_thread():
'''
return a handle to the current thread of execution
'''
return mt.current_thread()
# ------------------------------------------------------------------------------
#
_signal_lock = mt.Lock()
_signal_sent = dict()
[docs]def cancel_main_thread(signame=None, once=False):
"""
This method will call thread.interrupt_main from any calling subthread.
That will cause a 'KeyboardInterrupt' exception in the main thread. This
can be excepted via `except KeyboardInterrupt`
The main thread MUST NOT have a SIGINT signal handler installed (other than
the default handler or SIGIGN), otherwise this call will cause an exception
in the core python signal handling (see http://bugs.python.org/issue23395).
The thread will exit after this, via sys.exit(0), and can then be joined
from the main thread.
When being called *from* the main thread, no interrupt will be generated,
but sys.exit() will still be called. This can be excepted in the code via
`except SystemExit`.
Another way to avoid the SIGINT problem is to send a different signal to the
main thread. We do so if `signal` is specified.
After sending the signal, any sub-thread will call sys.exit(), and thus
finish. We leave it to the main thread though to decide if it will exit at
this point or not. Either way, it will have to handle the signal first.
If `once` is set to `True`, we will send the given signal at most once.
This will mitigate races between multiple error causes, specifically during
finalization.
"""
if signame: signum = get_signal_by_name(signame)
else : signum = None
with _signal_lock:
if once:
if signum in _signal_sent:
# don't signal again
return
if signum:
# send the given signal to the process to which this thread belongs
os.kill(os.getpid(), signum)
else:
# this sends a SIGINT, resulting in a KeyboardInterrupt.
# NOTE: see http://bugs.python.org/issue23395 for problems on using
# SIGINT in combination with signal handlers!
try:
_thread.interrupt_main()
except TypeError:
# this is known to be a side effect of `thread.interrup_main()`
pass
# record the signal sending
_signal_sent[signum] = True
# the sub thread will at this point also exit.
if not is_main_thread():
sys.exit()
# ------------------------------------------------------------------------------
# this is the counterpart for the `cancel_main_thread` method above: any main
# thread should call `set_cancellation_handler`, which will set a signal handler
# for `SIGUSR2`, and upon catching it will raise a `KeyboardInterrupt`
# exception, which can be caught by any interested library or application.
#
# RU claims ownership of `SIGUSR2` -- it will complain if any other signal
# handler is installed for that signal.
#
# This method can safely be called multiple times. This method can be called
# from threads, but it will have no effect then (Python allows signal handlers
# to only be installed in the main thread)
#
# FIXME: `cancel_main_thread()` supports arbitrary signals --
# `set_cancellation_handler()` should, too.
#
def _sigusr2_handler(signum, frame):
print('caught sigusr2 (%s)' % os.getpid())
# we only want to get this exception once, so we unset the signal handler
# before we raise it
signal.signal(signal.SIGUSR2, signal.SIG_IGN)
raise KeyboardInterrupt('sigusr2')
[docs]def set_cancellation_handler():
# this method compares function pointers
# pylint: disable=W0143
# check if any handler exists
old_handler = signal.getsignal(signal.SIGUSR2)
if old_handler not in [signal.SIG_DFL, signal.SIG_IGN, None] and \
old_handler != _sigusr2_handler:
raise RuntimeError('handler for SIGUSR2 is already present')
try:
signal.signal(signal.SIGUSR2, _sigusr2_handler)
except ValueError:
# this fails in subthreads - ignore
pass
[docs]def unset_cancellation_handler():
try:
signal.signal(signal.SIGUSR2, signal.SIG_IGN)
except ValueError:
# this fails in subthreads - ignore
pass
# ------------------------------------------------------------------------------
#
[docs]def get_signal_by_name(signame):
"""
Translate a signal name into the respective signal number. If `signame` is
not found to be a valid signal name, this method will raise a `KeyError`
exception. Lookup is case insensitive.
"""
table = {'abrt' : signal.SIGABRT,
'alrm' : signal.SIGALRM,
'bus' : signal.SIGBUS,
'chld' : signal.SIGCHLD,
# 'cld' : signal.SIGCLD,
'cont' : signal.SIGCONT,
'fpe' : signal.SIGFPE,
'hup' : signal.SIGHUP,
'ill' : signal.SIGILL,
'int' : signal.SIGINT,
'io' : signal.SIGIO,
'iot' : signal.SIGIOT,
'kill' : signal.SIGKILL,
'pipe' : signal.SIGPIPE,
# 'poll' : signal.SIGPOLL,
'prof' : signal.SIGPROF,
# 'pwr' : signal.SIGPWR,
'quit' : signal.SIGQUIT,
# 'rtmax' : signal.SIGRTMAX,
# 'rtmin' : signal.SIGRTMIN,
'segv' : signal.SIGSEGV,
'stop' : signal.SIGSTOP,
'sys' : signal.SIGSYS,
'term' : signal.SIGTERM,
'trap' : signal.SIGTRAP,
'tstp' : signal.SIGTSTP,
'ttin' : signal.SIGTTIN,
'ttou' : signal.SIGTTOU,
'urg' : signal.SIGURG,
'usr1' : signal.SIGUSR1,
'usr2' : signal.SIGUSR2,
'vtalrm' : signal.SIGVTALRM,
'winch' : signal.SIGWINCH,
'xcpu' : signal.SIGXCPU,
'xfsz' : signal.SIGXFSZ,
}
return table[signame.lower()]
# ------------------------------------------------------------------------------
#
[docs]class ThreadExit(SystemExit):
pass
[docs]class SignalRaised(SystemExit):
def __init__(self, msg, signum=None):
if signum:
msg = '%s [signal: %s]' % (msg, signum)
SystemExit.__init__(self, msg)
# ------------------------------------------------------------------------------
#
[docs]def raise_in_thread(e=None, tname=None, tident=None):
"""
This method uses an internal Python function to inject an exception 'e'
into any given thread. That thread can be specified by its name ('tname')
or thread id ('tid'). If not specified, the exception is sent to the
MainThread.
The target thread will receive the exception with some delay. More
specifically, it needs to call up to 100 op codes before the exception
is evaluated and raised. The thread interruption can thus be delayed
significantly, like when the thread sleeps.
The default exception raised is 'radical.utils.ThreadExit' which inherits
from 'SystemExit'.
NOTE: this is not reliable: the exception is not raised immediately, but is
*scheduled* for raising at some point, ie. in general after about 100
opcodes (`sys.getcheckinterval()`). Depending on when exactly the
exception is finally raised, the interpreter might silently swallow
it, if that happens in a generic try/except clause. Those exist in
the Python core, even if discouraged by some PEP or the other.
See https://bugs.python.org/issue1779233
NOTE: we can only raise exception *types*, not exception *instances*
See https://bugs.python.org/issue1538556
Example::
def sub():
time.sleep(0.1)
ru.raise_in_thread()
try:
t = mt.Thread(target=sub)
t.start()
while True:
time.sleep(0.01)
except ru.ThreadExit: print 'thread exit'
except Exception as e: print 'except: %s' % e
except SystemExit: print 'exit'
else: print 'unexcepted'
finally: t.join()
"""
if not tident:
if not tname:
tname = 'MainThread'
for th in mt.enumerate():
if tname == th.name:
tident = th.ident
break
if not tident:
raise ValueError('no target thread given/found')
if not e:
e = ThreadExit
self_thread = mt.current_thread()
if self_thread.ident == tident:
# if we are in the target thread, we simply raise the exception.
# This specifically also applies to the main thread.
# Alas, we don't have a decent message to use...
raise e('raise_in_thread')
else:
# otherwise we inject the exception into the main thread's async
# exception scheduler
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tident),
ctypes.py_object(e))
# ------------------------------------------------------------------------------