Start using warnings from stdlib.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import time
13 from typing import Optional
14 import glob
15 from os.path import isfile, join, exists
16 from uuid import uuid4
17
18
19 logger = logging.getLogger(__name__)
20
21
22 # os.remove(file) you fuckwit.
23 # os.path.basename too.
24
25
26 def create_path_if_not_exist(path, on_error=None):
27     """
28     Attempts to create path if it does not exist. If on_error is
29     specified, it is called with an exception if one occurs, otherwise
30     exception is rethrown.
31
32     >>> import uuid
33     >>> import os
34     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
35     >>> os.path.exists(path)
36     False
37     >>> create_path_if_not_exist(path)
38     >>> os.path.exists(path)
39     True
40     """
41     logger.debug(f"Creating path {path}")
42     previous_umask = os.umask(0)
43     try:
44         os.makedirs(path)
45         os.chmod(path, 0o777)
46     except OSError as ex:
47         if ex.errno != errno.EEXIST and not os.path.isdir(path):
48             if on_error is not None:
49                 on_error(path, ex)
50             else:
51                 raise
52     finally:
53         os.umask(previous_umask)
54
55
56 def does_file_exist(filename: str) -> bool:
57     """Returns True if a file exists and is a normal file.
58
59     >>> does_file_exist(__file__)
60     True
61     """
62     return os.path.exists(filename) and os.path.isfile(filename)
63
64
65 def does_directory_exist(dirname: str) -> bool:
66     """Returns True if a file exists and is a directory.
67
68     >>> does_directory_exist('/tmp')
69     True
70     """
71     return os.path.exists(dirname) and os.path.isdir(dirname)
72
73
74 def does_path_exist(pathname: str) -> bool:
75     """Just a more verbose wrapper around os.path.exists."""
76     return os.path.exists(pathname)
77
78
79 def get_file_size(filename: str) -> int:
80     """Returns the size of a file in bytes."""
81     return os.path.getsize(filename)
82
83
84 def is_normal_file(filename: str) -> bool:
85     """Returns True if filename is a normal file.
86
87     >>> is_normal_file(__file__)
88     True
89     """
90     return os.path.isfile(filename)
91
92
93 def is_directory(filename: str) -> bool:
94     """Returns True if filename is a directory.
95
96     >>> is_directory('/tmp')
97     True
98     """
99     return os.path.isdir(filename)
100
101
102 def is_symlink(filename: str) -> bool:
103     return os.path.islink(filename)
104
105
106 def is_same_file(file1: str, file2: str) -> bool:
107     return os.path.samefile(file1, file2)
108
109
110 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
111     try:
112         return os.stat(filename)
113     except Exception as e:
114         logger.exception(e)
115         return None
116
117
118 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
119     tss = get_file_raw_timestamps(filename)
120     if tss is not None:
121         return extractor(tss)
122     return None
123
124
125 def get_file_raw_atime(filename: str) -> Optional[float]:
126     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
127
128
129 def get_file_raw_mtime(filename: str) -> Optional[float]:
130     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
131
132
133 def get_file_raw_ctime(filename: str) -> Optional[float]:
134     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
135
136
137 def get_file_md5(filename: str) -> str:
138     file_hash = hashlib.md5()
139     with open(filename, "rb") as f:
140         chunk = f.read(8192)
141         while chunk:
142             file_hash.update(chunk)
143             chunk = f.read(8192)
144     return file_hash.hexdigest()
145
146
147 def set_file_raw_atime(filename: str, atime: float):
148     mtime = get_file_raw_mtime(filename)
149     os.utime(filename, (atime, mtime))
150
151
152 def set_file_raw_mtime(filename: str, mtime: float):
153     atime = get_file_raw_atime(filename)
154     os.utime(filename, (atime, mtime))
155
156
157 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
158     if ts is not None:
159         os.utime(filename, (ts, ts))
160     else:
161         os.utime(filename, None)
162
163
164 def convert_file_timestamp_to_datetime(
165     filename: str, producer
166 ) -> Optional[datetime.datetime]:
167     ts = producer(filename)
168     if ts is not None:
169         return datetime.datetime.fromtimestamp(ts)
170     return None
171
172
173 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
174     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
175
176
177 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
178     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
179
180
181 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
182     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
183
184
185 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
186     now = time.time()
187     ts = get_file_raw_timestamps(filename)
188     if ts is None:
189         return None
190     result = extractor(ts)
191     return now - result
192
193
194 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
195     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
196
197
198 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
199     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
200
201
202 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
203     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
204
205
206 def get_file_timestamp_timedelta(
207     filename: str, extractor
208 ) -> Optional[datetime.timedelta]:
209     age = get_file_timestamp_age_seconds(filename, extractor)
210     if age is not None:
211         return datetime.timedelta(seconds=float(age))
212     return None
213
214
215 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
216     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
217
218
219 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
220     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
221
222
223 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
224     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
225
226
227 def describe_file_timestamp(
228     filename: str, extractor, *, brief=False
229 ) -> Optional[str]:
230     from datetime_utils import describe_duration, describe_duration_briefly
231     age = get_file_timestamp_age_seconds(filename, extractor)
232     if age is None:
233         return None
234     if brief:
235         return describe_duration_briefly(age)
236     else:
237         return describe_duration(age)
238
239
240 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
241     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
242
243
244 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
245     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
246
247
248 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
249     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
250
251
252 def touch_file(filename: str) -> bool:
253     return pathlib.Path(filename).touch()
254
255
256 def expand_globs(in_filename: str):
257     for filename in glob.glob(in_filename):
258         yield filename
259
260
261 def get_files(directory: str):
262     for filename in os.listdir(directory):
263         full_path = join(directory, filename)
264         if isfile(full_path) and exists(full_path):
265             yield full_path
266
267
268 def get_directories(directory: str):
269     for d in os.listdir(directory):
270         full_path = join(directory, d)
271         if not isfile(full_path) and exists(full_path):
272             yield full_path
273
274
275 def get_files_recursive(directory: str):
276     for filename in get_files(directory):
277         yield filename
278     for subdir in get_directories(directory):
279         for file_or_directory in get_files_recursive(subdir):
280             yield file_or_directory
281
282
283 class FileWriter(object):
284     def __init__(self, filename: str) -> None:
285         self.filename = filename
286         uuid = uuid4()
287         self.tempfile = f'{filename}-{uuid}.tmp'
288         self.handle = None
289
290     def __enter__(self) -> io.TextIOWrapper:
291         assert not does_path_exist(self.tempfile)
292         self.handle = open(self.tempfile, mode="w")
293         return self.handle
294
295     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
296         if self.handle is not None:
297             self.handle.close()
298             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
299             ret = os.system(cmd)
300             if (ret >> 8) != 0:
301                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
302         return None
303
304
305 if __name__ == '__main__':
306     import doctest
307     doctest.testmod()