import os
import sys
import time
import pprint
import signal
import random
import inspect
import traceback
import threading as mt
from .ids import generate_id
from .misc import ru_open
from .threads import get_thread_name
# ------------------------------------------------------------------------------
#
_ps_cmd = 'ps -efw'
if sys.platform != 'darwin':
_ps_cmd += ' --forest'
# ------------------------------------------------------------------------------
#
[docs]def get_trace():
trace = sys.exc_info()[2]
if trace:
stack = traceback.extract_tb(trace)
traceback_list = traceback.format_list(stack)
return ''.join(traceback_list)
else:
stack = traceback.extract_stack()
traceback_list = traceback.format_list(stack)
return ''.join(traceback_list[:-1])
# ------------------------------------------------------------------------------
#
# pylint: disable=unused-argument
[docs]def print_stacktraces(signum=None, sigframe=None):
'''
signum, sigframe exist to satisfy signal handler signature requirements
'''
this_tid = mt.currentThread().ident
# if multiple processes (ie. a process group) get the signal, then all
# traces are mixed together. Thus we waid 'pid%100' milliseconds, in
# the hope that this will stagger the prints.
pid = int(os.getpid())
time.sleep((pid % 100) / 1000)
out = '=========================================================\n'
out += 'RADICAL Utils -- Debug Helper -- Stacktraces\n'
out += os.popen("%s | grep ' %s ' | grep -v grep" % (_ps_cmd, pid)).read()
if _debug_helper:
out += '---------------------------------------------------------\n'
if _debug_helper.locks:
out += 'Locks:\n'
for name, lock in _debug_helper.locks.items():
owner = lock.owner
waits = lock.waits
if not owner: owner = '-'
out += ' %-60s: %s %s\n' % (name, owner, waits)
if _debug_helper.rlocks:
out += 'RLocks:\n'
for name, rlock in _debug_helper.rlocks.items():
owner = rlock.owner
waits = rlock.waits
if not owner: owner = '-'
out += ' %-60s: %s %s\n' % (name, owner, waits)
out += '---------------------------------------------------------\n'
try:
info = get_stacktraces()
except Exception as e:
out += 'skipping frame (%s)' % e
info = None
if info:
for tid,tname in info:
if tid == this_tid: marker = '[active]'
else : marker = ''
out += '---------------------------------------------------------\n'
out += 'Thread: %s %s\n' % (tname, marker)
out += ' PID : %s \n' % os.getpid()
out += ' TID : %s \n' % tid
for fname,line,func,code in info[tid,tname]:
if code: code = code.strip()
else : code = '<no code>'
# # [:-1]: .py vs. .pyc:/
# if not (__file__[:-1] in fname and \
# func in ['get_stacktraces', 'print_stacktraces']):
if func not in ['get_stacktraces', 'print_stacktraces']:
out += ' File: %s, line %d, in %s\n' % (fname, line, func)
out += ' %s\n' % code
out += '========================================================='
sys.stdout.write('%s\n' % out)
if 'RADICAL_DEBUG' in os.environ:
with ru_open('/tmp/ru.stacktrace.%s.log' % pid, 'w') as f:
f.write('%s\n' % out)
sys.stdout.flush()
# ------------------------------------------------------------------------------
#
[docs]def get_stacktraces():
id2name = dict()
for th in mt.enumerate():
id2name[th.ident] = th.name
ret = dict()
stacklist = list(sys._current_frames().items()) # pylint: disable=W0212
for tid, stack in stacklist:
name = id2name.get(tid, 'noname')
ret[tid, name] = traceback.extract_stack(stack)
return ret
# ------------------------------------------------------------------------------
#
[docs]def print_stacktrace(msg=None, _stack=None):
if not msg:
msg = ''
tname = mt.currentThread().name
pid = os.getpid()
out = '--------------\n'
out += 'RADICAL Utils -- Stacktrace [%s] [%s]\n' % (pid, tname)
out += '%s\n' % msg
out += os.popen("%s | grep ' %s ' | grep -v grep" % (_ps_cmd, pid)).read()
if not _stack:
_stack = get_stacktrace()
for line in _stack:
out += line.strip()
out += '\n'
out += '--------------\n'
sys.stdout.write(out)
# ------------------------------------------------------------------------------
#
[docs]def get_exception_trace(msg=None):
return traceback.format_exc().split('\n')
# ------------------------------------------------------------------------------
#
[docs]def print_exception_trace(msg=None, exc=None):
if not exc:
print_stacktrace(msg=msg, _stack=get_exception_trace())
else:
print_stacktrace(msg=msg,
_stack=traceback.extract_tb(exc.__traceback__))
# ------------------------------------------------------------------------------
#
[docs]def get_stacktrace():
return traceback.format_stack()[:-1]
# ------------------------------------------------------------------------------
#
[docs]def get_caller_name(skip=2):
'''
Get a name of a caller in the format module.class.method
`skip` specifies how many levels of stack to skip while getting caller
name. skip=1 means 'who calls me', skip=2 'who calls my caller' etc.
An empty string is returned if skipped levels exceed stack height
Kudos: http://stackoverflow.com/questions/2654113/ \
python-how-to-get-the-callers-method-name-in-the-called-method
'''
stack = inspect.stack()
start = 0 + skip
if len(stack) < start + 1:
return ''
pframe = stack[start][0]
name = list()
module = inspect.getmodule(pframe)
# `modname` can be None when frame is executed directly in console
# TODO(techtonik): consider using __main__
if module:
name.append(module.__name__)
# detect classname
if 'self' in pframe.f_locals:
name.append(pframe.f_locals['self'].__class__.__name__)
codename = pframe.f_code.co_name
if codename != '<module>': # top level usually
name.append(codename) # function or a method
del pframe
return '.'.join(name)
# ------------------------------------------------------------------------------
#
_verb = False
if 'RADICAL_DEBUG_VERBOSE' in os.environ:
_verb = True
_raise_on_state = dict()
_raise_on_lock = mt.Lock()
# ------------------------------------------------------------------------------
#
[docs]def raise_on(tag, log=None, msg=None):
'''
The purpose of this method is to artificially trigger error conditions for
testing purposes, for example when handling the n'th unit, getting the n'th
heartbeat signal, etc.
The tag parameter is interpreted as follows: on the `n`'th invocation of
this method with any given `tag`, an exception is raised, and the counter
for that tag is reset.
The limit `n` is set via an environment variable `RU_RAISE_ON_<tag>`, with
`tag` in upper casing. The environment will only be inspected during the
first invocation of the method with any given tag. The tag counter is
process-local, but is shared amongst threads of that process.
'''
with _raise_on_lock:
if tag not in _raise_on_state:
env = os.environ.get('RU_RAISE_ON_%s' % tag.upper())
if env and env.startswith('RANDOM_'):
# env is rnd spec
rate = int(env[7:])
limit = 1
elif env:
# env is int
rate = 1
limit = int(env)
else:
# no env set
rate = 1
limit = 0
_raise_on_state[tag] = {'count': 0,
'rate' : rate,
'limit': limit}
_raise_on_state[tag]['count'] += 1
count = _raise_on_state[tag]['count']
limit = _raise_on_state[tag]['limit']
rate = _raise_on_state[tag]['rate']
if msg : info = '%s [%2d / %2d] [%s]' % (tag, count, limit, msg)
elif _verb: info = '%s [%2d / %2d]' % (tag, count, limit )
if log : log.debug('raise_on checked %s' , info)
elif _verb: print('raise_on checked %s' % info)
if limit and count == limit:
_raise_on_state[tag]['count'] = 0
if rate == 1:
val = limit
else:
val = random.randint(0, 100)
if val > rate:
if log: log.warning('raise_on ignored %s [%2d / %2d]',
tag, val, rate)
elif _verb: print('raise_on ignored %s [%2d / %2d]'
% (tag, val, rate))
return
if log: log.warning('raise_on triggered %s [%2d / %2d]',
tag, val, rate)
elif _verb: print('raise_on triggered %s [%2d / %2d]'
% (tag, val, rate))
# reset counter and raise exception
raise RuntimeError('raise_on for %s [%s]' % (tag, val))
# ------------------------------------------------------------------------------
#
[docs]def attach_pudb(log=None):
# need to move here to avoid circular import
from .threads import gettid
host = '127.0.0.1'
# host = gethostip()
tid = gettid()
port = tid + 10000
if log:
log.info('debugger open: telnet %s %d', host, port)
else:
print('debugger open: telnet %s %d' % (host, port))
try:
import pudb # pylint: disable=E0401
from pudb.remote import set_trace # pylint: disable=E0401
pudb.DEFAULT_SIGNAL = signal.SIGALRM
set_trace(host=host, port=port, term_size=(200, 50))
except Exception as e:
if log:
log.warning('failed to attach pudb (%s)', e)
# ------------------------------------------------------------------------------
#
_SNIPPET_PATHS = ['%s/.radical/snippets/' % os.environ.get('RADICAL_BASE', '/tmp')]
[docs]def add_snippet_path(path):
'''
add a path to the search path for dynamically loaded python snippets
(see `ru.get_snippet()`).
'''
if 'RADICAL_DEBUG' in os.environ:
if path not in _SNIPPET_PATHS:
_SNIPPET_PATHS.append(path)
# ------------------------------------------------------------------------------
#
[docs]def get_snippet(sid):
'''
RU exposes small python snippets for runtime code insertion. The usage is
intended as follows:
* a programmer implements a class
* for some experiment or test, that class's behavior must be controled at
runtime.
* in all places where such an adaptation is expected to take place, the
programmer inserts a hook like this:
exec(ru.get_snippet('my_class.init_hook'))
* this will trigger RU to search for python files of the name
`my_class.init_hook.py` in `$RADICAL_BASE/.radical/snippets/` (default),
and return their content for injection.
The snippet search path can be extended by calling.
ru.add_snippet_path(path)
The `RADICAL_DEBUG` environment variable needs to be set for this method to
do anything. A snippet can use the following literal strinfgs which will be
replaced by their actual values:
'###SNIPPET_FILE###' - filename from which snippet was loaded
'###SNIPPET_PATH###' - path in which the snippet file is located
'###SNIPPET_ID###' - the sid string used to identify the snippet
'''
if 'RADICAL_DEBUG' in os.environ:
for path in _SNIPPET_PATHS:
fname = '%s/%s.py' % (path, sid)
try:
with ru_open(fname, 'r') as fin:
snippet = fin.read()
snippet = snippet.replace('###SNIPPET_FILE###', fname)
snippet = snippet.replace('###SNIPPET_PATH###', path)
snippet = snippet.replace('###SNIPPET_ID###', sid)
return snippet
except:
pass
return 'None'
# ------------------------------------------------------------------------------
#
[docs]class DebugHelper(object):
'''
When instantiated, and when 'RADICAL_DEBUG' is set in the environment, this
class will install a signal handler for SIGUSR1. When that signal is
received, a stacktrace for all threads is printed to stdout.
We also check if SIGINFO is available, which is generally bound to CTRL-T.
Additionally, a call to 'dh.fs_block(info=None)' will create a file system
based barrier: it will create a unique file in /tmp/ (based on 'name' if
given), and dump the stack trace and any 'info' into it. It then waits
until that file has changed (touched or removed etc), and then returns.
The wait is a simple pull based 'os.stat()' (once per sec).
'''
# --------------------------------------------------------------------------
#
def __init__(self, name=None, info=None):
'''
name: string to identify fs barriers
info: static info to dump into fs barriers
'''
self.name = name
self.info = info
self.locks = dict()
self.rlocks = dict()
if not self.name:
self.name = str(id(self))
if 'MainThread' not in mt.current_thread().name:
# python only supports signals in main threads :-/
return
if 'RADICAL_DEBUG' in os.environ:
signal.signal(signal.SIGUSR1, print_stacktraces) # signum 30
signal.signal(signal.SIGQUIT, print_stacktraces) # signum 3
try:
assert signal.SIGINFO
signal.signal(signal.SIGINFO, print_stacktraces) # signum 29
except AttributeError: # stack unwind in progress
pass
# --------------------------------------------------------------------------
#
[docs] def register_lock(self, name, lock):
assert name not in self.locks, name
self.locks[name] = lock
# --------------------------------------------------------------------------
#
[docs] def register_rlock(self, name, rlock):
assert name not in self.rlocks, name
self.rlocks[name] = rlock
# --------------------------------------------------------------------------
#
[docs] def unregister_lock(self, name):
assert name in self.locks, name
del self.locks[name]
# --------------------------------------------------------------------------
#
[docs] def unregister_rlock(self, name):
assert name in self.rlocks, name
del self.rlocks[name]
# --------------------------------------------------------------------------
#
[docs] def fs_block(self, info=None):
'''
Dump state, info in barrier file, and wait for it tou be touched or
read or removed, then continue. Leave no trace.
'''
if 'RADICAL_DEBUG' not in os.environ:
return
try:
pid = os.getpid()
tid = mt.currentThread().ident
fb = '/tmp/ru.dh.%s.%s.%s' % (self.name, pid, tid)
fd = ru_open(fb, 'w+')
fd.seek(0,0)
fd.write('\nSTACK TRACE:\n%s\n%s\n' % (time.time(), get_trace()))
fd.write('\nSTATIC INFO:\n%s\n\n' % pprint.pformat(self.info))
fd.write('\nINFO:\n%s\n\n' % pprint.pformat(info))
fd.flush()
new = os.stat(fb)
old = new
while old == new:
new = os.stat(fb)
time.sleep(0.1)
except:
# we don't care (much)...
pass
finally:
if fd : fd.close()
try : os.unlink(fb)
except: pass
# ------------------------------------------------------------------------------
#
_debug_helper = None
if 'RADICAL_DEBUG_HELPER' in os.environ:
if not _debug_helper:
_debug_helper = DebugHelper()
# ------------------------------------------------------------------------------
#
[docs]class Lock(object):
# --------------------------------------------------------------------------
#
def __init__(self, name=None):
self.lock = mt.Lock()
self.owner = None
self.waits = list()
self.name = name
if not self.name:
self.name = generate_id('lock')
if _debug_helper:
_debug_helper.register_lock(self.name, self)
def __enter__(self):
self.acquire()
def __exit__(self, a, b, c):
self.release()
# --------------------------------------------------------------------------
#
[docs] def acquire(self, blocking=True):
self.waits.append(get_thread_name())
ret = self.lock.acquire(blocking=blocking)
if ret is not False:
self.owner = get_thread_name()
self.waits.pop()
return ret
# --------------------------------------------------------------------------
#
[docs] def release(self):
self.lock.release()
self.owner = None
# ------------------------------------------------------------------------------
#
[docs]class RLock(object):
# --------------------------------------------------------------------------
#
def __init__(self, name=None):
self.lock = mt.RLock()
self.owner = None
self.waits = list()
self.name = name
if not self.name:
self.name = generate_id('rlock')
if _debug_helper:
_debug_helper.register_rlock(self.name, self)
def __enter__(self):
self.acquire()
def __exit__(self, a, b, c):
self.release()
# --------------------------------------------------------------------------
#
[docs] def acquire(self, blocking=True):
self.waits.append(get_thread_name())
ret = self.lock.acquire(blocking=blocking)
if ret is not False:
self.owner = get_thread_name()
self.waits.pop()
return ret
# --------------------------------------------------------------------------
#
[docs] def release(self):
self.lock.release()
self.owner = None
# ------------------------------------------------------------------------------