Improve docs in ANSI.py
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for working with files."""
6
7 import contextlib
8 import datetime
9 import errno
10 import glob
11 import hashlib
12 import logging
13 import os
14 import pathlib
15 import re
16 import time
17 from os.path import exists, isfile, join
18 from typing import Callable, List, Literal, Optional, TextIO
19 from uuid import uuid4
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove_newlines(x: str) -> str:
25     """Trivial function to be used as a line_transformer in
26     :meth:`slurp_file` for no newlines in file contents"""
27     return x.replace('\n', '')
28
29
30 def strip_whitespace(x: str) -> str:
31     """Trivial function to be used as a line_transformer in
32     :meth:`slurp_file` for no leading / trailing whitespace in
33     file contents"""
34     return x.strip()
35
36
37 def remove_hash_comments(x: str) -> str:
38     """Trivial function to be used as a line_transformer in
39     :meth:`slurp_file` for no # comments in file contents"""
40     return re.sub(r'#.*$', '', x)
41
42
43 def slurp_file(
44     filename: str,
45     *,
46     skip_blank_lines=False,
47     line_transformers: Optional[List[Callable[[str], str]]] = None,
48 ):
49     """Reads in a file's contents line-by-line to a memory buffer applying
50     each line transformation in turn.
51
52     Args:
53         filename: file to be read
54         skip_blank_lines: should reading skip blank lines?
55         line_transformers: little string->string transformations
56     """
57
58     ret = []
59     xforms = []
60     if line_transformers is not None:
61         for x in line_transformers:
62             xforms.append(x)
63     if not file_is_readable(filename):
64         raise Exception(f'{filename} can\'t be read.')
65     with open(filename) as rf:
66         for line in rf:
67             for transformation in xforms:
68                 line = transformation(line)
69             if skip_blank_lines and line == '':
70                 continue
71             ret.append(line)
72     return ret
73
74
75 def remove(path: str) -> None:
76     """Deletes a file.  Raises if path refers to a directory or a file
77     that doesn't exist.
78
79     Args:
80         path: the path of the file to delete
81
82     >>> import os
83     >>> filename = '/tmp/file_utils_test_file'
84     >>> os.system(f'touch {filename}')
85     0
86     >>> does_file_exist(filename)
87     True
88     >>> remove(filename)
89     >>> does_file_exist(filename)
90     False
91     """
92     os.remove(path)
93
94
95 def delete(path: str) -> None:
96     """This is a convenience for my dumb ass who can't remember os.remove
97     sometimes.
98     """
99     os.remove(path)
100
101
102 def without_extension(path: str) -> str:
103     """Remove one (the last) extension from a file or path.
104
105     Args:
106         path: the path from which to remove an extension
107
108     Returns:
109         the path with one extension removed.
110
111     >>> without_extension('foobar.txt')
112     'foobar'
113
114     >>> without_extension('/home/scott/frapp.py')
115     '/home/scott/frapp'
116
117     >>> f = 'a.b.c.tar.gz'
118     >>> while('.' in f):
119     ...     f = without_extension(f)
120     ...     print(f)
121     a.b.c.tar
122     a.b.c
123     a.b
124     a
125
126     >>> without_extension('foobar')
127     'foobar'
128
129     """
130     return os.path.splitext(path)[0]
131
132
133 def without_all_extensions(path: str) -> str:
134     """Removes all extensions from a path; handles multiple extensions
135     like foobar.tar.gz -> foobar.
136
137     Args:
138         path: the path from which to remove all extensions
139
140     Returns:
141         the path with all extensions removed.
142
143     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
144     '/home/scott/foobar'
145
146     """
147     while '.' in path:
148         path = without_extension(path)
149     return path
150
151
152 def get_extension(path: str) -> str:
153     """Extract and return one (the last) extension from a file or path.
154
155     Args:
156         path: the path from which to extract an extension
157
158     Returns:
159         The last extension from the file path.
160
161     >>> get_extension('this_is_a_test.txt')
162     '.txt'
163
164     >>> get_extension('/home/scott/test.py')
165     '.py'
166
167     >>> get_extension('foobar')
168     ''
169
170     """
171     return os.path.splitext(path)[1]
172
173
174 def get_all_extensions(path: str) -> List[str]:
175     """Return the extensions of a file or path in order.
176
177     Args:
178         path: the path from which to extract all extensions.
179
180     Returns:
181         a list containing each extension which may be empty.
182
183     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
184     ['.tar', '.gz', '.1']
185
186     >>> get_all_extensions('/home/scott/foobar')
187     []
188
189     """
190     ret = []
191     while True:
192         ext = get_extension(path)
193         path = without_extension(path)
194         if ext:
195             ret.append(ext)
196         else:
197             ret.reverse()
198             return ret
199
200
201 def without_path(filespec: str) -> str:
202     """Returns the base filename without any leading path.
203
204     Args:
205         filespec: path to remove leading directories from
206
207     Returns:
208         filespec without leading dir components.
209
210     >>> without_path('/home/scott/foo.py')
211     'foo.py'
212
213     >>> without_path('foo.py')
214     'foo.py'
215
216     """
217     return os.path.split(filespec)[1]
218
219
220 def get_path(filespec: str) -> str:
221     """Returns just the path of the filespec by removing the filename and
222     extension.
223
224     Args:
225         filespec: path to remove filename / extension(s) from
226
227     Returns:
228         filespec with just the leading directory components and no
229             filename or extension(s)
230
231     >>> get_path('/home/scott/foobar.py')
232     '/home/scott'
233
234     >>> get_path('/home/scott/test.1.2.3.gz')
235     '/home/scott'
236
237     >>> get_path('~scott/frapp.txt')
238     '~scott'
239
240     """
241     return os.path.split(filespec)[0]
242
243
244 def get_canonical_path(filespec: str) -> str:
245     """Returns a canonicalized absolute path.
246
247     Args:
248         filespec: the path to canonicalize
249
250     Returns:
251         the canonicalized path
252
253     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
254     '/usr/home/scott/foo.txt'
255
256     """
257     return os.path.realpath(filespec)
258
259
260 def create_path_if_not_exist(path, on_error=None) -> None:
261     """
262     Attempts to create path if it does not exist already.
263
264     .. warning::
265
266         Files are created with mode 0x0777 (i.e. world read/writeable).
267
268     Args:
269         path: the path to attempt to create
270         on_error: If True, it's invoked on error conditions.  Otherwise
271             any exceptions are raised.
272
273     >>> import uuid
274     >>> import os
275     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
276     >>> os.path.exists(path)
277     False
278     >>> create_path_if_not_exist(path)
279     >>> os.path.exists(path)
280     True
281     """
282     logger.debug("Creating path %s", path)
283     previous_umask = os.umask(0)
284     try:
285         os.makedirs(path)
286         os.chmod(path, 0o777)
287     except OSError as ex:
288         if ex.errno != errno.EEXIST and not os.path.isdir(path):
289             if on_error is not None:
290                 on_error(path, ex)
291             else:
292                 raise
293     finally:
294         os.umask(previous_umask)
295
296
297 def does_file_exist(filename: str) -> bool:
298     """Returns True if a file exists and is a normal file.
299
300     Args:
301         filename: filename to check
302
303     Returns:
304         True if filename exists and is a normal file.
305
306     >>> does_file_exist(__file__)
307     True
308     >>> does_file_exist('/tmp/2492043r9203r9230r9230r49230r42390r4230')
309     False
310     """
311     return os.path.exists(filename) and os.path.isfile(filename)
312
313
314 def file_is_readable(filename: str) -> bool:
315     """True if file exists, is a normal file and is readable by the
316     current process.  False otherwise.
317
318     Args:
319         filename: the filename to check for read access
320     """
321     return does_file_exist(filename) and os.access(filename, os.R_OK)
322
323
324 def file_is_writable(filename: str) -> bool:
325     """True if file exists, is a normal file and is writable by the
326     current process.  False otherwise.
327
328     Args:
329         filename: the file to check for write access.
330     """
331     return does_file_exist(filename) and os.access(filename, os.W_OK)
332
333
334 def file_is_executable(filename: str) -> bool:
335     """True if file exists, is a normal file and is executable by the
336     current process.  False otherwise.
337
338     Args:
339         filename: the file to check for execute access.
340     """
341     return does_file_exist(filename) and os.access(filename, os.X_OK)
342
343
344 def does_directory_exist(dirname: str) -> bool:
345     """Returns True if a file exists and is a directory.
346
347     >>> does_directory_exist('/tmp')
348     True
349     >>> does_directory_exist('/xyzq/21341')
350     False
351     """
352     return os.path.exists(dirname) and os.path.isdir(dirname)
353
354
355 def does_path_exist(pathname: str) -> bool:
356     """Just a more verbose wrapper around os.path.exists."""
357     return os.path.exists(pathname)
358
359
360 def get_file_size(filename: str) -> int:
361     """Returns the size of a file in bytes.
362
363     Args:
364         filename: the filename to size
365
366     Returns:
367         size of filename in bytes
368     """
369     return os.path.getsize(filename)
370
371
372 def is_normal_file(filename: str) -> bool:
373     """Returns True if filename is a normal file.
374
375     >>> is_normal_file(__file__)
376     True
377     """
378     return os.path.isfile(filename)
379
380
381 def is_directory(filename: str) -> bool:
382     """Returns True if filename is a directory.
383
384     >>> is_directory('/tmp')
385     True
386     """
387     return os.path.isdir(filename)
388
389
390 def is_symlink(filename: str) -> bool:
391     """True if filename is a symlink, False otherwise.
392
393     >>> is_symlink('/tmp')
394     False
395
396     >>> is_symlink('/home')
397     True
398
399     """
400     return os.path.islink(filename)
401
402
403 def is_same_file(file1: str, file2: str) -> bool:
404     """Returns True if the two files are the same inode.
405
406     >>> is_same_file('/tmp', '/tmp/../tmp')
407     True
408
409     >>> is_same_file('/tmp', '/home')
410     False
411
412     """
413     return os.path.samefile(file1, file2)
414
415
416 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
417     """Stats the file and returns an os.stat_result or None on error.
418
419     Args:
420         filename: the file whose timestamps to fetch
421
422     Returns:
423         the os.stat_result or None to indicate an error occurred
424     """
425     try:
426         return os.stat(filename)
427     except Exception as e:
428         logger.exception(e)
429         return None
430
431
432 def get_file_raw_timestamp(
433     filename: str, extractor: Callable[[os.stat_result], Optional[float]]
434 ) -> Optional[float]:
435     """Stat a file and, if successful, use extractor to fetch some
436     subset of the information in the os.stat_result.  See also
437     :meth:`get_file_raw_atime`, :meth:`get_file_raw_mtime`, and
438     :meth:`get_file_raw_ctime` which just call this with a lambda
439     extractor.
440
441     Args:
442         filename: the filename to stat
443         extractor: Callable that takes a os.stat_result and produces
444             something useful(?) with it.
445
446     Returns:
447         whatever the extractor produced or None on error.
448     """
449     tss = get_file_raw_timestamps(filename)
450     if tss is not None:
451         return extractor(tss)
452     return None
453
454
455 def get_file_raw_atime(filename: str) -> Optional[float]:
456     """Get a file's raw access time or None on error.
457
458     See also :meth:`get_file_atime_as_datetime`,
459     :meth:`get_file_atime_timedelta`,
460     and :meth:`get_file_atime_age_seconds`.
461     """
462     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
463
464
465 def get_file_raw_mtime(filename: str) -> Optional[float]:
466     """Get a file's raw modification time or None on error.
467
468     See also :meth:`get_file_mtime_as_datetime`,
469     :meth:`get_file_mtime_timedelta`,
470     and :meth:`get_file_mtime_age_seconds`.
471     """
472     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
473
474
475 def get_file_raw_ctime(filename: str) -> Optional[float]:
476     """Get a file's raw creation time or None on error.
477
478     See also :meth:`get_file_ctime_as_datetime`,
479     :meth:`get_file_ctime_timedelta`,
480     and :meth:`get_file_ctime_age_seconds`.
481     """
482     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
483
484
485 def get_file_md5(filename: str) -> str:
486     """Hashes filename's disk contents and returns the MD5 digest.
487
488     Args:
489         filename: the file whose contents to hash
490
491     Returns:
492         the MD5 digest of the file's contents.  Raises on errors.
493     """
494     file_hash = hashlib.md5()
495     with open(filename, "rb") as f:
496         chunk = f.read(8192)
497         while chunk:
498             file_hash.update(chunk)
499             chunk = f.read(8192)
500     return file_hash.hexdigest()
501
502
503 def set_file_raw_atime(filename: str, atime: float):
504     """Sets a file's raw access time.
505
506     See also :meth:`get_file_atime_as_datetime`,
507     :meth:`get_file_atime_timedelta`,
508     :meth:`get_file_atime_age_seconds`,
509     and :meth:`get_file_raw_atime`.
510     """
511     mtime = get_file_raw_mtime(filename)
512     assert mtime is not None
513     os.utime(filename, (atime, mtime))
514
515
516 def set_file_raw_mtime(filename: str, mtime: float):
517     """Sets a file's raw modification time.
518
519     See also :meth:`get_file_mtime_as_datetime`,
520     :meth:`get_file_mtime_timedelta`,
521     :meth:`get_file_mtime_age_seconds`,
522     and :meth:`get_file_raw_mtime`.
523     """
524     atime = get_file_raw_atime(filename)
525     assert atime is not None
526     os.utime(filename, (atime, mtime))
527
528
529 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
530     """Sets both a file's raw modification and access times
531
532     Args:
533         filename: the file whose times to set
534         ts: the raw time to set or None to indicate time should be
535             set to the current time.
536     """
537     if ts is not None:
538         os.utime(filename, (ts, ts))
539     else:
540         os.utime(filename, None)
541
542
543 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
544     """Convert a raw file timestamp into a python datetime."""
545     ts = producer(filename)
546     if ts is not None:
547         return datetime.datetime.fromtimestamp(ts)
548     return None
549
550
551 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
552     """Fetch a file's access time as a python datetime.
553
554     See also :meth:`get_file_atime_as_datetime`,
555     :meth:`get_file_atime_timedelta`,
556     :meth:`get_file_atime_age_seconds`,
557     :meth:`describe_file_atime`,
558     and :meth:`get_file_raw_atime`.
559     """
560     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
561
562
563 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
564     """Fetches a file's modification time as a python datetime.
565
566     See also :meth:`get_file_mtime_as_datetime`,
567     :meth:`get_file_mtime_timedelta`,
568     :meth:`get_file_mtime_age_seconds`,
569     and :meth:`get_file_raw_mtime`.
570     """
571     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
572
573
574 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
575     """Fetches a file's creation time as a python datetime.
576
577     See also :meth:`get_file_ctime_as_datetime`,
578     :meth:`get_file_ctime_timedelta`,
579     :meth:`get_file_ctime_age_seconds`,
580     and :meth:`get_file_raw_ctime`.
581     """
582     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
583
584
585 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
586     """~Internal helper"""
587     now = time.time()
588     ts = get_file_raw_timestamps(filename)
589     if ts is None:
590         return None
591     result = extractor(ts)
592     return now - result
593
594
595 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
596     """Gets a file's access time as an age in seconds (ago).
597
598     See also :meth:`get_file_atime_as_datetime`,
599     :meth:`get_file_atime_timedelta`,
600     :meth:`get_file_atime_age_seconds`,
601     :meth:`describe_file_atime`,
602     and :meth:`get_file_raw_atime`.
603     """
604     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
605
606
607 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
608     """Gets a file's creation time as an age in seconds (ago).
609
610     See also :meth:`get_file_ctime_as_datetime`,
611     :meth:`get_file_ctime_timedelta`,
612     :meth:`get_file_ctime_age_seconds`,
613     and :meth:`get_file_raw_ctime`.
614     """
615     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
616
617
618 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
619     """Gets a file's modification time as seconds (ago).
620
621     See also :meth:`get_file_mtime_as_datetime`,
622     :meth:`get_file_mtime_timedelta`,
623     :meth:`get_file_mtime_age_seconds`,
624     and :meth:`get_file_raw_mtime`.
625     """
626     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
627
628
629 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
630     """~Internal helper"""
631     age = get_file_timestamp_age_seconds(filename, extractor)
632     if age is not None:
633         return datetime.timedelta(seconds=float(age))
634     return None
635
636
637 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
638     """How long ago was a file accessed as a timedelta?
639
640     See also :meth:`get_file_atime_as_datetime`,
641     :meth:`get_file_atime_timedelta`,
642     :meth:`get_file_atime_age_seconds`,
643     :meth:`describe_file_atime`,
644     and :meth:`get_file_raw_atime`.
645     """
646     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
647
648
649 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
650     """How long ago was a file created as a timedelta?
651
652     See also :meth:`get_file_ctime_as_datetime`,
653     :meth:`get_file_ctime_timedelta`,
654     :meth:`get_file_ctime_age_seconds`,
655     and :meth:`get_file_raw_ctime`.
656     """
657     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
658
659
660 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
661     """
662     Gets a file's modification time as a python timedelta.
663
664     See also :meth:`get_file_mtime_as_datetime`,
665     :meth:`get_file_mtime_timedelta`,
666     :meth:`get_file_mtime_age_seconds`,
667     and :meth:`get_file_raw_mtime`.
668     """
669     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
670
671
672 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
673     """~Internal helper"""
674     from datetime_utils import describe_duration, describe_duration_briefly
675
676     age = get_file_timestamp_age_seconds(filename, extractor)
677     if age is None:
678         return None
679     if brief:
680         return describe_duration_briefly(age)
681     else:
682         return describe_duration(age)
683
684
685 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
686     """
687     Describe how long ago a file was accessed.
688
689     See also :meth:`get_file_atime_as_datetime`,
690     :meth:`get_file_atime_timedelta`,
691     :meth:`get_file_atime_age_seconds`,
692     :meth:`describe_file_atime`,
693     and :meth:`get_file_raw_atime`.
694     """
695     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
696
697
698 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
699     """Describes a file's creation time.
700
701     See also :meth:`get_file_ctime_as_datetime`,
702     :meth:`get_file_ctime_timedelta`,
703     :meth:`get_file_ctime_age_seconds`,
704     and :meth:`get_file_raw_ctime`.
705     """
706     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
707
708
709 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
710     """
711     Describes how long ago a file was modified.
712
713     See also :meth:`get_file_mtime_as_datetime`,
714     :meth:`get_file_mtime_timedelta`,
715     :meth:`get_file_mtime_age_seconds`,
716     and :meth:`get_file_raw_mtime`.
717     """
718     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
719
720
721 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
722     """Like unix "touch" command's semantics: update the timestamp
723     of a file to the current time if the file exists.  Create the
724     file if it doesn't exist.
725
726     Args:
727         filename: the filename
728         mode: the mode to create the file with
729     """
730     pathlib.Path(filename, mode=mode).touch()
731
732
733 def expand_globs(in_filename: str):
734     """Expands shell globs (* and ? wildcards) to the matching files."""
735     for filename in glob.glob(in_filename):
736         yield filename
737
738
739 def get_files(directory: str):
740     """Returns the files in a directory as a generator."""
741     for filename in os.listdir(directory):
742         full_path = join(directory, filename)
743         if isfile(full_path) and exists(full_path):
744             yield full_path
745
746
747 def get_directories(directory: str):
748     """Returns the subdirectories in a directory as a generator."""
749     for d in os.listdir(directory):
750         full_path = join(directory, d)
751         if not isfile(full_path) and exists(full_path):
752             yield full_path
753
754
755 def get_files_recursive(directory: str):
756     """Find the files and directories under a root recursively."""
757     for filename in get_files(directory):
758         yield filename
759     for subdir in get_directories(directory):
760         for file_or_directory in get_files_recursive(subdir):
761             yield file_or_directory
762
763
764 class FileWriter(contextlib.AbstractContextManager):
765     """A helper that writes a file to a temporary location and then moves
766     it atomically to its ultimate destination on close.
767     """
768
769     def __init__(self, filename: str) -> None:
770         self.filename = filename
771         uuid = uuid4()
772         self.tempfile = f'{filename}-{uuid}.tmp'
773         self.handle: Optional[TextIO] = None
774
775     def __enter__(self) -> TextIO:
776         assert not does_path_exist(self.tempfile)
777         self.handle = open(self.tempfile, mode="w")
778         return self.handle
779
780     def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
781         if self.handle is not None:
782             self.handle.close()
783             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
784             ret = os.system(cmd)
785             if (ret >> 8) != 0:
786                 raise Exception(f'{cmd} failed, exit value {ret>>8}!')
787         return False
788
789
790 if __name__ == '__main__':
791     import doctest
792
793     doctest.testmod()