3 """Utilities for working with files."""
12 from typing import Optional
14 from os.path import isfile, join, exists
16 logger = logging.getLogger(__name__)
19 def create_path_if_not_exist(path, on_error=None):
21 Attempts to create path if it does not exist. If on_error is
22 specified, it is called with an exception if one occurs, otherwise
23 exception is rethrown.
27 >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
28 >>> os.path.exists(path)
30 >>> create_path_if_not_exist(path)
31 >>> os.path.exists(path)
34 logger.debug(f"Creating path {path}")
35 previous_umask = os.umask(0)
40 if ex.errno != errno.EEXIST and not os.path.isdir(path):
41 if on_error is not None:
46 os.umask(previous_umask)
49 def does_file_exist(filename: str) -> bool:
50 return os.path.exists(filename) and os.path.isfile(filename)
53 def does_directory_exist(dirname: str) -> bool:
54 return os.path.exists(dirname) and os.path.isdir(dirname)
57 def does_path_exist(pathname: str) -> bool:
58 return os.path.exists(pathname)
61 def get_file_size(filename: str) -> int:
62 return os.path.getsize(filename)
65 def is_normal_file(filename: str) -> bool:
66 return os.path.isfile(filename)
69 def is_directory(filename: str) -> bool:
70 return os.path.isdir(filename)
73 def is_symlink(filename: str) -> bool:
74 return os.path.islink(filename)
77 def is_same_file(file1: str, file2: str) -> bool:
78 return os.path.samefile(file1, file2)
81 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
83 return os.stat(filename)
84 except Exception as e:
89 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
90 tss = get_file_raw_timestamps(filename)
96 def get_file_raw_atime(filename: str) -> Optional[float]:
97 return get_file_raw_timestamp(filename, lambda x: x.st_atime)
100 def get_file_raw_mtime(filename: str) -> Optional[float]:
101 return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
104 def get_file_raw_ctime(filename: str) -> Optional[float]:
105 return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
108 def get_file_md5(filename: str) -> str:
109 file_hash = hashlib.md5()
110 with open(filename, "rb") as f:
113 file_hash.update(chunk)
115 return file_hash.hexdigest()
118 def set_file_raw_atime(filename: str, atime: float):
119 mtime = get_file_raw_mtime(filename)
120 os.utime(filename, (atime, mtime))
123 def set_file_raw_mtime(filename: str, mtime: float):
124 atime = get_file_raw_atime(filename)
125 os.utime(filename, (atime, mtime))
128 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
130 os.utime(filename, (ts, ts))
132 os.utime(filename, None)
135 def convert_file_timestamp_to_datetime(
136 filename: str, producer
137 ) -> Optional[datetime.datetime]:
138 ts = producer(filename)
140 return datetime.datetime.fromtimestamp(ts)
144 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
145 return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
148 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
149 return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
152 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
153 return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
156 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
158 ts = get_file_raw_timestamps(filename)
161 result = extractor(ts)
165 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
166 return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
169 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
170 return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
173 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
174 return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
177 def get_file_timestamp_timedelta(
178 filename: str, extractor
179 ) -> Optional[datetime.timedelta]:
180 age = get_file_timestamp_age_seconds(filename, extractor)
182 return datetime.timedelta(seconds=float(age))
186 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
187 return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
190 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
191 return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
194 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
195 return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
198 def describe_file_timestamp(
199 filename: str, extractor, *, brief=False
201 from datetime_utils import describe_duration, describe_duration_briefly
202 age = get_file_timestamp_age_seconds(filename, extractor)
206 return describe_duration_briefly(age)
208 return describe_duration(age)
211 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
212 return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
215 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
216 return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
219 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
220 return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
223 def touch_file(filename: str) -> bool:
224 return pathlib.Path(filename).touch()
227 def expand_globs(in_filename: str):
228 for filename in glob.glob(in_filename):
232 def get_files(directory: str):
233 for filename in os.listdir(directory):
234 full_path = join(directory, filename)
235 if isfile(full_path) and exists(full_path):
239 def get_directories(directory: str):
240 for d in os.listdir(directory):
241 full_path = join(directory, d)
242 if not isfile(full_path) and exists(full_path):
246 def get_files_recursive(directory: str):
247 for filename in get_files(directory):
249 for subdir in get_directories(directory):
250 for file_or_directory in get_files_recursive(subdir):
251 yield file_or_directory