Try multiple strategies to determine the console size.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for working with files."""
6
7 import contextlib
8 import datetime
9 import errno
10 import glob
11 import hashlib
12 import logging
13 import os
14 import pathlib
15 import re
16 import time
17 from os.path import exists, isfile, join
18 from typing import Callable, List, Literal, Optional, TextIO
19 from uuid import uuid4
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove_newlines(x: str) -> str:
25     return x.replace('\n', '')
26
27
28 def strip_whitespace(x: str) -> str:
29     return x.strip()
30
31
32 def remove_hash_comments(x: str) -> str:
33     return re.sub(r'#.*$', '', x)
34
35
36 def slurp_file(
37     filename: str,
38     *,
39     skip_blank_lines=False,
40     line_transformers: Optional[List[Callable[[str], str]]] = None,
41 ):
42     ret = []
43     if not file_is_readable(filename):
44         raise Exception(f'{filename} can\'t be read.')
45     with open(filename) as rf:
46         for line in rf:
47             if line_transformers is not None:
48                 for transformation in line_transformers:
49                     line = transformation(line)
50             if skip_blank_lines and line == '':
51                 continue
52             ret.append(line)
53     return ret
54
55
56 def remove(path: str) -> None:
57     """Deletes a file.  Raises if path refers to a directory or a file
58     that doesn't exist.
59
60     >>> import os
61     >>> filename = '/tmp/file_utils_test_file'
62     >>> os.system(f'touch {filename}')
63     0
64     >>> does_file_exist(filename)
65     True
66     >>> remove(filename)
67     >>> does_file_exist(filename)
68     False
69
70     """
71     os.remove(path)
72
73
74 def delete(path: str) -> None:
75     os.remove(path)
76
77
78 def without_extension(path: str) -> str:
79     """Remove one extension from a file or path.
80
81     >>> without_extension('foobar.txt')
82     'foobar'
83
84     >>> without_extension('/home/scott/frapp.py')
85     '/home/scott/frapp'
86
87     >>> without_extension('a.b.c.tar.gz')
88     'a.b.c.tar'
89
90     >>> without_extension('foobar')
91     'foobar'
92
93     """
94     return os.path.splitext(path)[0]
95
96
97 def without_all_extensions(path: str) -> str:
98     """Removes all extensions from a path; handles multiple extensions
99     like foobar.tar.gz -> foobar.
100
101     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
102     '/home/scott/foobar'
103
104     """
105     while '.' in path:
106         path = without_extension(path)
107     return path
108
109
110 def get_extension(path: str) -> str:
111     """Extract and return one extension from a file or path.
112
113     >>> get_extension('this_is_a_test.txt')
114     '.txt'
115
116     >>> get_extension('/home/scott/test.py')
117     '.py'
118
119     >>> get_extension('foobar')
120     ''
121
122     """
123     return os.path.splitext(path)[1]
124
125
126 def get_all_extensions(path: str) -> List[str]:
127     """Return the extensions of a file or path in order.
128
129     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
130     ['.tar', '.gz', '.1']
131
132     """
133     ret = []
134     while True:
135         ext = get_extension(path)
136         path = without_extension(path)
137         if ext:
138             ret.append(ext)
139         else:
140             ret.reverse()
141             return ret
142
143
144 def without_path(filespec: str) -> str:
145     """Returns the base filename without any leading path.
146
147     >>> without_path('/home/scott/foo.py')
148     'foo.py'
149
150     >>> without_path('foo.py')
151     'foo.py'
152
153     """
154     return os.path.split(filespec)[1]
155
156
157 def get_path(filespec: str) -> str:
158     """Returns just the path of the filespec by removing the filename and
159     extension.
160
161     >>> get_path('/home/scott/foobar.py')
162     '/home/scott'
163
164     >>> get_path('~scott/frapp.txt')
165     '~scott'
166
167     """
168     return os.path.split(filespec)[0]
169
170
171 def get_canonical_path(filespec: str) -> str:
172     """Returns a canonicalized absolute path.
173
174     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
175     '/usr/home/scott/foo.txt'
176
177     """
178     return os.path.realpath(filespec)
179
180
181 def create_path_if_not_exist(path, on_error=None):
182     """
183     Attempts to create path if it does not exist. If on_error is
184     specified, it is called with an exception if one occurs, otherwise
185     exception is rethrown.
186
187     >>> import uuid
188     >>> import os
189     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
190     >>> os.path.exists(path)
191     False
192     >>> create_path_if_not_exist(path)
193     >>> os.path.exists(path)
194     True
195     """
196     logger.debug("Creating path %s", path)
197     previous_umask = os.umask(0)
198     try:
199         os.makedirs(path)
200         os.chmod(path, 0o777)
201     except OSError as ex:
202         if ex.errno != errno.EEXIST and not os.path.isdir(path):
203             if on_error is not None:
204                 on_error(path, ex)
205             else:
206                 raise
207     finally:
208         os.umask(previous_umask)
209
210
211 def does_file_exist(filename: str) -> bool:
212     """Returns True if a file exists and is a normal file.
213
214     >>> does_file_exist(__file__)
215     True
216     """
217     return os.path.exists(filename) and os.path.isfile(filename)
218
219
220 def file_is_readable(filename: str) -> bool:
221     return does_file_exist(filename) and os.access(filename, os.R_OK)
222
223
224 def file_is_writable(filename: str) -> bool:
225     return does_file_exist(filename) and os.access(filename, os.W_OK)
226
227
228 def file_is_executable(filename: str) -> bool:
229     return does_file_exist(filename) and os.access(filename, os.X_OK)
230
231
232 def does_directory_exist(dirname: str) -> bool:
233     """Returns True if a file exists and is a directory.
234
235     >>> does_directory_exist('/tmp')
236     True
237     """
238     return os.path.exists(dirname) and os.path.isdir(dirname)
239
240
241 def does_path_exist(pathname: str) -> bool:
242     """Just a more verbose wrapper around os.path.exists."""
243     return os.path.exists(pathname)
244
245
246 def get_file_size(filename: str) -> int:
247     """Returns the size of a file in bytes."""
248     return os.path.getsize(filename)
249
250
251 def is_normal_file(filename: str) -> bool:
252     """Returns True if filename is a normal file.
253
254     >>> is_normal_file(__file__)
255     True
256     """
257     return os.path.isfile(filename)
258
259
260 def is_directory(filename: str) -> bool:
261     """Returns True if filename is a directory.
262
263     >>> is_directory('/tmp')
264     True
265     """
266     return os.path.isdir(filename)
267
268
269 def is_symlink(filename: str) -> bool:
270     """True if filename is a symlink, False otherwise.
271
272     >>> is_symlink('/tmp')
273     False
274
275     >>> is_symlink('/home')
276     True
277
278     """
279     return os.path.islink(filename)
280
281
282 def is_same_file(file1: str, file2: str) -> bool:
283     """Returns True if the two files are the same inode.
284
285     >>> is_same_file('/tmp', '/tmp/../tmp')
286     True
287
288     >>> is_same_file('/tmp', '/home')
289     False
290
291     """
292     return os.path.samefile(file1, file2)
293
294
295 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
296     """Stats the file and returns an os.stat_result or None on error."""
297     try:
298         return os.stat(filename)
299     except Exception as e:
300         logger.exception(e)
301         return None
302
303
304 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
305     tss = get_file_raw_timestamps(filename)
306     if tss is not None:
307         return extractor(tss)
308     return None
309
310
311 def get_file_raw_atime(filename: str) -> Optional[float]:
312     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
313
314
315 def get_file_raw_mtime(filename: str) -> Optional[float]:
316     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
317
318
319 def get_file_raw_ctime(filename: str) -> Optional[float]:
320     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
321
322
323 def get_file_md5(filename: str) -> str:
324     """Hashes filename's contents and returns an MD5."""
325     file_hash = hashlib.md5()
326     with open(filename, "rb") as f:
327         chunk = f.read(8192)
328         while chunk:
329             file_hash.update(chunk)
330             chunk = f.read(8192)
331     return file_hash.hexdigest()
332
333
334 def set_file_raw_atime(filename: str, atime: float):
335     mtime = get_file_raw_mtime(filename)
336     assert mtime is not None
337     os.utime(filename, (atime, mtime))
338
339
340 def set_file_raw_mtime(filename: str, mtime: float):
341     atime = get_file_raw_atime(filename)
342     assert atime is not None
343     os.utime(filename, (atime, mtime))
344
345
346 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
347     if ts is not None:
348         os.utime(filename, (ts, ts))
349     else:
350         os.utime(filename, None)
351
352
353 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
354     ts = producer(filename)
355     if ts is not None:
356         return datetime.datetime.fromtimestamp(ts)
357     return None
358
359
360 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
361     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
362
363
364 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
365     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
366
367
368 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
369     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
370
371
372 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
373     now = time.time()
374     ts = get_file_raw_timestamps(filename)
375     if ts is None:
376         return None
377     result = extractor(ts)
378     return now - result
379
380
381 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
382     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
383
384
385 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
386     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
387
388
389 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
390     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
391
392
393 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
394     age = get_file_timestamp_age_seconds(filename, extractor)
395     if age is not None:
396         return datetime.timedelta(seconds=float(age))
397     return None
398
399
400 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
401     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
402
403
404 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
405     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
406
407
408 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
409     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
410
411
412 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
413     from datetime_utils import describe_duration, describe_duration_briefly
414
415     age = get_file_timestamp_age_seconds(filename, extractor)
416     if age is None:
417         return None
418     if brief:
419         return describe_duration_briefly(age)
420     else:
421         return describe_duration(age)
422
423
424 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
425     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
426
427
428 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
429     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
430
431
432 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
433     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
434
435
436 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
437     pathlib.Path(filename, mode=mode).touch()
438
439
440 def expand_globs(in_filename: str):
441     for filename in glob.glob(in_filename):
442         yield filename
443
444
445 def get_files(directory: str):
446     for filename in os.listdir(directory):
447         full_path = join(directory, filename)
448         if isfile(full_path) and exists(full_path):
449             yield full_path
450
451
452 def get_directories(directory: str):
453     for d in os.listdir(directory):
454         full_path = join(directory, d)
455         if not isfile(full_path) and exists(full_path):
456             yield full_path
457
458
459 def get_files_recursive(directory: str):
460     for filename in get_files(directory):
461         yield filename
462     for subdir in get_directories(directory):
463         for file_or_directory in get_files_recursive(subdir):
464             yield file_or_directory
465
466
467 class FileWriter(contextlib.AbstractContextManager):
468     """A helper that writes a file to a temporary location and then moves
469     it atomically to its ultimate destination on close.
470
471     """
472
473     def __init__(self, filename: str) -> None:
474         self.filename = filename
475         uuid = uuid4()
476         self.tempfile = f'{filename}-{uuid}.tmp'
477         self.handle: Optional[TextIO] = None
478
479     def __enter__(self) -> TextIO:
480         assert not does_path_exist(self.tempfile)
481         self.handle = open(self.tempfile, mode="w")
482         return self.handle
483
484     def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
485         if self.handle is not None:
486             self.handle.close()
487             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
488             ret = os.system(cmd)
489             if (ret >> 8) != 0:
490                 raise Exception(f'{cmd} failed, exit value {ret>>8}!')
491         return False
492
493
494 if __name__ == '__main__':
495     import doctest
496
497     doctest.testmod()