Source code for radical.utils.profile

# pylint: disable=protected-access

import os
import csv
import time

from .ids     import get_radical_base
from .misc    import as_string, as_list, ru_open
from .misc    import get_env_ns      as ru_get_env_ns
from .host    import get_hostname    as ru_get_hostname
from .host    import get_hostip      as ru_get_hostip
from .threads import get_thread_name as ru_get_thread_name
from .config  import DefaultConfig
from .atfork  import atfork


# ------------------------------------------------------------------------------
#
# We store profiles in CSV formatted files.
# The CSV field names are defined here:
#
TIME         = 0  # time of event (float, seconds since epoch)  mandatory
EVENT        = 1  # event ID (string)                           mandatory
COMP         = 2  # component which recorded the event          mandatory
TID          = 3  # uid of thread involved                      optional
UID          = 4  # uid of entity involved                      optional
STATE        = 5  # state of entity involved                    optional
MSG          = 6  # message describing the event                optional
ENTITY       = 7  # type of entity involved                     optional
PROF_KEY_MAX = 8  # iteration helper: `for _ in range(PROF_KEY_MAX):`

# Note that `ENTITY` is not written to the profile, but rather derived from the
# UID when reading the profiles.

# A previous incarnation of this class stored CSVs with the following columns:
#
# TIME       = 0  # time of event (float, seconds since epoch)  mandatory
# COMP       = 2  # component which recorded the event          mandatory
# TID        = 3  # uid of thread involved                      optional
# UID        = 4  # uid of entity involved                      optional
# STATE      = 5  # state of entity involved                    optional
# EVENT      = 1  # event ID (string)                           mandatory
# MSG        = 6  # message describing the event                optional

# ------------------------------------------------------------------------------
#
# when recombining profiles, we will get one NTP sync offset per profile, and
# thus potentially multiple such offsets per host.  If those differ more than
# a certain value (float, in seconds) from each other, we print a warning:
#
NTP_DIFF_WARN_LIMIT = 1.0

# syncing with the NTP host is expensive, so we only do it once in a while and
# cache the result.  We use a disk cache which is valid for 1 minute
NTP_CACHE_TIMEOUT = 60  # disk cache is valid for 60 seconds

# maximum field size allowed by the csv parser.  The larger the number of
# entities in the profile, the larger the size of the filed required by the
# csv parser. We assume a 64bit C long.
CSV_FIELD_SIZE_LIMIT = 9223372036854775807


# ------------------------------------------------------------------------------
#
def _sync_ntp():

    # read from disk cache
    try:
        with ru_open('%s/ntp.cache' % get_radical_base('utils'), 'r') as fin:
            data  = as_string(fin.read()).split()
            t_sys = float(data[0])
            t_ntp = float(data[1])

    except:
        t_sys = None
        t_ntp = None


    # if disc cache is empty or old
    t_now = time.time()
    if t_sys is None or t_now - t_sys > NTP_CACHE_TIMEOUT:

        # refresh data
        import ntplib                                    # pylint: disable=E0401

        ntp_host = os.environ.get('RADICAL_UTILS_NTPHOST','0.pool.ntp.org')

        t_one = time.time()
        response = ntplib.NTPClient().request(ntp_host, timeout=1)
        t_two = time.time()

        t_sys = (t_one + t_two) / 2.0
        t_ntp = response.tx_time

        with ru_open('%s/ntp.cache' % get_radical_base('utils'), 'w') as fout:
            fout.write('%f\n%f\n' % (t_sys, t_ntp))

    # correct both time stamps by current time
    t_cor  = time.time() - t_sys
    t_sys += t_cor
    t_ntp += t_cor

    return t_sys, t_ntp


# ------------------------------------------------------------------------------
#
# the profiler is not using threads and is generally threadsafe (all write ops
# should be atomic) - but alas Python threadlocks I/O streams, and those locks
# can still deadlock after fork:
#
#   - https://bugs.python.org/issue6721
#   - https://bugs.python.org/issue40399
#
# We thus have to close/reopen the prof file handle after fork.  This creates
# a bit of a mess as we now have to maintain a global list of profiler instances
# to clean up after fork... :-/
#
_profilers = list()


def _atfork_prepare():
    pass


def _atfork_parent():
    pass


def _atfork_child():
    for prof, fname in _profilers:
        prof._handle = ru_open(fname, 'a', buffering=1024)


atfork(_atfork_prepare, _atfork_parent, _atfork_child)


# ------------------------------------------------------------------------------
#
[docs]class Profiler(object): ''' This class is really just a persistent file handle with a convenience call (prof()) to write lines timestamped events. Any profiling intelligence must be applied when reading and evaluating the created profiles. the following fields are defined for each event: time : mandatory, float, time in seconds since epoch event: mandatory, string, short, unique name of event to be recorded comp : optional, string, name of component where the event originates tid : optional, string, current thread id (name) uid : optional, string, ID of entity involved (when available) state: optional, string, state of entity involved, if applicable msg : optional, string, free for message describing the event Strings MUST NOT contain commas. Otherwise they are encouraged to be formed as `[a-z][0-9a-z_.]*'. `msg` are free-form, but the inhibition of comma holds. We propose to limit the sum of strings to about 256 characters - this will guarantee atomic writes on most OS's, w/o additional locking overheads. Less than 100 charcters makes the profiles almost human-readable. The profile is enabled by setting environment variables. For a profiler named `radical.utils`, the following env variables will be evaluated: RADICAL_UTILS_PROFILE RADICAL_PROFILE If either is present in the environemnt, the profile is enabled (the value of the setting is ignored). ''' fields = ['time', 'event', 'comp', 'thread', 'uid', 'state', 'msg'] # -------------------------------------------------------------------------- # def __init__(self, name, ns=None, path=None): ''' Open the file handle, sync the clock, and write timestam_zero ''' ru_def = DefaultConfig() if not ns: ns = name # check if this profile is enabled via an env variable self._enabled = ru_get_env_ns('profile', ns) if self._enabled is None: self._enabled = ru_def.get('profile') if self._enabled is None: self._enabled = 'False' if self._enabled.lower() in ['0', 'false', 'off']: self._enabled = False # don't open the file on disabled profilers if not self._enabled: self._handle = None return # profiler is enabled - set properties, sync time, open handle self._enabled = True self._path = path self._name = name if not self._path: self._path = ru_def['profile_dir'] self._ts_zero, self._ts_abs, self._ts_mode = self._timestamp_init() try: os.makedirs(self._path) except OSError: pass # already exists # we set `buffering` to `1` to force line buffering. That is not idea # performance wise - but will not do an `fsync()` after writes, so OS # level buffering should still apply. This is supposed to shield # against incomplete profiles. fname = '%s/%s.prof' % (self._path, self._name) self._handle = ru_open(fname, 'a', buffering=1024) # register for cleanup after fork _profilers.append([self, fname]) # write header and time normalization info if self._handle: self._handle.write('#%s\n' % (','.join(Profiler.fields))) self._handle.write('%.7f,%s,%s,%s,%s,%s,%s\n' % (self.timestamp(), 'sync_abs', self._name, ru_get_thread_name(), '', '', '%s:%s:%s:%s:%s' % (ru_get_hostname(), ru_get_hostip(), self._ts_zero, self._ts_abs, self._ts_mode))) # -------------------------------------------------------------------------- # def __del__(self): # self.close() pass # -------------------------------------------------------------------------- # @property def enabled(self): return self._enabled @property def path(self): return self._path # -------------------------------------------------------------------------- #
[docs] def enable(self): self._enabled = True
[docs] def disable(self): self._enabled = False
# -------------------------------------------------------------------------- #
[docs] def close(self): try: if not self._enabled: return if self._enabled and self._handle: self.prof('END') self.flush() self._handle.close() self._handle = None except: pass
# -------------------------------------------------------------------------- #
[docs] def flush(self, verbose=False): if not self._enabled: return if not self._handle : return if verbose: self.prof('flush') # see https://docs.python.org/2/library/stdtypes.html#file.flush self._handle.flush() os.fsync(self._handle.fileno())
# -------------------------------------------------------------------------- # # FIXME: reorder args to reflect tupleorder (breaks API) #
[docs] def prof(self, event, uid=None, state=None, msg=None, ts=None, comp=None, tid=None): if not self._enabled: return if not self._handle : return if ts is None: ts = self.timestamp() if comp is None: comp = self._name if tid is None: tid = ru_get_thread_name() if uid is None: uid = '' if state is None: state = '' if msg is None: msg = '' # if uid is a list, then recursively call self.prof for each uid given for _uid in as_list(uid): data = '%.7f,%s,%s,%s,%s,%s,%s\n' \ % (ts, event, comp, tid, _uid, state, msg) self._handle.write(data) self._handle.flush()
# -------------------------------------------------------------------------- # def _timestamp_init(self): ''' return a tuple of [system time, absolute time] ''' # retrieve absolute timestamp from an external source # # We first try to contact a network time service for a timestamp, if # that fails we use the current system time. try: ts_sys, ts_ntp = _sync_ntp() return [ts_sys, ts_ntp, 'ntp'] except: pass # on any errors, we fall back to system time t = time.time() return [t,t, 'sys'] # -------------------------------------------------------------------------- #
[docs] def timestamp(self): return time.time()
# -------------------------------------------------------------------------- #
[docs]def timestamp(): return time.time()
# ------------------------------------------------------------------------------ #
[docs]def read_profiles(profiles, sid=None, efilter=None): ''' We read all profiles as CSV files and parse them. For each profile, we back-calculate global time (epoch) from the synch timestamps. The caller can provide a filter of the following structure:: filter = {ru.EVENT: ['event 1', 'event 2', ...], ru.MSG : ['msg 1', 'msg 2', ...], ... } Filters apply on *substring* matches! ''' legacy = os.environ.get('RADICAL_ANALYTICS_LEGACY_PROFILES', False) if legacy and legacy.lower() not in ['no', 'false']: legacy = True else: legacy = False # set the maximum field size allowed by the csv parser csv.field_size_limit(CSV_FIELD_SIZE_LIMIT) # import resource # print('max RSS : %20d MB' % (resource.getrusage(1)[2]/(1024))) # FIXME: we correct one pesky profile entry, which is exactly 1.000 in an # otherwise ntp-aligned profile - see [1]. In this case we use the # previous timestamp (if available) # # [1] https://github.com/radical-cybertools/radical.pilot/issues/1117 if not efilter: efilter = dict() ret = dict() last = list() skipped = 0 for prof in profiles: with ru_open(prof, 'r') as csvfile: ret[prof] = list() reader = csv.reader(csvfile) try: for raw in reader: # we keep the raw data around for error checks row = list(raw) # if 'bootstrap_1' in row: # print() # print(row) # skip header if row[TIME].startswith('#'): skipped += 1 continue # make room in the row for entity type etc. row.extend([None] * (PROF_KEY_MAX - len(row))) row[TIME] = float(row[TIME]) # we derive entity type from the uid -- but funnel # some cases into 'session' as a catch-all type uid = row[UID] if uid: row[ENTITY] = uid.split('.',1)[0] else: row[ENTITY] = 'session' row[UID] = sid # we should have no unset (ie. None) fields left - otherwise # the profile was likely not correctly closed. if None in row: if legacy: comp, tid = row[1].split(':', 1) new_row = [None] * PROF_KEY_MAX new_row[TIME ] = row[0] new_row[EVENT ] = row[4] new_row[COMP ] = comp new_row[TID ] = tid new_row[UID ] = row[2] new_row[STATE ] = row[3] new_row[MSG ] = row[5] uid = new_row[UID] if uid: new_row[ENTITY] = uid.split('.',1)[0] else: new_row[ENTITY] = 'session' new_row[UID] = sid row = new_row if None in row: print('row invalid [%s]: %s' % (prof, raw)) continue # raise ValueError('row invalid [%s]: %s' % (prof, row)) # apply the filter. We do that after adding the entity # field above, as the filter might also apply to that. skip = False for field, pats in efilter.items(): for pattern in pats: if row[field] in pattern: skip = True break if skip: continue # fix rp issue 1117 (see FIXME above) if row[TIME] == 1.0 and last: row[TIME] = last[TIME] if not skip: ret[prof].append(row) last = row # print(' --- %-30s -- %-30s ' % (row[STATE], row[MSG])) # if 'bootstrap_1' in row: # print(row) # print() # print('TIME : %s' % row[TIME ]) # print('EVENT : %s' % row[EVENT ]) # print('COMP : %s' % row[COMP ]) # print('TID : %s' % row[TID ]) # print('UID : %s' % row[UID ]) # print('STATE : %s' % row[STATE ]) # print('ENTITY : %s' % row[ENTITY]) # print('MSG : %s' % row[MSG ]) except: raise # print('skip remainder of %s' % prof) # continue return ret
# ------------------------------------------------------------------------------ #
[docs]def combine_profiles(profs): ''' We merge all profiles and sort by time. This routine expects all profiles to have a synchronization time stamp. Two kinds of sync timestamps are supported: absolute (`sync_abs`) and relative (`sync_rel`). Time syncing is done based on 'sync_abs' timestamps. We expect one such absolute timestamp to be available per host (the first profile entry will contain host information). All timestamps from the same host will be corrected by the respectively determined NTP offset. We define an 'accuracy' measure which is the maximum difference of clock correction offsets across all hosts. The `sync_rel` timestamps are expected to occur in pairs, one for a profile with no other sync timestamp, and one profile which has a `sync_abs` timestamp. In that case, the time correction from the latter is transfered to the former (the two time stamps are considered to have been written at the exact same time). The method returnes the combined profile and accuracy, as tuple. ''' syncs = dict() # profiles which have relative time refs t_host = dict() # time offset per host p_glob = list() # global profile t_min = None # absolute starting point of profiled session c_end = 0 # counter for profile closing tag accuracy = 0 # max uncorrected clock deviation if len(profs) == 1: return list(profs.values())[0], accuracy # first get all absolute and relative timestamp sync from the profiles, # for all hosts for pname, prof in profs.items(): sync_abs = list() sync_rel = list() syncs[pname] = {'rel' : sync_rel, 'abs' : sync_abs} if not len(prof): # print('empty %s' % pname) continue for entry in prof: if entry[EVENT] == 'sync_abs': sync_abs.append(entry) if entry[EVENT] == 'sync_rel': sync_rel.append(entry) # we can have any number of sync_rel's - but if we find none, we expect # a sync_abs if not sync_rel and not sync_abs: # print('unsynced %s' % pname) continue syncs[pname] = {'rel' : sync_rel, 'abs' : sync_abs} # for pname, prof in profs.items(): # if prof: # print('check %-100s: %s' % (pname, prof[0][TIME:EVENT])) for pname, prof in profs.items(): if not len(prof): # print('empty %s' % pname) continue # if we have only sync_rel(s), then find the offset by the corresponding # sync_rel in the other profiles, and determine the offset to use. Use # the first sync_rel that results in an offset, and only complain if # none is found. offset = None offset_event = None if syncs[pname]['abs']: offset = 0.0 else: for sync_rel in syncs[pname]['rel']: for _pname in syncs: if _pname == pname: continue for _sync_rel in syncs[_pname]['rel']: if _sync_rel[MSG] == sync_rel[MSG]: offset = _sync_rel[TIME] - sync_rel[TIME] offset_event = syncs[_pname]['abs'][0] if offset: break if offset: break if offset is None: # print('no rel sync %s' % pname) continue # print('sync profile %-100s : %20.3fs' % (pname, offset)) for event in prof: event[TIME] += offset # if we have an offset event, we append it to the profile. This # basically transplants an sync_abs event into a sync_rel profile if offset_event: # print('transplant sync_abs to %s: %s' % (pname, offset_event)) prof.append(offset_event) syncs[pname]['abs'].append(offset_event) # all profiles are rel-synced here. Now we look at sync_abs values to align # across hosts and to determine accuracy. for pname in syncs: for sync_abs in syncs[pname]['abs']: # https://github.com/radical-cybertools/radical.analytics/issues/20 if not sync_abs[MSG] or ':' not in sync_abs[MSG]: # print('unsynced profile %s [%s]' % (pname, sync_abs)) continue t_prof = sync_abs[TIME] host, ip, t_sys, t_ntp, t_mode = sync_abs[MSG].split(':') host_id = '%s:%s' % (host, ip) if t_min: t_min = min(t_min, t_prof) else : t_min = t_prof if t_mode == 'sys': # print('sys synced profile (%s)' % t_mode) continue # determine the correction for the given host t_sys = float(t_sys) t_ntp = float(t_ntp) t_off = t_sys - t_ntp if host_id in t_host and \ t_host[host_id] != t_off: diff = t_off - t_host[host_id] accuracy = max(accuracy, diff) # we allow for *some* amount of inconsistency before warning if diff > NTP_DIFF_WARN_LIMIT: print('conflicting time sync for %-45s (%15s): ' '%10.2f - %10.2f = %5.2f' % (pname.split('/')[-1], host_id, t_off, t_host[host_id], diff)) continue t_host[host_id] = t_off unsynced = set() # now that we can align clocks for all hosts, apply that correction to all # profiles for pname, prof in profs.items(): if not len(prof): # print('empty prof: %s' % pname) continue if not syncs[pname]['abs']: # print('no sync_abs event: %s' % prof[0]) continue sync_abs = syncs[pname]['abs'][0] # print(MSG) # print(sync_abs) # print(sync_abs[MSG]) # print(sync_abs[MSG].split(':')) host, ip, _, _, _ = sync_abs[MSG].split(':') host_id = '%s:%s' % (host, ip) if host_id in t_host: t_off = t_host[host_id] else: unsynced.add(host_id) t_off = 0.0 t_0 = sync_abs[TIME] t_0 -= t_min # correct profile timestamps for row in prof: row[TIME] -= t_min row[TIME] -= t_off # print(row[EVENT],) # count closing entries if row[EVENT] == 'END': c_end += 1 # add profile to global one p_glob += prof # if prof: # print('check %-100s: %s' % (pname, prof[0][TIME:EVENT])) # Check for proper closure of profiling files # if c_end == 0: # print('WARNING: profile "%s" not correctly closed.' % pname) # elif c_end > 1: # print('WARNING: profile "%s" closed %d times.' % (pname, c_end)) # sort by time and return p_glob = sorted(p_glob[:], key=lambda k: k[TIME]) # print('check %-100s: %s' % ('t_min', p_glob[0][TIME])) # print('check %-100s: %s' % ('t_max', p_glob[-1][TIME])) return p_glob, accuracy
# ------------------------------------------------------------------------------ #
[docs]def clean_profile(profile, sid, state_final=None, state_canceled=None): ''' This method will prepare a profile for consumption in radical.analytics. It performs the following actions: - makes sure all events have a `ename` entry - remove all state transitions to `CANCELLED` if a different final state is encountered for the same uid - assignes the session uid to all events without uid - makes sure that state transitions have an `ename` set to `state` ''' entities = dict() # things which have a uid if not state_final: state_final = [] elif not isinstance(state_final, list): state_final = [state_final] for event in profile: uid = event[UID ] state = event[STATE] name = event[EVENT] # we derive entity_type from the uid -- but funnel # some cases into the session if uid: event[ENTITY] = uid.split('.',1)[0] else: event[ENTITY] = 'session' event[UID] = sid uid = sid if uid not in entities: entities[uid] = dict() entities[uid]['states'] = dict() entities[uid]['events'] = list() if name == 'advance': # this is a state progression assert state, 'cannot advance w/o state' assert uid, 'cannot advance w/o uid' # this is a state transition event event[EVENT] = 'state' skip = False if state in state_final and state != state_canceled: # a final state other than CANCELED will cancel any previous # CANCELED state. if state_canceled and \ state_canceled in entities[uid]['states']: del entities[uid]['states'][state_canceled] # vice-versa, we will not add CANCELED if a final # state already exists: if state_canceled and \ state_canceled == state: if any([s in entities[uid]['states'] for s in state_final]): skip = True continue if state in entities[uid]['states']: # ignore duplicated recordings of state transitions skip = True continue # raise ValueError('double state (%s) for %s' % (state, uid)) if not skip: entities[uid]['states'][state] = event entities[uid]['events'].append(event) # we have evaluated, cleaned and sorted all events -- now we recreate # a clean profile out of them ret = list() for entity in list(entities.values()): ret += entity['events'] # sort by time and return ret = sorted(ret[:], key=lambda k: k[TIME]) return ret
# ------------------------------------------------------------------------------ #
[docs]def event_to_label(event): if event[EVENT] == 'state': return event[STATE] else: return event[EVENT]
# ------------------------------------------------------------------------------