3 # © Copyright 2021-2022, Scott Gasch
5 """Utilities for working with files."""
17 from os.path import exists, isfile, join
18 from typing import Callable, List, Literal, Optional, TextIO
19 from uuid import uuid4
21 logger = logging.getLogger(__name__)
24 def remove_newlines(x: str) -> str:
25 """Trivial function to be used as a line_transformer in
26 :meth:`slurp_file` for no newlines in file contents"""
27 return x.replace('\n', '')
30 def strip_whitespace(x: str) -> str:
31 """Trivial function to be used as a line_transformer in
32 :meth:`slurp_file` for no leading / trailing whitespace in
37 def remove_hash_comments(x: str) -> str:
38 """Trivial function to be used as a line_transformer in
39 :meth:`slurp_file` for no # comments in file contents"""
40 return re.sub(r'#.*$', '', x)
46 skip_blank_lines=False,
47 line_transformers: Optional[List[Callable[[str], str]]] = None,
49 """Reads in a file's contents line-by-line to a memory buffer applying
50 each line transformation in turn.
53 filename: file to be read
54 skip_blank_lines: should reading skip blank lines?
55 line_transformers: little string->string transformations
60 if line_transformers is not None:
61 for x in line_transformers:
63 if not file_is_readable(filename):
64 raise Exception(f'{filename} can\'t be read.')
65 with open(filename) as rf:
67 for transformation in xforms:
68 line = transformation(line)
69 if skip_blank_lines and line == '':
75 def remove(path: str) -> None:
76 """Deletes a file. Raises if path refers to a directory or a file
80 path: the path of the file to delete
83 >>> filename = '/tmp/file_utils_test_file'
84 >>> os.system(f'touch {filename}')
86 >>> does_file_exist(filename)
89 >>> does_file_exist(filename)
95 def fix_multiple_slashes(path: str) -> str:
96 """Fixes multi-slashes in paths or path-like strings
99 path: the path in which to remove multiple slashes
101 >>> p = '/usr/local//etc/rc.d///file.txt'
102 >>> fix_multiple_slashes(p)
103 '/usr/local/etc/rc.d/file.txt'
105 >>> p = 'this is a test'
106 >>> fix_multiple_slashes(p) == p
109 return re.sub(r'/+', '/', path)
112 def delete(path: str) -> None:
113 """This is a convenience for my dumb ass who can't remember os.remove
119 def without_extension(path: str) -> str:
120 """Remove one (the last) extension from a file or path.
123 path: the path from which to remove an extension
126 the path with one extension removed.
128 >>> without_extension('foobar.txt')
131 >>> without_extension('/home/scott/frapp.py')
134 >>> f = 'a.b.c.tar.gz'
136 ... f = without_extension(f)
143 >>> without_extension('foobar')
147 return os.path.splitext(path)[0]
150 def without_all_extensions(path: str) -> str:
151 """Removes all extensions from a path; handles multiple extensions
152 like foobar.tar.gz -> foobar.
155 path: the path from which to remove all extensions
158 the path with all extensions removed.
160 >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
165 path = without_extension(path)
169 def get_extension(path: str) -> str:
170 """Extract and return one (the last) extension from a file or path.
173 path: the path from which to extract an extension
176 The last extension from the file path.
178 >>> get_extension('this_is_a_test.txt')
181 >>> get_extension('/home/scott/test.py')
184 >>> get_extension('foobar')
188 return os.path.splitext(path)[1]
191 def get_all_extensions(path: str) -> List[str]:
192 """Return the extensions of a file or path in order.
195 path: the path from which to extract all extensions.
198 a list containing each extension which may be empty.
200 >>> get_all_extensions('/home/scott/foo.tar.gz.1')
201 ['.tar', '.gz', '.1']
203 >>> get_all_extensions('/home/scott/foobar')
209 ext = get_extension(path)
210 path = without_extension(path)
218 def without_path(filespec: str) -> str:
219 """Returns the base filename without any leading path.
222 filespec: path to remove leading directories from
225 filespec without leading dir components.
227 >>> without_path('/home/scott/foo.py')
230 >>> without_path('foo.py')
234 return os.path.split(filespec)[1]
237 def get_path(filespec: str) -> str:
238 """Returns just the path of the filespec by removing the filename and
242 filespec: path to remove filename / extension(s) from
245 filespec with just the leading directory components and no
246 filename or extension(s)
248 >>> get_path('/home/scott/foobar.py')
251 >>> get_path('/home/scott/test.1.2.3.gz')
254 >>> get_path('~scott/frapp.txt')
258 return os.path.split(filespec)[0]
261 def get_canonical_path(filespec: str) -> str:
262 """Returns a canonicalized absolute path.
265 filespec: the path to canonicalize
268 the canonicalized path
270 >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
271 '/usr/home/scott/foo.txt'
274 return os.path.realpath(filespec)
277 def create_path_if_not_exist(path, on_error=None) -> None:
279 Attempts to create path if it does not exist already.
283 Files are created with mode 0x0777 (i.e. world read/writeable).
286 path: the path to attempt to create
287 on_error: If True, it's invoked on error conditions. Otherwise
288 any exceptions are raised.
292 >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
293 >>> os.path.exists(path)
295 >>> create_path_if_not_exist(path)
296 >>> os.path.exists(path)
299 logger.debug("Creating path %s", path)
300 previous_umask = os.umask(0)
303 os.chmod(path, 0o777)
304 except OSError as ex:
305 if ex.errno != errno.EEXIST and not os.path.isdir(path):
306 if on_error is not None:
311 os.umask(previous_umask)
314 def does_file_exist(filename: str) -> bool:
315 """Returns True if a file exists and is a normal file.
318 filename: filename to check
321 True if filename exists and is a normal file.
323 >>> does_file_exist(__file__)
325 >>> does_file_exist('/tmp/2492043r9203r9230r9230r49230r42390r4230')
328 return os.path.exists(filename) and os.path.isfile(filename)
331 def file_is_readable(filename: str) -> bool:
332 """True if file exists, is a normal file and is readable by the
333 current process. False otherwise.
336 filename: the filename to check for read access
338 return does_file_exist(filename) and os.access(filename, os.R_OK)
341 def file_is_writable(filename: str) -> bool:
342 """True if file exists, is a normal file and is writable by the
343 current process. False otherwise.
346 filename: the file to check for write access.
348 return does_file_exist(filename) and os.access(filename, os.W_OK)
351 def file_is_executable(filename: str) -> bool:
352 """True if file exists, is a normal file and is executable by the
353 current process. False otherwise.
356 filename: the file to check for execute access.
358 return does_file_exist(filename) and os.access(filename, os.X_OK)
361 def does_directory_exist(dirname: str) -> bool:
362 """Returns True if a file exists and is a directory.
364 >>> does_directory_exist('/tmp')
366 >>> does_directory_exist('/xyzq/21341')
369 return os.path.exists(dirname) and os.path.isdir(dirname)
372 def does_path_exist(pathname: str) -> bool:
373 """Just a more verbose wrapper around os.path.exists."""
374 return os.path.exists(pathname)
377 def get_file_size(filename: str) -> int:
378 """Returns the size of a file in bytes.
381 filename: the filename to size
384 size of filename in bytes
386 return os.path.getsize(filename)
389 def is_normal_file(filename: str) -> bool:
390 """Returns True if filename is a normal file.
392 >>> is_normal_file(__file__)
395 return os.path.isfile(filename)
398 def is_directory(filename: str) -> bool:
399 """Returns True if filename is a directory.
401 >>> is_directory('/tmp')
404 return os.path.isdir(filename)
407 def is_symlink(filename: str) -> bool:
408 """True if filename is a symlink, False otherwise.
410 >>> is_symlink('/tmp')
413 >>> is_symlink('/home')
417 return os.path.islink(filename)
420 def is_same_file(file1: str, file2: str) -> bool:
421 """Returns True if the two files are the same inode.
423 >>> is_same_file('/tmp', '/tmp/../tmp')
426 >>> is_same_file('/tmp', '/home')
430 return os.path.samefile(file1, file2)
433 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
434 """Stats the file and returns an os.stat_result or None on error.
437 filename: the file whose timestamps to fetch
440 the os.stat_result or None to indicate an error occurred
443 return os.stat(filename)
444 except Exception as e:
449 def get_file_raw_timestamp(
450 filename: str, extractor: Callable[[os.stat_result], Optional[float]]
451 ) -> Optional[float]:
452 """Stat a file and, if successful, use extractor to fetch some
453 subset of the information in the os.stat_result. See also
454 :meth:`get_file_raw_atime`, :meth:`get_file_raw_mtime`, and
455 :meth:`get_file_raw_ctime` which just call this with a lambda
459 filename: the filename to stat
460 extractor: Callable that takes a os.stat_result and produces
461 something useful(?) with it.
464 whatever the extractor produced or None on error.
466 tss = get_file_raw_timestamps(filename)
468 return extractor(tss)
472 def get_file_raw_atime(filename: str) -> Optional[float]:
473 """Get a file's raw access time or None on error.
475 See also :meth:`get_file_atime_as_datetime`,
476 :meth:`get_file_atime_timedelta`,
477 and :meth:`get_file_atime_age_seconds`.
479 return get_file_raw_timestamp(filename, lambda x: x.st_atime)
482 def get_file_raw_mtime(filename: str) -> Optional[float]:
483 """Get a file's raw modification time or None on error.
485 See also :meth:`get_file_mtime_as_datetime`,
486 :meth:`get_file_mtime_timedelta`,
487 and :meth:`get_file_mtime_age_seconds`.
489 return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
492 def get_file_raw_ctime(filename: str) -> Optional[float]:
493 """Get a file's raw creation time or None on error.
495 See also :meth:`get_file_ctime_as_datetime`,
496 :meth:`get_file_ctime_timedelta`,
497 and :meth:`get_file_ctime_age_seconds`.
499 return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
502 def get_file_md5(filename: str) -> str:
503 """Hashes filename's disk contents and returns the MD5 digest.
506 filename: the file whose contents to hash
509 the MD5 digest of the file's contents. Raises on errors.
511 file_hash = hashlib.md5()
512 with open(filename, "rb") as f:
515 file_hash.update(chunk)
517 return file_hash.hexdigest()
520 def set_file_raw_atime(filename: str, atime: float):
521 """Sets a file's raw access time.
523 See also :meth:`get_file_atime_as_datetime`,
524 :meth:`get_file_atime_timedelta`,
525 :meth:`get_file_atime_age_seconds`,
526 and :meth:`get_file_raw_atime`.
528 mtime = get_file_raw_mtime(filename)
529 assert mtime is not None
530 os.utime(filename, (atime, mtime))
533 def set_file_raw_mtime(filename: str, mtime: float):
534 """Sets a file's raw modification time.
536 See also :meth:`get_file_mtime_as_datetime`,
537 :meth:`get_file_mtime_timedelta`,
538 :meth:`get_file_mtime_age_seconds`,
539 and :meth:`get_file_raw_mtime`.
541 atime = get_file_raw_atime(filename)
542 assert atime is not None
543 os.utime(filename, (atime, mtime))
546 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
547 """Sets both a file's raw modification and access times
550 filename: the file whose times to set
551 ts: the raw time to set or None to indicate time should be
552 set to the current time.
555 os.utime(filename, (ts, ts))
557 os.utime(filename, None)
560 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
561 """Convert a raw file timestamp into a python datetime."""
562 ts = producer(filename)
564 return datetime.datetime.fromtimestamp(ts)
568 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
569 """Fetch a file's access time as a python datetime.
571 See also :meth:`get_file_atime_as_datetime`,
572 :meth:`get_file_atime_timedelta`,
573 :meth:`get_file_atime_age_seconds`,
574 :meth:`describe_file_atime`,
575 and :meth:`get_file_raw_atime`.
577 return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
580 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
581 """Fetches a file's modification time as a python datetime.
583 See also :meth:`get_file_mtime_as_datetime`,
584 :meth:`get_file_mtime_timedelta`,
585 :meth:`get_file_mtime_age_seconds`,
586 and :meth:`get_file_raw_mtime`.
588 return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
591 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
592 """Fetches a file's creation time as a python datetime.
594 See also :meth:`get_file_ctime_as_datetime`,
595 :meth:`get_file_ctime_timedelta`,
596 :meth:`get_file_ctime_age_seconds`,
597 and :meth:`get_file_raw_ctime`.
599 return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
602 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
603 """~Internal helper"""
605 ts = get_file_raw_timestamps(filename)
608 result = extractor(ts)
612 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
613 """Gets a file's access time as an age in seconds (ago).
615 See also :meth:`get_file_atime_as_datetime`,
616 :meth:`get_file_atime_timedelta`,
617 :meth:`get_file_atime_age_seconds`,
618 :meth:`describe_file_atime`,
619 and :meth:`get_file_raw_atime`.
621 return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
624 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
625 """Gets a file's creation time as an age in seconds (ago).
627 See also :meth:`get_file_ctime_as_datetime`,
628 :meth:`get_file_ctime_timedelta`,
629 :meth:`get_file_ctime_age_seconds`,
630 and :meth:`get_file_raw_ctime`.
632 return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
635 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
636 """Gets a file's modification time as seconds (ago).
638 See also :meth:`get_file_mtime_as_datetime`,
639 :meth:`get_file_mtime_timedelta`,
640 :meth:`get_file_mtime_age_seconds`,
641 and :meth:`get_file_raw_mtime`.
643 return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
646 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
647 """~Internal helper"""
648 age = get_file_timestamp_age_seconds(filename, extractor)
650 return datetime.timedelta(seconds=float(age))
654 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
655 """How long ago was a file accessed as a timedelta?
657 See also :meth:`get_file_atime_as_datetime`,
658 :meth:`get_file_atime_timedelta`,
659 :meth:`get_file_atime_age_seconds`,
660 :meth:`describe_file_atime`,
661 and :meth:`get_file_raw_atime`.
663 return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
666 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
667 """How long ago was a file created as a timedelta?
669 See also :meth:`get_file_ctime_as_datetime`,
670 :meth:`get_file_ctime_timedelta`,
671 :meth:`get_file_ctime_age_seconds`,
672 and :meth:`get_file_raw_ctime`.
674 return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
677 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
679 Gets a file's modification time as a python timedelta.
681 See also :meth:`get_file_mtime_as_datetime`,
682 :meth:`get_file_mtime_timedelta`,
683 :meth:`get_file_mtime_age_seconds`,
684 and :meth:`get_file_raw_mtime`.
686 return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
689 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
690 """~Internal helper"""
691 from datetime_utils import describe_duration, describe_duration_briefly
693 age = get_file_timestamp_age_seconds(filename, extractor)
697 return describe_duration_briefly(age)
699 return describe_duration(age)
702 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
704 Describe how long ago a file was accessed.
706 See also :meth:`get_file_atime_as_datetime`,
707 :meth:`get_file_atime_timedelta`,
708 :meth:`get_file_atime_age_seconds`,
709 :meth:`describe_file_atime`,
710 and :meth:`get_file_raw_atime`.
712 return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
715 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
716 """Describes a file's creation time.
718 See also :meth:`get_file_ctime_as_datetime`,
719 :meth:`get_file_ctime_timedelta`,
720 :meth:`get_file_ctime_age_seconds`,
721 and :meth:`get_file_raw_ctime`.
723 return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
726 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
728 Describes how long ago a file was modified.
730 See also :meth:`get_file_mtime_as_datetime`,
731 :meth:`get_file_mtime_timedelta`,
732 :meth:`get_file_mtime_age_seconds`,
733 and :meth:`get_file_raw_mtime`.
735 return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
738 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
739 """Like unix "touch" command's semantics: update the timestamp
740 of a file to the current time if the file exists. Create the
741 file if it doesn't exist.
744 filename: the filename
745 mode: the mode to create the file with
747 pathlib.Path(filename, mode=mode).touch()
750 def expand_globs(in_filename: str):
751 """Expands shell globs (* and ? wildcards) to the matching files."""
752 for filename in glob.glob(in_filename):
756 def get_files(directory: str):
757 """Returns the files in a directory as a generator."""
758 for filename in os.listdir(directory):
759 full_path = join(directory, filename)
760 if isfile(full_path) and exists(full_path):
764 def get_directories(directory: str):
765 """Returns the subdirectories in a directory as a generator."""
766 for d in os.listdir(directory):
767 full_path = join(directory, d)
768 if not isfile(full_path) and exists(full_path):
772 def get_files_recursive(directory: str):
773 """Find the files and directories under a root recursively."""
774 for filename in get_files(directory):
776 for subdir in get_directories(directory):
777 for file_or_directory in get_files_recursive(subdir):
778 yield file_or_directory
781 class FileWriter(contextlib.AbstractContextManager):
782 """A helper that writes a file to a temporary location and then moves
783 it atomically to its ultimate destination on close.
786 def __init__(self, filename: str) -> None:
787 self.filename = filename
789 self.tempfile = f'{filename}-{uuid}.tmp'
790 self.handle: Optional[TextIO] = None
792 def __enter__(self) -> TextIO:
793 assert not does_path_exist(self.tempfile)
794 self.handle = open(self.tempfile, mode="w")
797 def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
798 if self.handle is not None:
800 cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
803 raise Exception(f'{cmd} failed, exit value {ret>>8}!')
807 if __name__ == '__main__':