Source code for radical.utils.queue_network.network
__author__ = "Radical.Utils Development Team"
__copyright__ = "Copyright 2018, RADICAL"
__license__ = "GPL"
'''
Provide an abstraction for distributed queue network.
The network nodes are instances of queue_network::Node. A node can exist either
as thread, as a local process, or as a remote process, the latter spawned via
SAGA.
The communication or work items between nodes is always via queues, were groups
of nodes feed into the same queues, and/or are fed from the same queues. A node
can be fed from multiple queues, where queues are prioritized according to some
policy. Depending if two communicating nodes are co-threads, co-processes, or
remote processes, different queue implementations (with the same semantic) are
used.
A network is created according to a specific state model. That model describes
the state of entities passing through the queue network. Each active state is
enacted by a network node, and each pending state is represented by the entity
being passed through a queue. A queue connection thus exists for every valid
state transition.
Nodes also support the observer patter, ie. they can notify observers about
enacted state transitions.
'''
# ------------------------------------------------------------------------------
#
# http://stackoverflow.com/questions/9539052/python-dynamically-changing-base-classes-at-runtime-how-to
#
# Depending on agent architecture (which is specific to the resource type it
# runs on) can switch between different component types: using threaded (when
# running on the same node), multiprocessing (also for running on the same node,
# but avoiding python's threading problems, for the prices of slower queues),
# and remote processes (for running components on different nodes, using zeromq
# queues for communication).
#
# We do some trickery to keep the actual components independent from the actual
# schema:
#
# - wrap the different queue types into a rpu.Queue object
# - change the base class of the component dynamically to the respective type
#
# This requires components to adhere to the following restrictions:
#
# - *only* communicate over queues -- no shared data with other components or
# component instances. Note that this also holds for example for the
# scheduler!
# - no shared data between the component class and it's run() method. That
# includes no sharing of queues.
# - components inherit from base_component, and the constructor needs to
# register all required component-internal and -external queues with that
# base class -- the run() method can then transparently retrieve them from
# there.
#
# ------------------------------------------------------------------------------
#
[docs]
class Network(object):
# --------------------------------------------------------------------------
#
def __init__(self, state_model=None, network_description=None):
'''
span the network according to state model and network description
'''
if state_model is None: state_model = dict()
if network_description is None: network_description = dict()
self._state_model = state_model
self._network_description = network_description
self._callbacks = dict()
# --------------------------------------------------------------------------
#
[docs]
def feed(self, entities):
'''
feed entities into the network
'''
if not isinstance(entities, list):
entities = [entities]
for entity in entities:
assert entity.state == self._state_model['initial']
self.feeder_queue.push(entities)
# --------------------------------------------------------------------------
#
[docs]
def subscribe(self, states, callback):
'''
subscribe for callback notifications for state transitions *into* the
specified states. If callbacks return 'False' or 'None', they are
unregistered.
'''
if not isinstance(states, list):
states = [states]
for state in states:
if state not in self._calbacks:
self._callbacks[state] = list()
self._callbacks[state].append(callback)
# ------------------------------------------------------------------------------