Source code for radical.utils.zmq.client


import zmq

from typing import Any

import threading as mt

from ..json_io   import read_json
from ..misc      import as_string
from ..serialize import to_msgpack, from_msgpack
from ..logger    import Logger
from .utils      import no_intr, sock_connect


# ------------------------------------------------------------------------------
#
_LINGER_TIMEOUT    = 1000  # ms to linger after close
_HIGH_WATER_MARK   = 1024  # number of messages to buffer before dropping
_DEFAULT_BULK_SIZE =    1  # number of messages to put in a bulk


# ------------------------------------------------------------------------------
#
[docs] class Client(object): # -------------------------------------------------------------------------- # def __init__(self, server: str = None, url: str = None, log: Logger = None) -> None: if server: self._url = read_json('%s.cfg' % server)['addr'] elif url: self._url = url else: raise ValueError('need server name/cfg or Url') self._log = log self._cb = None self._ctx = zmq.Context() self._sock = self._ctx.socket(zmq.REQ) self._sock.linger = _LINGER_TIMEOUT self._sock.hwm = _HIGH_WATER_MARK sock_connect(self._sock, self._url) self._term = mt.Event() self._active = False # -------------------------------------------------------------------------- # @property def url(self) -> str: return self._url # -------------------------------------------------------------------------- #
[docs] def request(self, cmd: str, *args: Any, **kwargs: Any) -> Any: msg = {'cmd' : cmd, 'args' : args, 'kwargs': kwargs} if self._log: self._log.debug('request: %s', msg) req = to_msgpack(msg) no_intr(self._sock.send, req) msg = no_intr(self._sock.recv) res = as_string(from_msgpack(msg)) # FIXME: assert proper res structure if res.get('err'): err_msg = 'ERROR: %s' % res['err'] if res['exc']: err_msg += '\n%s' % ''.join(res['exc']) raise RuntimeError(err_msg) return res['res']
# -------------------------------------------------------------------------- #
[docs] def close(self) -> None: self._sock.close()
# ------------------------------------------------------------------------------