More spring cleaning.
[pyutils.git] / src / pyutils / files / file_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """
6 This is a grab bag of file-related utilities.  It has code to, for example,
7 read files transforming the text as its read, normalize pathnames, strip
8 extensions, read and manipulate atimes/mtimes/ctimes, compute a signature
9 based on a file's contents, traverse the file system recursively, etc...
10 """
11
12 import contextlib
13 import datetime
14 import errno
15 import fnmatch
16 import glob
17 import hashlib
18 import logging
19 import os
20 import pathlib
21 import re
22 import time
23 from os.path import exists, isfile, join
24 from typing import Callable, List, Literal, Optional, TextIO
25 from uuid import uuid4
26
27 logger = logging.getLogger(__name__)
28
29
30 def remove_newlines(x: str) -> str:
31     """Trivial function to be used as a line_transformer in
32     :meth:`slurp_file` for no newlines in file contents"""
33     return x.replace("\n", "")
34
35
36 def strip_whitespace(x: str) -> str:
37     """Trivial function to be used as a line_transformer in
38     :meth:`slurp_file` for no leading / trailing whitespace in
39     file contents"""
40     return x.strip()
41
42
43 def remove_hash_comments(x: str) -> str:
44     """Trivial function to be used as a line_transformer in
45     :meth:`slurp_file` for no # comments in file contents"""
46     return re.sub(r"#.*$", "", x)
47
48
49 def slurp_file(
50     filename: str,
51     *,
52     skip_blank_lines: bool = False,
53     line_transformers: Optional[List[Callable[[str], str]]] = None,
54 ):
55     """Reads in a file's contents line-by-line to a memory buffer applying
56     each line transformation in turn.
57
58     Args:
59         filename: file to be read
60         skip_blank_lines: should reading skip blank lines?
61         line_transformers: little string->string transformations
62
63     Returns:
64         A list of lines from the read and transformed file contents.
65     """
66
67     ret = []
68     xforms = []
69     if line_transformers is not None:
70         for x in line_transformers:
71             xforms.append(x)
72     if not is_readable(filename):
73         raise Exception(f"{filename} can't be read.")
74     with open(filename) as rf:
75         for line in rf:
76             for transformation in xforms:
77                 line = transformation(line)
78             if skip_blank_lines and line == "":
79                 continue
80             ret.append(line)
81     return ret
82
83
84 def remove(path: str) -> None:
85     """Deletes a file.  Raises if path refers to a directory or a file
86     that doesn't exist.
87
88     Args:
89         path: the path of the file to delete
90
91     >>> import os
92     >>> filename = '/tmp/file_utils_test_file'
93     >>> os.system(f'touch {filename}')
94     0
95     >>> does_file_exist(filename)
96     True
97     >>> remove(filename)
98     >>> does_file_exist(filename)
99     False
100     """
101     os.remove(path)
102
103
104 def fix_multiple_slashes(path: str) -> str:
105     """Fixes multi-slashes in paths or path-like strings
106
107     Args:
108         path: the path in which to remove multiple slashes
109
110     >>> p = '/usr/local//etc/rc.d///file.txt'
111     >>> fix_multiple_slashes(p)
112     '/usr/local/etc/rc.d/file.txt'
113
114     >>> p = 'this is a test'
115     >>> fix_multiple_slashes(p) == p
116     True
117     """
118     return re.sub(r"/+", "/", path)
119
120
121 def delete(path: str) -> None:
122     """This is a convenience for my dumb ass who can't remember os.remove
123     sometimes.
124     """
125     os.remove(path)
126
127
128 def without_extension(path: str) -> str:
129     """Remove one (the last) extension from a file or path.
130
131     Args:
132         path: the path from which to remove an extension
133
134     Returns:
135         the path with one extension removed.
136
137     See also :meth:`without_all_extensions`.
138
139     >>> without_extension('foobar.txt')
140     'foobar'
141
142     >>> without_extension('/home/scott/frapp.py')
143     '/home/scott/frapp'
144
145     >>> f = 'a.b.c.tar.gz'
146     >>> while('.' in f):
147     ...     f = without_extension(f)
148     ...     print(f)
149     a.b.c.tar
150     a.b.c
151     a.b
152     a
153
154     >>> without_extension('foobar')
155     'foobar'
156
157     """
158     return os.path.splitext(path)[0]
159
160
161 def without_all_extensions(path: str) -> str:
162     """Removes all extensions from a path; handles multiple extensions
163     like foobar.tar.gz -> foobar.
164
165     Args:
166         path: the path from which to remove all extensions
167
168     Returns:
169         the path with all extensions removed.
170
171     See also :meth:`without_extension`
172
173     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
174     '/home/scott/foobar'
175
176     """
177     while "." in path:
178         path = without_extension(path)
179     return path
180
181
182 def get_extension(path: str) -> str:
183     """Extract and return one (the last) extension from a file or path.
184
185     Args:
186         path: the path from which to extract an extension
187
188     Returns:
189         The last extension from the file path.
190
191     See also :meth:`without_extension`, :meth:`without_all_extensions`,
192     :meth:`get_all_extensions`.
193
194     >>> get_extension('this_is_a_test.txt')
195     '.txt'
196
197     >>> get_extension('/home/scott/test.py')
198     '.py'
199
200     >>> get_extension('foobar')
201     ''
202
203     """
204     return os.path.splitext(path)[1]
205
206
207 def get_all_extensions(path: str) -> List[str]:
208     """Return the extensions of a file or path in order.
209
210     Args:
211         path: the path from which to extract all extensions.
212
213     Returns:
214         a list containing each extension which may be empty.
215
216     See also :meth:`without_extension`, :meth:`without_all_extensions`,
217     :meth:`get_extension`.
218
219     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
220     ['.tar', '.gz', '.1']
221
222     >>> get_all_extensions('/home/scott/foobar')
223     []
224
225     """
226     ret = []
227     while True:
228         ext = get_extension(path)
229         path = without_extension(path)
230         if ext:
231             ret.append(ext)
232         else:
233             ret.reverse()
234             return ret
235
236
237 def without_path(filespec: str) -> str:
238     """Returns the base filename without any leading path.
239
240     Args:
241         filespec: path to remove leading directories from
242
243     Returns:
244         filespec without leading dir components.
245
246     See also :meth:`get_path`, :meth:`get_canonical_path`.
247
248     >>> without_path('/home/scott/foo.py')
249     'foo.py'
250
251     >>> without_path('foo.py')
252     'foo.py'
253
254     """
255     return os.path.split(filespec)[1]
256
257
258 def get_path(filespec: str) -> str:
259     """Returns just the path of the filespec by removing the filename and
260     extension.
261
262     Args:
263         filespec: path to remove filename / extension(s) from
264
265     Returns:
266         filespec with just the leading directory components and no
267             filename or extension(s)
268
269     See also :meth:`without_path`, :meth:`get_canonical_path`.
270
271     >>> get_path('/home/scott/foobar.py')
272     '/home/scott'
273
274     >>> get_path('/home/scott/test.1.2.3.gz')
275     '/home/scott'
276
277     >>> get_path('~scott/frapp.txt')
278     '~scott'
279
280     """
281     return os.path.split(filespec)[0]
282
283
284 def get_canonical_path(filespec: str) -> str:
285     """Returns a canonicalized absolute path.
286
287     Args:
288         filespec: the path to canonicalize
289
290     Returns:
291         the canonicalized path
292
293     See also :meth:`get_path`, :meth:`without_path`.
294
295     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
296     '/usr/home/scott/foo.txt'
297
298     """
299     return os.path.realpath(filespec)
300
301
302 def create_path_if_not_exist(
303     path: str, on_error: Callable[[str, OSError], None] = None
304 ) -> None:
305     """
306     Attempts to create path if it does not exist already.
307
308     Args:
309         path: the path to attempt to create
310         on_error: If set, it's invoked on error conditions and passed then
311             path and OSError that it caused.
312
313     See also :meth:`does_file_exist`.
314
315     .. warning::
316
317         Files are created with mode 0o0777 (i.e. world read/writeable).
318
319     >>> import uuid
320     >>> import os
321     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
322     >>> os.path.exists(path)
323     False
324     >>> create_path_if_not_exist(path)
325     >>> os.path.exists(path)
326     True
327     """
328     logger.debug("Creating path %s", path)
329     previous_umask = os.umask(0)
330     try:
331         os.makedirs(path)
332         os.chmod(path, 0o777)
333     except OSError as ex:
334         if ex.errno != errno.EEXIST and not os.path.isdir(path):
335             if on_error is not None:
336                 on_error(path, ex)
337             else:
338                 raise
339     finally:
340         os.umask(previous_umask)
341
342
343 def does_file_exist(filename: str) -> bool:
344     """Returns True if a file exists and is a normal file.
345
346     Args:
347         filename: filename to check
348
349     Returns:
350         True if filename exists and is a normal file.
351
352     See also :meth:`create_path_if_not_exist`, :meth:`is_readable`.
353
354     >>> does_file_exist(__file__)
355     True
356     >>> does_file_exist('/tmp/2492043r9203r9230r9230r49230r42390r4230')
357     False
358     """
359     return os.path.exists(filename) and os.path.isfile(filename)
360
361
362 def is_readable(filename: str) -> bool:
363     """Is the file readable?
364
365     Args:
366         filename: the filename to check for read access
367
368     Returns:
369         True if the file exists, is a normal file, and is readable
370         by the current process.  False otherwise.
371
372     See also :meth:`does_file_exist`, :meth:`is_writable`,
373     :meth:`is_executable`.
374     """
375     return os.access(filename, os.R_OK)
376
377
378 def is_writable(filename: str) -> bool:
379     """Is the file writable?
380
381     Args:
382         filename: the file to check for write access.
383
384     Returns:
385         True if file exists, is a normal file and is writable by the
386         current process.  False otherwise.
387
388     See also :meth:`is_readable`, :meth:`does_file_exist`.
389     """
390     return os.access(filename, os.W_OK)
391
392
393 def is_executable(filename: str) -> bool:
394     """Is the file executable?
395
396     Args:
397         filename: the file to check for execute access.
398
399     Returns:
400         True if file exists, is a normal file and is executable by the
401         current process.  False otherwise.
402
403     See also :meth:`does_file_exist`, :meth:`is_readable`,
404     :meth:`is_writable`.
405     """
406     return os.access(filename, os.X_OK)
407
408
409 def does_directory_exist(dirname: str) -> bool:
410     """Does the given directory exist?
411
412     Args:
413         dirname: the name of the directory to check
414
415     Returns:
416         True if a path exists and is a directory, not a regular file.
417
418     See also :meth:`does_file_exist`.
419
420     >>> does_directory_exist('/tmp')
421     True
422     >>> does_directory_exist('/xyzq/21341')
423     False
424     """
425     return os.path.exists(dirname) and os.path.isdir(dirname)
426
427
428 def does_path_exist(pathname: str) -> bool:
429     """Just a more verbose wrapper around os.path.exists."""
430     return os.path.exists(pathname)
431
432
433 def get_file_size(filename: str) -> int:
434     """Returns the size of a file in bytes.
435
436     Args:
437         filename: the filename to size
438
439     Returns:
440         size of filename in bytes
441     """
442     return os.path.getsize(filename)
443
444
445 def is_normal_file(filename: str) -> bool:
446     """Is that file normal (not a directory or some special file?)
447
448     Args:
449         filename: the path of the file to check
450
451     Returns:
452         True if filename is a normal file.
453
454     See also :meth:`is_directory`, :meth:`does_file_exist`, :meth:`is_symlink`.
455
456     >>> is_normal_file(__file__)
457     True
458     """
459     return os.path.isfile(filename)
460
461
462 def is_directory(filename: str) -> bool:
463     """Is that path a directory (not a normal file?)
464
465     Args:
466         filename: the path of the file to check
467
468     Returns:
469         True if filename is a directory
470
471     See also :meth:`does_directory_exist`, :meth:`is_normal_file`,
472     :meth:`is_symlink`.
473
474     >>> is_directory('/tmp')
475     True
476     """
477     return os.path.isdir(filename)
478
479
480 def is_symlink(filename: str) -> bool:
481     """Is that path a symlink?
482
483     Args:
484         filename: the path of the file to check
485
486     Returns:
487         True if filename is a symlink, False otherwise.
488
489     See also :meth:`is_directory`, :meth:`is_normal_file`.
490
491     >>> is_symlink('/tmp')
492     False
493
494     >>> is_symlink('/home')
495     True
496     """
497     return os.path.islink(filename)
498
499
500 def is_same_file(file1: str, file2: str) -> bool:
501     """Determine if two paths reference the same inode.
502
503     Args:
504         file1: the first file
505         file2: the second file
506
507     Returns:
508         True if the two files are the same file.
509
510     See also :meth:`is_symlink`, :meth:`is_normal_file`.
511
512     >>> is_same_file('/tmp', '/tmp/../tmp')
513     True
514
515     >>> is_same_file('/tmp', '/home')
516     False
517     """
518     return os.path.samefile(file1, file2)
519
520
521 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
522     """Stats the file and returns an `os.stat_result` or None on error.
523
524     Args:
525         filename: the file whose timestamps to fetch
526
527     Returns:
528         the os.stat_result or None to indicate an error occurred
529
530     See also
531     :meth:`get_file_raw_atime`,
532     :meth:`get_file_raw_ctime`,
533     :meth:`get_file_raw_mtime`,
534     :meth:`get_file_raw_timestamp`
535     """
536     try:
537         return os.stat(filename)
538     except Exception:
539         logger.exception("Failed to stat path %s; returning None", filename)
540         return None
541
542
543 def get_file_raw_timestamp(
544     filename: str, extractor: Callable[[os.stat_result], Optional[float]]
545 ) -> Optional[float]:
546     """Stat a file and, if successful, use extractor to fetch some
547     subset of the information in the `os.stat_result`.
548
549     Args:
550         filename: the filename to stat
551         extractor: Callable that takes a os.stat_result and produces
552             something useful(?) with it.
553
554     Returns:
555         whatever the extractor produced or None on error.
556
557     See also
558     :meth:`get_file_raw_atime`,
559     :meth:`get_file_raw_ctime`,
560     :meth:`get_file_raw_mtime`,
561     :meth:`get_file_raw_timestamps`
562     """
563     tss = get_file_raw_timestamps(filename)
564     if tss is not None:
565         return extractor(tss)
566     return None
567
568
569 def get_file_raw_atime(filename: str) -> Optional[float]:
570     """Get a file's raw access time.
571
572     Args:
573         filename: the path to the file to stat
574
575     Returns:
576         The file's raw atime (seconds since the Epoch) or
577         None on error.
578
579     See also
580     :meth:`get_file_atime_age_seconds`,
581     :meth:`get_file_atime_as_datetime`,
582     :meth:`get_file_atime_timedelta`,
583     :meth:`get_file_raw_ctime`,
584     :meth:`get_file_raw_mtime`,
585     :meth:`get_file_raw_timestamps`
586     """
587     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
588
589
590 def get_file_raw_mtime(filename: str) -> Optional[float]:
591     """Get a file's raw modification time.
592
593     Args:
594         filename: the path to the file to stat
595
596     Returns:
597         The file's raw mtime (seconds since the Epoch) or
598         None on error.
599
600     See also
601     :meth:`get_file_raw_atime`,
602     :meth:`get_file_raw_ctime`,
603     :meth:`get_file_mtime_age_seconds`,
604     :meth:`get_file_mtime_as_datetime`,
605     :meth:`get_file_mtime_timedelta`,
606     :meth:`get_file_raw_timestamps`
607     """
608     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
609
610
611 def get_file_raw_ctime(filename: str) -> Optional[float]:
612     """Get a file's raw creation time.
613
614     Args:
615         filename: the path to the file to stat
616
617     Returns:
618         The file's raw ctime (seconds since the Epoch) or
619         None on error.
620
621     See also
622     :meth:`get_file_raw_atime`,
623     :meth:`get_file_ctime_age_seconds`,
624     :meth:`get_file_ctime_as_datetime`,
625     :meth:`get_file_ctime_timedelta`,
626     :meth:`get_file_raw_mtime`,
627     :meth:`get_file_raw_timestamps`
628     """
629     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
630
631
632 def get_file_md5(filename: str) -> str:
633     """Hashes filename's disk contents and returns the MD5 digest.
634
635     Args:
636         filename: the file whose contents to hash
637
638     Returns:
639         the MD5 digest of the file's contents.  Raises on error.
640     """
641     file_hash = hashlib.md5()
642     with open(filename, "rb") as f:
643         chunk = f.read(8192)
644         while chunk:
645             file_hash.update(chunk)
646             chunk = f.read(8192)
647     return file_hash.hexdigest()
648
649
650 def set_file_raw_atime(filename: str, atime: float) -> None:
651     """Sets a file's raw access time.
652
653     Args:
654         filename: the file whose atime should be set
655         atime: raw atime as number of seconds since the Epoch to set
656
657     See also
658     :meth:`get_file_raw_atime`,
659     :meth:`get_file_atime_age_seconds`,
660     :meth:`get_file_atime_as_datetime`,
661     :meth:`get_file_atime_timedelta`,
662     :meth:`get_file_raw_timestamps`,
663     :meth:`set_file_raw_mtime`,
664     :meth:`set_file_raw_atime_and_mtime`,
665     :meth:`touch_file`
666     """
667     mtime = get_file_raw_mtime(filename)
668     assert mtime is not None
669     os.utime(filename, (atime, mtime))
670
671
672 def set_file_raw_mtime(filename: str, mtime: float):
673     """Sets a file's raw modification time.
674
675     Args:
676         filename: the file whose mtime should be set
677         mtime: the raw mtime as number of seconds since the Epoch to set
678
679     See also
680     :meth:`get_file_raw_mtime`,
681     :meth:`get_file_mtime_age_seconds`,
682     :meth:`get_file_mtime_as_datetime`,
683     :meth:`get_file_mtime_timedelta`,
684     :meth:`get_file_raw_timestamps`,
685     :meth:`set_file_raw_atime`,
686     :meth:`set_file_raw_atime_and_mtime`,
687     :meth:`touch_file`
688     """
689     atime = get_file_raw_atime(filename)
690     assert atime is not None
691     os.utime(filename, (atime, mtime))
692
693
694 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
695     """Sets both a file's raw modification and access times.
696
697     Args:
698         filename: the file whose times to set
699         ts: the raw time to set or None to indicate time should be
700             set to the current time.
701
702     See also
703     :meth:`get_file_raw_atime`,
704     :meth:`get_file_raw_mtime`,
705     :meth:`get_file_raw_timestamps`,
706     :meth:`set_file_raw_atime`,
707     :meth:`set_file_raw_mtime`
708     """
709     if ts is not None:
710         os.utime(filename, (ts, ts))
711     else:
712         os.utime(filename, None)
713
714
715 def _convert_file_timestamp_to_datetime(
716     filename: str,
717     producer: Callable[[str], Optional[float]],
718 ) -> Optional[datetime.datetime]:
719     """
720     Converts a raw file timestamp into a Python datetime.
721
722     Args:
723         filename: file whose timestamps should be converted.
724         producer: source of the timestamp.
725     Returns:
726         The datetime.
727     """
728     ts = producer(filename)
729     if ts is not None:
730         return datetime.datetime.fromtimestamp(ts)
731     return None
732
733
734 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
735     """Fetch a file's access time as a Python datetime.
736
737     Args:
738         filename: the file whose atime should be fetched.
739
740     Returns:
741         The file's atime as a Python :class:`datetime.datetime`.
742
743     See also
744     :meth:`get_file_raw_atime`,
745     :meth:`get_file_atime_age_seconds`,
746     :meth:`get_file_atime_timedelta`,
747     :meth:`get_file_raw_ctime`,
748     :meth:`get_file_raw_mtime`,
749     :meth:`get_file_raw_timestamps`,
750     :meth:`set_file_raw_atime`,
751     :meth:`set_file_raw_atime_and_mtime`
752     """
753     return _convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
754
755
756 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
757     """Fetch a file's modification time as a Python datetime.
758
759     Args:
760         filename: the file whose mtime should be fetched.
761
762     Returns:
763         The file's mtime as a Python :class:`datetime.datetime`.
764
765     See also
766     :meth:`get_file_raw_mtime`,
767     :meth:`get_file_mtime_age_seconds`,
768     :meth:`get_file_mtime_timedelta`,
769     :meth:`get_file_raw_ctime`,
770     :meth:`get_file_raw_atime`,
771     :meth:`get_file_raw_timestamps`,
772     :meth:`set_file_raw_atime`,
773     :meth:`set_file_raw_atime_and_mtime`
774     """
775     return _convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
776
777
778 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
779     """Fetches a file's creation time as a Python datetime.
780
781     Args:
782         filename: the file whose ctime should be fetched.
783
784     Returns:
785         The file's ctime as a Python :class:`datetime.datetime`.
786
787     See also
788     :meth:`get_file_raw_ctime`,
789     :meth:`get_file_ctime_age_seconds`,
790     :meth:`get_file_ctime_timedelta`,
791     :meth:`get_file_raw_atime`,
792     :meth:`get_file_raw_mtime`,
793     :meth:`get_file_raw_timestamps`
794     """
795     return _convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
796
797
798 def _get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
799     """~Internal helper"""
800     now = time.time()
801     ts = get_file_raw_timestamps(filename)
802     if ts is None:
803         return None
804     result = extractor(ts)
805     return now - result
806
807
808 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
809     """Gets a file's access time as an age in seconds (ago).
810
811     Args:
812         filename: file whose atime should be checked.
813
814     Returns:
815         The number of seconds ago that filename was last accessed.
816
817     See also
818     :meth:`get_file_raw_atime`,
819     :meth:`get_file_atime_as_datetime`,
820     :meth:`get_file_atime_timedelta`,
821     :meth:`get_file_raw_ctime`,
822     :meth:`get_file_raw_mtime`,
823     :meth:`get_file_raw_timestamps`,
824     :meth:`set_file_raw_atime`,
825     :meth:`set_file_raw_atime_and_mtime`
826     """
827     return _get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
828
829
830 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
831     """Gets a file's creation time as an age in seconds (ago).
832
833     Args:
834         filename: file whose ctime should be checked.
835
836     Returns:
837         The number of seconds ago that filename was created.
838
839     See also
840     :meth:`get_file_raw_ctime`,
841     :meth:`get_file_ctime_age_seconds`,
842     :meth:`get_file_ctime_as_datetime`,
843     :meth:`get_file_ctime_timedelta`,
844     :meth:`get_file_raw_mtime`,
845     :meth:`get_file_raw_atime`,
846     :meth:`get_file_raw_timestamps`
847     """
848     return _get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
849
850
851 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
852     """Gets a file's modification time as seconds (ago).
853
854     Args:
855         filename: file whose mtime should be checked.
856
857     Returns:
858         The number of seconds ago that filename was last modified.
859
860     See also
861     :meth:`get_file_raw_atime`,
862     :meth:`get_file_raw_ctime`,
863     :meth:`get_file_raw_mtime`,
864     :meth:`get_file_mtime_as_datetime`,
865     :meth:`get_file_mtime_timedelta`,
866     :meth:`get_file_raw_timestamps`,
867     :meth:`set_file_raw_atime`,
868     :meth:`set_file_raw_atime_and_mtime`
869     """
870     return _get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
871
872
873 def _get_file_timestamp_timedelta(
874     filename: str, extractor
875 ) -> Optional[datetime.timedelta]:
876     """~Internal helper"""
877     age = _get_file_timestamp_age_seconds(filename, extractor)
878     if age is not None:
879         return datetime.timedelta(seconds=float(age))
880     return None
881
882
883 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
884     """How long ago was a file accessed as a timedelta?
885
886     Args:
887         filename: the file whose atime should be checked.
888
889     Returns:
890         A Python :class:`datetime.timedelta` representing how long
891         ago filename was last accessed.
892
893     See also
894     :meth:`get_file_raw_atime`,
895     :meth:`get_file_atime_age_seconds`,
896     :meth:`get_file_atime_as_datetime`,
897     :meth:`get_file_raw_ctime`,
898     :meth:`get_file_raw_mtime`,
899     :meth:`get_file_raw_timestamps`,
900     :meth:`set_file_raw_atime`,
901     :meth:`set_file_raw_atime_and_mtime`
902     """
903     return _get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
904
905
906 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
907     """How long ago was a file created as a timedelta?
908
909     Args:
910         filename: the file whose ctime should be checked.
911
912     Returns:
913         A Python :class:`datetime.timedelta` representing how long
914         ago filename was created.
915
916     See also
917     :meth:`get_file_raw_atime`,
918     :meth:`get_file_raw_ctime`,
919     :meth:`get_file_ctime_age_seconds`,
920     :meth:`get_file_ctime_as_datetime`,
921     :meth:`get_file_raw_mtime`,
922     :meth:`get_file_raw_timestamps`
923     """
924     return _get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
925
926
927 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
928     """
929     Gets a file's modification time as a Python timedelta.
930
931     Args:
932         filename: the file whose mtime should be checked.
933
934     Returns:
935         A Python :class:`datetime.timedelta` representing how long
936         ago filename was last modified.
937
938     See also
939     :meth:`get_file_raw_atime`,
940     :meth:`get_file_raw_ctime`,
941     :meth:`get_file_raw_mtime`,
942     :meth:`get_file_mtime_age_seconds`,
943     :meth:`get_file_mtime_as_datetime`,
944     :meth:`get_file_raw_timestamps`,
945     :meth:`set_file_raw_atime`,
946     :meth:`set_file_raw_atime_and_mtime`
947     """
948     return _get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
949
950
951 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
952     """~Internal helper"""
953     from pyutils.datetimes.datetime_utils import (
954         describe_duration,
955         describe_duration_briefly,
956     )
957
958     age = _get_file_timestamp_age_seconds(filename, extractor)
959     if age is None:
960         return None
961     if brief:
962         return describe_duration_briefly(age)
963     else:
964         return describe_duration(age)
965
966
967 def describe_file_atime(filename: str, *, brief: bool = False) -> Optional[str]:
968     """
969     Describe how long ago a file was accessed.
970
971     Args:
972         filename: the file whose atime should be described.
973         brief: if True, describe atime briefly.
974
975     Returns:
976         A string that represents how long ago filename was last
977         accessed.  The description will be verbose or brief depending
978         on the brief argument.
979
980     See also
981     :meth:`get_file_raw_atime`,
982     :meth:`get_file_atime_age_seconds`,
983     :meth:`get_file_atime_as_datetime`,
984     :meth:`get_file_atime_timedelta`,
985     :meth:`get_file_raw_ctime`,
986     :meth:`get_file_raw_mtime`,
987     :meth:`get_file_raw_timestamps`
988     :meth:`set_file_raw_atime`,
989     :meth:`set_file_raw_atime_and_mtime`
990     """
991     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
992
993
994 def describe_file_ctime(filename: str, *, brief: bool = False) -> Optional[str]:
995     """Describes a file's creation time.
996
997     Args:
998         filename: the file whose ctime should be described.
999         brief: if True, describe ctime briefly.
1000
1001     Returns:
1002         A string that represents how long ago filename was created.
1003         The description will be verbose or brief depending
1004         on the brief argument.
1005
1006     See also
1007     :meth:`get_file_raw_atime`,
1008     :meth:`get_file_raw_ctime`,
1009     :meth:`get_file_ctime_age_seconds`,
1010     :meth:`get_file_ctime_as_datetime`,
1011     :meth:`get_file_ctime_timedelta`,
1012     :meth:`get_file_raw_mtime`,
1013     :meth:`get_file_raw_timestamps`
1014     """
1015     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
1016
1017
1018 def describe_file_mtime(filename: str, *, brief: bool = False) -> Optional[str]:
1019     """Describes how long ago a file was modified.
1020
1021     Args:
1022         filename: the file whose mtime should be described.
1023         brief: if True, describe mtime briefly.
1024
1025     Returns:
1026         A string that represents how long ago filename was last
1027         modified.  The description will be verbose or brief depending
1028         on the brief argument.
1029
1030     See also
1031     :meth:`get_file_raw_atime`,
1032     :meth:`get_file_raw_ctime`,
1033     :meth:`get_file_raw_mtime`,
1034     :meth:`get_file_mtime_age_seconds`,
1035     :meth:`get_file_mtime_as_datetime`,
1036     :meth:`get_file_mtime_timedelta`,
1037     :meth:`get_file_raw_timestamps`,
1038     :meth:`set_file_raw_atime`,
1039     :meth:`set_file_raw_atime_and_mtime`
1040     """
1041     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
1042
1043
1044 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
1045     """Like unix "touch" command's semantics: update the timestamp
1046     of a file to the current time if the file exists.  Create the
1047     file if it doesn't exist.
1048
1049     Args:
1050         filename: the filename
1051         mode: the mode to create the file with
1052
1053     .. warning::
1054
1055         The default creation mode is 0o666 which is world readable
1056         and writable.  Override this by passing in your own mode
1057         parameter if desired.
1058
1059     See also :meth:`set_file_raw_atime`, :meth:`set_file_raw_atime_and_mtime`,
1060     :meth:`set_file_raw_mtime`, :meth:`create_path_if_not_exist`
1061     """
1062     pathlib.Path(filename, mode=mode).touch()
1063
1064
1065 def expand_globs(in_filename: str):
1066     """
1067     Expands shell globs (* and ? wildcards) to the matching files.
1068
1069     Args:
1070         in_filename: the filepath to be expanded.  May contain '*' and '?'
1071             globbing characters.
1072
1073     Returns:
1074         A Generator that yields filenames that match the input pattern.
1075
1076     See also :meth:`get_files`, :meth:`get_files_recursive`.
1077     """
1078     for filename in glob.glob(in_filename):
1079         yield filename
1080
1081
1082 def get_files(directory: str):
1083     """Returns the files in a directory as a generator.
1084
1085     Args:
1086         directory: the directory to list files under.
1087
1088     Returns:
1089         A generator that yields all files in the input directory.
1090
1091     See also :meth:`expand_globs`, :meth:`get_files_recursive`,
1092     :meth:`get_matching_files`.
1093     """
1094     for filename in os.listdir(directory):
1095         full_path = join(directory, filename)
1096         if isfile(full_path) and exists(full_path):
1097             yield full_path
1098
1099
1100 def get_matching_files(directory: str, glob_string: str):
1101     """
1102     Returns the subset of files whose name matches a glob.
1103
1104     Args:
1105         directory: the directory to match files within.
1106         glob_string: the globbing pattern (may include '*' and '?') to
1107             use when matching files.
1108
1109     Returns:
1110         A generator that yields filenames in directory that match
1111         the given glob pattern.
1112
1113     See also :meth:`get_files`, :meth:`expand_globs`.
1114     """
1115     for filename in get_files(directory):
1116         if fnmatch.fnmatch(filename, glob_string):
1117             yield filename
1118
1119
1120 def get_directories(directory: str):
1121     """
1122     Returns the subdirectories in a directory as a generator.
1123
1124     Args:
1125         directory: the directory to list subdirectories within.
1126
1127     Returns:
1128         A generator that yields all subdirectories within the given
1129         input directory.
1130
1131     See also :meth:`get_files`, :meth:`get_files_recursive`.
1132     """
1133     for d in os.listdir(directory):
1134         full_path = join(directory, d)
1135         if not isfile(full_path) and exists(full_path):
1136             yield full_path
1137
1138
1139 def get_files_recursive(directory: str):
1140     """
1141     Find the files and directories under a root recursively.
1142
1143     Args:
1144         directory: the root directory under which to list subdirectories
1145             and file contents.
1146
1147     Returns:
1148         A generator that yields all directories and files beneath the input
1149         root directory.
1150
1151     See also :meth:`get_files`, :meth:`get_matching_files`,
1152     :meth:`get_matching_files_recursive`
1153     """
1154     for filename in get_files(directory):
1155         yield filename
1156     for subdir in get_directories(directory):
1157         for file_or_directory in get_files_recursive(subdir):
1158             yield file_or_directory
1159
1160
1161 def get_matching_files_recursive(directory: str, glob_string: str):
1162     """Returns the subset of files whose name matches a glob under a root recursively.
1163
1164     Args:
1165         directory: the root under which to search
1166         glob_string: a globbing pattern that describes the subset of
1167             files and directories to return.  May contain '?' and '*'.
1168
1169     Returns:
1170         A generator that yields all files and directories under the given root
1171         directory that match the given globbing pattern.
1172
1173     See also :meth:`get_files_recursive`.
1174
1175     """
1176     for filename in get_files_recursive(directory):
1177         if fnmatch.fnmatch(filename, glob_string):
1178             yield filename
1179
1180
1181 class FileWriter(contextlib.AbstractContextManager):
1182     """A helper that writes a file to a temporary location and then
1183     moves it atomically to its ultimate destination on close.
1184
1185     Example usage.  Creates a temporary file that is populated by the
1186     print statements within the context.  Until the context is exited,
1187     the true destination file does not exist so no reader of it can
1188     see partial writes due to buffering or code timing.  Once the
1189     context is exited, the file is moved from its temporary location
1190     to its permanent location by a call to `/bin/mv` which should be
1191     atomic::
1192
1193         with FileWriter('/home/bob/foobar.txt') as w:
1194             print("This is a test!", file=w)
1195             time.sleep(2)
1196             print("This is only a test...", file=w)
1197     """
1198
1199     def __init__(self, filename: str) -> None:
1200         """
1201         Args:
1202             filename: the ultimate destination file we want to populate.
1203                 On exit, the file will be atomically created.
1204         """
1205         self.filename = filename
1206         uuid = uuid4()
1207         self.tempfile = f"{filename}-{uuid}.tmp"
1208         self.handle: Optional[TextIO] = None
1209
1210     def __enter__(self) -> TextIO:
1211         assert not does_path_exist(self.tempfile)
1212         self.handle = open(self.tempfile, mode="w")
1213         return self.handle
1214
1215     def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
1216         if self.handle is not None:
1217             self.handle.close()
1218             cmd = f"/bin/mv -f {self.tempfile} {self.filename}"
1219             ret = os.system(cmd)
1220             if (ret >> 8) != 0:
1221                 raise Exception(f"{cmd} failed, exit value {ret>>8}!")
1222         return False
1223
1224
1225 class CreateFileWithMode(contextlib.AbstractContextManager):
1226     """This helper context manager can be used instead of the typical
1227     pattern for creating a file if you want to ensure that the file
1228     created is a particular permission mode upon creation.
1229
1230     Python's open doesn't support this; you need to set the os.umask
1231     and then create a descriptor to open via os.open, see below.
1232
1233         >>> import os
1234         >>> filename = f'/tmp/CreateFileWithModeTest.{os.getpid()}'
1235         >>> with CreateFileWithMode(filename, mode=0o600) as wf:
1236         ...     print('This is a test', file=wf)
1237         >>> result = os.stat(filename)
1238
1239         Note: there is a high order bit set in this that is S_IFREG indicating
1240         that the file is a "normal file".  Clear it with the mask.
1241
1242         >>> print(f'{result.st_mode & 0o7777:o}')
1243         600
1244         >>> with open(filename, 'r') as rf:
1245         ...     contents = rf.read()
1246         >>> contents
1247         'This is a test\\n'
1248         >>> remove(filename)
1249     """
1250
1251     def __init__(self, filename: str, mode: Optional[int] = 0o600) -> None:
1252         """
1253         Args:
1254             filename: path of the file to create.
1255             mode: the UNIX-style octal mode with which to create the
1256                 filename.  Defaults to 0o600.
1257
1258         .. warning::
1259
1260             If the file already exists it will be overwritten!
1261
1262         """
1263         self.filename = filename
1264         if mode is not None:
1265             self.mode = mode & 0o7777
1266         else:
1267             self.mode = 0o666
1268         self.handle: Optional[TextIO] = None
1269         self.old_umask = os.umask(0)
1270
1271     def __enter__(self) -> TextIO:
1272         descriptor = os.open(
1273             path=self.filename,
1274             flags=(os.O_WRONLY | os.O_CREAT | os.O_TRUNC),
1275             mode=self.mode,
1276         )
1277         self.handle = open(descriptor, "w")
1278         return self.handle
1279
1280     def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
1281         os.umask(self.old_umask)
1282         if self.handle is not None:
1283             self.handle.close()
1284         return False
1285
1286
1287 if __name__ == "__main__":
1288     import doctest
1289
1290     doctest.testmod()