Source code for radical.utils.zmq.bridge
import os
import threading as mt
from typing import Optional
from ..logger import Logger
from ..profile import Profiler
from ..config import Config
from ..json_io import read_json, write_json
from .utils import LOG_ENABLED
QUEUE = 'QUEUE'
PUBSUB = 'PUBSUB'
UNKNOWN = 'UNKNOWN'
# ------------------------------------------------------------------------------
#
[docs]
class Bridge(object):
'''
A bridge can be configured to have a finite lifetime: when no messages are
received in `timeout` seconds, the bridge process will terminate.
'''
# --------------------------------------------------------------------------
#
[docs]
@staticmethod
def get_config(name, pwd=None):
if not pwd:
pwd = '.'
fname = '%s/%s.cfg' % (pwd, name)
cfg = dict()
if os.path.isfile(fname):
cfg = read_json(fname)
return Config(from_dict=cfg)
# --------------------------------------------------------------------------
#
def __init__(self, cfg, log=None):
self._cfg = cfg
self._log = log
self._channel = self._cfg.channel
self._uid = self._cfg.uid
self._pwd = self._cfg.path
if not self._pwd:
self._pwd = os.getcwd()
if not self._log:
if LOG_ENABLED: level = self._cfg.log_lvl
else : level = 'ERROR'
self._log = Logger(name=self._uid, ns='radical.utils.zmq',
level=level, path=self._pwd)
self._prof = Profiler(name=self._uid, path=self._pwd)
if 'hb' in self._uid or 'heartbeat' in self._uid:
self._prof.disable()
else:
self._prof.disable()
self._prof.prof('init', uid=self._uid, msg=self._pwd)
self._log.debug('bridge %s init', self._uid)
self._bridge_initialize()
# --------------------------------------------------------------------------
#
[docs]
def write_config(self, fname=None):
if not fname:
fname = '%s/%s.cfg' % (self._pwd, self._cfg.uid)
write_json(fname, {'uid' : self._cfg.uid,
self.type_in : str(self.addr_in),
self.type_out: str(self.addr_out)})
# --------------------------------------------------------------------------
#
@property
def name(self):
return self._uid
@property
def uid(self):
return self._uid
@property
def channel(self):
return self._channel
# protocol independent addr query
@property
def type_in(self):
raise NotImplementedError()
@property
def type_out(self):
raise NotImplementedError()
@property
def addr_in(self):
raise NotImplementedError()
@property
def addr_out(self):
raise NotImplementedError()
def _bridge_initialize(self):
raise NotImplementedError()
def _bridge_work(self):
raise NotImplementedError()
# --------------------------------------------------------------------------
#
[docs]
def start(self):
# the bridge runs in a thread. It is the bridge's owner process'
# responsibility to ensure the thread is seeing suffient time to perform
# as needed. Given Python's thread performance (or lack thereof), this
# basically means that the user of this class should create a separate
# process instance to host the bridge thread.
self._term = mt.Event()
self._bridge_thread = mt.Thread(target=self._bridge_work)
self._bridge_thread.daemon = True
self._bridge_thread.start()
self._log.info('started bridge %s', self._uid)
# --------------------------------------------------------------------------
#
[docs]
def wait(self, timeout=None):
'''
wait for the bridge to terminate. If `timeout` is set, the call will
return after that many seconds, with a return value indicating whether
the bridge is still alive.
'''
self._bridge_thread.join(timeout=timeout)
if timeout is not None:
return not self._bridge_thread.is_alive()
# --------------------------------------------------------------------------
#
[docs]
@staticmethod
def create(channel : str,
kind : Optional[str] = None,
cfg : Optional[dict] = None):
# FIXME: add other config parameters: batch size, log level, etc.
# NOTE: I'd rather have this as class data than as stack data, but
# python stumbles over circular imports at that point :/
# Another option though is to discover and dynamically load
# components.
from .pubsub import PubSub
from .queue import Queue
_btypemap = {PUBSUB: PubSub,
QUEUE : Queue}
if not kind:
if 'queue' in channel.lower(): kind = QUEUE
elif 'pubsub' in channel.lower(): kind = PUBSUB
else : kind = UNKNOWN
if kind not in _btypemap:
raise ValueError('unknown bridge type (%s)' % kind)
btype = _btypemap[kind]
bridge = btype(channel, cfg=cfg)
return bridge
# --------------------------------------------------------------------------
#
[docs]
def stop(self):
self._term.set()
# self._bridge_thread.join(timeout=timeout)
self._prof.prof('term', uid=self._uid)
# if timeout is not None:
# return not self._bridge_thread.is_alive()
# --------------------------------------------------------------------------
#
@property
def alive(self):
return self._bridge_thread.is_alive()
# ------------------------------------------------------------------------------