# coding=utf-8 import errno import os import pytest import shutil import six import library.python.fs import library.python.strings import library.python.tmp import library.python.windows import yatest.common def in_env(case): def wrapped_case(*args, **kwargs): with library.python.tmp.temp_dir() as temp_dir: case(lambda path: os.path.join(temp_dir, path)) return wrapped_case def mkfile(path, data=''): with open(path, 'wb') as f: if data: f.write(data) if isinstance(data, six.binary_type) else f.write( data.encode(library.python.strings.fs_encoding()) ) def mktree_example(path, name): os.mkdir(path(name)) mkfile(path(name + '/file1'), 'FILE1') os.mkdir(path(name + '/dir1')) os.mkdir(path(name + '/dir2')) mkfile(path(name + '/dir2/file2'), 'FILE2') mkfile(path(name + '/dir2/file3'), 'FILE3') def file_data(path): with open(path, 'rb') as f: return f.read().decode('utf-8') def serialize_tree(path): if os.path.isfile(path): return file_data(path) data = {'dirs': set(), 'files': {}} for dirpath, dirnames, filenames in os.walk(path): dirpath_rel = os.path.relpath(dirpath, path) if dirpath_rel == '.': dirpath_rel = '' data['dirs'].update(set(os.path.join(dirpath_rel, x) for x in dirnames)) data['files'].update({os.path.join(dirpath_rel, x): file_data(os.path.join(dirpath, x)) for x in filenames}) return data def trees_equal(dir1, dir2): return serialize_tree(dir1) == serialize_tree(dir2) def inodes_unsupported(): return library.python.windows.on_win() def inodes_equal(path1, path2): return os.stat(path1).st_ino == os.stat(path2).st_ino def gen_error_access_denied(): if library.python.windows.on_win(): err = WindowsError() err.errno = errno.EACCES err.strerror = '' err.winerror = library.python.windows.ERRORS['ACCESS_DENIED'] else: err = OSError() err.errno = errno.EACCES err.strerror = os.strerror(err.errno) err.filename = 'unknown/file' raise err def test_errorfix_win(): @library.python.fs.errorfix_win def erroneous_func(): gen_error_access_denied() with pytest.raises(OSError) as errinfo: erroneous_func() assert errinfo.value.errno == errno.EACCES assert errinfo.value.filename == 'unknown/file' # See transcode_error, which encodes strerror, in library/python/windows/__init__.py assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type)) assert errinfo.value.strerror def test_custom_fs_error(): with pytest.raises(OSError) as errinfo: raise library.python.fs.CustomFsError(errno.EACCES, filename='some/file') assert errinfo.value.errno == errno.EACCES # See transcode_error, which encodes strerror, in library/python/windows/__init__.py assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type)) assert errinfo.value.filename == 'some/file' @in_env def test_ensure_dir(path): library.python.fs.ensure_dir(path('dir/subdir')) assert os.path.isdir(path('dir')) assert os.path.isdir(path('dir/subdir')) @in_env def test_ensure_dir_exists(path): os.makedirs(path('dir/subdir')) library.python.fs.ensure_dir(path('dir/subdir')) assert os.path.isdir(path('dir')) assert os.path.isdir(path('dir/subdir')) @in_env def test_ensure_dir_exists_partly(path): os.mkdir(path('dir')) library.python.fs.ensure_dir(path('dir/subdir')) assert os.path.isdir(path('dir')) assert os.path.isdir(path('dir/subdir')) @in_env def test_ensure_dir_exists_file(path): mkfile(path('dir')) with pytest.raises(OSError) as errinfo: library.python.fs.ensure_dir(path('dir/subdir')) # ENOENT on Windows! assert errinfo.value.errno in (errno.ENOTDIR, errno.ENOENT) assert os.path.isfile(path('dir')) @in_env def test_create_dirs(path): assert library.python.fs.create_dirs(path('dir/subdir')) == path('dir/subdir') assert os.path.isdir(path('dir')) assert os.path.isdir(path('dir/subdir')) @in_env def test_move_file(path): mkfile(path('src'), 'SRC') library.python.fs.move(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' @in_env def test_move_file_no_src(path): with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno == errno.ENOENT @in_env def test_move_file_exists(path): mkfile(path('src'), 'SRC') mkfile(path('dst'), 'DST') if library.python.windows.on_win(): # move is platform-dependent, use replace_file for dst replacement on all platforms with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'DST' else: library.python.fs.move(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' @in_env def test_move_file_exists_dir_empty(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR) assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert not os.path.isfile(path('dst/src')) @in_env def test_move_file_exists_dir_nonempty(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) mkfile(path('dst/dst_file')) with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR) assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/dst_file')) assert not os.path.isfile(path('dst/src')) @in_env def test_move_dir(path): os.mkdir(path('src')) mkfile(path('src/src_file')) library.python.fs.move(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/src_file')) @in_env def test_move_dir_exists_empty(path): os.mkdir(path('src')) mkfile(path('src/src_file')) os.mkdir(path('dst')) if library.python.windows.on_win(): # move is platform-dependent, use non-atomic replace for directory replacement with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert not os.path.isfile(path('dst/src_file')) else: library.python.fs.move(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/src_file')) @in_env def test_move_dir_exists_nonempty(path): os.mkdir(path('src')) mkfile(path('src/src_file')) os.mkdir(path('dst')) mkfile(path('dst/dst_file')) with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno in (errno.EEXIST, errno.ENOTEMPTY) assert os.path.isdir(path('src')) assert os.path.isfile(path('src/src_file')) assert os.path.isdir(path('dst')) assert not os.path.isfile(path('dst/src_file')) assert os.path.isfile(path('dst/dst_file')) @in_env def test_move_dir_exists_file(path): os.mkdir(path('src')) mkfile(path('src/src_file')) mkfile(path('dst'), 'DST') with pytest.raises(OSError) as errinfo: library.python.fs.move(path('src'), path('dst')) assert errinfo.value.errno in (errno.EEXIST, errno.ENOTDIR) assert os.path.isdir(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'DST' @in_env def test_replace_file(path): mkfile(path('src'), 'SRC') library.python.fs.replace_file(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' mkfile(path('src'), 'SRC') library.python.fs.replace(path('src'), path('dst2')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst2')) assert file_data(path('dst2')) == 'SRC' @in_env def test_replace_file_no_src(path): with pytest.raises(OSError) as errinfo: library.python.fs.replace_file(path('src'), path('dst')) assert errinfo.value.errno == errno.ENOENT with pytest.raises(OSError) as errinfo2: library.python.fs.replace(path('src'), path('dst2')) assert errinfo2.value.errno == errno.ENOENT @in_env def test_replace_file_exists(path): mkfile(path('src'), 'SRC') mkfile(path('dst'), 'DST') library.python.fs.replace_file(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' mkfile(path('src'), 'SRC') mkfile(path('dst2'), 'DST') library.python.fs.replace(path('src'), path('dst2')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst2')) assert file_data(path('dst2')) == 'SRC' @in_env def test_replace_file_exists_dir_empty(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) with pytest.raises(OSError) as errinfo: library.python.fs.replace_file(path('src'), path('dst')) assert errinfo.value.errno in (errno.EISDIR, errno.EACCES) assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert not os.path.isfile(path('dst/src')) @in_env def test_replace_file_exists_dir_empty_overwrite(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) library.python.fs.replace(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' @in_env def test_replace_file_exists_dir_nonempty(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) mkfile(path('dst/dst_file')) with pytest.raises(OSError) as errinfo: library.python.fs.replace_file(path('src'), path('dst')) assert errinfo.value.errno in (errno.EISDIR, errno.EACCES) assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/dst_file')) assert not os.path.isfile(path('dst/src')) @in_env def test_replace_file_exists_dir_nonempty_overwrite(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) mkfile(path('dst/dst_file')) library.python.fs.replace(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' @in_env def test_replace_dir(path): os.mkdir(path('src')) mkfile(path('src/src_file')) library.python.fs.replace(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/src_file')) @in_env def test_replace_dir_exists_empty(path): os.mkdir(path('src')) mkfile(path('src/src_file')) os.mkdir(path('dst')) library.python.fs.replace(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/src_file')) @in_env def test_replace_dir_exists_nonempty(path): os.mkdir(path('src')) mkfile(path('src/src_file')) os.mkdir(path('dst')) mkfile(path('dst/dst_file')) library.python.fs.replace(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/src_file')) assert not os.path.isfile(path('dst/dst_file')) @in_env def test_replace_dir_exists_file(path): os.mkdir(path('src')) mkfile(path('src/src_file')) mkfile(path('dst'), 'DST') library.python.fs.replace(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/src_file')) @in_env def test_remove_file(path): mkfile(path('path')) library.python.fs.remove_file(path('path')) assert not os.path.exists(path('path')) @in_env def test_remove_file_no(path): with pytest.raises(OSError) as errinfo: library.python.fs.remove_file(path('path')) assert errinfo.value.errno == errno.ENOENT @in_env def test_remove_file_exists_dir(path): os.mkdir(path('path')) with pytest.raises(OSError) as errinfo: library.python.fs.remove_file(path('path')) assert errinfo.value.errno in (errno.EISDIR, errno.EACCES) assert os.path.isdir(path('path')) @in_env def test_remove_dir(path): os.mkdir(path('path')) library.python.fs.remove_dir(path('path')) assert not os.path.exists(path('path')) @in_env def test_remove_dir_no(path): with pytest.raises(OSError) as errinfo: library.python.fs.remove_dir(path('path')) assert errinfo.value.errno == errno.ENOENT @in_env def test_remove_dir_exists_file(path): mkfile(path('path')) with pytest.raises(OSError) as errinfo: library.python.fs.remove_dir(path('path')) assert errinfo.value.errno in (errno.ENOTDIR, errno.EINVAL) assert os.path.isfile(path('path')) @in_env def test_remove_tree(path): mktree_example(path, 'path') library.python.fs.remove_tree(path('path')) assert not os.path.exists(path('path')) @in_env def test_remove_tree_empty(path): os.mkdir(path('path')) library.python.fs.remove_tree(path('path')) assert not os.path.exists(path('path')) @in_env def test_remove_tree_file(path): mkfile(path('path')) library.python.fs.remove_tree(path('path')) assert not os.path.exists(path('path')) @in_env def test_remove_tree_no(path): with pytest.raises(OSError) as errinfo: library.python.fs.remove_tree(path('path')) assert errinfo.value.errno == errno.ENOENT @in_env def test_remove_tree_safe(path): library.python.fs.remove_tree_safe(path('path')) @in_env def test_ensure_removed(path): library.python.fs.ensure_removed(path('path')) @in_env def test_ensure_removed_exists(path): os.makedirs(path('dir/subdir')) library.python.fs.ensure_removed(path('dir')) assert not os.path.exists(path('dir')) @in_env def test_ensure_removed_exists_precise(path): os.makedirs(path('dir/subdir')) library.python.fs.ensure_removed(path('dir/subdir')) assert os.path.exists(path('dir')) assert not os.path.exists(path('dir/subdir')) @in_env def test_hardlink_file(path): mkfile(path('src'), 'SRC') library.python.fs.hardlink(path('src'), path('dst')) assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' assert inodes_unsupported() or inodes_equal(path('src'), path('dst')) @in_env def test_hardlink_file_no_src(path): with pytest.raises(OSError) as errinfo: library.python.fs.hardlink(path('src'), path('dst')) assert errinfo.value.errno == errno.ENOENT @in_env def test_hardlink_file_exists(path): mkfile(path('src'), 'SRC') mkfile(path('dst'), 'DST') with pytest.raises(OSError) as errinfo: library.python.fs.hardlink(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'DST' assert inodes_unsupported() or not inodes_equal(path('src'), path('dst')) @in_env def test_hardlink_file_exists_dir(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) with pytest.raises(OSError) as errinfo: library.python.fs.hardlink(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert not os.path.isfile(path('dst/src')) @in_env def test_hardlink_dir(path): os.mkdir(path('src')) mkfile(path('src/src_file')) with pytest.raises(OSError) as errinfo: library.python.fs.hardlink(path('src'), path('dst')) assert errinfo.value.errno in (errno.EPERM, errno.EACCES) assert os.path.isdir(path('src')) assert not os.path.isdir(path('dst')) @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_file(path): mkfile(path('src'), 'SRC') library.python.fs.symlink(path('src'), path('dst')) assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert os.path.islink(path('dst')) assert file_data(path('dst')) == 'SRC' @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_file_no_src(path): library.python.fs.symlink(path('src'), path('dst')) assert not os.path.isfile(path('src')) assert not os.path.isfile(path('dst')) assert os.path.islink(path('dst')) @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_file_exists(path): mkfile(path('src'), 'SRC') mkfile(path('dst'), 'DST') with pytest.raises(OSError) as errinfo: library.python.fs.symlink(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert not os.path.islink(path('dst')) assert file_data(path('dst')) == 'DST' @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_file_exists_dir(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) with pytest.raises(OSError) as errinfo: library.python.fs.symlink(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert not os.path.islink(path('dst')) assert not os.path.isfile(path('dst/src')) @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_dir(path): os.mkdir(path('src')) mkfile(path('src/src_file')) library.python.fs.symlink(path('src'), path('dst')) assert os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert os.path.islink(path('dst')) assert os.path.isfile(path('dst/src_file')) @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_dir_no_src(path): library.python.fs.symlink(path('src'), path('dst')) assert not os.path.isdir(path('src')) assert not os.path.isdir(path('dst')) assert os.path.islink(path('dst')) @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_dir_exists(path): os.mkdir(path('src')) mkfile(path('src/src_file')) os.mkdir(path('dst')) with pytest.raises(OSError) as errinfo: library.python.fs.symlink(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isdir(path('src')) assert os.path.isdir(path('dst')) assert not os.path.islink(path('dst')) assert not os.path.isfile(path('dst/src_file')) @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_symlink_dir_exists_file(path): os.mkdir(path('src')) mkfile(path('src/src_file')) mkfile(path('dst'), 'DST') with pytest.raises(OSError) as errinfo: library.python.fs.symlink(path('src'), path('dst')) assert errinfo.value.errno == errno.EEXIST assert os.path.isdir(path('src')) assert os.path.isfile(path('dst')) assert not os.path.islink(path('dst')) @in_env def test_hardlink_tree(path): mktree_example(path, 'src') library.python.fs.hardlink_tree(path('src'), path('dst')) assert trees_equal(path('src'), path('dst')) @in_env def test_hardlink_tree_empty(path): os.mkdir(path('src')) library.python.fs.hardlink_tree(path('src'), path('dst')) assert trees_equal(path('src'), path('dst')) @in_env def test_hardlink_tree_file(path): mkfile(path('src'), 'SRC') library.python.fs.hardlink_tree(path('src'), path('dst')) assert trees_equal(path('src'), path('dst')) @in_env def test_hardlink_tree_no_src(path): with pytest.raises(OSError) as errinfo: library.python.fs.hardlink_tree(path('src'), path('dst')) assert errinfo.value.errno == errno.ENOENT @in_env def test_hardlink_tree_exists(path): mktree_example(path, 'src') os.mkdir(path('dst_dir')) with pytest.raises(OSError) as errinfo: library.python.fs.hardlink_tree(path('src'), path('dst_dir')) assert errinfo.value.errno == errno.EEXIST mkfile(path('dst_file'), 'DST') with pytest.raises(OSError) as errinfo: library.python.fs.hardlink_tree(path('src'), path('dst_file')) assert errinfo.value.errno == errno.EEXIST @in_env def test_hardlink_tree_file_exists(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst_dir')) with pytest.raises(OSError) as errinfo: library.python.fs.hardlink_tree(path('src'), path('dst_dir')) assert errinfo.value.errno == errno.EEXIST mkfile(path('dst_file'), 'DST') with pytest.raises(OSError) as errinfo: library.python.fs.hardlink_tree(path('src'), path('dst_file')) assert errinfo.value.errno == errno.EEXIST @in_env def test_copy_file(path): mkfile(path('src'), 'SRC') library.python.fs.copy_file(path('src'), path('dst')) assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' @in_env def test_copy_file_no_src(path): with pytest.raises(EnvironmentError): library.python.fs.copy_file(path('src'), path('dst')) @in_env def test_copy_file_exists(path): mkfile(path('src'), 'SRC') mkfile(path('dst'), 'DST') library.python.fs.copy_file(path('src'), path('dst')) assert os.path.isfile(path('src')) assert os.path.isfile(path('dst')) assert file_data(path('dst')) == 'SRC' @in_env def test_copy_file_exists_dir_empty(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) with pytest.raises(EnvironmentError): library.python.fs.copy_file(path('src'), path('dst')) assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert not os.path.isfile(path('dst/src')) @in_env def test_copy_file_exists_dir_nonempty(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst')) mkfile(path('dst/dst_file')) with pytest.raises(EnvironmentError): library.python.fs.copy_file(path('src'), path('dst')) assert os.path.isfile(path('src')) assert os.path.isdir(path('dst')) assert os.path.isfile(path('dst/dst_file')) assert not os.path.isfile(path('dst/src')) @in_env def test_copy_tree(path): mktree_example(path, 'src') library.python.fs.copy_tree(path('src'), path('dst')) assert trees_equal(path('src'), path('dst')) @in_env def test_copy_tree_empty(path): os.mkdir(path('src')) library.python.fs.copy_tree(path('src'), path('dst')) assert trees_equal(path('src'), path('dst')) @in_env def test_copy_tree_file(path): mkfile(path('src'), 'SRC') library.python.fs.copy_tree(path('src'), path('dst')) assert trees_equal(path('src'), path('dst')) @in_env def test_copy_tree_no_src(path): with pytest.raises(EnvironmentError): library.python.fs.copy_tree(path('src'), path('dst')) @in_env def test_copy_tree_exists(path): mktree_example(path, 'src') os.mkdir(path('dst_dir')) with pytest.raises(EnvironmentError): library.python.fs.copy_tree(path('src'), path('dst_dir')) mkfile(path('dst_file'), 'DST') with pytest.raises(EnvironmentError): library.python.fs.copy_tree(path('src'), path('dst_file')) @in_env def test_copy_tree_file_exists(path): mkfile(path('src'), 'SRC') os.mkdir(path('dst_dir')) with pytest.raises(EnvironmentError): library.python.fs.copy_tree(path('src'), path('dst_dir')) mkfile(path('dst_file'), 'DST') library.python.fs.copy_tree(path('src'), path('dst_file')) assert trees_equal(path('src'), path('dst_file')) @in_env def test_read_file(path): mkfile(path('src'), 'SRC') assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == 'SRC' assert library.python.fs.read_file(path('src'), binary=False) == 'SRC' @in_env def test_read_file_empty(path): mkfile(path('src')) assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == '' assert library.python.fs.read_file(path('src'), binary=False) == '' @in_env def test_read_file_multiline(path): mkfile(path('src'), 'SRC line 1\nSRC line 2\n') assert ( library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == 'SRC line 1\nSRC line 2\n' ) assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n' @in_env def test_read_file_multiline_crlf(path): mkfile(path('src'), 'SRC line 1\r\nSRC line 2\r\n') assert ( library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == 'SRC line 1\r\nSRC line 2\r\n' ) if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3 assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n' else: assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\r\nSRC line 2\r\n' @in_env def test_read_file_unicode(path): s = u'АБВ' mkfile(path('src'), s.encode('utf-8')) mkfile(path('src_cp1251'), s.encode('cp1251')) assert library.python.fs.read_file_unicode(path('src')) == s assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s assert library.python.fs.read_file_unicode(path('src'), binary=False) == s assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s @in_env def test_read_file_unicode_empty(path): mkfile(path('src')) mkfile(path('src_cp1251')) assert library.python.fs.read_file_unicode(path('src')) == '' assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == '' assert library.python.fs.read_file_unicode(path('src'), binary=False) == '' assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == '' @in_env def test_read_file_unicode_multiline(path): s = u'АБВ\nИ еще\n' mkfile(path('src'), s.encode('utf-8')) mkfile(path('src_cp1251'), s.encode('cp1251')) assert library.python.fs.read_file_unicode(path('src')) == s assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s assert library.python.fs.read_file_unicode(path('src'), binary=False) == s assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s @in_env def test_read_file_unicode_multiline_crlf(path): s = u'АБВ\r\nИ еще\r\n' mkfile(path('src'), s.encode('utf-8')) mkfile(path('src_cp1251'), s.encode('cp1251')) assert library.python.fs.read_file_unicode(path('src')) == s assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3 assert library.python.fs.read_file_unicode(path('src'), binary=False) == u'АБВ\nИ еще\n' assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == u'АБВ\nИ еще\n' else: assert library.python.fs.read_file_unicode(path('src'), binary=False) == s assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s @in_env def test_write_file(path): library.python.fs.write_file(path('src'), 'SRC') assert file_data(path('src')) == 'SRC' library.python.fs.write_file(path('src2'), 'SRC', binary=False) assert file_data(path('src2')) == 'SRC' @in_env def test_write_file_empty(path): library.python.fs.write_file(path('src'), '') assert file_data(path('src')) == '' library.python.fs.write_file(path('src2'), '', binary=False) assert file_data(path('src2')) == '' @in_env def test_write_file_multiline(path): library.python.fs.write_file(path('src'), 'SRC line 1\nSRC line 2\n') assert file_data(path('src')) == 'SRC line 1\nSRC line 2\n' library.python.fs.write_file(path('src2'), 'SRC line 1\nSRC line 2\n', binary=False) if library.python.windows.on_win(): assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n' else: assert file_data(path('src2')) == 'SRC line 1\nSRC line 2\n' @in_env def test_write_file_multiline_crlf(path): library.python.fs.write_file(path('src'), 'SRC line 1\r\nSRC line 2\r\n') assert file_data(path('src')) == 'SRC line 1\r\nSRC line 2\r\n' library.python.fs.write_file(path('src2'), 'SRC line 1\r\nSRC line 2\r\n', binary=False) if library.python.windows.on_win(): assert file_data(path('src2')) == 'SRC line 1\r\r\nSRC line 2\r\r\n' else: assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n' @in_env def test_get_file_size(path): mkfile(path('one.txt'), '22') assert library.python.fs.get_file_size(path('one.txt')) == 2 @in_env def test_get_file_size_empty(path): mkfile(path('one.txt')) assert library.python.fs.get_file_size(path('one.txt')) == 0 @in_env def test_get_tree_size(path): os.makedirs(path('deeper')) mkfile(path('one.txt'), '1') mkfile(path('deeper/two.txt'), '22') assert library.python.fs.get_tree_size(path('one.txt')) == 1 assert library.python.fs.get_tree_size(path('')) == 1 assert library.python.fs.get_tree_size(path(''), recursive=True) == 3 @pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') @in_env def test_get_tree_size_dangling_symlink(path): os.makedirs(path('deeper')) mkfile(path('one.txt'), '1') mkfile(path('deeper/two.txt'), '22') os.symlink(path('deeper/two.txt'), path("deeper/link.txt")) os.remove(path('deeper/two.txt')) # does not fail assert library.python.fs.get_tree_size(path(''), recursive=True) == 1 @pytest.mark.skipif(not library.python.windows.on_win(), reason='Test hardlinks on windows') def test_hardlink_or_copy(): max_allowed_hard_links = 1023 def run(hardlink_function, dir): src = r"test.txt" with open(src, "w") as f: f.write("test") for i in range(max_allowed_hard_links + 1): hardlink_function(src, os.path.join(dir, "{}.txt".format(i))) dir1 = library.python.fs.create_dirs("one") with pytest.raises(WindowsError) as e: run(library.python.fs.hardlink, dir1) assert e.value.winerror == 1142 assert len(os.listdir(dir1)) == max_allowed_hard_links dir2 = library.python.fs.create_dirs("two") run(library.python.fs.hardlink_or_copy, dir2) assert len(os.listdir(dir2)) == max_allowed_hard_links + 1 def test_remove_tree_unicode(): path = u"test_remove_tree_unicode/русский".encode("utf-8") os.makedirs(path) library.python.fs.remove_tree(six.text_type("test_remove_tree_unicode")) assert not os.path.exists("test_remove_tree_unicode") def test_remove_tree_safe_unicode(): path = u"test_remove_tree_safe_unicode/русский".encode("utf-8") os.makedirs(path) library.python.fs.remove_tree_safe(six.text_type("test_remove_tree_safe_unicode")) assert not os.path.exists("test_remove_tree_safe_unicode") def test_copy_tree_custom_copy_function(): library.python.fs.create_dirs("test_copy_tree_src/deepper/inner") library.python.fs.write_file("test_copy_tree_src/deepper/deepper.txt", "deepper.txt") library.python.fs.write_file("test_copy_tree_src/deepper/inner/inner.txt", "inner.txt") copied = [] def copy_function(src, dst): shutil.copy2(src, dst) copied.append(dst) library.python.fs.copy_tree( "test_copy_tree_src", yatest.common.work_path("test_copy_tree_dst"), copy_function=copy_function ) assert len(copied) == 2 assert yatest.common.work_path("test_copy_tree_dst/deepper/deepper.txt") in copied assert yatest.common.work_path("test_copy_tree_dst/deepper/inner/inner.txt") in copied def test_copy2(): library.python.fs.symlink("non-existent", "link") library.python.fs.copy2("link", "link2", follow_symlinks=False) assert os.path.islink("link2") assert os.readlink("link2") == "non-existent" def test_commonpath(): pj = os.path.join pja = lambda *x: os.path.abspath(pj(*x)) assert library.python.fs.commonpath(['a', 'b']) == '' assert library.python.fs.commonpath([pj('t', '1')]) == pj('t', '1') assert library.python.fs.commonpath([pj('t', '1'), pj('t', '2')]) == pj('t') assert library.python.fs.commonpath([pj('t', '1', '2'), pj('t', '1', '2')]) == pj('t', '1', '2') assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2')]) == pj('t', '1') assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2'), pj('t', '1', '3')]) == pj('t', '1') assert library.python.fs.commonpath([pja('t', '1', '1'), pja('t', '1', '2')]) == pja('t', '1') assert library.python.fs.commonpath({pj('t', '1'), pj('t', '2')}) == pj('t')