1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import os
- import time
- import logging
- import multiprocessing
- import tempfile
- import threading
- import library.python.filelock
- def _acquire_lock(lock_path, out_file_path):
- with library.python.filelock.FileLock(lock_path):
- with open(out_file_path, "a") as out:
- out.write("{}:{}\n".format(os.getpid(), time.time()))
- time.sleep(2)
- def test_filelock():
- temp_dir = tempfile.mkdtemp()
- lock_path = os.path.join(temp_dir, "file.lock")
- out_file_path = os.path.join(temp_dir, "out.txt")
- process_count = 5
- processes = []
- for i in range(process_count):
- process = multiprocessing.Process(target=_acquire_lock, args=(lock_path, out_file_path))
- process.start()
- processes.append(process)
- for process in processes:
- process.join()
- pids = []
- times = []
- with open(out_file_path) as out:
- content = out.read()
- logging.info("Times:\n%s", content)
- for line in content.strip().split("\n"):
- pid, time_val = line.split(":")
- pids.append(pid)
- times.append(float(time_val))
- assert len(set(pids)) == process_count
- time1 = times.pop()
- while times:
- time2 = times.pop()
- assert int(time1) - int(time2) >= 2
- time1 = time2
- def test_filelock_init_acquired():
- temp_dir = tempfile.mkdtemp()
- lock_path = os.path.join(temp_dir, "file.lock")
- with library.python.filelock.FileLock(lock_path):
- sublock = library.python.filelock.FileLock(lock_path)
- del sublock
- def test_concurrent_lock():
- filename = 'con.lock'
- def lock():
- l = library.python.filelock.FileLock(filename)
- time.sleep(1)
- l.acquire()
- l.release()
- try:
- os.unlink(filename)
- except OSError:
- pass
- threads = []
- for i in range(100):
- t = threading.Thread(target=lock)
- t.daemon = True
- threads.append(t)
- for t in threads:
- t.start()
- for t in threads:
- t.join()
|