Source code for radical.utils.scheduler.scheduler_bitarray


__author__    = "Radical.Utils Development Team"
__copyright__ = "Copyright 2013, RADICAL@Rutgers"
__license__   = "MIT"

import math

from .scheduler_base import SchedulerBase


# ------------------------------------------------------------------------------
#
[docs] class BitarrayScheduler(SchedulerBase): try: from bitarray import bitarray as _ba # pylint: disable=E0401 except: # fake with a do-nothing implementation so that initialization works class _ba(object): def __init__(self, i): pass def setall(self, x): pass _one = _ba(1) _one.setall(True) # -------------------------------------------------------------------------- # def __init__(self, resources=0): if 'cores' not in resources: raise ValueError('no cores to schedule over') self._pos = 0 self._size = resources.get('cores') self._ppn = resources.get('ppn', self._size) self._cores = self._ba(self._size) self._cores.setall(True) # True: free self._flag_align = resources.get('align', True) self._flag_scatter = resources.get('scatter', True) # -------------------------------------------------------------------------- #
[docs] def get_layout(self): return {'cores' : self._size, 'rows' : math.sqrt(self._size), 'cols' : math.sqrt(self._size)}
# -------------------------------------------------------------------------- #
[docs] def get_map(self): return self._cores
# -------------------------------------------------------------------------- # def _align(self, pat, req, loc): """ For a given continuous set of cores, try to move the set so that it resides within a single node. This obviously only works for sets smaller than self.ppn. """ if not self._flag_align: # alignment is disabled return loc, False if req > self._ppn: # we can't align allocations spanning nodes return loc, False # we check if the allocation is on a single node. If that is not the # case, we search again from the point of allocation, and check again. # We do that until we cycle around the full core list once, reaching # the original location again orig_pos = loc pos = loc start_node = pos / self._ppn end_node = (pos + req - 1) / self._ppn if start_node == end_node: # already aligned return loc, False # we search from pos=loc to the end of cores, then rewind to pos=0 # and search again from the beginning until again pos=loc. rewound = False while (pos <= self._size and not rewound) or \ (pos < orig_pos and rewound) : if pos > self._size: pos = 0 rewound = True # we need to renew the allocation -- we search further from pos # until we find something loc = self._cores.search(pat, 1, pos + 1) if not loc: pos = 0 rewound = True continue pos = loc[0] start_node = pos / self._ppn end_node = (pos + req - 1) / self._ppn if start_node == end_node: # found an alignment return pos, True # found new loc, but no dice wrt. alignment -- try again raise RuntimeError('alignment error (%s cores requested)' % req) # ------------------------------------------------------------------------------ #
[docs] def alloc(self, req): scattered = False aligned = False orig_pos = self._pos loc = list() pat = self._ba(req) # keep dict of patterns? pat.setall(True) # simple search loc = self._cores.search(pat, 1, self._pos) if not loc: # FIXME: assume we first search from pos 100 to 1000, then rewind, # then search from 0 to 1000, 900 cores being searched twice. # We should add an 'end_pos' parameter to bitarray.search. # But then again, rewind should be rare if cores are on # average much larger than requests -- and if that is not the # case, we won't be able to allocate many requests anyway... self._pos = 0 loc = self._cores.search(pat, 1, self._pos) if not loc and self._flag_scatter: scattered = True # search for non-continuous free cores loc = self._cores.search(self._one, req, orig_pos) if not loc: # try again from start loc = self._cores.search(self._one, req, 0) if not loc: # with ru_open('ba_cores.bin', 'w') as f: # self._cores.tofile(f) # with ru_open('ba_pat.bin', 'w') as f: # pat.tofile(f) # with ru_open('ba_req.bin', 'w') as f: # f.write('%d\n' % req) self._pos = 0 raise RuntimeError('out of cores (%s cores requested)' % req) # found a match - update pos if scattered: # we got a scattered list of cores self._pos = loc[-1] + 1 self._cores.setlist(loc, False) # for i in loc: # # FIXME: bulk op? # self._cores[i] = False else: # we got a continuous block loc = loc[0] if self._flag_align: loc, aligned = self._align(pat, req, loc) self._pos = loc + req self._cores.setrange(loc, self._pos - 1, False) return [req, loc, scattered, aligned]
# -------------------------------------------------------------------------- #
[docs] def dealloc(self, res): req, loc, scattered, _ = res if scattered: # we got a scattered list of cores self._cores.setlist(loc, True) # for i in loc: # # FIXME: bulk op? # self._cores[i] = True else: # we got a continuous block self._cores.setrange(loc, loc + req - 1, True)
# -------------------------------------------------------------------------- #
[docs] def get_stats(self): # determine frequency and length of stretches of free and busy cores free_dict = dict() free_cnt = 0 busy_dict = dict() busy_cnt = 0 for c in self._cores: if c: free_cnt += 1 if busy_cnt: if busy_cnt not in busy_dict: busy_dict[busy_cnt] = 0 busy_dict[busy_cnt] += 1 busy_cnt = 0 else: busy_cnt += 1 if free_cnt: if free_cnt not in free_dict: free_dict[free_cnt] = 0 free_dict[free_cnt] += 1 free_cnt = 0 # determine frequency of free core counts per node node = 0 node_free = dict() for i in range(self._ppn + 1): node_free[i] = 0 while node * (self._ppn + 1) < self._size: free_count = 0 for i in range(self._ppn): if self._cores[node * self._ppn + i]: free_count += 1 node_free[free_count] += 1 node += 1 return {'total' : self._size, 'free' : self._cores.count(), 'busy' : self._size - self._cores.count(), 'free_dist' : free_dict, 'busy_dist' : busy_dict, 'node_free' : node_free}
# ------------------------------------------------------------------------------