path.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # coding=utf-8
  2. import errno
  3. import os
  4. import shutil
  5. import contextlib
  6. import library.python.fs as lpf
  7. def replace_in_file(path, old, new):
  8. """
  9. Replace text occurrences in a file
  10. :param path: path to the file
  11. :param old: text to replace
  12. :param new: replacement
  13. """
  14. with open(path) as fp:
  15. content = fp.read()
  16. lpf.ensure_removed(path)
  17. with open(path, 'w') as fp:
  18. fp.write(content.replace(old, new))
  19. @contextlib.contextmanager
  20. def change_dir(path):
  21. old = os.getcwd()
  22. try:
  23. os.chdir(path)
  24. yield path
  25. finally:
  26. os.chdir(old)
  27. def copytree(src, dst, symlinks=False, ignore=None, postprocessing=None):
  28. '''
  29. Copy an entire directory of files into an existing directory
  30. instead of raising Exception what shtuil.copytree does
  31. '''
  32. if not os.path.exists(dst) and os.path.isdir(src):
  33. os.makedirs(dst)
  34. for item in os.listdir(src):
  35. s = os.path.join(src, item)
  36. d = os.path.join(dst, item)
  37. if os.path.isdir(s):
  38. shutil.copytree(s, d, symlinks, ignore)
  39. else:
  40. shutil.copy2(s, d)
  41. if postprocessing:
  42. postprocessing(dst, False)
  43. for root, dirs, files in os.walk(dst):
  44. for path in dirs:
  45. postprocessing(os.path.join(root, path), False)
  46. for path in files:
  47. postprocessing(os.path.join(root, path), True)
  48. def get_unique_file_path(dir_path, file_pattern, create_file=True, max_suffix=10000):
  49. def atomic_file_create(path):
  50. try:
  51. fd = os.open(path, os.O_CREAT | os.O_EXCL, 0o644)
  52. os.close(fd)
  53. return True
  54. except OSError as e:
  55. if e.errno in [errno.EEXIST, errno.EISDIR, errno.ETXTBSY]:
  56. return False
  57. # Access issue with file itself, not parent directory.
  58. if e.errno == errno.EACCES and os.path.exists(path):
  59. return False
  60. raise e
  61. def atomic_dir_create(path):
  62. try:
  63. os.mkdir(path)
  64. return True
  65. except OSError as e:
  66. if e.errno == errno.EEXIST:
  67. return False
  68. raise e
  69. file_path = os.path.join(dir_path, file_pattern)
  70. lpf.ensure_dir(os.path.dirname(file_path))
  71. file_counter = 0
  72. handler = atomic_file_create if create_file else atomic_dir_create
  73. while os.path.exists(file_path) or not handler(file_path):
  74. file_path = os.path.join(dir_path, file_pattern + ".{}".format(file_counter))
  75. file_counter += 1
  76. assert file_counter < max_suffix
  77. return file_path