import os
import time
import errno
import threading as mt
from .misc import as_bytes, ru_open
from .debug import get_caller_name
# ------------------------------------------------------------------------------
#
[docs]class Lockfile(object):
'''
This class represents a lockfile. On calling `open()` (or entering
a resource context `with`), the lockfile is exclusively opened and locked.
The returned object can be used for `read()`, `write()` and `seek()`
operations, and the lock is only released on `close()` (or when leaving the
resource context).
If `delete=True` is specified on construction, then the lockfile is removed
uppon `release()`, and the content is lost. Only for this mode is it
possible to obtain the current lock owner when using `get_owner()` while
waiting for a lock owned by another process or thread - the call will return
'unknown' otherwise.
Note that the file offset is not shared between owners of the lock file
- a newly acquired lock on the file should prompt a seek to the desired
file location (the initial offset is '0').
Example::
with ru.Lockfile(fname) as lf0:
lf0.write('test 0\\n')
with ru.Lockfile(fname) as lf1:
lf1.lseek(0, os.SEEK_SET)
lf1.write('test 1\\n')
with ru.Lockfile(fname) as lf2:
lf2.lseek(0, os.SEEK_END)
lf2.write('test 2\\n')
# raises a RuntimeError, as we won't be able to acquire the lock
with ru.Lockfile(fname) as lf3:
lf3.lseek(0, os.SEEK_END)
lf3.write('test 3\\n')
with open(fname, 'r') as fin:
data = fin.read()
assert data == 'test 1\\ntest 2\\n'
Example::
lf1 = ru.Lockfile(fname)
lf2 = ru.Lockfile(fname)
with lf1:
lf1.write('test 0')
with lf2(timeout=3):
# we won't reach this, the `with` above will time out and raise
lf2.write('test 0')
Implementation:
The implementation relies on cooperative locking: a process or thread which
uses this API to lock a file will create a temporary file in the same
directory (filename: `.<filename>.<pid>.<counter>` where `counter` is
a process singleton). On lock acquisition, we will attempt to create
a symbolic link to the file `<filename>.lck`, which will only succeed if no
other process or thread holds that lock. On `release`, that symbolic gets
removed; on object destruction, the temporary file gets removed.
The private file will contain the name of the owner. Any other process or
thread can follow the `.<filename>.lck` symlink, open the link's target
file, and read the name of the owner. No attempt is made to avoid races
between failure to acquire the lockfile and querying the current owner, so
that information is not reliable, but intented to be an informative help for
debugging and tracing purposes.
Example::
lf1 = ru.Lockfile(fname)
lf2 = ru.Lockfile(fname)
with lf1:
lf1.write('test 0')
try:
lf2.acquire() # timeout == None == 0.0
except RuntimeError:
print('lock is held by %s' % lf2.get_owner())
'''
_counter = 0
_lock = mt.Lock()
# --------------------------------------------------------------------------
#
def __init__(self, fname, *args, **kwargs):
'''
The `args` and `kwargs` arguments are passed to `acquire()` when used in
a `with Lockfile():` clause:
with ru.Lockfile(fname, timeout=3) as lockfile:
lockfile.write(data)
'''
kwargs.setdefault('delete', False)
self._fname = fname
self._fd = None
if 'delete' in kwargs:
self._delete = kwargs['delete']
del kwargs['delete']
else:
self._delete = False
self._args = args
self._kwargs = kwargs
# create a tempfile to be used for lock acquisition
with Lockfile._lock:
cnt = Lockfile._counter
Lockfile._counter += 1
fname_base = os.path.basename(self._fname)
fname_dir = os.path.dirname(self._fname)
self._lck = '%s/%s.lck' % (fname_dir, fname_base)
self._tmp = '%s/.%s.%d.%d' % (fname_dir, fname_base, os.getpid(), cnt)
# make sure our tmp file exists
with ru_open(self._tmp, 'w') as fout:
fout.write('%s\n' % get_caller_name())
# --------------------------------------------------------------------------
#
def __call__(self, *args, **kwargs):
'''
helper method to pass arguments while using the `with lock` clause:
with lock(timeout=2, owner=foo):
lock.write(data)
'''
self._args = args
self._kwargs = kwargs
return self
# --------------------------------------------------------------------------
#
def __enter__(self):
self.acquire(*self._args, **self._kwargs)
return self
# --------------------------------------------------------------------------
#
def __exit__(self, foo, bar, baz):
return self.release()
# --------------------------------------------------------------------------
#
def __bool__(self) -> bool:
return bool(self._fd)
# --------------------------------------------------------------------------
#
[docs] def locked(self) -> bool:
return bool(self)
# --------------------------------------------------------------------------
#
[docs] def acquire(self, timeout=None, owner=None):
'''
When the lock could not be acquired after `timeout` seconds, an
`TimeoutError` is raised (i.e. an `OSError` with errno `ETIME`). When
`owner` is specified and the lockfile was created with `delete=True`,
then that string will be passed to any other method which tries to
acquire this lockfile while it is locked (see `get_owner()`). If not
specified, the `owner` string is set to `ru.get_caller_name()`.
timeout:
< 0 : wait forever
= 0 : try locking once and raise `RuntimeError` on failure
> 0 : try for that many seconds, and raise `TimeoutError` on failure
None: same as 0
'''
if timeout is None:
timeout = 0.0
if self._fd:
raise RuntimeError('cannot call open twice')
# pre-emptively record who wants to acquire the lock
if not owner:
owner = get_caller_name()
with ru_open(self._tmp, 'w') as fout:
fout.write('%s\n' % owner)
start = time.time()
while True:
# attempt to link self._tmp to self._lck. Once that succeeds, open
# the self._fnmame for read/write.
try:
os.symlink(self._tmp, self._lck)
break # link succeeded, we have the lock
# if self._lck exists the above `link` will raise, and we try
# again after a bit. All other errors are fatal
except OSError as e:
# pylint: disable=W0707
if not e.errno == errno.EEXIST:
raise
if timeout == 0.0:
raise RuntimeError('failed to lock %s (%s)'
% (self._fname, self.get_owner()))
elif timeout > 0:
now = time.time()
if now - start > timeout:
raise TimeoutError(errno.ETIME, 'lock timeout for %s ()'
% self._fname, self.get_owner())
# try again
time.sleep(0.1) # FIXME: granulatiy?
continue
# the link succeeded, so open the file and break
self._fd = os.open(self._fname, os.O_RDWR | os.O_CREAT)
return self.locked()
# --------------------------------------------------------------------------
#
[docs] def get_owner(self):
'''
That name of the current owner of the lockfile can be retrieved by any
other process or thread which attempts to lock the same file. This
method returns `None` if the file is not currently locked.
Note: this method may race against lock acquisition / release, and the
information returned may be outdated.
'''
try:
with ru_open(self._lck, 'r') as fin:
# strip newline
return fin.readline()[:-1]
except OSError as e:
if e.errno == errno.EEXIST:
return None
else:
return 'unknown'
# --------------------------------------------------------------------------
#
[docs] def release(self):
'''
Release the lock on the file. If `delete=True` was specified on
construction, then the file (and all owner information) are removed.
Once released, all other threads/processes waiting in the `acquire()`
method call will compete for the lock, and one of them will obtain it.
'''
if not self._fd:
raise RuntimeError('lockfile is not open')
# release the file handle
os.close(self._fd)
self._fd = None
if self._delete:
self.remove()
# release the lock
os.unlink(self._lck)
# --------------------------------------------------------------------------
#
[docs] def remove(self):
os.unlink(self._fname)
# --------------------------------------------------------------------------
#
[docs] def read(self, length):
'''
Read from the locked file at the current offset. This method will raise
an `RuntimeError` when being called without the file being locked.
'''
if not self._fd:
raise RuntimeError('lockfile is not open')
return os.read(self._fd, length)
# --------------------------------------------------------------------------
#
[docs] def write(self, data):
'''
Write to the locked file at the current offset. This method will raise
an `RuntimeError` when being called without the file being locked.
'''
if not self._fd:
raise RuntimeError('lockfile is not open')
return os.write(self._fd, as_bytes(data))
# --------------------------------------------------------------------------
#
[docs] def seek(self, pos, how):
'''
Same as `lseek()`
'''
return self.lseek(pos, how)
# --------------------------------------------------------------------------
#
[docs] def lseek(self, pos, how):
'''
Change the offset at which the next read or write method is applied.
The arguments are interpreted as documented by `os.lseek()`.
'''
if not self._fd:
raise RuntimeError('lockfile is not open')
return os.lseek(self._fd, pos, how)
# ------------------------------------------------------------------------------