Used isort to sort imports. Also added to the git pre-commit hook.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import glob
8 import hashlib
9 import io
10 import logging
11 import os
12 import pathlib
13 import re
14 import time
15 from os.path import exists, isfile, join
16 from typing import List, Optional, TextIO
17 from uuid import uuid4
18
19 logger = logging.getLogger(__name__)
20
21
22 def remove_newlines(x):
23     return x.replace('\n', '')
24
25
26 def strip_whitespace(x):
27     return x.strip()
28
29
30 def remove_hash_comments(x):
31     return re.sub(r'#.*$', '', x)
32
33
34 def slurp_file(
35     filename: str,
36     *,
37     skip_blank_lines=False,
38     line_transformers=[],
39 ):
40     ret = []
41     if not file_is_readable(filename):
42         raise Exception(f'{filename} can\'t be read.')
43     with open(filename) as rf:
44         for line in rf:
45             for transformation in line_transformers:
46                 line = transformation(line)
47             if skip_blank_lines and line == '':
48                 continue
49             ret.append(line)
50     return ret
51
52
53 def remove(path: str) -> None:
54     """Deletes a file.  Raises if path refers to a directory or a file
55     that doesn't exist.
56
57     >>> import os
58     >>> filename = '/tmp/file_utils_test_file'
59     >>> os.system(f'touch {filename}')
60     0
61     >>> does_file_exist(filename)
62     True
63     >>> remove(filename)
64     >>> does_file_exist(filename)
65     False
66
67     """
68     os.remove(path)
69
70
71 def delete(path: str) -> None:
72     os.remove(path)
73
74
75 def without_extension(path: str) -> str:
76     """Remove one extension from a file or path.
77
78     >>> without_extension('foobar.txt')
79     'foobar'
80
81     >>> without_extension('/home/scott/frapp.py')
82     '/home/scott/frapp'
83
84     >>> without_extension('a.b.c.tar.gz')
85     'a.b.c.tar'
86
87     >>> without_extension('foobar')
88     'foobar'
89
90     """
91     return os.path.splitext(path)[0]
92
93
94 def without_all_extensions(path: str) -> str:
95     """Removes all extensions from a path; handles multiple extensions
96     like foobar.tar.gz -> foobar.
97
98     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
99     '/home/scott/foobar'
100
101     """
102     while '.' in path:
103         path = without_extension(path)
104     return path
105
106
107 def get_extension(path: str) -> str:
108     """Extract and return one extension from a file or path.
109
110     >>> get_extension('this_is_a_test.txt')
111     '.txt'
112
113     >>> get_extension('/home/scott/test.py')
114     '.py'
115
116     >>> get_extension('foobar')
117     ''
118
119     """
120     return os.path.splitext(path)[1]
121
122
123 def get_all_extensions(path: str) -> List[str]:
124     """Return the extensions of a file or path in order.
125
126     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
127     ['.tar', '.gz', '.1']
128
129     """
130     ret = []
131     while True:
132         ext = get_extension(path)
133         path = without_extension(path)
134         if ext:
135             ret.append(ext)
136         else:
137             ret.reverse()
138             return ret
139
140
141 def without_path(filespec: str) -> str:
142     """Returns the base filename without any leading path.
143
144     >>> without_path('/home/scott/foo.py')
145     'foo.py'
146
147     >>> without_path('foo.py')
148     'foo.py'
149
150     """
151     return os.path.split(filespec)[1]
152
153
154 def get_path(filespec: str) -> str:
155     """Returns just the path of the filespec by removing the filename and
156     extension.
157
158     >>> get_path('/home/scott/foobar.py')
159     '/home/scott'
160
161     >>> get_path('~scott/frapp.txt')
162     '~scott'
163
164     """
165     return os.path.split(filespec)[0]
166
167
168 def get_canonical_path(filespec: str) -> str:
169     """Returns a canonicalized absolute path.
170
171     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
172     '/usr/home/scott/foo.txt'
173
174     """
175     return os.path.realpath(filespec)
176
177
178 def create_path_if_not_exist(path, on_error=None):
179     """
180     Attempts to create path if it does not exist. If on_error is
181     specified, it is called with an exception if one occurs, otherwise
182     exception is rethrown.
183
184     >>> import uuid
185     >>> import os
186     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
187     >>> os.path.exists(path)
188     False
189     >>> create_path_if_not_exist(path)
190     >>> os.path.exists(path)
191     True
192     """
193     logger.debug(f"Creating path {path}")
194     previous_umask = os.umask(0)
195     try:
196         os.makedirs(path)
197         os.chmod(path, 0o777)
198     except OSError as ex:
199         if ex.errno != errno.EEXIST and not os.path.isdir(path):
200             if on_error is not None:
201                 on_error(path, ex)
202             else:
203                 raise
204     finally:
205         os.umask(previous_umask)
206
207
208 def does_file_exist(filename: str) -> bool:
209     """Returns True if a file exists and is a normal file.
210
211     >>> does_file_exist(__file__)
212     True
213     """
214     return os.path.exists(filename) and os.path.isfile(filename)
215
216
217 def file_is_readable(filename: str) -> bool:
218     return does_file_exist(filename) and os.access(filename, os.R_OK)
219
220
221 def file_is_writable(filename: str) -> bool:
222     return does_file_exist(filename) and os.access(filename, os.W_OK)
223
224
225 def file_is_executable(filename: str) -> bool:
226     return does_file_exist(filename) and os.access(filename, os.X_OK)
227
228
229 def does_directory_exist(dirname: str) -> bool:
230     """Returns True if a file exists and is a directory.
231
232     >>> does_directory_exist('/tmp')
233     True
234     """
235     return os.path.exists(dirname) and os.path.isdir(dirname)
236
237
238 def does_path_exist(pathname: str) -> bool:
239     """Just a more verbose wrapper around os.path.exists."""
240     return os.path.exists(pathname)
241
242
243 def get_file_size(filename: str) -> int:
244     """Returns the size of a file in bytes."""
245     return os.path.getsize(filename)
246
247
248 def is_normal_file(filename: str) -> bool:
249     """Returns True if filename is a normal file.
250
251     >>> is_normal_file(__file__)
252     True
253     """
254     return os.path.isfile(filename)
255
256
257 def is_directory(filename: str) -> bool:
258     """Returns True if filename is a directory.
259
260     >>> is_directory('/tmp')
261     True
262     """
263     return os.path.isdir(filename)
264
265
266 def is_symlink(filename: str) -> bool:
267     """True if filename is a symlink, False otherwise.
268
269     >>> is_symlink('/tmp')
270     False
271
272     >>> is_symlink('/home')
273     True
274
275     """
276     return os.path.islink(filename)
277
278
279 def is_same_file(file1: str, file2: str) -> bool:
280     """Returns True if the two files are the same inode.
281
282     >>> is_same_file('/tmp', '/tmp/../tmp')
283     True
284
285     >>> is_same_file('/tmp', '/home')
286     False
287
288     """
289     return os.path.samefile(file1, file2)
290
291
292 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
293     """Stats the file and returns an os.stat_result or None on error."""
294     try:
295         return os.stat(filename)
296     except Exception as e:
297         logger.exception(e)
298         return None
299
300
301 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
302     tss = get_file_raw_timestamps(filename)
303     if tss is not None:
304         return extractor(tss)
305     return None
306
307
308 def get_file_raw_atime(filename: str) -> Optional[float]:
309     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
310
311
312 def get_file_raw_mtime(filename: str) -> Optional[float]:
313     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
314
315
316 def get_file_raw_ctime(filename: str) -> Optional[float]:
317     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
318
319
320 def get_file_md5(filename: str) -> str:
321     """Hashes filename's contents and returns an MD5."""
322     file_hash = hashlib.md5()
323     with open(filename, "rb") as f:
324         chunk = f.read(8192)
325         while chunk:
326             file_hash.update(chunk)
327             chunk = f.read(8192)
328     return file_hash.hexdigest()
329
330
331 def set_file_raw_atime(filename: str, atime: float):
332     mtime = get_file_raw_mtime(filename)
333     assert mtime
334     os.utime(filename, (atime, mtime))
335
336
337 def set_file_raw_mtime(filename: str, mtime: float):
338     atime = get_file_raw_atime(filename)
339     assert atime
340     os.utime(filename, (atime, mtime))
341
342
343 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
344     if ts is not None:
345         os.utime(filename, (ts, ts))
346     else:
347         os.utime(filename, None)
348
349
350 def convert_file_timestamp_to_datetime(
351     filename: str, producer
352 ) -> Optional[datetime.datetime]:
353     ts = producer(filename)
354     if ts is not None:
355         return datetime.datetime.fromtimestamp(ts)
356     return None
357
358
359 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
360     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
361
362
363 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
364     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
365
366
367 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
368     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
369
370
371 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
372     now = time.time()
373     ts = get_file_raw_timestamps(filename)
374     if ts is None:
375         return None
376     result = extractor(ts)
377     return now - result
378
379
380 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
381     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
382
383
384 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
385     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
386
387
388 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
389     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
390
391
392 def get_file_timestamp_timedelta(
393     filename: str, extractor
394 ) -> Optional[datetime.timedelta]:
395     age = get_file_timestamp_age_seconds(filename, extractor)
396     if age is not None:
397         return datetime.timedelta(seconds=float(age))
398     return None
399
400
401 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
402     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
403
404
405 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
406     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
407
408
409 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
410     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
411
412
413 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
414     from datetime_utils import describe_duration, describe_duration_briefly
415
416     age = get_file_timestamp_age_seconds(filename, extractor)
417     if age is None:
418         return None
419     if brief:
420         return describe_duration_briefly(age)
421     else:
422         return describe_duration(age)
423
424
425 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
426     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
427
428
429 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
430     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
431
432
433 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
434     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
435
436
437 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
438     pathlib.Path(filename, mode=mode).touch()
439
440
441 def expand_globs(in_filename: str):
442     for filename in glob.glob(in_filename):
443         yield filename
444
445
446 def get_files(directory: str):
447     for filename in os.listdir(directory):
448         full_path = join(directory, filename)
449         if isfile(full_path) and exists(full_path):
450             yield full_path
451
452
453 def get_directories(directory: str):
454     for d in os.listdir(directory):
455         full_path = join(directory, d)
456         if not isfile(full_path) and exists(full_path):
457             yield full_path
458
459
460 def get_files_recursive(directory: str):
461     for filename in get_files(directory):
462         yield filename
463     for subdir in get_directories(directory):
464         for file_or_directory in get_files_recursive(subdir):
465             yield file_or_directory
466
467
468 class FileWriter(object):
469     def __init__(self, filename: str) -> None:
470         self.filename = filename
471         uuid = uuid4()
472         self.tempfile = f'{filename}-{uuid}.tmp'
473         self.handle: Optional[TextIO] = None
474
475     def __enter__(self) -> TextIO:
476         assert not does_path_exist(self.tempfile)
477         self.handle = open(self.tempfile, mode="w")
478         return self.handle
479
480     def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
481         if self.handle is not None:
482             self.handle.close()
483             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
484             ret = os.system(cmd)
485             if (ret >> 8) != 0:
486                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
487         return None
488
489
490 if __name__ == '__main__':
491     import doctest
492
493     doctest.testmod()