Cleanup config in preparation for zookeeper-based dynamic configs.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for working with files."""
6
7 import contextlib
8 import datetime
9 import errno
10 import glob
11 import hashlib
12 import logging
13 import os
14 import pathlib
15 import re
16 import time
17 from os.path import exists, isfile, join
18 from typing import Callable, List, Literal, Optional, TextIO
19 from uuid import uuid4
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove_newlines(x: str) -> str:
25     """Trivial function to be used as a line_transformer in
26     :meth:`slurp_file` for no newlines in file contents"""
27     return x.replace('\n', '')
28
29
30 def strip_whitespace(x: str) -> str:
31     """Trivial function to be used as a line_transformer in
32     :meth:`slurp_file` for no leading / trailing whitespace in
33     file contents"""
34     return x.strip()
35
36
37 def remove_hash_comments(x: str) -> str:
38     """Trivial function to be used as a line_transformer in
39     :meth:`slurp_file` for no # comments in file contents"""
40     return re.sub(r'#.*$', '', x)
41
42
43 def slurp_file(
44     filename: str,
45     *,
46     skip_blank_lines=False,
47     line_transformers: Optional[List[Callable[[str], str]]] = None,
48 ):
49     """Reads in a file's contents line-by-line to a memory buffer applying
50     each line transformation in turn.
51
52     Args:
53         filename: file to be read
54         skip_blank_lines: should reading skip blank lines?
55         line_transformers: little string->string transformations
56     """
57
58     ret = []
59     xforms = []
60     if line_transformers is not None:
61         for x in line_transformers:
62             xforms.append(x)
63     if not file_is_readable(filename):
64         raise Exception(f'{filename} can\'t be read.')
65     with open(filename) as rf:
66         for line in rf:
67             for transformation in xforms:
68                 line = transformation(line)
69             if skip_blank_lines and line == '':
70                 continue
71             ret.append(line)
72     return ret
73
74
75 def remove(path: str) -> None:
76     """Deletes a file.  Raises if path refers to a directory or a file
77     that doesn't exist.
78
79     Args:
80         path: the path of the file to delete
81
82     >>> import os
83     >>> filename = '/tmp/file_utils_test_file'
84     >>> os.system(f'touch {filename}')
85     0
86     >>> does_file_exist(filename)
87     True
88     >>> remove(filename)
89     >>> does_file_exist(filename)
90     False
91     """
92     os.remove(path)
93
94
95 def fix_multiple_slashes(path: str) -> str:
96     """Fixes multi-slashes in paths or path-like strings
97
98     Args:
99         path: the path in which to remove multiple slashes
100
101     >>> p = '/usr/local//etc/rc.d///file.txt'
102     >>> fix_multiple_slashes(p)
103     '/usr/local/etc/rc.d/file.txt'
104
105     >>> p = 'this is a test'
106     >>> fix_multiple_slashes(p) == p
107     True
108     """
109     return re.sub(r'/+', '/', path)
110
111
112 def delete(path: str) -> None:
113     """This is a convenience for my dumb ass who can't remember os.remove
114     sometimes.
115     """
116     os.remove(path)
117
118
119 def without_extension(path: str) -> str:
120     """Remove one (the last) extension from a file or path.
121
122     Args:
123         path: the path from which to remove an extension
124
125     Returns:
126         the path with one extension removed.
127
128     >>> without_extension('foobar.txt')
129     'foobar'
130
131     >>> without_extension('/home/scott/frapp.py')
132     '/home/scott/frapp'
133
134     >>> f = 'a.b.c.tar.gz'
135     >>> while('.' in f):
136     ...     f = without_extension(f)
137     ...     print(f)
138     a.b.c.tar
139     a.b.c
140     a.b
141     a
142
143     >>> without_extension('foobar')
144     'foobar'
145
146     """
147     return os.path.splitext(path)[0]
148
149
150 def without_all_extensions(path: str) -> str:
151     """Removes all extensions from a path; handles multiple extensions
152     like foobar.tar.gz -> foobar.
153
154     Args:
155         path: the path from which to remove all extensions
156
157     Returns:
158         the path with all extensions removed.
159
160     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
161     '/home/scott/foobar'
162
163     """
164     while '.' in path:
165         path = without_extension(path)
166     return path
167
168
169 def get_extension(path: str) -> str:
170     """Extract and return one (the last) extension from a file or path.
171
172     Args:
173         path: the path from which to extract an extension
174
175     Returns:
176         The last extension from the file path.
177
178     >>> get_extension('this_is_a_test.txt')
179     '.txt'
180
181     >>> get_extension('/home/scott/test.py')
182     '.py'
183
184     >>> get_extension('foobar')
185     ''
186
187     """
188     return os.path.splitext(path)[1]
189
190
191 def get_all_extensions(path: str) -> List[str]:
192     """Return the extensions of a file or path in order.
193
194     Args:
195         path: the path from which to extract all extensions.
196
197     Returns:
198         a list containing each extension which may be empty.
199
200     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
201     ['.tar', '.gz', '.1']
202
203     >>> get_all_extensions('/home/scott/foobar')
204     []
205
206     """
207     ret = []
208     while True:
209         ext = get_extension(path)
210         path = without_extension(path)
211         if ext:
212             ret.append(ext)
213         else:
214             ret.reverse()
215             return ret
216
217
218 def without_path(filespec: str) -> str:
219     """Returns the base filename without any leading path.
220
221     Args:
222         filespec: path to remove leading directories from
223
224     Returns:
225         filespec without leading dir components.
226
227     >>> without_path('/home/scott/foo.py')
228     'foo.py'
229
230     >>> without_path('foo.py')
231     'foo.py'
232
233     """
234     return os.path.split(filespec)[1]
235
236
237 def get_path(filespec: str) -> str:
238     """Returns just the path of the filespec by removing the filename and
239     extension.
240
241     Args:
242         filespec: path to remove filename / extension(s) from
243
244     Returns:
245         filespec with just the leading directory components and no
246             filename or extension(s)
247
248     >>> get_path('/home/scott/foobar.py')
249     '/home/scott'
250
251     >>> get_path('/home/scott/test.1.2.3.gz')
252     '/home/scott'
253
254     >>> get_path('~scott/frapp.txt')
255     '~scott'
256
257     """
258     return os.path.split(filespec)[0]
259
260
261 def get_canonical_path(filespec: str) -> str:
262     """Returns a canonicalized absolute path.
263
264     Args:
265         filespec: the path to canonicalize
266
267     Returns:
268         the canonicalized path
269
270     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
271     '/usr/home/scott/foo.txt'
272
273     """
274     return os.path.realpath(filespec)
275
276
277 def create_path_if_not_exist(path, on_error=None) -> None:
278     """
279     Attempts to create path if it does not exist already.
280
281     .. warning::
282
283         Files are created with mode 0x0777 (i.e. world read/writeable).
284
285     Args:
286         path: the path to attempt to create
287         on_error: If True, it's invoked on error conditions.  Otherwise
288             any exceptions are raised.
289
290     >>> import uuid
291     >>> import os
292     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
293     >>> os.path.exists(path)
294     False
295     >>> create_path_if_not_exist(path)
296     >>> os.path.exists(path)
297     True
298     """
299     logger.debug("Creating path %s", path)
300     previous_umask = os.umask(0)
301     try:
302         os.makedirs(path)
303         os.chmod(path, 0o777)
304     except OSError as ex:
305         if ex.errno != errno.EEXIST and not os.path.isdir(path):
306             if on_error is not None:
307                 on_error(path, ex)
308             else:
309                 raise
310     finally:
311         os.umask(previous_umask)
312
313
314 def does_file_exist(filename: str) -> bool:
315     """Returns True if a file exists and is a normal file.
316
317     Args:
318         filename: filename to check
319
320     Returns:
321         True if filename exists and is a normal file.
322
323     >>> does_file_exist(__file__)
324     True
325     >>> does_file_exist('/tmp/2492043r9203r9230r9230r49230r42390r4230')
326     False
327     """
328     return os.path.exists(filename) and os.path.isfile(filename)
329
330
331 def file_is_readable(filename: str) -> bool:
332     """True if file exists, is a normal file and is readable by the
333     current process.  False otherwise.
334
335     Args:
336         filename: the filename to check for read access
337     """
338     return does_file_exist(filename) and os.access(filename, os.R_OK)
339
340
341 def file_is_writable(filename: str) -> bool:
342     """True if file exists, is a normal file and is writable by the
343     current process.  False otherwise.
344
345     Args:
346         filename: the file to check for write access.
347     """
348     return does_file_exist(filename) and os.access(filename, os.W_OK)
349
350
351 def file_is_executable(filename: str) -> bool:
352     """True if file exists, is a normal file and is executable by the
353     current process.  False otherwise.
354
355     Args:
356         filename: the file to check for execute access.
357     """
358     return does_file_exist(filename) and os.access(filename, os.X_OK)
359
360
361 def does_directory_exist(dirname: str) -> bool:
362     """Returns True if a file exists and is a directory.
363
364     >>> does_directory_exist('/tmp')
365     True
366     >>> does_directory_exist('/xyzq/21341')
367     False
368     """
369     return os.path.exists(dirname) and os.path.isdir(dirname)
370
371
372 def does_path_exist(pathname: str) -> bool:
373     """Just a more verbose wrapper around os.path.exists."""
374     return os.path.exists(pathname)
375
376
377 def get_file_size(filename: str) -> int:
378     """Returns the size of a file in bytes.
379
380     Args:
381         filename: the filename to size
382
383     Returns:
384         size of filename in bytes
385     """
386     return os.path.getsize(filename)
387
388
389 def is_normal_file(filename: str) -> bool:
390     """Returns True if filename is a normal file.
391
392     >>> is_normal_file(__file__)
393     True
394     """
395     return os.path.isfile(filename)
396
397
398 def is_directory(filename: str) -> bool:
399     """Returns True if filename is a directory.
400
401     >>> is_directory('/tmp')
402     True
403     """
404     return os.path.isdir(filename)
405
406
407 def is_symlink(filename: str) -> bool:
408     """True if filename is a symlink, False otherwise.
409
410     >>> is_symlink('/tmp')
411     False
412
413     >>> is_symlink('/home')
414     True
415
416     """
417     return os.path.islink(filename)
418
419
420 def is_same_file(file1: str, file2: str) -> bool:
421     """Returns True if the two files are the same inode.
422
423     >>> is_same_file('/tmp', '/tmp/../tmp')
424     True
425
426     >>> is_same_file('/tmp', '/home')
427     False
428
429     """
430     return os.path.samefile(file1, file2)
431
432
433 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
434     """Stats the file and returns an os.stat_result or None on error.
435
436     Args:
437         filename: the file whose timestamps to fetch
438
439     Returns:
440         the os.stat_result or None to indicate an error occurred
441     """
442     try:
443         return os.stat(filename)
444     except Exception as e:
445         logger.exception(e)
446         return None
447
448
449 def get_file_raw_timestamp(
450     filename: str, extractor: Callable[[os.stat_result], Optional[float]]
451 ) -> Optional[float]:
452     """Stat a file and, if successful, use extractor to fetch some
453     subset of the information in the os.stat_result.  See also
454     :meth:`get_file_raw_atime`, :meth:`get_file_raw_mtime`, and
455     :meth:`get_file_raw_ctime` which just call this with a lambda
456     extractor.
457
458     Args:
459         filename: the filename to stat
460         extractor: Callable that takes a os.stat_result and produces
461             something useful(?) with it.
462
463     Returns:
464         whatever the extractor produced or None on error.
465     """
466     tss = get_file_raw_timestamps(filename)
467     if tss is not None:
468         return extractor(tss)
469     return None
470
471
472 def get_file_raw_atime(filename: str) -> Optional[float]:
473     """Get a file's raw access time or None on error.
474
475     See also :meth:`get_file_atime_as_datetime`,
476     :meth:`get_file_atime_timedelta`,
477     and :meth:`get_file_atime_age_seconds`.
478     """
479     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
480
481
482 def get_file_raw_mtime(filename: str) -> Optional[float]:
483     """Get a file's raw modification time or None on error.
484
485     See also :meth:`get_file_mtime_as_datetime`,
486     :meth:`get_file_mtime_timedelta`,
487     and :meth:`get_file_mtime_age_seconds`.
488     """
489     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
490
491
492 def get_file_raw_ctime(filename: str) -> Optional[float]:
493     """Get a file's raw creation time or None on error.
494
495     See also :meth:`get_file_ctime_as_datetime`,
496     :meth:`get_file_ctime_timedelta`,
497     and :meth:`get_file_ctime_age_seconds`.
498     """
499     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
500
501
502 def get_file_md5(filename: str) -> str:
503     """Hashes filename's disk contents and returns the MD5 digest.
504
505     Args:
506         filename: the file whose contents to hash
507
508     Returns:
509         the MD5 digest of the file's contents.  Raises on errors.
510     """
511     file_hash = hashlib.md5()
512     with open(filename, "rb") as f:
513         chunk = f.read(8192)
514         while chunk:
515             file_hash.update(chunk)
516             chunk = f.read(8192)
517     return file_hash.hexdigest()
518
519
520 def set_file_raw_atime(filename: str, atime: float):
521     """Sets a file's raw access time.
522
523     See also :meth:`get_file_atime_as_datetime`,
524     :meth:`get_file_atime_timedelta`,
525     :meth:`get_file_atime_age_seconds`,
526     and :meth:`get_file_raw_atime`.
527     """
528     mtime = get_file_raw_mtime(filename)
529     assert mtime is not None
530     os.utime(filename, (atime, mtime))
531
532
533 def set_file_raw_mtime(filename: str, mtime: float):
534     """Sets a file's raw modification time.
535
536     See also :meth:`get_file_mtime_as_datetime`,
537     :meth:`get_file_mtime_timedelta`,
538     :meth:`get_file_mtime_age_seconds`,
539     and :meth:`get_file_raw_mtime`.
540     """
541     atime = get_file_raw_atime(filename)
542     assert atime is not None
543     os.utime(filename, (atime, mtime))
544
545
546 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
547     """Sets both a file's raw modification and access times
548
549     Args:
550         filename: the file whose times to set
551         ts: the raw time to set or None to indicate time should be
552             set to the current time.
553     """
554     if ts is not None:
555         os.utime(filename, (ts, ts))
556     else:
557         os.utime(filename, None)
558
559
560 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
561     """Convert a raw file timestamp into a python datetime."""
562     ts = producer(filename)
563     if ts is not None:
564         return datetime.datetime.fromtimestamp(ts)
565     return None
566
567
568 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
569     """Fetch a file's access time as a python datetime.
570
571     See also :meth:`get_file_atime_as_datetime`,
572     :meth:`get_file_atime_timedelta`,
573     :meth:`get_file_atime_age_seconds`,
574     :meth:`describe_file_atime`,
575     and :meth:`get_file_raw_atime`.
576     """
577     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
578
579
580 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
581     """Fetches a file's modification time as a python datetime.
582
583     See also :meth:`get_file_mtime_as_datetime`,
584     :meth:`get_file_mtime_timedelta`,
585     :meth:`get_file_mtime_age_seconds`,
586     and :meth:`get_file_raw_mtime`.
587     """
588     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
589
590
591 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
592     """Fetches a file's creation time as a python datetime.
593
594     See also :meth:`get_file_ctime_as_datetime`,
595     :meth:`get_file_ctime_timedelta`,
596     :meth:`get_file_ctime_age_seconds`,
597     and :meth:`get_file_raw_ctime`.
598     """
599     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
600
601
602 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
603     """~Internal helper"""
604     now = time.time()
605     ts = get_file_raw_timestamps(filename)
606     if ts is None:
607         return None
608     result = extractor(ts)
609     return now - result
610
611
612 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
613     """Gets a file's access time as an age in seconds (ago).
614
615     See also :meth:`get_file_atime_as_datetime`,
616     :meth:`get_file_atime_timedelta`,
617     :meth:`get_file_atime_age_seconds`,
618     :meth:`describe_file_atime`,
619     and :meth:`get_file_raw_atime`.
620     """
621     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
622
623
624 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
625     """Gets a file's creation time as an age in seconds (ago).
626
627     See also :meth:`get_file_ctime_as_datetime`,
628     :meth:`get_file_ctime_timedelta`,
629     :meth:`get_file_ctime_age_seconds`,
630     and :meth:`get_file_raw_ctime`.
631     """
632     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
633
634
635 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
636     """Gets a file's modification time as seconds (ago).
637
638     See also :meth:`get_file_mtime_as_datetime`,
639     :meth:`get_file_mtime_timedelta`,
640     :meth:`get_file_mtime_age_seconds`,
641     and :meth:`get_file_raw_mtime`.
642     """
643     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
644
645
646 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
647     """~Internal helper"""
648     age = get_file_timestamp_age_seconds(filename, extractor)
649     if age is not None:
650         return datetime.timedelta(seconds=float(age))
651     return None
652
653
654 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
655     """How long ago was a file accessed as a timedelta?
656
657     See also :meth:`get_file_atime_as_datetime`,
658     :meth:`get_file_atime_timedelta`,
659     :meth:`get_file_atime_age_seconds`,
660     :meth:`describe_file_atime`,
661     and :meth:`get_file_raw_atime`.
662     """
663     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
664
665
666 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
667     """How long ago was a file created as a timedelta?
668
669     See also :meth:`get_file_ctime_as_datetime`,
670     :meth:`get_file_ctime_timedelta`,
671     :meth:`get_file_ctime_age_seconds`,
672     and :meth:`get_file_raw_ctime`.
673     """
674     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
675
676
677 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
678     """
679     Gets a file's modification time as a python timedelta.
680
681     See also :meth:`get_file_mtime_as_datetime`,
682     :meth:`get_file_mtime_timedelta`,
683     :meth:`get_file_mtime_age_seconds`,
684     and :meth:`get_file_raw_mtime`.
685     """
686     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
687
688
689 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
690     """~Internal helper"""
691     from datetime_utils import describe_duration, describe_duration_briefly
692
693     age = get_file_timestamp_age_seconds(filename, extractor)
694     if age is None:
695         return None
696     if brief:
697         return describe_duration_briefly(age)
698     else:
699         return describe_duration(age)
700
701
702 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
703     """
704     Describe how long ago a file was accessed.
705
706     See also :meth:`get_file_atime_as_datetime`,
707     :meth:`get_file_atime_timedelta`,
708     :meth:`get_file_atime_age_seconds`,
709     :meth:`describe_file_atime`,
710     and :meth:`get_file_raw_atime`.
711     """
712     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
713
714
715 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
716     """Describes a file's creation time.
717
718     See also :meth:`get_file_ctime_as_datetime`,
719     :meth:`get_file_ctime_timedelta`,
720     :meth:`get_file_ctime_age_seconds`,
721     and :meth:`get_file_raw_ctime`.
722     """
723     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
724
725
726 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
727     """
728     Describes how long ago a file was modified.
729
730     See also :meth:`get_file_mtime_as_datetime`,
731     :meth:`get_file_mtime_timedelta`,
732     :meth:`get_file_mtime_age_seconds`,
733     and :meth:`get_file_raw_mtime`.
734     """
735     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
736
737
738 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
739     """Like unix "touch" command's semantics: update the timestamp
740     of a file to the current time if the file exists.  Create the
741     file if it doesn't exist.
742
743     Args:
744         filename: the filename
745         mode: the mode to create the file with
746     """
747     pathlib.Path(filename, mode=mode).touch()
748
749
750 def expand_globs(in_filename: str):
751     """Expands shell globs (* and ? wildcards) to the matching files."""
752     for filename in glob.glob(in_filename):
753         yield filename
754
755
756 def get_files(directory: str):
757     """Returns the files in a directory as a generator."""
758     for filename in os.listdir(directory):
759         full_path = join(directory, filename)
760         if isfile(full_path) and exists(full_path):
761             yield full_path
762
763
764 def get_directories(directory: str):
765     """Returns the subdirectories in a directory as a generator."""
766     for d in os.listdir(directory):
767         full_path = join(directory, d)
768         if not isfile(full_path) and exists(full_path):
769             yield full_path
770
771
772 def get_files_recursive(directory: str):
773     """Find the files and directories under a root recursively."""
774     for filename in get_files(directory):
775         yield filename
776     for subdir in get_directories(directory):
777         for file_or_directory in get_files_recursive(subdir):
778             yield file_or_directory
779
780
781 class FileWriter(contextlib.AbstractContextManager):
782     """A helper that writes a file to a temporary location and then moves
783     it atomically to its ultimate destination on close.
784     """
785
786     def __init__(self, filename: str) -> None:
787         self.filename = filename
788         uuid = uuid4()
789         self.tempfile = f'{filename}-{uuid}.tmp'
790         self.handle: Optional[TextIO] = None
791
792     def __enter__(self) -> TextIO:
793         assert not does_path_exist(self.tempfile)
794         self.handle = open(self.tempfile, mode="w")
795         return self.handle
796
797     def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
798         if self.handle is not None:
799             self.handle.close()
800             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
801             ret = os.system(cmd)
802             if (ret >> 8) != 0:
803                 raise Exception(f'{cmd} failed, exit value {ret>>8}!')
804         return False
805
806
807 if __name__ == '__main__':
808     import doctest
809
810     doctest.testmod()