Add doctests to some of this stuff.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import time
13 from typing import Optional
14 import glob
15 from os.path import isfile, join, exists
16 from uuid import uuid4
17
18
19 logger = logging.getLogger(__name__)
20
21
22 def create_path_if_not_exist(path, on_error=None):
23     """
24     Attempts to create path if it does not exist. If on_error is
25     specified, it is called with an exception if one occurs, otherwise
26     exception is rethrown.
27
28     >>> import uuid
29     >>> import os
30     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
31     >>> os.path.exists(path)
32     False
33     >>> create_path_if_not_exist(path)
34     >>> os.path.exists(path)
35     True
36     """
37     logger.debug(f"Creating path {path}")
38     previous_umask = os.umask(0)
39     try:
40         os.makedirs(path)
41         os.chmod(path, 0o777)
42     except OSError as ex:
43         if ex.errno != errno.EEXIST and not os.path.isdir(path):
44             if on_error is not None:
45                 on_error(path, ex)
46             else:
47                 raise
48     finally:
49         os.umask(previous_umask)
50
51
52 def does_file_exist(filename: str) -> bool:
53     """Returns True if a file exists and is a normal file.
54
55     >>> does_file_exist(__file__)
56     True
57     """
58     return os.path.exists(filename) and os.path.isfile(filename)
59
60
61 def does_directory_exist(dirname: str) -> bool:
62     """Returns True if a file exists and is a directory.
63
64     >>> does_directory_exist('/tmp')
65     True
66     """
67     return os.path.exists(dirname) and os.path.isdir(dirname)
68
69
70 def does_path_exist(pathname: str) -> bool:
71     """Just a more verbose wrapper around os.path.exists."""
72     return os.path.exists(pathname)
73
74
75 def get_file_size(filename: str) -> int:
76     """Returns the size of a file in bytes."""
77     return os.path.getsize(filename)
78
79
80 def is_normal_file(filename: str) -> bool:
81     """Returns True if filename is a normal file.
82
83     >>> is_normal_file(__file__)
84     True
85     """
86     return os.path.isfile(filename)
87
88
89 def is_directory(filename: str) -> bool:
90     """Returns True if filename is a directory.
91
92     >>> is_directory('/tmp')
93     True
94     """
95     return os.path.isdir(filename)
96
97
98 def is_symlink(filename: str) -> bool:
99     return os.path.islink(filename)
100
101
102 def is_same_file(file1: str, file2: str) -> bool:
103     return os.path.samefile(file1, file2)
104
105
106 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
107     try:
108         return os.stat(filename)
109     except Exception as e:
110         logger.exception(e)
111         return None
112
113
114 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
115     tss = get_file_raw_timestamps(filename)
116     if tss is not None:
117         return extractor(tss)
118     return None
119
120
121 def get_file_raw_atime(filename: str) -> Optional[float]:
122     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
123
124
125 def get_file_raw_mtime(filename: str) -> Optional[float]:
126     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
127
128
129 def get_file_raw_ctime(filename: str) -> Optional[float]:
130     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
131
132
133 def get_file_md5(filename: str) -> str:
134     file_hash = hashlib.md5()
135     with open(filename, "rb") as f:
136         chunk = f.read(8192)
137         while chunk:
138             file_hash.update(chunk)
139             chunk = f.read(8192)
140     return file_hash.hexdigest()
141
142
143 def set_file_raw_atime(filename: str, atime: float):
144     mtime = get_file_raw_mtime(filename)
145     os.utime(filename, (atime, mtime))
146
147
148 def set_file_raw_mtime(filename: str, mtime: float):
149     atime = get_file_raw_atime(filename)
150     os.utime(filename, (atime, mtime))
151
152
153 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
154     if ts is not None:
155         os.utime(filename, (ts, ts))
156     else:
157         os.utime(filename, None)
158
159
160 def convert_file_timestamp_to_datetime(
161     filename: str, producer
162 ) -> Optional[datetime.datetime]:
163     ts = producer(filename)
164     if ts is not None:
165         return datetime.datetime.fromtimestamp(ts)
166     return None
167
168
169 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
170     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
171
172
173 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
174     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
175
176
177 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
178     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
179
180
181 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
182     now = time.time()
183     ts = get_file_raw_timestamps(filename)
184     if ts is None:
185         return None
186     result = extractor(ts)
187     return now - result
188
189
190 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
191     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
192
193
194 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
195     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
196
197
198 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
199     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
200
201
202 def get_file_timestamp_timedelta(
203     filename: str, extractor
204 ) -> Optional[datetime.timedelta]:
205     age = get_file_timestamp_age_seconds(filename, extractor)
206     if age is not None:
207         return datetime.timedelta(seconds=float(age))
208     return None
209
210
211 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
212     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
213
214
215 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
216     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
217
218
219 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
220     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
221
222
223 def describe_file_timestamp(
224     filename: str, extractor, *, brief=False
225 ) -> Optional[str]:
226     from datetime_utils import describe_duration, describe_duration_briefly
227     age = get_file_timestamp_age_seconds(filename, extractor)
228     if age is None:
229         return None
230     if brief:
231         return describe_duration_briefly(age)
232     else:
233         return describe_duration(age)
234
235
236 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
237     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
238
239
240 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
241     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
242
243
244 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
245     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
246
247
248 def touch_file(filename: str) -> bool:
249     return pathlib.Path(filename).touch()
250
251
252 def expand_globs(in_filename: str):
253     for filename in glob.glob(in_filename):
254         yield filename
255
256
257 def get_files(directory: str):
258     for filename in os.listdir(directory):
259         full_path = join(directory, filename)
260         if isfile(full_path) and exists(full_path):
261             yield full_path
262
263
264 def get_directories(directory: str):
265     for d in os.listdir(directory):
266         full_path = join(directory, d)
267         if not isfile(full_path) and exists(full_path):
268             yield full_path
269
270
271 def get_files_recursive(directory: str):
272     for filename in get_files(directory):
273         yield filename
274     for subdir in get_directories(directory):
275         for file_or_directory in get_files_recursive(subdir):
276             yield file_or_directory
277
278
279 class FileWriter(object):
280     def __init__(self, filename: str) -> None:
281         self.filename = filename
282         uuid = uuid4()
283         self.tempfile = f'{filename}-{uuid}.tmp'
284         self.handle = None
285
286     def __enter__(self) -> io.TextIOWrapper:
287         assert not does_path_exist(self.tempfile)
288         self.handle = open(self.tempfile, mode="w")
289         return self.handle
290
291     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
292         if self.handle is not None:
293             self.handle.close()
294             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
295             ret = os.system(cmd)
296             if (ret >> 8) != 0:
297                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
298         return None
299
300
301 if __name__ == '__main__':
302     import doctest
303     doctest.testmod()