Source code for radical.utils.lease_manager


__author__    = 'Radical.Utils Development Team (Andre Merzky)'
__copyright__ = 'Copyright 2013, RADICAL@Rutgers'
__license__   = 'MIT'


import time
import threading

from .lockable import Lockable
from .logger   import Logger

# default settings for lease manager
MAX_POOL_SIZE = 15       # unlimited
MAX_POOL_WAIT = 60       # seconds
MAX_OBJ_AGE   = 60 * 10  # 10 minutes


# ------------------------------------------------------------------------------
#
class _LeaseObject(object):

    _uid = 0

    # --------------------------------------------------------------------------
    def __init__(self, lm, log, creator, args):

        self.lm         = lm
        self.used       = False
        self.log        = log
        self.obj        = creator(*args)
        self.uid        = 'lo.%04d' % _LeaseObject._uid
        self.t_created  = time.time()
        self.t_leased   = None
        self.t_released = time.time()  # we take control *now*

        _LeaseObject._uid += 1


    # --------------------------------------------------------------------------
    #
    def __cmp__(self, other):

        return bool(other == self.obj)


    # --------------------------------------------------------------------------
    #
    def __enter__(self):

        return self.obj


    # --------------------------------------------------------------------------
    #
    def __exit__(self, *args):                                      # noqa W0613

        self.lm.release(self)


    # --------------------------------------------------------------------------
    #
    def lease(self):

        if self.used:
            raise RuntimeError('LeaseObject is already leased: %s' % self)

        self.used     = True
        self.t_leased = time.time()


    # --------------------------------------------------------------------------
    #
    def release(self):

        if not self.used:
            raise RuntimeError('LeaseObject is not leased: %s' % self)

        self.used       = False
        self.t_released = time.time()


    # --------------------------------------------------------------------------
    def is_leased(self):

        return self.used


# ------------------------------------------------------------------------------
#
[docs]@Lockable class LeaseManager(object): ''' This is a lease manager -- it creates resource instances on demand and hands out leases to them. If for a given ID no object instance exists, one is created, locked and a lease is returned. If for an ID an object instance exists and is not leased, it is locked and returned. If one or more instances exist but all are in use already (leased), a new instance is created (up to MAX_POOL_SIZE which can be overwritten in the lease call). If that limit is reached, no objects are returned, and instead the lease call blocks until one of the existing objects gets released. ''' # -------------------------------------------------------------------------- # def __init__(self, max_pool_size=None, max_pool_wait=None, max_obj_age=None): ''' Make sure the object dict is initialized, exactly once. ''' self._log = Logger('radical.utils') self._log.debug('lm new manager') self._pools = dict() self._max_pool_size = max_pool_size self._max_pool_wait = max_pool_wait self._max_obj_age = max_obj_age if self._max_pool_size is None: self._max_pool_size = MAX_POOL_SIZE if self._max_pool_wait is None: self._max_pool_wait = MAX_POOL_WAIT if self._max_obj_age is None: self._max_obj_age = MAX_OBJ_AGE # -------------------------------------------------------------------------- # def _initialize_pool(self, pool_id): ''' set up a new pool, but do not create any instances, yet. ''' with self: if pool_id not in self._pools: self._log.debug('lm create pool for %s (%s) (%s)' % (pool_id, type(pool_id), self)) self._pools[pool_id] = dict() self._pools[pool_id]['objects'] = list() self._pools[pool_id]['freed'] = None self._pools[pool_id]['event'] = threading.Event() return self._pools[pool_id] # -------------------------------------------------------------------------- # def _create_object(self, pool_id, creator, args): ''' a new instance is needed -- create one, unless max_pool_size is reached. If that is the case, return `None`, otherwise return the created object (which is locked before return). ''' with self: self._log.debug('lm create object for %s' % pool_id) if pool_id not in self._pools: raise RuntimeError('internal error: no pool for %s' % pool_id) pool = self._pools[pool_id] # check if a poolsize cap is set and reched if self._max_pool_size > 0 and \ self._max_pool_size <= len(pool['objects']): # no more space... return None # poolsize cap not reached -- increase pool. If creating a new # object does not work for any reason, return None. obj = None try: obj = _LeaseObject(self, self._log, creator, args) obj.lease() pool['objects'].append(obj) except Exception as e: # this exception needs to fall through -- we can't wait # for object creation problems to fix themself over time... self._log.exception('Could not create lease object: %s', e) raise return obj # -------------------------------------------------------------------------- # def _remove_object(self, pool_id, obj): ''' a new instance is needed -- create one, unless max_pool_size is reached. If that is the case, return `None`, otherwise return the created object (which is locked before return). ''' with self: if pool_id not in self._pools: raise RuntimeError('internal error: no pool for %s' % pool_id) pool = self._pools[pool_id] # poolsize cap not reached -- increase pool. If creating a new # object does not work for any reason, return None. obj = None try: pool['objects'].remove(obj) except Exception as e: self._log.warn('Could not remove lease object: %s', e) return obj # -------------------------------------------------------------------------- #
[docs] def lease(self, pool_id, creator, args=None): ''' For a given object identified, attempt to retrieve an existing object from the pool. If such a free (released) object is found, lock and refturn it. If such objects exist but all are in use, create a new one up to max_pool_size (default: 10). used, block untill it is freed. If that object does not exist, create it and proceed per above. return the object thusly created. pool_id : id of the pool to lease from. The pool ID essentially determines the scope of validity for the managed objects -- they form a namespace, where objects are shared when living under the same name space entry (the pool_id). creator : method to use to create a new object instance Example: def creator() : return Logger(name) ret = lease_manager.lease(name, creator) ''' pool_id = str(pool_id) if not args : args = [] elif not isinstance(args, list): args = [args] with self: # make sure the pool exists pool = self._initialize_pool(pool_id) # find an unlocked object instance in the pool # NOTE: we iterate over a copy of the list, as an eventual object # removeal would screw up the index... for obj in pool['objects'][:]: if not obj.is_leased(): # check age age = time.time() - obj.t_created if age > self._max_obj_age: # too old -- remove and continue to search for # a younger unleased object self._remove_object(pool_id, obj) continue # found one -- lease/lock and return it obj.lease() return obj # no unlocked object found -- create a new one obj = self._create_object(pool_id, creator, args) # FIXME: we could try_catch the above error, and then check if the # pool has anything worth to wait on. That might be useful # for creation errors which are transient. Alas, we don't # have any means to distinguish them from non-transient # errors, so we don't do that at this point... # check if we got an object if obj is not None: # we got a locked object -- return return obj # pool is full, nothing is free -- we need to wait for an event on # the pool to free an instance. # Now, this is where deadlocks will happen: any application leasing # too many instances in the same thread (with 'too many' meaning # more than max_pool_size) will lock that thread here, thus having no # chance to release other instances. We thus will print a log error # here, and will raise a timeout exception after MAX_POOL_WAIT # seconds. # Not that we release our lock here, to give other threads the # chance to release objects. self._log.warning('lm lease object: pool is full') timer_start = time.time() timer_now = time.time() while(timer_now - timer_start) < self._max_pool_wait: # wait for any release activity on the pool timer_left = self._max_pool_wait - (timer_now - timer_start) pool['event'].wait(timer_left) with self: if pool['event'].is_set(): # make sure we don't lock up pool['event'].clear() # we won the race! now get the freed object obj = pool['freed'] pool['freed'] = None if obj is None: # object was deleted -- we should have space to create # a new one! obj = self._create_object(pool_id, creator, args) # check if we got an object if obj is not None: # we got a locked object -- return return obj else: # find did not find a freed object, did not get to # create a new one -- is there a free one in the # pool? for obj in pool['objects']: if not obj.is_leased(): # found one -- lease/lock and return it obj.lease() return obj # apparently not - so we give up for now... else: # we got a freed object -- lock and return obj.lease() return obj # none free, none created - or we lost the rase for handling the # event. Wait again. timer_now = time.time() # at this point we give up: we can't create a new object, can't find # a free one, and we are running out of wait time... raise LookupError('stop waiting on object lease')
# -------------------------------------------------------------------------- #
[docs] def release(self, instance, delete=False): ''' the given object is not needed right now -- unlock it so that somebody else can lease it. This will not delete the object, ''' with self: for pool_id in self._pools: for obj in self._pools [pool_id]['objects']: if instance is not obj: # this is not the object you are looking for. continue # remove the lease lock on the object obj.release() if delete: # remove the object from the pool (decreasing its # ref counter and thus making it eligible for # garbage collection). self._pools [pool_id]['objects'].remove(obj) self._pools [pool_id]['freed'] = None else: # mark the object as freed for lease. self._pools [pool_id]['freed'] = obj # notify waiting threads about the lease or creation # opportunity. self._pools [pool_id]['event'].set() # object has been released return
# FIXME: log warning # for now we ignore double-frees # raise RuntimeError('cannot release object -- not managed') # ------------------------------------------------------------------------------