3 # © Copyright 2021-2022, Scott Gasch
5 """Utilities for working with files."""
18 from os.path import exists, isfile, join
19 from typing import Callable, List, Literal, Optional, TextIO
20 from uuid import uuid4
22 logger = logging.getLogger(__name__)
25 def remove_newlines(x: str) -> str:
26 """Trivial function to be used as a line_transformer in
27 :meth:`slurp_file` for no newlines in file contents"""
28 return x.replace('\n', '')
31 def strip_whitespace(x: str) -> str:
32 """Trivial function to be used as a line_transformer in
33 :meth:`slurp_file` for no leading / trailing whitespace in
38 def remove_hash_comments(x: str) -> str:
39 """Trivial function to be used as a line_transformer in
40 :meth:`slurp_file` for no # comments in file contents"""
41 return re.sub(r'#.*$', '', x)
47 skip_blank_lines=False,
48 line_transformers: Optional[List[Callable[[str], str]]] = None,
50 """Reads in a file's contents line-by-line to a memory buffer applying
51 each line transformation in turn.
54 filename: file to be read
55 skip_blank_lines: should reading skip blank lines?
56 line_transformers: little string->string transformations
61 if line_transformers is not None:
62 for x in line_transformers:
64 if not file_is_readable(filename):
65 raise Exception(f'{filename} can\'t be read.')
66 with open(filename) as rf:
68 for transformation in xforms:
69 line = transformation(line)
70 if skip_blank_lines and line == '':
76 def remove(path: str) -> None:
77 """Deletes a file. Raises if path refers to a directory or a file
81 path: the path of the file to delete
84 >>> filename = '/tmp/file_utils_test_file'
85 >>> os.system(f'touch {filename}')
87 >>> does_file_exist(filename)
90 >>> does_file_exist(filename)
96 def fix_multiple_slashes(path: str) -> str:
97 """Fixes multi-slashes in paths or path-like strings
100 path: the path in which to remove multiple slashes
102 >>> p = '/usr/local//etc/rc.d///file.txt'
103 >>> fix_multiple_slashes(p)
104 '/usr/local/etc/rc.d/file.txt'
106 >>> p = 'this is a test'
107 >>> fix_multiple_slashes(p) == p
110 return re.sub(r'/+', '/', path)
113 def delete(path: str) -> None:
114 """This is a convenience for my dumb ass who can't remember os.remove
120 def without_extension(path: str) -> str:
121 """Remove one (the last) extension from a file or path.
124 path: the path from which to remove an extension
127 the path with one extension removed.
129 >>> without_extension('foobar.txt')
132 >>> without_extension('/home/scott/frapp.py')
135 >>> f = 'a.b.c.tar.gz'
137 ... f = without_extension(f)
144 >>> without_extension('foobar')
148 return os.path.splitext(path)[0]
151 def without_all_extensions(path: str) -> str:
152 """Removes all extensions from a path; handles multiple extensions
153 like foobar.tar.gz -> foobar.
156 path: the path from which to remove all extensions
159 the path with all extensions removed.
161 >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
166 path = without_extension(path)
170 def get_extension(path: str) -> str:
171 """Extract and return one (the last) extension from a file or path.
174 path: the path from which to extract an extension
177 The last extension from the file path.
179 >>> get_extension('this_is_a_test.txt')
182 >>> get_extension('/home/scott/test.py')
185 >>> get_extension('foobar')
189 return os.path.splitext(path)[1]
192 def get_all_extensions(path: str) -> List[str]:
193 """Return the extensions of a file or path in order.
196 path: the path from which to extract all extensions.
199 a list containing each extension which may be empty.
201 >>> get_all_extensions('/home/scott/foo.tar.gz.1')
202 ['.tar', '.gz', '.1']
204 >>> get_all_extensions('/home/scott/foobar')
210 ext = get_extension(path)
211 path = without_extension(path)
219 def without_path(filespec: str) -> str:
220 """Returns the base filename without any leading path.
223 filespec: path to remove leading directories from
226 filespec without leading dir components.
228 >>> without_path('/home/scott/foo.py')
231 >>> without_path('foo.py')
235 return os.path.split(filespec)[1]
238 def get_path(filespec: str) -> str:
239 """Returns just the path of the filespec by removing the filename and
243 filespec: path to remove filename / extension(s) from
246 filespec with just the leading directory components and no
247 filename or extension(s)
249 >>> get_path('/home/scott/foobar.py')
252 >>> get_path('/home/scott/test.1.2.3.gz')
255 >>> get_path('~scott/frapp.txt')
259 return os.path.split(filespec)[0]
262 def get_canonical_path(filespec: str) -> str:
263 """Returns a canonicalized absolute path.
266 filespec: the path to canonicalize
269 the canonicalized path
271 >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
272 '/usr/home/scott/foo.txt'
275 return os.path.realpath(filespec)
278 def create_path_if_not_exist(path, on_error=None) -> None:
280 Attempts to create path if it does not exist already.
284 Files are created with mode 0x0777 (i.e. world read/writeable).
287 path: the path to attempt to create
288 on_error: If True, it's invoked on error conditions. Otherwise
289 any exceptions are raised.
293 >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
294 >>> os.path.exists(path)
296 >>> create_path_if_not_exist(path)
297 >>> os.path.exists(path)
300 logger.debug("Creating path %s", path)
301 previous_umask = os.umask(0)
304 os.chmod(path, 0o777)
305 except OSError as ex:
306 if ex.errno != errno.EEXIST and not os.path.isdir(path):
307 if on_error is not None:
312 os.umask(previous_umask)
315 def does_file_exist(filename: str) -> bool:
316 """Returns True if a file exists and is a normal file.
319 filename: filename to check
322 True if filename exists and is a normal file.
324 >>> does_file_exist(__file__)
326 >>> does_file_exist('/tmp/2492043r9203r9230r9230r49230r42390r4230')
329 return os.path.exists(filename) and os.path.isfile(filename)
332 def file_is_readable(filename: str) -> bool:
333 """True if file exists, is a normal file and is readable by the
334 current process. False otherwise.
337 filename: the filename to check for read access
339 return does_file_exist(filename) and os.access(filename, os.R_OK)
342 def file_is_writable(filename: str) -> bool:
343 """True if file exists, is a normal file and is writable by the
344 current process. False otherwise.
347 filename: the file to check for write access.
349 return does_file_exist(filename) and os.access(filename, os.W_OK)
352 def file_is_executable(filename: str) -> bool:
353 """True if file exists, is a normal file and is executable by the
354 current process. False otherwise.
357 filename: the file to check for execute access.
359 return does_file_exist(filename) and os.access(filename, os.X_OK)
362 def does_directory_exist(dirname: str) -> bool:
363 """Returns True if a file exists and is a directory.
365 >>> does_directory_exist('/tmp')
367 >>> does_directory_exist('/xyzq/21341')
370 return os.path.exists(dirname) and os.path.isdir(dirname)
373 def does_path_exist(pathname: str) -> bool:
374 """Just a more verbose wrapper around os.path.exists."""
375 return os.path.exists(pathname)
378 def get_file_size(filename: str) -> int:
379 """Returns the size of a file in bytes.
382 filename: the filename to size
385 size of filename in bytes
387 return os.path.getsize(filename)
390 def is_normal_file(filename: str) -> bool:
391 """Returns True if filename is a normal file.
393 >>> is_normal_file(__file__)
396 return os.path.isfile(filename)
399 def is_directory(filename: str) -> bool:
400 """Returns True if filename is a directory.
402 >>> is_directory('/tmp')
405 return os.path.isdir(filename)
408 def is_symlink(filename: str) -> bool:
409 """True if filename is a symlink, False otherwise.
411 >>> is_symlink('/tmp')
414 >>> is_symlink('/home')
418 return os.path.islink(filename)
421 def is_same_file(file1: str, file2: str) -> bool:
422 """Returns True if the two files are the same inode.
424 >>> is_same_file('/tmp', '/tmp/../tmp')
427 >>> is_same_file('/tmp', '/home')
431 return os.path.samefile(file1, file2)
434 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
435 """Stats the file and returns an os.stat_result or None on error.
438 filename: the file whose timestamps to fetch
441 the os.stat_result or None to indicate an error occurred
444 return os.stat(filename)
445 except Exception as e:
450 def get_file_raw_timestamp(
451 filename: str, extractor: Callable[[os.stat_result], Optional[float]]
452 ) -> Optional[float]:
453 """Stat a file and, if successful, use extractor to fetch some
454 subset of the information in the os.stat_result. See also
455 :meth:`get_file_raw_atime`, :meth:`get_file_raw_mtime`, and
456 :meth:`get_file_raw_ctime` which just call this with a lambda
460 filename: the filename to stat
461 extractor: Callable that takes a os.stat_result and produces
462 something useful(?) with it.
465 whatever the extractor produced or None on error.
467 tss = get_file_raw_timestamps(filename)
469 return extractor(tss)
473 def get_file_raw_atime(filename: str) -> Optional[float]:
474 """Get a file's raw access time or None on error.
476 See also :meth:`get_file_atime_as_datetime`,
477 :meth:`get_file_atime_timedelta`,
478 and :meth:`get_file_atime_age_seconds`.
480 return get_file_raw_timestamp(filename, lambda x: x.st_atime)
483 def get_file_raw_mtime(filename: str) -> Optional[float]:
484 """Get a file's raw modification time or None on error.
486 See also :meth:`get_file_mtime_as_datetime`,
487 :meth:`get_file_mtime_timedelta`,
488 and :meth:`get_file_mtime_age_seconds`.
490 return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
493 def get_file_raw_ctime(filename: str) -> Optional[float]:
494 """Get a file's raw creation time or None on error.
496 See also :meth:`get_file_ctime_as_datetime`,
497 :meth:`get_file_ctime_timedelta`,
498 and :meth:`get_file_ctime_age_seconds`.
500 return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
503 def get_file_md5(filename: str) -> str:
504 """Hashes filename's disk contents and returns the MD5 digest.
507 filename: the file whose contents to hash
510 the MD5 digest of the file's contents. Raises on errors.
512 file_hash = hashlib.md5()
513 with open(filename, "rb") as f:
516 file_hash.update(chunk)
518 return file_hash.hexdigest()
521 def set_file_raw_atime(filename: str, atime: float):
522 """Sets a file's raw access time.
524 See also :meth:`get_file_atime_as_datetime`,
525 :meth:`get_file_atime_timedelta`,
526 :meth:`get_file_atime_age_seconds`,
527 and :meth:`get_file_raw_atime`.
529 mtime = get_file_raw_mtime(filename)
530 assert mtime is not None
531 os.utime(filename, (atime, mtime))
534 def set_file_raw_mtime(filename: str, mtime: float):
535 """Sets a file's raw modification time.
537 See also :meth:`get_file_mtime_as_datetime`,
538 :meth:`get_file_mtime_timedelta`,
539 :meth:`get_file_mtime_age_seconds`,
540 and :meth:`get_file_raw_mtime`.
542 atime = get_file_raw_atime(filename)
543 assert atime is not None
544 os.utime(filename, (atime, mtime))
547 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
548 """Sets both a file's raw modification and access times
551 filename: the file whose times to set
552 ts: the raw time to set or None to indicate time should be
553 set to the current time.
556 os.utime(filename, (ts, ts))
558 os.utime(filename, None)
561 def convert_file_timestamp_to_datetime(
562 filename: str, producer
563 ) -> Optional[datetime.datetime]:
564 """Convert a raw file timestamp into a python datetime."""
565 ts = producer(filename)
567 return datetime.datetime.fromtimestamp(ts)
571 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
572 """Fetch a file's access time as a python datetime.
574 See also :meth:`get_file_atime_as_datetime`,
575 :meth:`get_file_atime_timedelta`,
576 :meth:`get_file_atime_age_seconds`,
577 :meth:`describe_file_atime`,
578 and :meth:`get_file_raw_atime`.
580 return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
583 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
584 """Fetches a file's modification time as a python datetime.
586 See also :meth:`get_file_mtime_as_datetime`,
587 :meth:`get_file_mtime_timedelta`,
588 :meth:`get_file_mtime_age_seconds`,
589 and :meth:`get_file_raw_mtime`.
591 return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
594 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
595 """Fetches a file's creation time as a python datetime.
597 See also :meth:`get_file_ctime_as_datetime`,
598 :meth:`get_file_ctime_timedelta`,
599 :meth:`get_file_ctime_age_seconds`,
600 and :meth:`get_file_raw_ctime`.
602 return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
605 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
606 """~Internal helper"""
608 ts = get_file_raw_timestamps(filename)
611 result = extractor(ts)
615 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
616 """Gets a file's access time as an age in seconds (ago).
618 See also :meth:`get_file_atime_as_datetime`,
619 :meth:`get_file_atime_timedelta`,
620 :meth:`get_file_atime_age_seconds`,
621 :meth:`describe_file_atime`,
622 and :meth:`get_file_raw_atime`.
624 return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
627 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
628 """Gets a file's creation time as an age in seconds (ago).
630 See also :meth:`get_file_ctime_as_datetime`,
631 :meth:`get_file_ctime_timedelta`,
632 :meth:`get_file_ctime_age_seconds`,
633 and :meth:`get_file_raw_ctime`.
635 return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
638 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
639 """Gets a file's modification time as seconds (ago).
641 See also :meth:`get_file_mtime_as_datetime`,
642 :meth:`get_file_mtime_timedelta`,
643 :meth:`get_file_mtime_age_seconds`,
644 and :meth:`get_file_raw_mtime`.
646 return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
649 def get_file_timestamp_timedelta(
650 filename: str, extractor
651 ) -> Optional[datetime.timedelta]:
652 """~Internal helper"""
653 age = get_file_timestamp_age_seconds(filename, extractor)
655 return datetime.timedelta(seconds=float(age))
659 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
660 """How long ago was a file accessed as a timedelta?
662 See also :meth:`get_file_atime_as_datetime`,
663 :meth:`get_file_atime_timedelta`,
664 :meth:`get_file_atime_age_seconds`,
665 :meth:`describe_file_atime`,
666 and :meth:`get_file_raw_atime`.
668 return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
671 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
672 """How long ago was a file created as a timedelta?
674 See also :meth:`get_file_ctime_as_datetime`,
675 :meth:`get_file_ctime_timedelta`,
676 :meth:`get_file_ctime_age_seconds`,
677 and :meth:`get_file_raw_ctime`.
679 return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
682 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
684 Gets a file's modification time as a python timedelta.
686 See also :meth:`get_file_mtime_as_datetime`,
687 :meth:`get_file_mtime_timedelta`,
688 :meth:`get_file_mtime_age_seconds`,
689 and :meth:`get_file_raw_mtime`.
691 return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
694 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
695 """~Internal helper"""
696 from pyutils.datetimez.datetime_utils import (
698 describe_duration_briefly,
701 age = get_file_timestamp_age_seconds(filename, extractor)
705 return describe_duration_briefly(age)
707 return describe_duration(age)
710 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
712 Describe how long ago a file was accessed.
714 See also :meth:`get_file_atime_as_datetime`,
715 :meth:`get_file_atime_timedelta`,
716 :meth:`get_file_atime_age_seconds`,
717 :meth:`describe_file_atime`,
718 and :meth:`get_file_raw_atime`.
720 return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
723 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
724 """Describes a file's creation time.
726 See also :meth:`get_file_ctime_as_datetime`,
727 :meth:`get_file_ctime_timedelta`,
728 :meth:`get_file_ctime_age_seconds`,
729 and :meth:`get_file_raw_ctime`.
731 return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
734 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
736 Describes how long ago a file was modified.
738 See also :meth:`get_file_mtime_as_datetime`,
739 :meth:`get_file_mtime_timedelta`,
740 :meth:`get_file_mtime_age_seconds`,
741 and :meth:`get_file_raw_mtime`.
743 return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
746 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
747 """Like unix "touch" command's semantics: update the timestamp
748 of a file to the current time if the file exists. Create the
749 file if it doesn't exist.
752 filename: the filename
753 mode: the mode to create the file with
755 pathlib.Path(filename, mode=mode).touch()
758 def expand_globs(in_filename: str):
759 """Expands shell globs (* and ? wildcards) to the matching files."""
760 for filename in glob.glob(in_filename):
764 def get_files(directory: str):
765 """Returns the files in a directory as a generator."""
766 for filename in os.listdir(directory):
767 full_path = join(directory, filename)
768 if isfile(full_path) and exists(full_path):
772 def get_matching_files(directory: str, glob: str):
773 """Returns the subset of files whose name matches a glob."""
774 for filename in get_files(directory):
775 if fnmatch.fnmatch(filename, glob):
779 def get_directories(directory: str):
780 """Returns the subdirectories in a directory as a generator."""
781 for d in os.listdir(directory):
782 full_path = join(directory, d)
783 if not isfile(full_path) and exists(full_path):
787 def get_files_recursive(directory: str):
788 """Find the files and directories under a root recursively."""
789 for filename in get_files(directory):
791 for subdir in get_directories(directory):
792 for file_or_directory in get_files_recursive(subdir):
793 yield file_or_directory
796 def get_matching_files_recursive(directory: str, glob: str):
797 """Returns the subset of files whose name matches a glob under a root recursively."""
798 for filename in get_files_recursive(directory):
799 if fnmatch.fnmatch(filename, glob):
803 class FileWriter(contextlib.AbstractContextManager):
804 """A helper that writes a file to a temporary location and then moves
805 it atomically to its ultimate destination on close.
808 def __init__(self, filename: str) -> None:
809 self.filename = filename
811 self.tempfile = f'{filename}-{uuid}.tmp'
812 self.handle: Optional[TextIO] = None
814 def __enter__(self) -> TextIO:
815 assert not does_path_exist(self.tempfile)
816 self.handle = open(self.tempfile, mode="w")
819 def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
820 if self.handle is not None:
822 cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
825 raise Exception(f'{cmd} failed, exit value {ret>>8}!')
829 if __name__ == '__main__':