Cleanup more contextlib.AbstractContextManagers and Literal[False]s.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import contextlib
6 import datetime
7 import errno
8 import glob
9 import hashlib
10 import logging
11 import os
12 import pathlib
13 import re
14 import time
15 from os.path import exists, isfile, join
16 from typing import Callable, List, Literal, Optional, TextIO
17 from uuid import uuid4
18
19 logger = logging.getLogger(__name__)
20
21
22 def remove_newlines(x: str) -> str:
23     return x.replace('\n', '')
24
25
26 def strip_whitespace(x: str) -> str:
27     return x.strip()
28
29
30 def remove_hash_comments(x: str) -> str:
31     return re.sub(r'#.*$', '', x)
32
33
34 def slurp_file(
35     filename: str,
36     *,
37     skip_blank_lines=False,
38     line_transformers: Optional[List[Callable[[str], str]]] = None,
39 ):
40     ret = []
41     if not file_is_readable(filename):
42         raise Exception(f'{filename} can\'t be read.')
43     with open(filename) as rf:
44         for line in rf:
45             if line_transformers is not None:
46                 for transformation in line_transformers:
47                     line = transformation(line)
48             if skip_blank_lines and line == '':
49                 continue
50             ret.append(line)
51     return ret
52
53
54 def remove(path: str) -> None:
55     """Deletes a file.  Raises if path refers to a directory or a file
56     that doesn't exist.
57
58     >>> import os
59     >>> filename = '/tmp/file_utils_test_file'
60     >>> os.system(f'touch {filename}')
61     0
62     >>> does_file_exist(filename)
63     True
64     >>> remove(filename)
65     >>> does_file_exist(filename)
66     False
67
68     """
69     os.remove(path)
70
71
72 def delete(path: str) -> None:
73     os.remove(path)
74
75
76 def without_extension(path: str) -> str:
77     """Remove one extension from a file or path.
78
79     >>> without_extension('foobar.txt')
80     'foobar'
81
82     >>> without_extension('/home/scott/frapp.py')
83     '/home/scott/frapp'
84
85     >>> without_extension('a.b.c.tar.gz')
86     'a.b.c.tar'
87
88     >>> without_extension('foobar')
89     'foobar'
90
91     """
92     return os.path.splitext(path)[0]
93
94
95 def without_all_extensions(path: str) -> str:
96     """Removes all extensions from a path; handles multiple extensions
97     like foobar.tar.gz -> foobar.
98
99     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
100     '/home/scott/foobar'
101
102     """
103     while '.' in path:
104         path = without_extension(path)
105     return path
106
107
108 def get_extension(path: str) -> str:
109     """Extract and return one extension from a file or path.
110
111     >>> get_extension('this_is_a_test.txt')
112     '.txt'
113
114     >>> get_extension('/home/scott/test.py')
115     '.py'
116
117     >>> get_extension('foobar')
118     ''
119
120     """
121     return os.path.splitext(path)[1]
122
123
124 def get_all_extensions(path: str) -> List[str]:
125     """Return the extensions of a file or path in order.
126
127     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
128     ['.tar', '.gz', '.1']
129
130     """
131     ret = []
132     while True:
133         ext = get_extension(path)
134         path = without_extension(path)
135         if ext:
136             ret.append(ext)
137         else:
138             ret.reverse()
139             return ret
140
141
142 def without_path(filespec: str) -> str:
143     """Returns the base filename without any leading path.
144
145     >>> without_path('/home/scott/foo.py')
146     'foo.py'
147
148     >>> without_path('foo.py')
149     'foo.py'
150
151     """
152     return os.path.split(filespec)[1]
153
154
155 def get_path(filespec: str) -> str:
156     """Returns just the path of the filespec by removing the filename and
157     extension.
158
159     >>> get_path('/home/scott/foobar.py')
160     '/home/scott'
161
162     >>> get_path('~scott/frapp.txt')
163     '~scott'
164
165     """
166     return os.path.split(filespec)[0]
167
168
169 def get_canonical_path(filespec: str) -> str:
170     """Returns a canonicalized absolute path.
171
172     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
173     '/usr/home/scott/foo.txt'
174
175     """
176     return os.path.realpath(filespec)
177
178
179 def create_path_if_not_exist(path, on_error=None):
180     """
181     Attempts to create path if it does not exist. If on_error is
182     specified, it is called with an exception if one occurs, otherwise
183     exception is rethrown.
184
185     >>> import uuid
186     >>> import os
187     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
188     >>> os.path.exists(path)
189     False
190     >>> create_path_if_not_exist(path)
191     >>> os.path.exists(path)
192     True
193     """
194     logger.debug("Creating path %s", path)
195     previous_umask = os.umask(0)
196     try:
197         os.makedirs(path)
198         os.chmod(path, 0o777)
199     except OSError as ex:
200         if ex.errno != errno.EEXIST and not os.path.isdir(path):
201             if on_error is not None:
202                 on_error(path, ex)
203             else:
204                 raise
205     finally:
206         os.umask(previous_umask)
207
208
209 def does_file_exist(filename: str) -> bool:
210     """Returns True if a file exists and is a normal file.
211
212     >>> does_file_exist(__file__)
213     True
214     """
215     return os.path.exists(filename) and os.path.isfile(filename)
216
217
218 def file_is_readable(filename: str) -> bool:
219     return does_file_exist(filename) and os.access(filename, os.R_OK)
220
221
222 def file_is_writable(filename: str) -> bool:
223     return does_file_exist(filename) and os.access(filename, os.W_OK)
224
225
226 def file_is_executable(filename: str) -> bool:
227     return does_file_exist(filename) and os.access(filename, os.X_OK)
228
229
230 def does_directory_exist(dirname: str) -> bool:
231     """Returns True if a file exists and is a directory.
232
233     >>> does_directory_exist('/tmp')
234     True
235     """
236     return os.path.exists(dirname) and os.path.isdir(dirname)
237
238
239 def does_path_exist(pathname: str) -> bool:
240     """Just a more verbose wrapper around os.path.exists."""
241     return os.path.exists(pathname)
242
243
244 def get_file_size(filename: str) -> int:
245     """Returns the size of a file in bytes."""
246     return os.path.getsize(filename)
247
248
249 def is_normal_file(filename: str) -> bool:
250     """Returns True if filename is a normal file.
251
252     >>> is_normal_file(__file__)
253     True
254     """
255     return os.path.isfile(filename)
256
257
258 def is_directory(filename: str) -> bool:
259     """Returns True if filename is a directory.
260
261     >>> is_directory('/tmp')
262     True
263     """
264     return os.path.isdir(filename)
265
266
267 def is_symlink(filename: str) -> bool:
268     """True if filename is a symlink, False otherwise.
269
270     >>> is_symlink('/tmp')
271     False
272
273     >>> is_symlink('/home')
274     True
275
276     """
277     return os.path.islink(filename)
278
279
280 def is_same_file(file1: str, file2: str) -> bool:
281     """Returns True if the two files are the same inode.
282
283     >>> is_same_file('/tmp', '/tmp/../tmp')
284     True
285
286     >>> is_same_file('/tmp', '/home')
287     False
288
289     """
290     return os.path.samefile(file1, file2)
291
292
293 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
294     """Stats the file and returns an os.stat_result or None on error."""
295     try:
296         return os.stat(filename)
297     except Exception as e:
298         logger.exception(e)
299         return None
300
301
302 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
303     tss = get_file_raw_timestamps(filename)
304     if tss is not None:
305         return extractor(tss)
306     return None
307
308
309 def get_file_raw_atime(filename: str) -> Optional[float]:
310     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
311
312
313 def get_file_raw_mtime(filename: str) -> Optional[float]:
314     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
315
316
317 def get_file_raw_ctime(filename: str) -> Optional[float]:
318     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
319
320
321 def get_file_md5(filename: str) -> str:
322     """Hashes filename's contents and returns an MD5."""
323     file_hash = hashlib.md5()
324     with open(filename, "rb") as f:
325         chunk = f.read(8192)
326         while chunk:
327             file_hash.update(chunk)
328             chunk = f.read(8192)
329     return file_hash.hexdigest()
330
331
332 def set_file_raw_atime(filename: str, atime: float):
333     mtime = get_file_raw_mtime(filename)
334     assert mtime is not None
335     os.utime(filename, (atime, mtime))
336
337
338 def set_file_raw_mtime(filename: str, mtime: float):
339     atime = get_file_raw_atime(filename)
340     assert atime is not None
341     os.utime(filename, (atime, mtime))
342
343
344 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
345     if ts is not None:
346         os.utime(filename, (ts, ts))
347     else:
348         os.utime(filename, None)
349
350
351 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
352     ts = producer(filename)
353     if ts is not None:
354         return datetime.datetime.fromtimestamp(ts)
355     return None
356
357
358 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
359     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
360
361
362 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
363     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
364
365
366 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
367     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
368
369
370 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
371     now = time.time()
372     ts = get_file_raw_timestamps(filename)
373     if ts is None:
374         return None
375     result = extractor(ts)
376     return now - result
377
378
379 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
380     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
381
382
383 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
384     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
385
386
387 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
388     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
389
390
391 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
392     age = get_file_timestamp_age_seconds(filename, extractor)
393     if age is not None:
394         return datetime.timedelta(seconds=float(age))
395     return None
396
397
398 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
399     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
400
401
402 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
403     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
404
405
406 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
407     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
408
409
410 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
411     from datetime_utils import describe_duration, describe_duration_briefly
412
413     age = get_file_timestamp_age_seconds(filename, extractor)
414     if age is None:
415         return None
416     if brief:
417         return describe_duration_briefly(age)
418     else:
419         return describe_duration(age)
420
421
422 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
423     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
424
425
426 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
427     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
428
429
430 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
431     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
432
433
434 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
435     pathlib.Path(filename, mode=mode).touch()
436
437
438 def expand_globs(in_filename: str):
439     for filename in glob.glob(in_filename):
440         yield filename
441
442
443 def get_files(directory: str):
444     for filename in os.listdir(directory):
445         full_path = join(directory, filename)
446         if isfile(full_path) and exists(full_path):
447             yield full_path
448
449
450 def get_directories(directory: str):
451     for d in os.listdir(directory):
452         full_path = join(directory, d)
453         if not isfile(full_path) and exists(full_path):
454             yield full_path
455
456
457 def get_files_recursive(directory: str):
458     for filename in get_files(directory):
459         yield filename
460     for subdir in get_directories(directory):
461         for file_or_directory in get_files_recursive(subdir):
462             yield file_or_directory
463
464
465 class FileWriter(contextlib.AbstractContextManager):
466     """A helper that writes a file to a temporary location and then moves
467     it atomically to its ultimate destination on close.
468
469     """
470
471     def __init__(self, filename: str) -> None:
472         self.filename = filename
473         uuid = uuid4()
474         self.tempfile = f'{filename}-{uuid}.tmp'
475         self.handle: Optional[TextIO] = None
476
477     def __enter__(self) -> TextIO:
478         assert not does_path_exist(self.tempfile)
479         self.handle = open(self.tempfile, mode="w")
480         return self.handle
481
482     def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
483         if self.handle is not None:
484             self.handle.close()
485             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
486             ret = os.system(cmd)
487             if (ret >> 8) != 0:
488                 raise Exception(f'{cmd} failed, exit value {ret>>8}!')
489         return False
490
491
492 if __name__ == '__main__':
493     import doctest
494
495     doctest.testmod()