import os
import sys
import string
# import colorama as c
from .misc import get_env_ns as ru_get_env_ns
from .misc import ru_open
from .config import DefaultConfig
# ------------------------------------------------------------------------------
#
def _open(target):
try:
os.makedirs(os.path.abspath(os.path.dirname(target)))
except OSError:
pass # exists
return ru_open(target, 'w')
# ------------------------------------------------------------------------------
#
[docs]class Reporter(object):
# COLORS = {'white' : c.Style.BRIGHT + c.Fore.WHITE ,
# 'yellow' : c.Style.BRIGHT + c.Fore.YELLOW ,
# 'green' : c.Style.BRIGHT + c.Fore.GREEN ,
# 'blue' : c.Style.BRIGHT + c.Fore.BLUE ,
# 'cyan' : c.Style.BRIGHT + c.Fore.CYAN ,
# 'red' : c.Style.BRIGHT + c.Fore.RED ,
# 'magenta' : c.Style.BRIGHT + c.Fore.MAGENTA ,
# 'black' : c.Style.BRIGHT + c.Fore.BLACK ,
# 'darkwhite' : c.Style.DIM + c.Fore.WHITE ,
# 'darkyellow' : c.Style.DIM + c.Fore.YELLOW ,
# 'darkgreen' : c.Style.DIM + c.Fore.GREEN ,
# 'darkblue' : c.Style.DIM + c.Fore.BLUE ,
# 'darkcyan' : c.Style.DIM + c.Fore.CYAN ,
# 'darkred' : c.Style.DIM + c.Fore.RED ,
# 'darkmagenta' : c.Style.DIM + c.Fore.MAGENTA ,
# 'darkblack' : c.Style.DIM + c.Fore.BLACK ,
# 'reset' : c.Style.RESET_ALL + c.Fore.RESET
# }
COLORS = {'reset' : '\033[39m',
'black' : '\033[30m',
'red' : '\033[31m',
'green' : '\033[32m',
'yellow' : '\033[33m',
'blue' : '\033[34m',
'magenta' : '\033[35m',
'cyan' : '\033[36m',
'lightgray' : '\033[37m',
'darkgray' : '\033[90m',
'lightred' : '\033[91m',
'lightgreen' : '\033[92m',
'lightyellow' : '\033[93m',
'lightblue' : '\033[94m',
'lightmagenta' : '\033[95m',
'lightcyan' : '\033[96m',
'white' : '\033[97m'}
MODS = {'reset' : '\033[0m',
'bold' : '\033[1m',
'underline' : '\033[4m',
'blink' : '\033[5m',
'inverse' : '\033[7m',
'' : ''}
# define terminal colors and other output options
TITLE = 'bold lightblue'
HEADER = 'bold lightyellow'
INFO = 'lightblue'
IDLE = 'lightwhite'
PROGRESS = 'lightwhite'
OK = 'lightgreen'
WARN = 'lightyellow'
ERROR = 'lightred'
EMPTY = ''
DOTTED = '.'
SINGLE = '-'
DOUBLE = '='
HASHED = '#'
# --------------------------------------------------------------------------
#
def __init__(self, name, ns=None, path=None, targets=None, enabled=None):
'''
settings.style:
E : empty line
T : tabulator
L : line of line segments
M : text to report
'''
ru_def = DefaultConfig()
self._name = name
if not ns:
ns = name
# check if this profile is enabled via an env variable
if enabled is not None:
self._enabled = bool(enabled)
else:
self._enabled = str(ru_def['report']).lower()
if ru_get_env_ns('report', ns) is not None:
self._enabled = str(ru_get_env_ns('report', ns)).lower()
if self._enabled in ['0', 'false', 'off', 'none']:
self._enabled = False
else:
self._enabled = True
if not self._enabled:
# disabled
return
self._use_color = ru_get_env_ns('report_color', ns, default='True')
if self._use_color.lower() in ['0', 'false', 'off']:
self._use_color = False
else:
self._use_color = True
self._use_anime = ru_get_env_ns('report_anime', ns, default='True')
if self._use_anime.lower() in ['0', 'false', 'off']:
self._use_anime = False
else:
self._use_anime = True
self._line_len = int(ru_get_env_ns('report_llen', ns, default=80))
if not path:
path = os.getcwd()
if not targets:
targets = ru_get_env_ns('report_tgt', ns)
if not targets:
targets = ru_def['report_tgt']
if isinstance(targets, str):
targets = targets.split(',')
if not isinstance(targets, list):
targets = [targets]
if '/' in name:
try:
os.makedirs(os.path.normpath(os.path.dirname(name)))
except OSError:
pass # dir exists
self._prog_tgt = None
self._prog_cnt = 0
self._prog_pos = 0
self._prog_off = 0
C = 'color'
S = 'style'
G = 'segment'
self._pos = 0
self._settings = {
'title' : {C: self.TITLE , S: 'ELMLE' , G: self.DOUBLE},
'header' : {C: self.HEADER , S: 'ELME' , G: self.SINGLE},
'info' : {C: self.INFO , S: 'M' , G: self.EMPTY },
'idle' : {C: self.IDLE , S: 'M' , G: self.EMPTY },
'progress' : {C: self.PROGRESS , S: 'M' , G: self.EMPTY },
'ok' : {C: self.OK , S: 'M' , G: self.EMPTY },
'warn' : {C: self.WARN , S: 'M' , G: self.EMPTY },
'error' : {C: self.ERROR , S: 'M' , G: self.EMPTY },
'plain' : {C: '' , S: 'M' , G: self.EMPTY },
}
if not self._use_color:
for k in self._settings:
self._settings[k]['color'] = ''
self._idle_sequence = '/-\\|'
self._idle_pos = dict()
self._idle_count = 0
self._streams = list()
for t in targets:
if t in ['0', 'null'] : continue
elif t in ['-', '1', 'stdout']: h = sys.stdout
elif t in ['=', '2', 'stderr']: h = sys.stderr
elif t in ['.'] : h = _open("%s/%s.rep" % (path,name))
elif t.startswith('/') : h = _open(t)
else : h = _open("%s/%s" % (path,t))
self._streams.append(h)
# --------------------------------------------------------------------------
#
def _out(self, color, msg, count=None):
if not self._enabled:
return
if self._use_color:
color_mod = ''
if ' ' in color:
color_mod, color = color.split(' ', 2)
color = self.COLORS.get(color.lower(), '')
color_mod = self.MODS.get(color_mod.lower(), '')
color += color_mod
# make sure we count tab length on line start correctly
msg = msg.replace('\n\t', '\n ')
# make sure we don't extent a long line further
if self._pos >= (self._line_len) and msg and msg[0] != '\n':
while msg[0] == '\b':
msg = msg[1:]
if count:
msg = ' %6d\n %s' % (count, msg)
else:
msg = '\n %s' % msg
# special control characters:
#
# * '>>' will, at it's place, insert sufficient spaces to make the
# remainder of the string right-aligned. Only one >> is
# interpreted, linebreaks before it are ignored
#
# * '<<' will insert a line break if the position is not already on
# the beginning of a line
#
slash_f = msg.find('>>')
if slash_f >= 0:
copy = msg[slash_f + 1:].strip()
spaces = self._line_len - self._pos - len(copy) + 1 # '>>'
spaces = max(0, spaces)
msg = msg.replace('>>', spaces * ' ')
slash_cr = msg.find('<<')
if slash_cr >= 0:
if self._pos + slash_cr > 0:
spaces = self._line_len - self._pos - 1
msg = msg.replace('<<', '%s\\\n' % (spaces * ' '))
else:
msg = msg.replace('<<', '')
mlen = len([x for x in msg if x in string.printable])
mlen -= msg.count('\b')
# find the last \n and then count how many chars we are writing after it
slash_n = msg.rfind('\n')
if slash_n >= 0:
self._pos = mlen - slash_n - 1
else:
self._pos += mlen
for stream in self._streams:
if self._use_color:
stream.write(color)
stream.write(msg)
if self._use_color:
stream.write(self.COLORS['reset'])
stream.write(self.MODS['reset'])
try:
stream.flush()
except Exception:
pass
# --------------------------------------------------------------------------
#
def _format(self, msg, settings=None):
if not self._enabled:
return
if not msg:
msg = ''
if not settings:
settings = dict()
color = settings.get('color', '')
style = settings.get('style', 'M')
segment = settings.get('segment', '')
for c in style:
if c == 'M':
self._out(color, "%s" % msg)
if c == 'T':
self._out(color, "\t")
elif c == 'E':
self._out(color, "\n")
elif c == 'L':
if segment:
self._out(color, "%s\n" % (self._line_len * segment))
# --------------------------------------------------------------------------
#
[docs] def set_style(self, which, color=None, style=None, segment=None):
if not self._enabled:
return
if which not in self._settings:
raise LookupError('reporter does not support style "%s"' % which)
settings = self._settings[which]
if color : settings['color'] = color
if style : settings['style'] = style
if segment: settings['segment'] = segment
# --------------------------------------------------------------------------
#
[docs] def title(self, title):
if not self._enabled:
return
if not title:
title = self._name
if not title:
return
fmt = " %%-%ds\n" % (self._line_len - 1)
self._format(fmt % title, self._settings['title'])
# --------------------------------------------------------------------------
#
# --------------------------------------------------------------------------
#
[docs] def info(self, msg=''):
if not self._enabled:
return
self._format(msg, self._settings['info'])
# --------------------------------------------------------------------------
#
[docs] def idle(self, c=None, mode=None, color=None, idle_id=None):
if not self._enabled:
return
if not self._use_anime:
return
if not idle_id:
idle_id = 'default'
if color: col = self._settings[color]['color']
else : col = self._settings['idle']['color']
idx = 0
if mode == 'start':
self._out(col, 'O')
self._idle_count = 0
elif mode == 'stop':
self._out(col, '\b %6d' % self._idle_count)
else:
if not c:
idx = self._idle_pos.get(idle_id, 0)
c = self._idle_sequence[idx % len(self._idle_sequence)]
idx += 1
self._out(col, '\b%s' % c)
else:
idx += 1
self._idle_count += 1
self._out(col, '\b%s|' % c, count=self._idle_count)
self._idle_pos[idle_id] = idx
# --------------------------------------------------------------------------
#
[docs] def progress_tgt(self, tgt=None, label=None):
if not self._enabled:
return
if label:
front = '%-6s: ' % label
else:
front = '%-6d: ' % tgt
self._prog_len = self._line_len - len(front)
self._prog_tgt = tgt
self._prog_cnt = 0
self._prog_pos = 0
self._format(front, self._settings['progress'])
# --------------------------------------------------------------------------
#
[docs] def progress_done(self):
if not self._enabled:
return
self._prog_tgt = None
self._prog_cnt = 0
self._prog_pos = 0
self._format('\n', self._settings['progress'])
# --------------------------------------------------------------------------
#
[docs] def progress(self, msg=None):
if not self._enabled:
return
if self._prog_tgt:
self._prog_cnt += 1
val = int(self._prog_cnt * self._prog_len / self._prog_tgt)
while val > self._prog_pos:
self._prog_pos += 1
self._format('#', self._settings['progress'])
else:
if not msg:
msg = '.'
self._format(msg, self._settings['progress'])
# --------------------------------------------------------------------------
#
[docs] def ok(self, msg=''):
if not self._enabled:
return
self._format(msg, self._settings['ok'])
# --------------------------------------------------------------------------
#
[docs] def warn(self, msg=''):
if not self._enabled:
return
self._format(msg, self._settings['warn'])
# --------------------------------------------------------------------------
#
[docs] def error(self, msg=''):
if not self._enabled:
return
self._format(msg, self._settings['error'])
# --------------------------------------------------------------------------
#
[docs] def exit(self, msg='', exit_code=0):
if not self._enabled:
return
self.error(msg)
sys.exit(exit_code)
# --------------------------------------------------------------------------
#
[docs] def plain(self, msg=''):
if not self._enabled:
return
self._format(msg)
# ------------------------------------------------------------------------------