More cleanup.
[python_utils.git] / file_utils.py
index 6bcfc75db47efa6d2da2327cdd42284c589901d4..98e8c2670fd173fb11be51327fb6925a2b47f1eb 100644 (file)
@@ -2,32 +2,32 @@
 
 """Utilities for working with files."""
 
 
 """Utilities for working with files."""
 
+import contextlib
 import datetime
 import errno
 import glob
 import hashlib
 import datetime
 import errno
 import glob
 import hashlib
-import io
 import logging
 import os
 import pathlib
 import re
 import time
 from os.path import exists, isfile, join
 import logging
 import os
 import pathlib
 import re
 import time
 from os.path import exists, isfile, join
-from typing import List, Optional, TextIO
+from typing import Callable, List, Literal, Optional, TextIO
 from uuid import uuid4
 
 logger = logging.getLogger(__name__)
 
 
 from uuid import uuid4
 
 logger = logging.getLogger(__name__)
 
 
-def remove_newlines(x):
+def remove_newlines(x: str) -> str:
     return x.replace('\n', '')
 
 
     return x.replace('\n', '')
 
 
-def strip_whitespace(x):
+def strip_whitespace(x: str) -> str:
     return x.strip()
 
 
     return x.strip()
 
 
-def remove_hash_comments(x):
+def remove_hash_comments(x: str) -> str:
     return re.sub(r'#.*$', '', x)
 
 
     return re.sub(r'#.*$', '', x)
 
 
@@ -35,15 +35,16 @@ def slurp_file(
     filename: str,
     *,
     skip_blank_lines=False,
     filename: str,
     *,
     skip_blank_lines=False,
-    line_transformers=[],
+    line_transformers: Optional[List[Callable[[str], str]]] = None,
 ):
     ret = []
     if not file_is_readable(filename):
         raise Exception(f'{filename} can\'t be read.')
     with open(filename) as rf:
         for line in rf:
 ):
     ret = []
     if not file_is_readable(filename):
         raise Exception(f'{filename} can\'t be read.')
     with open(filename) as rf:
         for line in rf:
-            for transformation in line_transformers:
-                line = transformation(line)
+            if line_transformers is not None:
+                for transformation in line_transformers:
+                    line = transformation(line)
             if skip_blank_lines and line == '':
                 continue
             ret.append(line)
             if skip_blank_lines and line == '':
                 continue
             ret.append(line)
@@ -190,7 +191,7 @@ def create_path_if_not_exist(path, on_error=None):
     >>> os.path.exists(path)
     True
     """
     >>> os.path.exists(path)
     True
     """
-    logger.debug(f"Creating path {path}")
+    logger.debug("Creating path %s", path)
     previous_umask = os.umask(0)
     try:
         os.makedirs(path)
     previous_umask = os.umask(0)
     try:
         os.makedirs(path)
@@ -461,7 +462,12 @@ def get_files_recursive(directory: str):
             yield file_or_directory
 
 
             yield file_or_directory
 
 
-class FileWriter(object):
+class FileWriter(contextlib.AbstractContextManager):
+    """A helper that writes a file to a temporary location and then moves
+    it atomically to its ultimate destination on close.
+
+    """
+
     def __init__(self, filename: str) -> None:
         self.filename = filename
         uuid = uuid4()
     def __init__(self, filename: str) -> None:
         self.filename = filename
         uuid = uuid4()
@@ -473,14 +479,14 @@ class FileWriter(object):
         self.handle = open(self.tempfile, mode="w")
         return self.handle
 
         self.handle = open(self.tempfile, mode="w")
         return self.handle
 
-    def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
+    def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
         if self.handle is not None:
             self.handle.close()
             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
             ret = os.system(cmd)
             if (ret >> 8) != 0:
         if self.handle is not None:
             self.handle.close()
             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
             ret = os.system(cmd)
             if (ret >> 8) != 0:
-                raise Exception(f'{cmd} failed, exit value {ret>>8}')
-        return None
+                raise Exception(f'{cmd} failed, exit value {ret>>8}!')
+        return False
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':