#!/usr/bin/env python3 """Utilities for working with files.""" import contextlib import datetime import errno import glob import hashlib import logging import os import pathlib import re import time from os.path import exists, isfile, join from typing import Callable, List, Literal, Optional, TextIO from uuid import uuid4 logger = logging.getLogger(__name__) def remove_newlines(x: str) -> str: return x.replace('\n', '') def strip_whitespace(x: str) -> str: return x.strip() def remove_hash_comments(x: str) -> str: return re.sub(r'#.*$', '', x) def slurp_file( filename: str, *, skip_blank_lines=False, line_transformers: Optional[List[Callable[[str], str]]] = None, ): ret = [] if not file_is_readable(filename): raise Exception(f'{filename} can\'t be read.') with open(filename) as rf: for line in rf: if line_transformers is not None: for transformation in line_transformers: line = transformation(line) if skip_blank_lines and line == '': continue ret.append(line) return ret def remove(path: str) -> None: """Deletes a file. Raises if path refers to a directory or a file that doesn't exist. >>> import os >>> filename = '/tmp/file_utils_test_file' >>> os.system(f'touch {filename}') 0 >>> does_file_exist(filename) True >>> remove(filename) >>> does_file_exist(filename) False """ os.remove(path) def delete(path: str) -> None: os.remove(path) def without_extension(path: str) -> str: """Remove one extension from a file or path. >>> without_extension('foobar.txt') 'foobar' >>> without_extension('/home/scott/frapp.py') '/home/scott/frapp' >>> without_extension('a.b.c.tar.gz') 'a.b.c.tar' >>> without_extension('foobar') 'foobar' """ return os.path.splitext(path)[0] def without_all_extensions(path: str) -> str: """Removes all extensions from a path; handles multiple extensions like foobar.tar.gz -> foobar. >>> without_all_extensions('/home/scott/foobar.1.tar.gz') '/home/scott/foobar' """ while '.' in path: path = without_extension(path) return path def get_extension(path: str) -> str: """Extract and return one extension from a file or path. >>> get_extension('this_is_a_test.txt') '.txt' >>> get_extension('/home/scott/test.py') '.py' >>> get_extension('foobar') '' """ return os.path.splitext(path)[1] def get_all_extensions(path: str) -> List[str]: """Return the extensions of a file or path in order. >>> get_all_extensions('/home/scott/foo.tar.gz.1') ['.tar', '.gz', '.1'] """ ret = [] while True: ext = get_extension(path) path = without_extension(path) if ext: ret.append(ext) else: ret.reverse() return ret def without_path(filespec: str) -> str: """Returns the base filename without any leading path. >>> without_path('/home/scott/foo.py') 'foo.py' >>> without_path('foo.py') 'foo.py' """ return os.path.split(filespec)[1] def get_path(filespec: str) -> str: """Returns just the path of the filespec by removing the filename and extension. >>> get_path('/home/scott/foobar.py') '/home/scott' >>> get_path('~scott/frapp.txt') '~scott' """ return os.path.split(filespec)[0] def get_canonical_path(filespec: str) -> str: """Returns a canonicalized absolute path. >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt') '/usr/home/scott/foo.txt' """ return os.path.realpath(filespec) def create_path_if_not_exist(path, on_error=None): """ Attempts to create path if it does not exist. If on_error is specified, it is called with an exception if one occurs, otherwise exception is rethrown. >>> import uuid >>> import os >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4())) >>> os.path.exists(path) False >>> create_path_if_not_exist(path) >>> os.path.exists(path) True """ logger.debug("Creating path %s", path) previous_umask = os.umask(0) try: os.makedirs(path) os.chmod(path, 0o777) except OSError as ex: if ex.errno != errno.EEXIST and not os.path.isdir(path): if on_error is not None: on_error(path, ex) else: raise finally: os.umask(previous_umask) def does_file_exist(filename: str) -> bool: """Returns True if a file exists and is a normal file. >>> does_file_exist(__file__) True """ return os.path.exists(filename) and os.path.isfile(filename) def file_is_readable(filename: str) -> bool: return does_file_exist(filename) and os.access(filename, os.R_OK) def file_is_writable(filename: str) -> bool: return does_file_exist(filename) and os.access(filename, os.W_OK) def file_is_executable(filename: str) -> bool: return does_file_exist(filename) and os.access(filename, os.X_OK) def does_directory_exist(dirname: str) -> bool: """Returns True if a file exists and is a directory. >>> does_directory_exist('/tmp') True """ return os.path.exists(dirname) and os.path.isdir(dirname) def does_path_exist(pathname: str) -> bool: """Just a more verbose wrapper around os.path.exists.""" return os.path.exists(pathname) def get_file_size(filename: str) -> int: """Returns the size of a file in bytes.""" return os.path.getsize(filename) def is_normal_file(filename: str) -> bool: """Returns True if filename is a normal file. >>> is_normal_file(__file__) True """ return os.path.isfile(filename) def is_directory(filename: str) -> bool: """Returns True if filename is a directory. >>> is_directory('/tmp') True """ return os.path.isdir(filename) def is_symlink(filename: str) -> bool: """True if filename is a symlink, False otherwise. >>> is_symlink('/tmp') False >>> is_symlink('/home') True """ return os.path.islink(filename) def is_same_file(file1: str, file2: str) -> bool: """Returns True if the two files are the same inode. >>> is_same_file('/tmp', '/tmp/../tmp') True >>> is_same_file('/tmp', '/home') False """ return os.path.samefile(file1, file2) def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]: """Stats the file and returns an os.stat_result or None on error.""" try: return os.stat(filename) except Exception as e: logger.exception(e) return None def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]: tss = get_file_raw_timestamps(filename) if tss is not None: return extractor(tss) return None def get_file_raw_atime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_atime) def get_file_raw_mtime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_mtime) def get_file_raw_ctime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_ctime) def get_file_md5(filename: str) -> str: """Hashes filename's contents and returns an MD5.""" file_hash = hashlib.md5() with open(filename, "rb") as f: chunk = f.read(8192) while chunk: file_hash.update(chunk) chunk = f.read(8192) return file_hash.hexdigest() def set_file_raw_atime(filename: str, atime: float): mtime = get_file_raw_mtime(filename) assert mtime is not None os.utime(filename, (atime, mtime)) def set_file_raw_mtime(filename: str, mtime: float): atime = get_file_raw_atime(filename) assert atime is not None os.utime(filename, (atime, mtime)) def set_file_raw_atime_and_mtime(filename: str, ts: float = None): if ts is not None: os.utime(filename, (ts, ts)) else: os.utime(filename, None) def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]: ts = producer(filename) if ts is not None: return datetime.datetime.fromtimestamp(ts) return None def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_atime) def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime) def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime) def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]: now = time.time() ts = get_file_raw_timestamps(filename) if ts is None: return None result = extractor(ts) return now - result def get_file_atime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime) def get_file_ctime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime) def get_file_mtime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime) def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]: age = get_file_timestamp_age_seconds(filename, extractor) if age is not None: return datetime.timedelta(seconds=float(age)) return None def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_atime) def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime) def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime) def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]: from datetime_utils import describe_duration, describe_duration_briefly age = get_file_timestamp_age_seconds(filename, extractor) if age is None: return None if brief: return describe_duration_briefly(age) else: return describe_duration(age) def describe_file_atime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief) def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief) def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief) def touch_file(filename: str, *, mode: Optional[int] = 0o666): pathlib.Path(filename, mode=mode).touch() def expand_globs(in_filename: str): for filename in glob.glob(in_filename): yield filename def get_files(directory: str): for filename in os.listdir(directory): full_path = join(directory, filename) if isfile(full_path) and exists(full_path): yield full_path def get_directories(directory: str): for d in os.listdir(directory): full_path = join(directory, d) if not isfile(full_path) and exists(full_path): yield full_path def get_files_recursive(directory: str): for filename in get_files(directory): yield filename for subdir in get_directories(directory): for file_or_directory in get_files_recursive(subdir): yield file_or_directory class FileWriter(contextlib.AbstractContextManager): """A helper that writes a file to a temporary location and then moves it atomically to its ultimate destination on close. """ def __init__(self, filename: str) -> None: self.filename = filename uuid = uuid4() self.tempfile = f'{filename}-{uuid}.tmp' self.handle: Optional[TextIO] = None def __enter__(self) -> TextIO: assert not does_path_exist(self.tempfile) self.handle = open(self.tempfile, mode="w") return self.handle def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]: if self.handle is not None: self.handle.close() cmd = f'/bin/mv -f {self.tempfile} {self.filename}' ret = os.system(cmd) if (ret >> 8) != 0: raise Exception(f'{cmd} failed, exit value {ret>>8}!') return False if __name__ == '__main__': import doctest doctest.testmod()