Money, Rate, CentCount and a bunch of bugfixes.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import time
13 from typing import Optional
14 import glob
15 from os.path import isfile, join, exists
16 from uuid import uuid4
17
18
19 logger = logging.getLogger(__name__)
20
21
22 def create_path_if_not_exist(path, on_error=None):
23     """
24     Attempts to create path if it does not exist. If on_error is
25     specified, it is called with an exception if one occurs, otherwise
26     exception is rethrown.
27
28     >>> import uuid
29     >>> import os
30     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
31     >>> os.path.exists(path)
32     False
33     >>> create_path_if_not_exist(path)
34     >>> os.path.exists(path)
35     True
36     """
37     logger.debug(f"Creating path {path}")
38     previous_umask = os.umask(0)
39     try:
40         os.makedirs(path)
41         os.chmod(path, 0o777)
42     except OSError as ex:
43         if ex.errno != errno.EEXIST and not os.path.isdir(path):
44             if on_error is not None:
45                 on_error(path, ex)
46             else:
47                 raise
48     finally:
49         os.umask(previous_umask)
50
51
52 def does_file_exist(filename: str) -> bool:
53     return os.path.exists(filename) and os.path.isfile(filename)
54
55
56 def does_directory_exist(dirname: str) -> bool:
57     return os.path.exists(dirname) and os.path.isdir(dirname)
58
59
60 def does_path_exist(pathname: str) -> bool:
61     return os.path.exists(pathname)
62
63
64 def get_file_size(filename: str) -> int:
65     return os.path.getsize(filename)
66
67
68 def is_normal_file(filename: str) -> bool:
69     return os.path.isfile(filename)
70
71
72 def is_directory(filename: str) -> bool:
73     return os.path.isdir(filename)
74
75
76 def is_symlink(filename: str) -> bool:
77     return os.path.islink(filename)
78
79
80 def is_same_file(file1: str, file2: str) -> bool:
81     return os.path.samefile(file1, file2)
82
83
84 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
85     try:
86         return os.stat(filename)
87     except Exception as e:
88         logger.exception(e)
89         return None
90
91
92 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
93     tss = get_file_raw_timestamps(filename)
94     if tss is not None:
95         return extractor(tss)
96     return None
97
98
99 def get_file_raw_atime(filename: str) -> Optional[float]:
100     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
101
102
103 def get_file_raw_mtime(filename: str) -> Optional[float]:
104     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
105
106
107 def get_file_raw_ctime(filename: str) -> Optional[float]:
108     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
109
110
111 def get_file_md5(filename: str) -> str:
112     file_hash = hashlib.md5()
113     with open(filename, "rb") as f:
114         chunk = f.read(8192)
115         while chunk:
116             file_hash.update(chunk)
117             chunk = f.read(8192)
118     return file_hash.hexdigest()
119
120
121 def set_file_raw_atime(filename: str, atime: float):
122     mtime = get_file_raw_mtime(filename)
123     os.utime(filename, (atime, mtime))
124
125
126 def set_file_raw_mtime(filename: str, mtime: float):
127     atime = get_file_raw_atime(filename)
128     os.utime(filename, (atime, mtime))
129
130
131 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
132     if ts is not None:
133         os.utime(filename, (ts, ts))
134     else:
135         os.utime(filename, None)
136
137
138 def convert_file_timestamp_to_datetime(
139     filename: str, producer
140 ) -> Optional[datetime.datetime]:
141     ts = producer(filename)
142     if ts is not None:
143         return datetime.datetime.fromtimestamp(ts)
144     return None
145
146
147 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
148     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
149
150
151 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
152     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
153
154
155 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
156     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
157
158
159 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
160     now = time.time()
161     ts = get_file_raw_timestamps(filename)
162     if ts is None:
163         return None
164     result = extractor(ts)
165     return now - result
166
167
168 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
169     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
170
171
172 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
173     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
174
175
176 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
177     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
178
179
180 def get_file_timestamp_timedelta(
181     filename: str, extractor
182 ) -> Optional[datetime.timedelta]:
183     age = get_file_timestamp_age_seconds(filename, extractor)
184     if age is not None:
185         return datetime.timedelta(seconds=float(age))
186     return None
187
188
189 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
190     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
191
192
193 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
194     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
195
196
197 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
198     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
199
200
201 def describe_file_timestamp(
202     filename: str, extractor, *, brief=False
203 ) -> Optional[str]:
204     from datetime_utils import describe_duration, describe_duration_briefly
205     age = get_file_timestamp_age_seconds(filename, extractor)
206     if age is None:
207         return None
208     if brief:
209         return describe_duration_briefly(age)
210     else:
211         return describe_duration(age)
212
213
214 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
215     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
216
217
218 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
219     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
220
221
222 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
223     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
224
225
226 def touch_file(filename: str) -> bool:
227     return pathlib.Path(filename).touch()
228
229
230 def expand_globs(in_filename: str):
231     for filename in glob.glob(in_filename):
232         yield filename
233
234
235 def get_files(directory: str):
236     for filename in os.listdir(directory):
237         full_path = join(directory, filename)
238         if isfile(full_path) and exists(full_path):
239             yield full_path
240
241
242 def get_directories(directory: str):
243     for d in os.listdir(directory):
244         full_path = join(directory, d)
245         if not isfile(full_path) and exists(full_path):
246             yield full_path
247
248
249 def get_files_recursive(directory: str):
250     for filename in get_files(directory):
251         yield filename
252     for subdir in get_directories(directory):
253         for file_or_directory in get_files_recursive(subdir):
254             yield file_or_directory
255
256
257 class FileWriter(object):
258     def __init__(self, filename: str) -> None:
259         self.filename = filename
260         uuid = uuid4()
261         self.tempfile = f'{filename}-{uuid}.tmp'
262         self.handle = None
263
264     def __enter__(self) -> io.TextIOWrapper:
265         assert not does_path_exist(self.tempfile)
266         self.handle = open(self.tempfile, mode="w")
267         return self.handle
268
269     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
270         if self.handle is not None:
271             self.handle.close()
272             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
273             ret = os.system(cmd)
274             if (ret >> 8) != 0:
275                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
276         return None