123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import os
- import time
- import logging
- import multiprocessing
- import tempfile
- import threading
- import library.python.filelock
- def _acquire_lock(lock_path, out_file_path):
- with library.python.filelock.FileLock(lock_path):
- with open(out_file_path, "a") as out:
- out.write("{}:{}\n".format(os.getpid(), time.time()))
- time.sleep(2)
- def test_filelock():
- temp_dir = tempfile.mkdtemp()
- lock_path = os.path.join(temp_dir, "file.lock")
- out_file_path = os.path.join(temp_dir, "out.txt")
- process_count = 5
- processes = []
- for i in range(process_count):
- process = multiprocessing.Process(target=_acquire_lock, args=(lock_path, out_file_path))
- process.start()
- processes.append(process)
- for process in processes:
- process.join()
- pids = []
- times = []
- with open(out_file_path) as out:
- content = out.read()
- logging.info("Times:\n%s", content)
- for line in content.strip().split("\n"):
- pid, time_val = line.split(":")
- pids.append(pid)
- times.append(float(time_val))
- assert len(set(pids)) == process_count
- time1 = times.pop()
- while times:
- time2 = times.pop()
- assert int(time1) - int(time2) >= 2
- time1 = time2
- def test_filelock_init_acquired():
- temp_dir = tempfile.mkdtemp()
- lock_path = os.path.join(temp_dir, "file.lock")
- with library.python.filelock.FileLock(lock_path):
- sublock = library.python.filelock.FileLock(lock_path)
- del sublock
- def test_concurrent_lock():
- filename = 'thread.lock'
- def lock():
- lock = library.python.filelock.FileLock(filename)
- time.sleep(1)
- assert lock.acquire()
- lock.release()
- try:
- os.unlink(filename)
- except OSError:
- pass
- threads = []
- for i in range(100):
- t = threading.Thread(target=lock)
- threads.append(t)
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- def test_pidfilelock():
- lock_file = 'pidfile.lock'
- # there should be no info
- lock = library.python.filelock.PidFileLock(lock_file)
- assert lock.info.pid == 0
- assert lock.info.time == 0
- with library.python.filelock.PidFileLock(lock_file) as lock:
- assert lock.info.pid == os.getpid()
- assert lock.info.time <= time.time()
- assert lock.info.time > time.time() - 2
- newlock = library.python.filelock.PidFileLock(lock_file)
- # info shouldn't require locking
- assert newlock.info.pid == os.getpid()
- assert not newlock.acquire(blocking=False)
- newlock = library.python.filelock.PidFileLock(lock_file)
- # info is still accessible
- assert newlock.info.pid == os.getpid()
- t = newlock.info.time
- # info is updated
- time.sleep(1)
- with newlock as lock:
- assert lock.info.time > t
- def _try_acquire_pidlock(lock_file, out_file, lock_pid=None):
- lock = library.python.filelock.PidFileLock(lock_file)
- with open(out_file, "w") as afile:
- afile.write("1" if lock.acquire(blocking=False) else "0")
- if lock_pid is not None:
- assert lock.info.pid == lock_pid
- def test_pidfilelock_multiprocessing():
- lock_file = 'mp_pidfile.lock'
- out_file = lock_file + ".out"
- # subprocess can aquire lock
- proc = multiprocessing.Process(target=_try_acquire_pidlock, args=(lock_file, out_file))
- proc.start()
- proc.join()
- with open(out_file) as afile:
- assert "1" == afile.read()
- # subprocess can't aquire lock
- with library.python.filelock.PidFileLock(lock_file) as lock:
- proc = multiprocessing.Process(target=_try_acquire_pidlock, args=(lock_file, out_file, lock.info.pid))
- proc.start()
- proc.join()
- with open(out_file) as afile:
- assert "0" == afile.read()
|