#!/usr/bin/env python3 """Utilities for working with files.""" import datetime import errno import logging import os import time from typing import Optional import glob from os.path import isfile, join, exists import datetime_utils logger = logging.getLogger(__name__) def create_path_if_not_exist(path, on_error=None): """ Attempts to create path if it does not exist. If on_error is specified, it is called with an exception if one occurs, otherwise exception is rethrown. >>> import uuid >>> import os >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4())) >>> os.path.exists(path) False >>> create_path_if_not_exist(path) >>> os.path.exists(path) True """ logger.debug(f"Creating path {path}") previous_umask = os.umask(0) try: os.makedirs(path) os.chmod(path, 0o777) except OSError as ex: if ex.errno != errno.EEXIST and not os.path.isdir(path): if on_error is not None: on_error(path, ex) else: raise finally: os.umask(previous_umask) def does_file_exist(filename: str) -> bool: return os.path.exists(filename) def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]: try: return os.stat(filename) except Exception as e: logger.exception(e) return None def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]: tss = get_file_raw_timestamps(filename) if tss is not None: return extractor(tss) return None def get_file_raw_atime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_atime) def get_file_raw_mtime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_mtime) def get_file_raw_ctime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_ctime) def convert_file_timestamp_to_datetime( filename: str, producer ) -> Optional[datetime.datetime]: ts = producer(filename) if ts is not None: return datetime.datetime.fromtimestamp(ts) return None def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_atime) def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime) def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]: return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime) def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]: now = time.time() ts = get_file_raw_timestamps(filename) if ts is None: return None result = extractor(ts) return now - result def get_file_atime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime) def get_file_ctime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime) def get_file_mtime_age_seconds(filename: str) -> Optional[int]: return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime) def get_file_timestamp_timedelta( filename: str, extractor ) -> Optional[datetime.timedelta]: age = get_file_timestamp_age_seconds(filename, extractor) if age is not None: return datetime.timedelta(seconds=float(age)) return None def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_atime) def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime) def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]: return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime) def describe_file_timestamp( filename: str, extractor, *, brief=False ) -> Optional[str]: age = get_file_timestamp_age_seconds(filename, extractor) if age is None: return None if brief: return datetime_utils.describe_duration_briefly(age) else: return datetime_utils.describe_duration(age) def describe_file_atime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief) def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief) def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief) def expand_globs(in_filename: str): for filename in glob.glob(in_filename): yield filename def get_files(directory: str): for filename in os.listdir(directory): full_path = join(directory, filename) if isfile(full_path) and exists(full_path): yield full_path def get_directories(directory: str): for d in os.listdir(directory): full_path = join(directory, d) if not isfile(full_path) and exists(full_path): yield full_path def get_files_recursive(directory: str): for filename in get_files(directory): yield filename for subdir in get_directories(directory): for filename in get_files_recursive(subdir): yield filename