projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Let text_utils work with strings that contain ansi escape sequences.
[python_utils.git]
/
file_utils.py
diff --git
a/file_utils.py
b/file_utils.py
index 905e23b16df017d3cab6c6ef27ce61ee8596c112..98e8c2670fd173fb11be51327fb6925a2b47f1eb 100644
(file)
--- a/
file_utils.py
+++ b/
file_utils.py
@@
-2,32
+2,32
@@
"""Utilities for working with files."""
"""Utilities for working with files."""
+import contextlib
import datetime
import errno
import glob
import hashlib
import datetime
import errno
import glob
import hashlib
-import io
import logging
import os
import pathlib
import re
import time
from os.path import exists, isfile, join
import logging
import os
import pathlib
import re
import time
from os.path import exists, isfile, join
-from typing import
List
, Optional, TextIO
+from typing import
Callable, List, Literal
, Optional, TextIO
from uuid import uuid4
logger = logging.getLogger(__name__)
from uuid import uuid4
logger = logging.getLogger(__name__)
-def remove_newlines(x
)
:
+def remove_newlines(x
: str) -> str
:
return x.replace('\n', '')
return x.replace('\n', '')
-def strip_whitespace(x
)
:
+def strip_whitespace(x
: str) -> str
:
return x.strip()
return x.strip()
-def remove_hash_comments(x
)
:
+def remove_hash_comments(x
: str) -> str
:
return re.sub(r'#.*$', '', x)
return re.sub(r'#.*$', '', x)
@@
-35,15
+35,16
@@
def slurp_file(
filename: str,
*,
skip_blank_lines=False,
filename: str,
*,
skip_blank_lines=False,
- line_transformers
=[]
,
+ line_transformers
: Optional[List[Callable[[str], str]]] = None
,
):
ret = []
if not file_is_readable(filename):
raise Exception(f'{filename} can\'t be read.')
with open(filename) as rf:
for line in rf:
):
ret = []
if not file_is_readable(filename):
raise Exception(f'{filename} can\'t be read.')
with open(filename) as rf:
for line in rf:
- for transformation in line_transformers:
- line = transformation(line)
+ if line_transformers is not None:
+ for transformation in line_transformers:
+ line = transformation(line)
if skip_blank_lines and line == '':
continue
ret.append(line)
if skip_blank_lines and line == '':
continue
ret.append(line)
@@
-190,7
+191,7
@@
def create_path_if_not_exist(path, on_error=None):
>>> os.path.exists(path)
True
"""
>>> os.path.exists(path)
True
"""
- logger.debug(
f"Creating path {path}"
)
+ logger.debug(
"Creating path %s", path
)
previous_umask = os.umask(0)
try:
os.makedirs(path)
previous_umask = os.umask(0)
try:
os.makedirs(path)
@@
-330,13
+331,13
@@
def get_file_md5(filename: str) -> str:
def set_file_raw_atime(filename: str, atime: float):
mtime = get_file_raw_mtime(filename)
def set_file_raw_atime(filename: str, atime: float):
mtime = get_file_raw_mtime(filename)
- assert mtime
+ assert mtime
is not None
os.utime(filename, (atime, mtime))
def set_file_raw_mtime(filename: str, mtime: float):
atime = get_file_raw_atime(filename)
os.utime(filename, (atime, mtime))
def set_file_raw_mtime(filename: str, mtime: float):
atime = get_file_raw_atime(filename)
- assert atime
+ assert atime
is not None
os.utime(filename, (atime, mtime))
os.utime(filename, (atime, mtime))
@@
-347,9
+348,7
@@
def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
os.utime(filename, None)
os.utime(filename, None)
-def convert_file_timestamp_to_datetime(
- filename: str, producer
-) -> Optional[datetime.datetime]:
+def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
ts = producer(filename)
if ts is not None:
return datetime.datetime.fromtimestamp(ts)
ts = producer(filename)
if ts is not None:
return datetime.datetime.fromtimestamp(ts)
@@
-389,9
+388,7
@@
def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
-def get_file_timestamp_timedelta(
- filename: str, extractor
-) -> Optional[datetime.timedelta]:
+def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
age = get_file_timestamp_age_seconds(filename, extractor)
if age is not None:
return datetime.timedelta(seconds=float(age))
age = get_file_timestamp_age_seconds(filename, extractor)
if age is not None:
return datetime.timedelta(seconds=float(age))
@@
-465,7
+462,12
@@
def get_files_recursive(directory: str):
yield file_or_directory
yield file_or_directory
-class FileWriter(object):
+class FileWriter(contextlib.AbstractContextManager):
+ """A helper that writes a file to a temporary location and then moves
+ it atomically to its ultimate destination on close.
+
+ """
+
def __init__(self, filename: str) -> None:
self.filename = filename
uuid = uuid4()
def __init__(self, filename: str) -> None:
self.filename = filename
uuid = uuid4()
@@
-477,14
+479,14
@@
class FileWriter(object):
self.handle = open(self.tempfile, mode="w")
return self.handle
self.handle = open(self.tempfile, mode="w")
return self.handle
- def __exit__(self, exc_type, exc_val, exc_tb) ->
Optional[bool
]:
+ def __exit__(self, exc_type, exc_val, exc_tb) ->
Literal[False
]:
if self.handle is not None:
self.handle.close()
cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
ret = os.system(cmd)
if (ret >> 8) != 0:
if self.handle is not None:
self.handle.close()
cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
ret = os.system(cmd)
if (ret >> 8) != 0:
- raise Exception(f'{cmd} failed, exit value {ret>>8}')
- return
Non
e
+ raise Exception(f'{cmd} failed, exit value {ret>>8}
!
')
+ return
Fals
e
if __name__ == '__main__':
if __name__ == '__main__':