67e6f561f394e1a99c4ace7bb2f6ddbe733ea011
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import time
13 from typing import Optional
14 import glob
15 from os.path import isfile, join, exists
16 from uuid import uuid4
17
18
19 logger = logging.getLogger(__name__)
20
21
22 # os.remove(file) you fuckwit.
23
24
25 def create_path_if_not_exist(path, on_error=None):
26     """
27     Attempts to create path if it does not exist. If on_error is
28     specified, it is called with an exception if one occurs, otherwise
29     exception is rethrown.
30
31     >>> import uuid
32     >>> import os
33     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
34     >>> os.path.exists(path)
35     False
36     >>> create_path_if_not_exist(path)
37     >>> os.path.exists(path)
38     True
39     """
40     logger.debug(f"Creating path {path}")
41     previous_umask = os.umask(0)
42     try:
43         os.makedirs(path)
44         os.chmod(path, 0o777)
45     except OSError as ex:
46         if ex.errno != errno.EEXIST and not os.path.isdir(path):
47             if on_error is not None:
48                 on_error(path, ex)
49             else:
50                 raise
51     finally:
52         os.umask(previous_umask)
53
54
55 def does_file_exist(filename: str) -> bool:
56     """Returns True if a file exists and is a normal file.
57
58     >>> does_file_exist(__file__)
59     True
60     """
61     return os.path.exists(filename) and os.path.isfile(filename)
62
63
64 def does_directory_exist(dirname: str) -> bool:
65     """Returns True if a file exists and is a directory.
66
67     >>> does_directory_exist('/tmp')
68     True
69     """
70     return os.path.exists(dirname) and os.path.isdir(dirname)
71
72
73 def does_path_exist(pathname: str) -> bool:
74     """Just a more verbose wrapper around os.path.exists."""
75     return os.path.exists(pathname)
76
77
78 def get_file_size(filename: str) -> int:
79     """Returns the size of a file in bytes."""
80     return os.path.getsize(filename)
81
82
83 def is_normal_file(filename: str) -> bool:
84     """Returns True if filename is a normal file.
85
86     >>> is_normal_file(__file__)
87     True
88     """
89     return os.path.isfile(filename)
90
91
92 def is_directory(filename: str) -> bool:
93     """Returns True if filename is a directory.
94
95     >>> is_directory('/tmp')
96     True
97     """
98     return os.path.isdir(filename)
99
100
101 def is_symlink(filename: str) -> bool:
102     return os.path.islink(filename)
103
104
105 def is_same_file(file1: str, file2: str) -> bool:
106     return os.path.samefile(file1, file2)
107
108
109 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
110     try:
111         return os.stat(filename)
112     except Exception as e:
113         logger.exception(e)
114         return None
115
116
117 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
118     tss = get_file_raw_timestamps(filename)
119     if tss is not None:
120         return extractor(tss)
121     return None
122
123
124 def get_file_raw_atime(filename: str) -> Optional[float]:
125     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
126
127
128 def get_file_raw_mtime(filename: str) -> Optional[float]:
129     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
130
131
132 def get_file_raw_ctime(filename: str) -> Optional[float]:
133     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
134
135
136 def get_file_md5(filename: str) -> str:
137     file_hash = hashlib.md5()
138     with open(filename, "rb") as f:
139         chunk = f.read(8192)
140         while chunk:
141             file_hash.update(chunk)
142             chunk = f.read(8192)
143     return file_hash.hexdigest()
144
145
146 def set_file_raw_atime(filename: str, atime: float):
147     mtime = get_file_raw_mtime(filename)
148     os.utime(filename, (atime, mtime))
149
150
151 def set_file_raw_mtime(filename: str, mtime: float):
152     atime = get_file_raw_atime(filename)
153     os.utime(filename, (atime, mtime))
154
155
156 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
157     if ts is not None:
158         os.utime(filename, (ts, ts))
159     else:
160         os.utime(filename, None)
161
162
163 def convert_file_timestamp_to_datetime(
164     filename: str, producer
165 ) -> Optional[datetime.datetime]:
166     ts = producer(filename)
167     if ts is not None:
168         return datetime.datetime.fromtimestamp(ts)
169     return None
170
171
172 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
173     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
174
175
176 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
177     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
178
179
180 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
181     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
182
183
184 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
185     now = time.time()
186     ts = get_file_raw_timestamps(filename)
187     if ts is None:
188         return None
189     result = extractor(ts)
190     return now - result
191
192
193 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
194     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
195
196
197 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
198     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
199
200
201 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
202     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
203
204
205 def get_file_timestamp_timedelta(
206     filename: str, extractor
207 ) -> Optional[datetime.timedelta]:
208     age = get_file_timestamp_age_seconds(filename, extractor)
209     if age is not None:
210         return datetime.timedelta(seconds=float(age))
211     return None
212
213
214 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
215     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
216
217
218 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
219     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
220
221
222 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
223     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
224
225
226 def describe_file_timestamp(
227     filename: str, extractor, *, brief=False
228 ) -> Optional[str]:
229     from datetime_utils import describe_duration, describe_duration_briefly
230     age = get_file_timestamp_age_seconds(filename, extractor)
231     if age is None:
232         return None
233     if brief:
234         return describe_duration_briefly(age)
235     else:
236         return describe_duration(age)
237
238
239 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
240     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
241
242
243 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
244     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
245
246
247 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
248     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
249
250
251 def touch_file(filename: str) -> bool:
252     return pathlib.Path(filename).touch()
253
254
255 def expand_globs(in_filename: str):
256     for filename in glob.glob(in_filename):
257         yield filename
258
259
260 def get_files(directory: str):
261     for filename in os.listdir(directory):
262         full_path = join(directory, filename)
263         if isfile(full_path) and exists(full_path):
264             yield full_path
265
266
267 def get_directories(directory: str):
268     for d in os.listdir(directory):
269         full_path = join(directory, d)
270         if not isfile(full_path) and exists(full_path):
271             yield full_path
272
273
274 def get_files_recursive(directory: str):
275     for filename in get_files(directory):
276         yield filename
277     for subdir in get_directories(directory):
278         for file_or_directory in get_files_recursive(subdir):
279             yield file_or_directory
280
281
282 class FileWriter(object):
283     def __init__(self, filename: str) -> None:
284         self.filename = filename
285         uuid = uuid4()
286         self.tempfile = f'{filename}-{uuid}.tmp'
287         self.handle = None
288
289     def __enter__(self) -> io.TextIOWrapper:
290         assert not does_path_exist(self.tempfile)
291         self.handle = open(self.tempfile, mode="w")
292         return self.handle
293
294     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
295         if self.handle is not None:
296             self.handle.close()
297             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
298             ret = os.system(cmd)
299             if (ret >> 8) != 0:
300                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
301         return None
302
303
304 if __name__ == '__main__':
305     import doctest
306     doctest.testmod()