Change settings in flake8 and black.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import glob
8 import hashlib
9 import io
10 import logging
11 import os
12 import pathlib
13 import re
14 import time
15 from os.path import exists, isfile, join
16 from typing import List, Optional, TextIO
17 from uuid import uuid4
18
19 logger = logging.getLogger(__name__)
20
21
22 def remove_newlines(x):
23     return x.replace('\n', '')
24
25
26 def strip_whitespace(x):
27     return x.strip()
28
29
30 def remove_hash_comments(x):
31     return re.sub(r'#.*$', '', x)
32
33
34 def slurp_file(
35     filename: str,
36     *,
37     skip_blank_lines=False,
38     line_transformers=[],
39 ):
40     ret = []
41     if not file_is_readable(filename):
42         raise Exception(f'{filename} can\'t be read.')
43     with open(filename) as rf:
44         for line in rf:
45             for transformation in line_transformers:
46                 line = transformation(line)
47             if skip_blank_lines and line == '':
48                 continue
49             ret.append(line)
50     return ret
51
52
53 def remove(path: str) -> None:
54     """Deletes a file.  Raises if path refers to a directory or a file
55     that doesn't exist.
56
57     >>> import os
58     >>> filename = '/tmp/file_utils_test_file'
59     >>> os.system(f'touch {filename}')
60     0
61     >>> does_file_exist(filename)
62     True
63     >>> remove(filename)
64     >>> does_file_exist(filename)
65     False
66
67     """
68     os.remove(path)
69
70
71 def delete(path: str) -> None:
72     os.remove(path)
73
74
75 def without_extension(path: str) -> str:
76     """Remove one extension from a file or path.
77
78     >>> without_extension('foobar.txt')
79     'foobar'
80
81     >>> without_extension('/home/scott/frapp.py')
82     '/home/scott/frapp'
83
84     >>> without_extension('a.b.c.tar.gz')
85     'a.b.c.tar'
86
87     >>> without_extension('foobar')
88     'foobar'
89
90     """
91     return os.path.splitext(path)[0]
92
93
94 def without_all_extensions(path: str) -> str:
95     """Removes all extensions from a path; handles multiple extensions
96     like foobar.tar.gz -> foobar.
97
98     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
99     '/home/scott/foobar'
100
101     """
102     while '.' in path:
103         path = without_extension(path)
104     return path
105
106
107 def get_extension(path: str) -> str:
108     """Extract and return one extension from a file or path.
109
110     >>> get_extension('this_is_a_test.txt')
111     '.txt'
112
113     >>> get_extension('/home/scott/test.py')
114     '.py'
115
116     >>> get_extension('foobar')
117     ''
118
119     """
120     return os.path.splitext(path)[1]
121
122
123 def get_all_extensions(path: str) -> List[str]:
124     """Return the extensions of a file or path in order.
125
126     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
127     ['.tar', '.gz', '.1']
128
129     """
130     ret = []
131     while True:
132         ext = get_extension(path)
133         path = without_extension(path)
134         if ext:
135             ret.append(ext)
136         else:
137             ret.reverse()
138             return ret
139
140
141 def without_path(filespec: str) -> str:
142     """Returns the base filename without any leading path.
143
144     >>> without_path('/home/scott/foo.py')
145     'foo.py'
146
147     >>> without_path('foo.py')
148     'foo.py'
149
150     """
151     return os.path.split(filespec)[1]
152
153
154 def get_path(filespec: str) -> str:
155     """Returns just the path of the filespec by removing the filename and
156     extension.
157
158     >>> get_path('/home/scott/foobar.py')
159     '/home/scott'
160
161     >>> get_path('~scott/frapp.txt')
162     '~scott'
163
164     """
165     return os.path.split(filespec)[0]
166
167
168 def get_canonical_path(filespec: str) -> str:
169     """Returns a canonicalized absolute path.
170
171     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
172     '/usr/home/scott/foo.txt'
173
174     """
175     return os.path.realpath(filespec)
176
177
178 def create_path_if_not_exist(path, on_error=None):
179     """
180     Attempts to create path if it does not exist. If on_error is
181     specified, it is called with an exception if one occurs, otherwise
182     exception is rethrown.
183
184     >>> import uuid
185     >>> import os
186     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
187     >>> os.path.exists(path)
188     False
189     >>> create_path_if_not_exist(path)
190     >>> os.path.exists(path)
191     True
192     """
193     logger.debug(f"Creating path {path}")
194     previous_umask = os.umask(0)
195     try:
196         os.makedirs(path)
197         os.chmod(path, 0o777)
198     except OSError as ex:
199         if ex.errno != errno.EEXIST and not os.path.isdir(path):
200             if on_error is not None:
201                 on_error(path, ex)
202             else:
203                 raise
204     finally:
205         os.umask(previous_umask)
206
207
208 def does_file_exist(filename: str) -> bool:
209     """Returns True if a file exists and is a normal file.
210
211     >>> does_file_exist(__file__)
212     True
213     """
214     return os.path.exists(filename) and os.path.isfile(filename)
215
216
217 def file_is_readable(filename: str) -> bool:
218     return does_file_exist(filename) and os.access(filename, os.R_OK)
219
220
221 def file_is_writable(filename: str) -> bool:
222     return does_file_exist(filename) and os.access(filename, os.W_OK)
223
224
225 def file_is_executable(filename: str) -> bool:
226     return does_file_exist(filename) and os.access(filename, os.X_OK)
227
228
229 def does_directory_exist(dirname: str) -> bool:
230     """Returns True if a file exists and is a directory.
231
232     >>> does_directory_exist('/tmp')
233     True
234     """
235     return os.path.exists(dirname) and os.path.isdir(dirname)
236
237
238 def does_path_exist(pathname: str) -> bool:
239     """Just a more verbose wrapper around os.path.exists."""
240     return os.path.exists(pathname)
241
242
243 def get_file_size(filename: str) -> int:
244     """Returns the size of a file in bytes."""
245     return os.path.getsize(filename)
246
247
248 def is_normal_file(filename: str) -> bool:
249     """Returns True if filename is a normal file.
250
251     >>> is_normal_file(__file__)
252     True
253     """
254     return os.path.isfile(filename)
255
256
257 def is_directory(filename: str) -> bool:
258     """Returns True if filename is a directory.
259
260     >>> is_directory('/tmp')
261     True
262     """
263     return os.path.isdir(filename)
264
265
266 def is_symlink(filename: str) -> bool:
267     """True if filename is a symlink, False otherwise.
268
269     >>> is_symlink('/tmp')
270     False
271
272     >>> is_symlink('/home')
273     True
274
275     """
276     return os.path.islink(filename)
277
278
279 def is_same_file(file1: str, file2: str) -> bool:
280     """Returns True if the two files are the same inode.
281
282     >>> is_same_file('/tmp', '/tmp/../tmp')
283     True
284
285     >>> is_same_file('/tmp', '/home')
286     False
287
288     """
289     return os.path.samefile(file1, file2)
290
291
292 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
293     """Stats the file and returns an os.stat_result or None on error."""
294     try:
295         return os.stat(filename)
296     except Exception as e:
297         logger.exception(e)
298         return None
299
300
301 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
302     tss = get_file_raw_timestamps(filename)
303     if tss is not None:
304         return extractor(tss)
305     return None
306
307
308 def get_file_raw_atime(filename: str) -> Optional[float]:
309     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
310
311
312 def get_file_raw_mtime(filename: str) -> Optional[float]:
313     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
314
315
316 def get_file_raw_ctime(filename: str) -> Optional[float]:
317     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
318
319
320 def get_file_md5(filename: str) -> str:
321     """Hashes filename's contents and returns an MD5."""
322     file_hash = hashlib.md5()
323     with open(filename, "rb") as f:
324         chunk = f.read(8192)
325         while chunk:
326             file_hash.update(chunk)
327             chunk = f.read(8192)
328     return file_hash.hexdigest()
329
330
331 def set_file_raw_atime(filename: str, atime: float):
332     mtime = get_file_raw_mtime(filename)
333     assert mtime is not None
334     os.utime(filename, (atime, mtime))
335
336
337 def set_file_raw_mtime(filename: str, mtime: float):
338     atime = get_file_raw_atime(filename)
339     assert atime is not None
340     os.utime(filename, (atime, mtime))
341
342
343 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
344     if ts is not None:
345         os.utime(filename, (ts, ts))
346     else:
347         os.utime(filename, None)
348
349
350 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
351     ts = producer(filename)
352     if ts is not None:
353         return datetime.datetime.fromtimestamp(ts)
354     return None
355
356
357 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
358     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
359
360
361 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
362     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
363
364
365 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
366     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
367
368
369 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
370     now = time.time()
371     ts = get_file_raw_timestamps(filename)
372     if ts is None:
373         return None
374     result = extractor(ts)
375     return now - result
376
377
378 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
379     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
380
381
382 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
383     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
384
385
386 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
387     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
388
389
390 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
391     age = get_file_timestamp_age_seconds(filename, extractor)
392     if age is not None:
393         return datetime.timedelta(seconds=float(age))
394     return None
395
396
397 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
398     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
399
400
401 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
402     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
403
404
405 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
406     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
407
408
409 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
410     from datetime_utils import describe_duration, describe_duration_briefly
411
412     age = get_file_timestamp_age_seconds(filename, extractor)
413     if age is None:
414         return None
415     if brief:
416         return describe_duration_briefly(age)
417     else:
418         return describe_duration(age)
419
420
421 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
422     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
423
424
425 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
426     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
427
428
429 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
430     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
431
432
433 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
434     pathlib.Path(filename, mode=mode).touch()
435
436
437 def expand_globs(in_filename: str):
438     for filename in glob.glob(in_filename):
439         yield filename
440
441
442 def get_files(directory: str):
443     for filename in os.listdir(directory):
444         full_path = join(directory, filename)
445         if isfile(full_path) and exists(full_path):
446             yield full_path
447
448
449 def get_directories(directory: str):
450     for d in os.listdir(directory):
451         full_path = join(directory, d)
452         if not isfile(full_path) and exists(full_path):
453             yield full_path
454
455
456 def get_files_recursive(directory: str):
457     for filename in get_files(directory):
458         yield filename
459     for subdir in get_directories(directory):
460         for file_or_directory in get_files_recursive(subdir):
461             yield file_or_directory
462
463
464 class FileWriter(object):
465     def __init__(self, filename: str) -> None:
466         self.filename = filename
467         uuid = uuid4()
468         self.tempfile = f'{filename}-{uuid}.tmp'
469         self.handle: Optional[TextIO] = None
470
471     def __enter__(self) -> TextIO:
472         assert not does_path_exist(self.tempfile)
473         self.handle = open(self.tempfile, mode="w")
474         return self.handle
475
476     def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
477         if self.handle is not None:
478             self.handle.close()
479             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
480             ret = os.system(cmd)
481             if (ret >> 8) != 0:
482                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
483         return None
484
485
486 if __name__ == '__main__':
487     import doctest
488
489     doctest.testmod()