Source code for radical.utils.shell


import os
import queue
import shlex
import select

import threading  as mt
import subprocess as sp

from .constants import RUNNING, DONE, FAILED
from .misc      import is_string
from .misc      import ru_open


# ------------------------------------------------------------------------------
#
[docs]def sh_quote(data): ''' quote a string and wrap it in double quotes so that it can passed to a POSIX shell. Examples: foo -> "foo" foo"bar -> "foo\\"bar" foo\\"bar -> "foo\\\\\\"bar" $FOO'$BAR -> "$FOO'$BAR" NOTE: this method does not attempt to strip out backticks or other code execution mechanisms from the string. ''' if '\\' in data: data = data.replace('\\', '\\\\') if '"' in data: data = data.replace('"', '\\"') return '"%s"' % data
# ------------------------------------------------------------------------------ #
[docs]def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None, cwd=None): ''' call a shell command, return `[stdout, stderr, retval]`. ''' # convert string into arg list if needed if is_string(cmd) and \ not shell: cmd = shlex.split(cmd) if stdout : stdout = sp.PIPE else : stdout = None if stderr : stderr = sp.PIPE else : stderr = None p = sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd) if not stdout and not stderr: ret = p.wait() else: stdout, stderr = p.communicate() ret = p.returncode return stdout.decode("utf-8"), stderr.decode("utf-8"), ret
# ------------------------------------------------------------------------------ #
[docs]def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None, cwd=None): ''' call a shell command in the background. Do not attempt to pipe STDOUT/ERR, but only support writing to named files. ''' # pipes won't work - see sh_callout_async if stdout == sp.PIPE: raise ValueError('stdout pipe unsupported') if stderr == sp.PIPE: raise ValueError('stderr pipe unsupported') # openfile descriptors for I/O, if needed if is_string(stdout): stdout = ru_open(stdout, 'w') if is_string(stderr): stderr = ru_open(stderr, 'w') # convert string into arg list if needed if not shell and is_string(cmd): cmd = shlex.split(cmd) sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd) return
# ------------------------------------------------------------------------------ #
[docs]def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True, shell=False, env=None, cwd=None): ''' Run a command, and capture stdout/stderr if so flagged. The call will return an PROC object instance on which the captured output can be retrieved line by line (I/O is line buffered). When the process is done, a `None` will be returned on the I/O queues. Line breaks are stripped. stdout/stderr: True [default], False, string - False : discard I/O - True : capture I/O as queue [default] - string: capture I/O as queue, also write to named file shell: True, False [default] - pass to popen cwd: string - working directory for command to run in PROC: - PROC.stdout : `queue.Queue` instance delivering stdout lines - PROC.stderr : `queue.Queue` instance delivering stderr lines - PROC.state : ru.RUNNING, ru.DONE, ru.FAILED - PROC.rc : returncode (None while ru.RUNNING) - PROC.stdout_filename: name of stdout file (when available) - PROC.stderr_filename: name of stderr file (when available) ''' # NOTE: Fucking python screws up stdio buffering when threads are used, # *even if the treads do not perform stdio*. Its possible that the # logging module interfers, too. Either way, I am fed up debugging # this shit, and give up. This method does not work for threaded # python applications. assert False, 'this is broken for python apps' # -------------------------------------------------------------------------- # class _P(object): ''' internal representation of a process ''' # ---------------------------------------------------------------------- def __init__(self, cmd, stdin, stdout, stderr, shell, env, cwd): cmd = cmd.strip() self._in_c = bool(stdin) # flag stdin capture self._out_c = bool(stdout) # flag stdout capture self._err_c = bool(stderr) # flag stderr capture self._in_r , self._in_w = os.pipe() # put stdin to child self._out_r, self._out_w = os.pipe() # get stdout from child self._err_r, self._err_w = os.pipe() # get stderr from child self._in_o = os.fdopen(self._in_r) # file object for in ep self._out_o = os.fdopen(self._out_r) # file object for out ep self._err_o = os.fdopen(self._err_r) # file object for err ep self._in_q = queue.Queue() # get stdin from parent self._out_q = queue.Queue() # put stdout to parent self._err_q = queue.Queue() # put stderr to parent if is_string(stdout): self._out_f = ru_open(stdout, 'w') else : self._out_f = None if is_string(stderr): self._err_f = ru_open(stderr, 'w') else : self._err_f = None self.state = RUNNING self._proc = sp.Popen(cmd, stdin=self._in_r, stdout=self._out_w, stderr=self._err_w, shell=shell, env=env, cwd=cwd, bufsize=1) t = mt.Thread(target=self._watch) t.daemon = True t.start() self.rc = None # return code @property def stdin(self): if not self._in_c: raise RuntimeError('stdin not captured') return self._in_q @property def stdout(self): if not self._out_c: raise RuntimeError('stdout not captured') return self._out_q @property def stderr(self): if not self._err_c: raise RuntimeError('stderr not captured') return self._err_q @property def stdout_filename(self): if not self._out_f: raise RuntimeError('stdout not recorded') return self._out_f.name @property def stderr_filename(self): if not self._err_f: raise RuntimeError('stderr not recorded') return self._err_f.name def kill(self): self._proc.terminate() # ---------------------------------------------------------------------- # def _watch(self): poller = select.poll() poller.register(self._out_r, select.POLLIN | select.POLLHUP) poller.register(self._err_r, select.POLLIN | select.POLLHUP) # try forever to read stdin, stdout and stderr, stop only when # either signals that process (parent or child) died while True: # check for input data = self._in_q.get_nowait() if data: self._out_o.write(data) self._out_f.write(data) active = False fds = poller.poll(100) # timeout configurable (ms) for fd,mode in fds: if mode & select.POLLHUP: # fd died - grab data from other fds continue if fd == self._out_r: o_in = self._out_o q_out = self._out_q f_out = self._out_f elif fd == self._err_r: o_in = self._err_o q_out = self._err_q f_out = self._err_f line = o_in.readline() # `bufsize=1` in `popen` if line: # found valid data (active) active = True if q_out: q_out.put(line.rstrip('\n')) if f_out: f_out.write(line) # no data received - check process health if not active and self._proc.poll() is not None: # process is dead self.rc = self._proc.returncode if self.rc == 0: self.state = DONE else : self.state = FAILED if self._out_q: self._out_q.put(None) # signal EOF if self._err_q: self._err_q.put(None) # signal EOF if self._out_q: self._out_q.join() # ensure reads if self._err_q: self._err_q.join() # ensure reads return # finishes thread # -------------------------------------------------------------------------- return _P(cmd=cmd, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd)
# ------------------------------------------------------------------------------