Source code for radical.utils.flux


# pylint: disable=cell-var-from-loop

import os
import sys
import time
import json
import shlex

from typing import Optional, List, Dict, Any, Callable

import threading       as mt
import subprocess      as sp


from .url     import Url
from .ids     import generate_id, ID_CUSTOM
from .shell   import sh_callout
from .logger  import Logger
from .profile import Profiler
from .modules import import_module
from .host    import get_hostname


# --------------------------------------------------------------------------
#
class _FluxService(object):
    '''
    Helper class to handle a private Flux instance, including configuration,
    start, monitoring and termination.
    '''

    # --------------------------------------------------------------------------
    #
    def __init__(self, uid  : str,
                       log  : Logger,
                       prof : Profiler) -> None:

        self._uid     = uid
        self._log     = log
        self._prof    = prof

        self._lock    = mt.RLock()
        self._term    = mt.Event()

        self._uri     = None
        self._env     = None
        self._proc    = None
        self._watcher = None

        try:
            cmd = 'flux python -c "import flux; print(flux.__file__)"'
            out, err, ret = sh_callout(cmd)

            if ret:
                raise RuntimeError('flux not found: %s' % err)

            flux_path = os.path.dirname(out.strip())
            mod_path  = os.path.dirname(flux_path)
            sys.path.append(mod_path)

            self._flux     = import_module('flux')
            self._flux_job = import_module('flux.job')

        except Exception:
            self._log.exception('flux import failed')
            raise


    # --------------------------------------------------------------------------
    #
    @property
    def uid(self):
        return self._uid


    @property
    def uri(self):
        return self._uri


    @property
    def env(self):
        return self._env


    # --------------------------------------------------------------------------
    #
    def _watch(self) -> None:

        # FIXME: this thread will change `os.environ` for this *process* because
        #        we want to call `flux ping` via `sh_callout`.  We should
        #        instead use the Flux Python API to run the pings and pass the
        #        URI explicitly.
        self._log.info('starting flux watcher')

        if self._env:
            for k,v in self._env.items():
                os.environ[k] = v

        out, err, ret = sh_callout('flux resource list')
        self._log.info('flux resources [ %d %s]:\n%s', ret, err, out)

        while not self._term.is_set():

            time.sleep(1)

            _, err, ret = sh_callout('flux ping -c 1 kvs')
            if ret:
                self._log.error('flux watcher err: %s', err)
                break

        # we only get here when the ping failed - set the event
        self._term.set()
        self._log.warn('flux stopped')


    # --------------------------------------------------------------------------
    #
    def start_service(self,
                      launcher: Optional[str]           = None,
                      env     : Optional[Dict[str,str]] = None
                     ) -> Optional[str]:

        with self._lock:

            if self._proc is not None:
                raise RuntimeError('already started Flux: %s' % self._uri)

            self._term.clear()

            return self._locked_start_service(launcher, env)


    # --------------------------------------------------------------------------
    #
    def _locked_start_service(self,
                              launcher: Optional[str]           = None,
                              env     : Optional[Dict[str,str]] = None
                             ) -> Optional[str]:

        cmd  = list()

        if launcher:
            cmd += shlex.split(launcher)

        cmd += ['flux', 'start', 'bash', '-c', 'echo URI:$FLUX_URI && sleep inf']

        flux_proc = sp.Popen(cmd, encoding="utf-8",
                             stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=sp.PIPE)

        flux_env = dict()
        while flux_proc.poll() is None:

            try:
                line = flux_proc.stdout.readline()

            except Exception as e:
                self._log.exception('flux service failed to start')
                raise RuntimeError('could not start flux') from e

            if not line:
                continue

            self._log.debug('flux output: %s', line)

            if line.startswith('URI:'):
                flux_uri = line.split(':', 1)[1].strip()
                flux_env['FLUX_URI'] = flux_uri
                break

        if flux_proc.poll() is not None:
            raise RuntimeError('could not execute `flux start`')

      # fr  = self._flux.uri.uri.FluxURIResolver()
      # ret = fr.resolve('pid:%d' % flux_proc.pid)
      # flux_env = {'FLUX_URI': ret}

        assert 'FLUX_URI' in flux_env, 'no FLUX_URI in env'

        # make sure that the flux url can be reached from other hosts
        # FIXME: this also routes local access via ssh which may slow comm
        flux_url             = Url(flux_env['FLUX_URI'])
        flux_url.host        = get_hostname()
        flux_url.schema      = 'ssh'
        flux_uri             = str(flux_url)
        flux_env['FLUX_URI'] = flux_uri

        self._uri       = flux_uri
        self._env       = flux_env
        self._proc      = flux_proc

        self._prof.prof('flux_started', msg=self._uid)

        # start watcher thread to monitor the instance
        self._watcher = mt.Thread(target=self._watch)
        self._watcher.daemon = True
        self._watcher.start()

        self._log.info("flux startup successful: [%s]", flux_env['FLUX_URI'])

        return self._uri


    # --------------------------------------------------------------------------
    #
    def check_service(self) -> Optional[str]:

        with self._lock:

            if not self._proc:
                raise RuntimeError('flux service was not yet started')

            if self._term.is_set():
                raise RuntimeError('flux service was terminated')

            return self._uri


    # --------------------------------------------------------------------------
    #
    def close_service(self) -> None:

        with self._lock:

            self.check_service()

            if not self._proc:
                raise RuntimeError('cannot kill flux from this process')

            if self._watcher:
                self._watcher.join()

            # terminate the service process
            # FIXME: send termination signal to flux for cleanup
            self._proc.kill()
            time.sleep(0.1)
            self._proc.terminate()
            self._proc.wait()

            self._uri = None
            self._env = None


# ------------------------------------------------------------------------------
#
[docs]class FluxHelper(object): ''' Helper CLASS to programnatically handle flux instances and to obtain state update events for flux jobs known in that instance. ''' # -------------------------------------------------------------------------- # def __init__(self) -> None: ''' The Flux Helper c'tor takes no arguments and will initially not be connected to a Flux instance. After construction, the application can call either one of the following methods: FluxHelper.connect_flux(uri=None) FluxHelper.start_flux() The first will attempt to connect to the Flux instance referenced by that URI - a `ValueError` exception will be raised if that instance cannot be reached. If no URI is provided, the environment variable `FLUX_URI` will be used. The second method will instantiate a new flux instance in the current process environment. In both cases, the properties FluxHelper.uri FluxHelper.env will provide information about the connected Flux instance. The `uri` is provided as a string, the `env` as a dictionary of environment settings (including `FLUX_URI` again). The method FluxHelper.reset() will disconnect from the Flux instance, and in the case where `start_flux` created a private instance, that instance will be killed. The `uri` and `env` properties will be reset to `None`. While connected to a Flux instance, the following methods can be used to interact with the instance: FluxHelper.get_executor() - return a flux.job.Executor instance FluxHelper.get_handle() - return a flux.job.Flux instance All provided executors and handles will be invalidated upon `reset()`. ''' self._service : Optional[_FluxService] = None self._uri = None self._env = None self._uid = generate_id('flux.%(item_counter)04d', ID_CUSTOM) self._log = Logger(self._uid, ns='radical.utils') self._prof = Profiler(self._uid, ns='radical.utils') self._lock = mt.RLock() self._exe = None self._handle = None self._handles = list() # TODO self._executors = list() # TODO try: cmd = 'flux python -c "import flux; print(flux.__file__)"' out, err, ret = sh_callout(cmd) if ret: raise RuntimeError('flux not found: %s' % err) flux_path = os.path.dirname(out.strip()) mod_path = os.path.dirname(flux_path) sys.path.append(mod_path) self._flux = import_module('flux') self._flux_job = import_module('flux.job') except Exception: self._log.exception('flux import failed') raise # -------------------------------------------------------------------------- # def __del__(self): # FIXME: are handles / executors correctly garbage collected? self.reset() # -------------------------------------------------------------------------- #
[docs] def reset(self): ''' Close the connection to the FLux instance (if it exists), and terminate the Flux service if it was started by this instance. All handles and executors created for this service will be invalidated. ''' with self._lock: for idx in range(len(self._handles)): del self._handles[idx] for idx in range(len(self._executors)): del self._executors[idx] self._exe = None self._handle = None if self._uri: try: self._service.close_service() except: pass self._uri = None self._env = None
# -------------------------------------------------------------------------- # @property def uid(self): ''' unique ID for this FluxHelper instance ''' with self._lock: return self._uid # -------------------------------------------------------------------------- # @property def uri(self): ''' uri for the connected Flux instance. Returns `None` if no instance is connected. ''' with self._lock: return self._uri # -------------------------------------------------------------------------- # @property def env(self): ''' environment dict for the connected Flux instance. Returns `None` if no instance is connected. ''' with self._lock: return self._env # -------------------------------------------------------------------------- #
[docs] def start_flux(self, launcher: Optional[str] = None) -> None: ''' Start a private Flux instance FIXME: forward env ''' with self._lock: if self._uri: raise RuntimeError('service already connected: %s' % self._uri) self._service = _FluxService(self._uid, self._log, self._prof) self._service.start_service(launcher=launcher) self._uri = self._service.check_service() self._env = self._service.env # with ru_open(self._uid + '.dump', 'a') as fout: # fout.write('start flux pid %d: %s\n' % (os.getpid(), self._uri)) # for l in get_stacktrace()[:-1]: # fout.write(l) self._setup()
# -------------------------------------------------------------------------- #
[docs] def connect_flux(self, uri : Optional[str] = None) -> None: ''' Connect to an existing Flux instance ''' with self._lock: # with ru_open(self._uid + '.dump', 'a') as fout: # fout.write('connect flux %d: %s\n' % (os.getpid(), uri)) # for l in get_stacktrace(): # fout.write(l + '\n') if self._uri: raise RuntimeError('service already connected: %s' % self._uri) if not uri: uri = os.environ.get('FLUX_URI') if not uri: raise RuntimeError('no Flux instance found via FLUX_URI') self._uri = uri self._env = {'FLUX_URI': uri} # FIXME: run a ping test to ensure the service is up self._setup()
# ---------------------------------------------------------------------- # def _setup(self): ''' Once a service is connected, create a handle and executor ''' with self._lock: assert self._uri, 'not initialized' # create a executor and handle for job management self._exe = self.get_executor() self._handle = self.get_handle() # -------------------------------------------------------------------------- #
[docs] def submit_jobs(self, specs: List[Dict[str, Any]], cb : Optional[Callable[[str, Any], None]] = None ) -> Any: with self._lock: if not self._uri: raise RuntimeError('FluxHelper is not connected') assert self._exe, 'no executor' futures = list() for spec in specs: jobspec = json.dumps(spec) fut = self._flux_job.submit_async(self._handle, jobspec) futures.append(fut) ids = list() for fut in futures: flux_id = fut.get_id() ids.append(flux_id) self._log.debug('submit: %s', flux_id) if cb: def app_cb(fut, event): try: cb(flux_id, event) except: self._log.exception('app cb failed') for ev in [ 'submit', 'alloc', 'start', 'finish', 'release', # 'free', # 'clean', 'exception', ]: fut.add_event_callback(ev, app_cb) self._log.debug('submitted: %s', ids) return ids
# -------------------------------------------------------------------------- #
[docs] def attach_jobs(self, ids: List[int], cb : Optional[Callable[[int, Any], None]] = None ) -> Any: with self._lock: if not self._uri: raise RuntimeError('FluxHelper is not connected') assert self._exe, 'no executor' for flux_id in ids: fut = self._exe.attach(flux_id) self._log.debug('attach %s : %s', flux_id, fut) if cb: def app_cb(fut, event): try: cb(flux_id, event) except: self._log.exception('app cb failed') for ev in [ 'submit', 'alloc', 'start', 'finish', 'release', # 'free', # 'clean', 'exception', ]: fut.add_event_callback(ev, app_cb)
# -------------------------------------------------------------------------- #
[docs] def cancel_jobs(self, flux_ids: List[int]) -> None: with self._lock: assert self._exe, 'no executor' for flux_id in flux_ids: fut = self._exe.attach(flux_id) self._log.debug('cancel %s : %s', flux_id, fut) fut.cancel()
# -------------------------------------------------------------------------- #
[docs] def get_handle(self) -> Any: with self._lock: if not self._uri: raise RuntimeError('FluxHelper is not connected') try: handle = self._flux.Flux(url=self._uri) assert handle, 'no handle' except Exception as e: raise RuntimeError('failed to connect at %s' % self._uri) from e self._handles.append(handle) return handle
# -------------------------------------------------------------------------- #
[docs] def get_executor(self) -> Any: with self._lock: if not self._uri: raise RuntimeError('FluxHelper is not connected') try: args = {'url': self._uri} exe = self._flux_job.executor.FluxExecutor(handle_kwargs=args) assert exe, 'no executor' except Exception as e: raise RuntimeError('failed to connect at %s' % self._uri) from e self._executors.append(exe) return exe
# ------------------------------------------------------------------------------