#!/usr/bin/env python3 """Utilities for working with files.""" import datetime import errno import hashlib import logging import os import io import pathlib import time from typing import Optional import glob from os.path import isfile, join, exists from uuid import uuid4 logger = logging.getLogger(__name__) # os.remove(file) you fuckwit. # os.path.basename too. def create_path_if_not_exist(path, on_error=None): """ Attempts to create path if it does not exist. If on_error is specified, it is called with an exception if one occurs, otherwise exception is rethrown. >>> import uuid >>> import os >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4())) >>> os.path.exists(path) False >>> create_path_if_not_exist(path) >>> os.path.exists(path) True """ logger.debug(f"Creating path {path}") previous_umask = os.umask(0) try: os.makedirs(path) os.chmod(path, 0o777) except OSError as ex: if ex.errno != errno.EEXIST and not os.path.isdir(path): if on_error is not None: on_error(path, ex) else: raise finally: os.umask(previous_umask) def does_file_exist(filename: str) -> bool: """Returns True if a file exists and is a normal file. >>> does_file_exist(__file__) True """ return os.path.exists(filename) and os.path.isfile(filename) def does_directory_exist(dirname: str) -> bool: """Returns True if a file exists and is a directory. >>> does_directory_exist('/tmp') True """ return os.path.exists(dirname) and os.path.isdir(dirname) def does_path_exist(pathname: str) -> bool: """Just a more verbose wrapper around os.path.exists.""" return os.path.exists(pathname) def get_file_size(filename: str) -> int: """Returns the size of a file in bytes.""" return os.path.getsize(filename) def is_normal_file(filename: str) -> bool: """Returns True if filename is a normal file. >>> is_normal_file(__file__) True """ return os.path.isfile(filename) def is_directory(filename: str) -> bool: """Returns True if filename is a directory. >>> is_directory('/tmp') True """ return os.path.isdir(filename) def is_symlink(filename: str) -> bool: return os.path.islink(filename) def is_same_file(file1: str, file2: str) -> bool: return os.path.samefile(file1, file2) def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]: try: return os.stat(filename) except Exception as e: logger.exception(e) return None def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]: tss = get_file_raw_timestamps(filename) if tss is not None: return extractor(tss) return None def get_file_raw_atime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_atime) def get_file_raw_mtime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_mtime) def get_file_raw_ctime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_ctime) def get_file_md5(filename: str) -> str: file_hash = hashlib.md5() with open(filename, "rb") as f: chunk = f.read(8192) while chunk: file_hash.update(chunk) chunk = f.read(8192) return file_hash.hexdigest() def set_file_raw_atime(filename: str, atime: float): mtime = get_file_raw_mtime(filename) os.utime(filename, (atime, mtime)) def set_file_raw_mtime(filename: str, mtime: float): atime = get_file_raw_atime(filename) os.utime(filename, (atime, mtime)) def set_file_raw_atime_and_mtime(filename: str, ts: float = None): if ts is not None: os.utime(filename, (ts, ts)) else: os.utime(filename, None) def convert_file_timestamp_to_datetime( filename: str, producer ) -> Optional[datetime.datetime]: ts = producer(filename) if ts is not None: return datetime.datetime.fromtimestamp(ts) return None def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_atime) def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime) def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime) def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]: now = time.time() ts = get_file_raw_timestamps(filename) if ts is None: return None result = extractor(ts) return now - result def get_file_atime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime) def get_file_ctime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime) def get_file_mtime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime) def get_file_timestamp_timedelta( filename: str, extractor ) -> Optional[datetime.timedelta]: age = get_file_timestamp_age_seconds(filename, extractor) if age is not None: return datetime.timedelta(seconds=float(age)) return None def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_atime) def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime) def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime) def describe_file_timestamp( filename: str, extractor, *, brief=False ) -> Optional[str]: from datetime_utils import describe_duration, describe_duration_briefly age = get_file_timestamp_age_seconds(filename, extractor) if age is None: return None if brief: return describe_duration_briefly(age) else: return describe_duration(age) def describe_file_atime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief) def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief) def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief) def touch_file(filename: str) -> bool: return pathlib.Path(filename).touch() def expand_globs(in_filename: str): for filename in glob.glob(in_filename): yield filename def get_files(directory: str): for filename in os.listdir(directory): full_path = join(directory, filename) if isfile(full_path) and exists(full_path): yield full_path def get_directories(directory: str): for d in os.listdir(directory): full_path = join(directory, d) if not isfile(full_path) and exists(full_path): yield full_path def get_files_recursive(directory: str): for filename in get_files(directory): yield filename for subdir in get_directories(directory): for file_or_directory in get_files_recursive(subdir): yield file_or_directory class FileWriter(object): def __init__(self, filename: str) -> None: self.filename = filename uuid = uuid4() self.tempfile = f'{filename}-{uuid}.tmp' self.handle = None def __enter__(self) -> io.TextIOWrapper: assert not does_path_exist(self.tempfile) self.handle = open(self.tempfile, mode="w") return self.handle def __exit__(self, exc_type, exc_val, exc_tb) -> bool: if self.handle is not None: self.handle.close() cmd = f'/bin/mv -f {self.tempfile} {self.filename}' ret = os.system(cmd) if (ret >> 8) != 0: raise Exception(f'{cmd} failed, exit value {ret>>8}') return None if __name__ == '__main__': import doctest doctest.testmod()