More cleanup.
[python_utils.git] / file_utils.py
index 905e23b16df017d3cab6c6ef27ce61ee8596c112..98e8c2670fd173fb11be51327fb6925a2b47f1eb 100644 (file)
@@ -2,32 +2,32 @@
 
 """Utilities for working with files."""
 
 
 """Utilities for working with files."""
 
+import contextlib
 import datetime
 import errno
 import glob
 import hashlib
 import datetime
 import errno
 import glob
 import hashlib
-import io
 import logging
 import os
 import pathlib
 import re
 import time
 from os.path import exists, isfile, join
 import logging
 import os
 import pathlib
 import re
 import time
 from os.path import exists, isfile, join
-from typing import List, Optional, TextIO
+from typing import Callable, List, Literal, Optional, TextIO
 from uuid import uuid4
 
 logger = logging.getLogger(__name__)
 
 
 from uuid import uuid4
 
 logger = logging.getLogger(__name__)
 
 
-def remove_newlines(x):
+def remove_newlines(x: str) -> str:
     return x.replace('\n', '')
 
 
     return x.replace('\n', '')
 
 
-def strip_whitespace(x):
+def strip_whitespace(x: str) -> str:
     return x.strip()
 
 
     return x.strip()
 
 
-def remove_hash_comments(x):
+def remove_hash_comments(x: str) -> str:
     return re.sub(r'#.*$', '', x)
 
 
     return re.sub(r'#.*$', '', x)
 
 
@@ -35,15 +35,16 @@ def slurp_file(
     filename: str,
     *,
     skip_blank_lines=False,
     filename: str,
     *,
     skip_blank_lines=False,
-    line_transformers=[],
+    line_transformers: Optional[List[Callable[[str], str]]] = None,
 ):
     ret = []
     if not file_is_readable(filename):
         raise Exception(f'{filename} can\'t be read.')
     with open(filename) as rf:
         for line in rf:
 ):
     ret = []
     if not file_is_readable(filename):
         raise Exception(f'{filename} can\'t be read.')
     with open(filename) as rf:
         for line in rf:
-            for transformation in line_transformers:
-                line = transformation(line)
+            if line_transformers is not None:
+                for transformation in line_transformers:
+                    line = transformation(line)
             if skip_blank_lines and line == '':
                 continue
             ret.append(line)
             if skip_blank_lines and line == '':
                 continue
             ret.append(line)
@@ -190,7 +191,7 @@ def create_path_if_not_exist(path, on_error=None):
     >>> os.path.exists(path)
     True
     """
     >>> os.path.exists(path)
     True
     """
-    logger.debug(f"Creating path {path}")
+    logger.debug("Creating path %s", path)
     previous_umask = os.umask(0)
     try:
         os.makedirs(path)
     previous_umask = os.umask(0)
     try:
         os.makedirs(path)
@@ -330,13 +331,13 @@ def get_file_md5(filename: str) -> str:
 
 def set_file_raw_atime(filename: str, atime: float):
     mtime = get_file_raw_mtime(filename)
 
 def set_file_raw_atime(filename: str, atime: float):
     mtime = get_file_raw_mtime(filename)
-    assert mtime
+    assert mtime is not None
     os.utime(filename, (atime, mtime))
 
 
 def set_file_raw_mtime(filename: str, mtime: float):
     atime = get_file_raw_atime(filename)
     os.utime(filename, (atime, mtime))
 
 
 def set_file_raw_mtime(filename: str, mtime: float):
     atime = get_file_raw_atime(filename)
-    assert atime
+    assert atime is not None
     os.utime(filename, (atime, mtime))
 
 
     os.utime(filename, (atime, mtime))
 
 
@@ -347,9 +348,7 @@ def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
         os.utime(filename, None)
 
 
         os.utime(filename, None)
 
 
-def convert_file_timestamp_to_datetime(
-    filename: str, producer
-) -> Optional[datetime.datetime]:
+def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
     ts = producer(filename)
     if ts is not None:
         return datetime.datetime.fromtimestamp(ts)
     ts = producer(filename)
     if ts is not None:
         return datetime.datetime.fromtimestamp(ts)
@@ -389,9 +388,7 @@ def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
 
 
     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
 
 
-def get_file_timestamp_timedelta(
-    filename: str, extractor
-) -> Optional[datetime.timedelta]:
+def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
     age = get_file_timestamp_age_seconds(filename, extractor)
     if age is not None:
         return datetime.timedelta(seconds=float(age))
     age = get_file_timestamp_age_seconds(filename, extractor)
     if age is not None:
         return datetime.timedelta(seconds=float(age))
@@ -465,7 +462,12 @@ def get_files_recursive(directory: str):
             yield file_or_directory
 
 
             yield file_or_directory
 
 
-class FileWriter(object):
+class FileWriter(contextlib.AbstractContextManager):
+    """A helper that writes a file to a temporary location and then moves
+    it atomically to its ultimate destination on close.
+
+    """
+
     def __init__(self, filename: str) -> None:
         self.filename = filename
         uuid = uuid4()
     def __init__(self, filename: str) -> None:
         self.filename = filename
         uuid = uuid4()
@@ -477,14 +479,14 @@ class FileWriter(object):
         self.handle = open(self.tempfile, mode="w")
         return self.handle
 
         self.handle = open(self.tempfile, mode="w")
         return self.handle
 
-    def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
+    def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
         if self.handle is not None:
             self.handle.close()
             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
             ret = os.system(cmd)
             if (ret >> 8) != 0:
         if self.handle is not None:
             self.handle.close()
             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
             ret = os.system(cmd)
             if (ret >> 8) != 0:
-                raise Exception(f'{cmd} failed, exit value {ret>>8}')
-        return None
+                raise Exception(f'{cmd} failed, exit value {ret>>8}!')
+        return False
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':