Source code for radical.utils.zmq.registry


import atexit
import shelve

from typing import List, Optional, Any

from ..json_io    import write_json
from ..dict_mixin import DictMixin

from .server import Server
from .client import Client

_registries = list()


# ------------------------------------------------------------------------------
#
def _flush_registries():
    for _reg in _registries:
        _reg.stop()


atexit.register(_flush_registries)


# ------------------------------------------------------------------------------
#
[docs] class Registry(Server): ''' The `ru.zmq.Registry` is a ZMQ service which provides a hierarchical persistent data store. ''' # -------------------------------------------------------------------------- # def __init__(self, url : Optional[str] = None, uid : Optional[str] = None, path : Optional[str] = None, persistent: bool = False) -> None: super().__init__(url=url, uid=uid, path=path) if persistent: path = '%s/%s.db' % (self._path, self._uid) self._log.debug('use shelve %s', path) self._data = shelve.open(path, writeback=True) else: self._log.debug('use in-memory dict') self._data = dict() self.register_request('put', self.put) self.register_request('get', self.get) self.register_request('keys', self.keys) self.register_request('del', self.delitem) self.register_request('dump', self.dump) # -------------------------------------------------------------------------- #
[docs] def dump(self, name: str = None) -> None: if not isinstance(self._data, dict): self._log.debug('ignore dump for non-dict %s', name) else: if name: fname = '%s/%s.%s.json' % (self._path, self._uid, name) else : fname = '%s/%s.json' % (self._path, self._uid) self._log.debug('dumo to %s', fname) write_json(self._data, fname)
# -------------------------------------------------------------------------- #
[docs] def stop(self) -> None: # self.dump() self._log.debug('stop') if isinstance(self._data, shelve.Shelf): self._data.close() super().stop()
# -------------------------------------------------------------------------- #
[docs] def put(self, key: str, val: Any) -> None: this = self._data elems = key.split('.') path = elems[:-1] leaf = elems[-1] self._log.debug_9('put %s: %s', str(key), str(val)) for elem in path: if elem not in this or this[elem] is None: this[elem] = dict() this = this[elem] this[leaf] = val if not isinstance(self._data, dict): self._data.sync()
# -------------------------------------------------------------------------- #
[docs] def get(self, key: str) -> Optional[str]: this = self._data elems = key.split('.') path = elems[:-1] leaf = elems[-1] for elem in path: this = this.get(elem, {}) if not this: break if this is None: this = dict() val = this.get(leaf) self._log.debug_9('get %s: %s', str(key), str(val)) return val
# -------------------------------------------------------------------------- #
[docs] def keys(self, pwd: Optional[str] = None) -> List[str]: this = self._data if pwd: path = pwd.split('.') for elem in path: this = this.get(elem, {}) if not this: break if this is None: this = dict() keys = list(this.keys()) self._log.debug_9('keys: %s', keys) return keys
# -------------------------------------------------------------------------- #
[docs] def delitem(self, key: str) -> None: self._log.debug_9('del: %s', key) this = self._data if key: path = key.split('.') for elem in path[:-1]: this = this.get(elem, {}) if not this: break if this: del this[path[-1]]
# ------------------------------------------------------------------------------ #
[docs] class RegistryClient(Client, DictMixin): ''' The `ru.zmq.RegistryClient` class provides a simple dict-like interface to a remote `ru.zmq.Registry` service. Note that only top-level dict-actions on the `RegistryClient` instance are synced with the remote service storage. ''' # -------------------------------------------------------------------------- # def __init__(self, url: str, pwd: Optional[str] = None) -> None: self._url = url self._pwd = pwd super().__init__(url=url) # -------------------------------------------------------------------------- #
[docs] def dump(self, name: str = None) -> None: return self.request(cmd='dump', name=name)
# -------------------------------------------------------------------------- # verbose API
[docs] def get(self, key : str, default: Optional[Any] = None) -> Optional[Any]: if self._pwd: key = self._pwd + '.' + key try: return self.request(cmd='get', key=key) except: return default
[docs] def put(self, key: str, val: Any) -> None: if self._pwd: key = self._pwd + '.' + key ret = self.request(cmd='put', key=key, val=val) assert ret is None return ret
# -------------------------------------------------------------------------- # dict mixin API def __getitem__(self, key: str) -> Optional[Any]: return self.get(key) def __setitem__(self, key: str, val: Any) -> None: return self.put(key, val) def __delitem__(self, key: str) -> None: if self._pwd: key = self._pwd + '.' + key ret = self.request(cmd='del', key=key) assert ret is None
[docs] def keys(self) -> List[str]: ret = self.request(cmd='keys', pwd=self._pwd) assert isinstance(ret, list) return ret
# ------------------------------------------------------------------------------