Source code for radical.utils.poll


__author__    = "Radical.Utils Development Team"
__copyright__ = "Copyright 2016, RADICAL@Rutgers"
__license__   = "MIT"

import os
import time
import select
import threading as mt


# `select.poll()` is a nice thing - alas, it is not implemented in MacOS'
# default python deployment.  Since RU process management relies on socket
# polling, we reimplement it here based on `select.select()`.
#
# The following import directive should enable any select based code to run
# unchanged on MacOS (for the parts we port, at least):
#
#     import radical.utils.poll as select
#
#     poller = select.poll()
#     ...
#
# If `RU_USE_PYPOLL` is set (to an arbitrary, non-empty value) in the
# environment, we fall back to the native Python implementation.
#
_use_pypoll = os.environ.get('RU_USE_PYPOLL', False)


# ------------------------------------------------------------------------------
# define the Poller factory.  No idea why the Poller object is not directly
# exposed in the `select` module... :/
[docs]def poll(log=None): return (Poller(log))
# ------------------------------------------------------------------------------ if _use_pypoll: POLLIN = select.POLLIN POLLPRI = select.POLLPRI POLLOUT = select.POLLOUT POLLERR = select.POLLERR POLLHUP = select.POLLHUP POLLNVAL = select.POLLNVAL POLLALL = POLLIN | POLLOUT | POLLERR | POLLPRI | POLLHUP | POLLNVAL # -------------------------------------------------------------------------- # class Poller(object): ''' This is a this wrapper around select.Poller to have it API compatible with our own, select based implementation. ''' def __init__(self, log=None): self._log = log self._poll = select.poll() def close(self): self._poll = None def register(self, fd, eventmask=None): assert self._poll return self._poll.register(fd, eventmask) def modify(self, fd, eventmask=None): assert self._poll return self._poll.modify(fd, eventmask) def unregister(self, fd): assert self._poll return self._poll.unregister(fd) def poll(self, timeout=None): assert self._poll time.sleep(0.2) ret = self._poll.poll(timeout) # if self._log: # self._log.debug('pypoll: %s', ret) return ret # ------------------------------------------------------------------------------ # else: # these defines are magically compatible with the ones in Python's `select` # module... POLLIN = 0b000001 POLLPRI = 0b000010 POLLOUT = 0b000100 POLLERR = 0b001000 POLLHUP = 0b010000 POLLNVAL = 0b100000 # POLLALL is not defined by `select`. POLLALL = POLLIN | POLLOUT | POLLERR | POLLPRI | POLLHUP | POLLNVAL # we only really support the following 4 types - all others are silently # ignored, and will never be triggered. _POLLTYPES = [POLLIN, POLLOUT, POLLERR, POLLHUP] # -------------------------------------------------------------------------- #
[docs] class Poller(object): ''' This object will accept a set of things we can call `select.select()` on, which is basically anything which has a file desciptor exposed via a `fileno()` member method, or any integers which directly represent an OS level file escriptor. This class implements most of the interface as defined by Python's `select.Poller` class, which is created by calling `select.poll()`. This implementation is thread-safe. NOTE: `Poller.poll()` returns the original object handle instead of the polled file descriptor. NOTE: Support for `POLLPRI` and `POLLNVAL` selection is not implemented, and will never be returned on `poll()`. ''' # ---------------------------------------------------------------------- # def __init__(self, log=None): self._log = log self._lock = mt.RLock() self._registered = {POLLIN : list(), POLLOUT : list(), POLLERR : list(), POLLHUP : list()} # ---------------------------------------------------------------------- #
[docs] def register(self, fd, eventmask=None): if not eventmask: eventmask = POLLIN | POLLOUT | POLLERR with self._lock: self.unregister(fd, _assert_existence=False) for e in _POLLTYPES: if eventmask & e: self._registered[e].append(fd)
# ---------------------------------------------------------------------- #
[docs] def close(self): with self._lock: for e in _POLLTYPES: for fd in self._registered[e]: fd.close() self._registered[e] = list()
# ---------------------------------------------------------------------- # def _exists(self, fd): with self._lock: for e in _POLLTYPES: if fd in self._registered[e]: return True return False # ---------------------------------------------------------------------- #
[docs] def modify(self, fd, eventmask=None): if not eventmask: eventmask = POLLIN | POLLOUT | POLLERR | POLLHUP with self._lock: assert self._exists(fd) self.register(fd, eventmask)
# ---------------------------------------------------------------------- #
[docs] def unregister(self, fd, _assert_existence=True): with self._lock: exists = self._exists(fd) if _assert_existence: assert exists elif not exists: return for e in _POLLTYPES: if fd in self._registered[e]: self._registered[e].remove(fd)
# ---------------------------------------------------------------------- #
[docs] def poll(self, timeout=None): ret = list() with self._lock: rlist = self._registered[POLLIN] wlist = self._registered[POLLOUT] xlist = self._registered[POLLERR] hlist = self._registered[POLLHUP] # only select if we have any FDs to watch if not rlist + wlist + xlist + hlist: return ([], [], []) # Des Pudel's Kern! rret, wret, xret = select.select(rlist + hlist, wlist, xlist, timeout) # if self._log: # self._log.debug('rppoll 0 : %s - %s - %s', rret, wret, xret) # do not return hlist-only FDs ret += [[fd, POLLOUT] for fd in set(wret)] ret += [[fd, POLLERR] for fd in set(xret)] # A socket being readable may also mean the emdpoint has been # closed. We do a zero-length read to check for POLLHUP. # # NOTE: I am so happy we can do type inspection in Python, so # that we can have different checks for, say, files and # sockets. Only this shit doesn't work on sockets! Argh! # Oh well, we now guess the type from `recv` and `read` # methods being available. # Its a shame to do this on every `poll()` call, as this # is likely in a performance critical path... for fd in set(rret): hup = False # file object if hasattr(fd, 'closed'): if fd.closed: hup = True # anything with a `fileno()` and `read()`/`write()` elif hasattr(fd, 'read'): try: fd.read(0) fd.write(b'') except Exception: hup = True # socket elif hasattr(fd, 'recv'): try: fd.recv(0) fd.send(b'') except Exception: hup = True # we can't handle errors on other types else: raise TypeError('cannot check %s [%s]' % (fd, type(fd))) # we always return the POLLIN event, too, as that is what # Python natively does. I don't think it makes sense, # semantically, but whatever... if hup: ret.append([fd, POLLIN | POLLHUP]) else : ret.append([fd, POLLIN]) # if self._log: # self._log.debug('rppoll 1 : %s (%s)', ret, rlist) return ret
# ------------------------------------------------------------------------------