Source code for radical.utils.algorithms


import math as m

from .logger import Logger


# ------------------------------------------------------------------------------
#
[docs]def collapse_ranges (ranges): ''' Given be a set of ranges (as a set of pairs of floats [start, end] with 'start <= end'. This algorithm will then collapse that set into the smallest possible set of ranges which cover the same, but not more nor less, of the domain (floats). We first sort the ranges by their starting point. We then start with the range with the smallest starting point [start_1, end_1], and compare to the next following range [start_2, end_2], where we now know that start_1 <= start_2. We have now two cases: a) when start_2 <= end_1, then the ranges overlap, and we collapse them into range_1: range_1 = [start_1, max[end_1, end_2] b) when start_2 > end_1, then ranges don't overlap. Importantly, none of the other later ranges can ever overlap range_1, because there start points are even larger. So we move range_1 to the set of final ranges, and restart the algorithm with range_2 being the smallest one. Termination condition is if only one range is left -- it is also moved to the list of final ranges then, and that list is returned. ''' # FIXME: does tuple and set conversion really add anything? # Ranges must be unique: we do not count timings when they start and end at # exactly the same time. By using a set, we do not repeat ranges. # we convert to a list before return. final = set() # return empty list if given an empty list if not ranges: return [] START = 0 END = 1 for _range in ranges: if _range[START] > _range[END]: _range[START], _range[END] = _range[END], _range[START] # sort ranges into a copy list, by their start time _ranges = sorted(ranges, key=lambda x: x[START]) # set 'base' to the earliest range (smallest starting point) base = _ranges[0] for _range in _ranges[1:]: # if range is empty, skip it if _range[START] == _range[END]: continue if _range[START] <= base[END]: # ranges overlap -- extend the base base[END] = max(base[END], _range[END]) else: # ranges don't overlap -- move base to final, and current _range # becomes the new base final.add(tuple(base)) base = _range # termination: push last base to final final.add(tuple(base)) # Return final as list of list in case a mutable type is needed. return sorted([list(b) for b in final])
# ------------------------------------------------------------------------------ #
[docs]def range_concurrency(ranges): ''' given a set of uncollapsed ranges, return a series which describes the range-concurrency at any point. Example: Ranges: [----] [---------] [--------] [--------] [-] [------------] [--] Concurrency: 1 2 1 2 1 0 1 0 1 2 1 0 1 0 Returned is a sorted list of tuples where the first entry defines at what range value the concurrency changed, and the second value defines to what the concurrency count changed at that point. You could consider the ranges to be of type `[time_start, time_end]`, and the return would be a list of `[timestamp, concurrency]`, if that helps -- but the algorithm does not make any assumption on the data type, really, only that the values can be sorted. ''' if not ranges: return list() START = 0 END = 1 for _range in ranges: if _range[START] > _range[END]: _range[START], _range[END] = _range[END], _range[START] # we want to sort all range boundaries by value, but also want to remember # if they were start or end times = list() for idx in range(len(ranges)): r = ranges[idx] times.append([r[0], START]) times.append([r[1], END ]) times.sort() # go through the sorted list of times, and increase concurrency on `START`, # decrease it on `END`. Make sure we add up all entries for the same value # at once. # return a sequence of [timestamp, concurrency] tuples ret = list() # check initial conditions assert times[0][0] >= 0 , 'negative time %s' % times[0] assert times[0][1] == START, 'inconsistent ranges %s' % ranges last_time = times[0][0] concurrency = 1 # set zero marker ret.append([last_time, 0]) # for all range boundaries: # if time changed: # store prervious tuple # if a range STARTed at this time, increase concurrency, # if a range ENDed at this time, decrease concurrency, # store trailing tuple for time_, mode in times[1:]: if time_ != last_time: ret.append([last_time, concurrency]) last_time = time_ if mode == START: concurrency += 1 else : concurrency -= 1 ret.append([last_time, concurrency]) assert concurrency == 0, \ 'inconsistent range structure? %s : %s : %s' % (ranges, ret, times) return ret
# ------------------------------------------------------------------------------ #
[docs]def partition(space, nparts): ''' create balanced partitions from an iterable space. This method preserves contiguous order. kudos: http://code.activestate.com/recipes/425397-split-a-list-into-roughly-equal-sized-pieces/ ''' n = len(space) b = 0 ret = list() for k in range(nparts): q, r = divmod(n - k, nparts) a, b = b, b + q + (r != 0) ret.append(space[a:b]) return ret
# ------------------------------------------------------------------------------ #
[docs]def in_range(value, ranges): ''' checks if a float value `value` is in any of the given `ranges`, which are assumed to be a list of tuples (or a single tuple) of start end end points (floats). Returns `True` or `False`. ''' # is there anythin to check? if not ranges or not len(ranges): return False if not isinstance(ranges[0], list): ranges = [ranges] for r in ranges: if value >= r[0] and value <= r[1]: return True return False
# ------------------------------------------------------------------------------ #
[docs]def remove_common_prefix(data, extend=0): ''' For the given list of strings, remove the part which appears to be a common prefix to all of them. If `extend` is given, the last `n` letters of the prefix are preserved. See also https://stackoverflow.com/questions/46128615/ . ''' if not data: return data # sort the list, get prefix from comparing first and last element s1 = min(data) s2 = max(data) prefix = s1 for i, c in enumerate(s1): if c != s2[i]: prefix = s1[:i] break if extend > 0 and len(prefix) < len(s1): prefix = s1[:min(len(prefix) - extend, len(s1))] # remove the found prefix from all elements return [elem.split(prefix, 1)[-1] for elem in data]
# ------------------------------------------------------------------------------ # # Assume we want to schedule a set of tasks over a set of resources. To # maximize utilization, we want to place the largest tasks first and fill the # gap with small tasks, thus we sort the list of tasks by size and schedule # beginning with the largest. # # For large number of tasks this can become expensive, specifically once the set # of resources is near exhaustion: we need to search through the whole task list # before being able to, possibly, place some small tasks. # # The algorithm below attempts to use the knowledge that, once a task of # a certain size can *not* be scheduled, no larger task can. It performs as # follows: # # 1 attempt to schedule the largest task (n) # 2 if success # 3 schedule the next smaller task (n = n - 1) # 4 if success # 5 goto (2) # 7 bisect distance to smallest task: l = bisect(m, 0) # 8 schedule task l # 9 if success # task l is small enough # 10 bisect distance toward larger task: l = bisect(m, l) # 11 goto 9 # 12 else # task l is still too large # 13 bisect distance towards smaller tasks: l = bisect(l, 0) # 14 goto 9 # # This algorithm stops once (m - l == 1), which is the boundary between 'too # large' and 'small enough' tasks. From here, we mark all tasks > m as too # large, set n = l, and begin from scratch. # # The implementation below contains some further optimizations. Specifically, # we remember the schedule results of all bisections, so that we don't schedule # or test those tasks twice. Further, we assume that scheduling a task may # alter the result for all further schedule attempts (apart from tasks we know # are too large). # # Example: # # # -------------------------------------------------------------------------- # # # # Tasks larger 65 cannot be scheduled. Once 65 is scheduled, 64 is too # # large and fails. 63, 62, 61 fit, but then space is tight and all tasks up # # to 42 fail. 42 to 22 get scheduled ok, but then space is out again for # # everything larger than 7 - the last 8 tasks (very small) can be scheduled. # # # # Out of the 128 tasks, 32 will get schedules, so 96 will not. Instead of # # trying schedule those 96 tasks, the algorithm will only test 15 tasks # # unsuccessfully (~15%). The effect improves with larger numbers of tasks # # and less available resources, which is usually the case when the RP # # scheduler sees high contention between scheduling and unscheduling # # operations. For example, the same setup for 128k tasks results in 25 # # failed scheduling attempts (0.01%). # # # failed = list() # def schedule(n): # if n in [61, 62, 63, 65] or \ # n in list(range(22, 42)) or \ # n < 8: # scheduled.append(n) # return True # could be scheduled # else: # failed.append(n) # return False # schedule failed # # tasks = list(range(128 * 1024)) # good, bad = lazy_bisect(tasks, schedule) # # assert len(failed) == 25) # for task in good: # assert schedule(task) is True # task not in bad # for task in bad: # assert schedule(task) is False # task not in good # # -------------------------------------------------------------------------- #
[docs]def lazy_bisect(data, check, on_ok=None, on_nok=None, on_fail=None, on_skip=None, ratio=0.5, log=None): ''' Find the next potentially good candidate element in a presumably ordered list `data` (from smallest to largest). The given callable `check` should accept a list element for good/bad checks, and return `True` for good (small enough), `False` otherwise (too large). Note that this is only a bisect if `ratio` is set at `0.5` - otherwise the search will back off through the list faster (1.0 >= ratio > 0.5) or slower (0.5 > ratio >= 0.0) than bisect. The method returns three lists, one list with elements of data which were checked `good`, and one list of elements checked as `bad`. Checks which raise an exception are also handled as `bad`, any element on which this happens will be returned in the third list (`fail`). Note that this last list (`fail`) will consist of tuples: [`element`, `error`] where `error` is the exception's error message. `on_ok` will be called on tasks where `check()` returned `good`, `on_nok` where `check()` returned `bad`, `on_fail()` where `check()` raised an exceptionm and `on_skip` where bisect decided that no `check()` invocation is needed. The respective data item is the only argument to the calls. ''' if not data: return [], [], [] if not log: log = Logger('radical.utils.alg') if ratio > 1.0: ratio = 1.0 if ratio < 0.0: ratio = 0.0 last_good = None # last known good index last_bad = None # last known bad index check_good = list() # found good index check_bad = list() # found bad index check_fail = list() # found bad index # -------------------------------------------------------------------------- def wrapcheck(thing, skip=False): ret = None if skip: if on_skip: on_skip(thing) else: ret = check(thing) if ret is True and on_ok : on_ok(thing) if ret is False and on_nok: on_nok(thing) return ret # -------------------------------------------------------------------------- # # -------------------------------------------------------------------------- # def state_hay(): # hay = '' # for i,x in enumerate(data): # if not i % 10 : hay += '|' # if check(x) is True : hay += '#' # else : hay += '.' # if not hay.endswith('|'): # hay += '|' # log.debug('%30s %s', '', hay) # # -------------------------------------------------------------------------- # # # -------------------------------------------------------------------------- # def state_needle(msg=''): # needle = '' # for i,x in enumerate(data): # if not i % 10 : needle += '|' # if x in check_good: needle += '#' # elif x in check_bad : needle += '.' # else : needle += ' ' # if not needle.endswith('|'): # needle += '|' # g = last_good # b = last_bad # if g is None: g = '?' # if b is None: b = '?' # log.debug('%3s - %3s %20s %s %s', g, b, ' ', needle, msg) # # -------------------------------------------------------------------------- # state_hay() first = True while True: # if we don't know anything, yet, check the first element if last_good is None and \ last_bad is None: assert first first = False idx = len(data) - 1 try: ret = wrapcheck(data[idx]) if ret is True: check_good.append(idx) else : check_bad .append(idx) except Exception as e: ret = False check_fail.append([idx, str(e)]) # state_needle('start %s %s' % (idx, ret)) if ret is True: last_good = idx else : last_bad = idx # If we only know a good one, just try the next in the list. This can # happen repeatedly, and we'll traverse the list for long as we can elif last_good is not None and \ last_bad is None: # make sure we still have something to check if last_good == 0: break # check the next element idx = last_good - 1 if idx in check_good: last_good = idx # state_needle('known %3d True' % idx) continue if idx in check_bad: last_bad = idx # state_needle('known %3d False' % idx) continue try: ret = wrapcheck(data[idx]) if ret is True: check_good.append(idx) else : check_bad .append(idx) except Exception as e: ret = False check_fail.append([idx, str(e)]) if ret is True: last_good = idx else : last_bad = idx # break out of this branch # state_needle('good %3d %s' % (idx, ret)) # If we know a bad one, we bisect the remaining list and check the # found candidate. If it is also bad, we assume all intermediate # elements as bad. If it is good, we set `last_good` and try again. # # If last_good is not known here, we bisect to begin of list (0), # otherwise we bisect to last_good. elif last_bad is not None: # make sure we still have something to check if last_bad == 0: break if last_good is not None: if last_good > last_bad: last_good = None # bisect for next candidate if last_good is not None: # bisect the difference, using the given ratio idx = int(m.ceil((last_bad - last_good + 1) * ratio + last_good)) idx = min(idx, last_bad - 1) else: # bisect to begin of data idx = int(m.ceil((last_bad + 1) * ratio)) idx = min(idx, last_bad - 1) # state_needle('range %3d?' % idx) # make sure we make progress: if space is too small for bisect, then # increase last_good or decrease last_bad if idx == last_good: idx = last_good + 1 if idx == last_bad : idx = last_bad - 1 if idx < 0: idx = 0 # check this bisected index (if we don't know it yet) if idx in check_good: ret = True elif idx in check_bad: ret = False else: try: ret = wrapcheck(data[idx]) if ret is True: check_good.append(idx) else : check_bad .append(idx) except Exception as e: ret = False check_fail.append([idx, str(e)]) # state_needle('range %3d %s' % (idx, ret)) if ret is True: # found a new good one closer to the last bad one - bisect again # until we neighbor last_bad. In that case, reset the last_bad # and restart searching (and bisecting) from that meeting point. last_good = idx if last_bad is not None: if last_bad < last_good: last_good = None # state_needle('range A') elif last_bad - last_good == 1: last_bad = None # state_needle('range a') else: # found a new, smaller bad one - consider all indexes between # id and the previous last_bad as bad for i in range((last_bad - idx - 1)): this = last_bad - i - 1 if this not in check_bad and \ this not in check_good: wrapcheck(data[this], skip=True) check_bad.append(this) # state_needle('Range') last_bad = idx if last_good is not None and last_good > last_bad: # this last_good is not interesting anymore we search # downwards last_good = None # state_hay() assert len(data) == len(check_good) + len(check_bad) + len(check_fail) return [ data[i] for i in check_good], \ [ data[i] for i in check_bad], \ [[data[i], e] for i,e in check_fail]
# ------------------------------------------------------------------------------ # if __name__ == '__main__': test = [[ 0, 10], [20, 30], [40, 50], [60, 70], [80, 90], [ 5, 15], [35, 55]] import pprint pprint.pprint(test) pprint.pprint(collapse_ranges(test)) test_space = list(range(75)) parts = partition(test_space, 8) for part in parts: print('%3d: %s' % (len(part), part)) # ------------------------------------------------------------------------------ # if __name__ == '__main__': import time scheduled = list() def schedule(n, check=False): if not check: time.sleep(0.1) # scheduling is slow (but not checks) print(n) if n in [61, 62, 63, 65] or \ n in list(range(22, 42)) or \ n < 8: scheduled.append(n) if not check: print('ok') return True else: if not check: print('--') return False tasks = list(range(128 * 1024)) tasks = lazy_bisect(tasks, schedule) for task in tasks: assert schedule(task, check=True) is False for task in scheduled: assert task not in tasks # ------------------------------------------------------------------------------