|
@@ -225,17 +225,20 @@ class DirectoryBasedExampleDatabase(ExampleDatabase):
|
|
|
# Note: we attempt to create the dir in question now. We
|
|
|
# already checked for permissions, but there can still be other issues,
|
|
|
# e.g. the disk is full, or permissions might have been changed.
|
|
|
- self._key_path(key).mkdir(exist_ok=True, parents=True)
|
|
|
- path = self._value_path(key, value)
|
|
|
- if not path.exists():
|
|
|
- suffix = binascii.hexlify(os.urandom(16)).decode("ascii")
|
|
|
- tmpname = path.with_suffix(f"{path.suffix}.{suffix}")
|
|
|
- tmpname.write_bytes(value)
|
|
|
- try:
|
|
|
- tmpname.rename(path)
|
|
|
- except OSError: # pragma: no cover
|
|
|
- tmpname.unlink()
|
|
|
- assert not tmpname.exists()
|
|
|
+ try:
|
|
|
+ self._key_path(key).mkdir(exist_ok=True, parents=True)
|
|
|
+ path = self._value_path(key, value)
|
|
|
+ if not path.exists():
|
|
|
+ suffix = binascii.hexlify(os.urandom(16)).decode("ascii")
|
|
|
+ tmpname = path.with_suffix(f"{path.suffix}.{suffix}")
|
|
|
+ tmpname.write_bytes(value)
|
|
|
+ try:
|
|
|
+ tmpname.rename(path)
|
|
|
+ except OSError: # pragma: no cover
|
|
|
+ tmpname.unlink()
|
|
|
+ assert not tmpname.exists()
|
|
|
+ except OSError: # pragma: no cover
|
|
|
+ pass
|
|
|
|
|
|
def move(self, src: bytes, dest: bytes, value: bytes) -> None:
|
|
|
if src == dest:
|