More cleanup, yey!
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import glob
8 import hashlib
9 import logging
10 import os
11 import pathlib
12 import re
13 import time
14 from os.path import exists, isfile, join
15 from typing import List, Optional, TextIO
16 from uuid import uuid4
17
18 logger = logging.getLogger(__name__)
19
20
21 def remove_newlines(x):
22     return x.replace('\n', '')
23
24
25 def strip_whitespace(x):
26     return x.strip()
27
28
29 def remove_hash_comments(x):
30     return re.sub(r'#.*$', '', x)
31
32
33 def slurp_file(
34     filename: str,
35     *,
36     skip_blank_lines=False,
37     line_transformers=[],
38 ):
39     ret = []
40     if not file_is_readable(filename):
41         raise Exception(f'{filename} can\'t be read.')
42     with open(filename) as rf:
43         for line in rf:
44             for transformation in line_transformers:
45                 line = transformation(line)
46             if skip_blank_lines and line == '':
47                 continue
48             ret.append(line)
49     return ret
50
51
52 def remove(path: str) -> None:
53     """Deletes a file.  Raises if path refers to a directory or a file
54     that doesn't exist.
55
56     >>> import os
57     >>> filename = '/tmp/file_utils_test_file'
58     >>> os.system(f'touch {filename}')
59     0
60     >>> does_file_exist(filename)
61     True
62     >>> remove(filename)
63     >>> does_file_exist(filename)
64     False
65
66     """
67     os.remove(path)
68
69
70 def delete(path: str) -> None:
71     os.remove(path)
72
73
74 def without_extension(path: str) -> str:
75     """Remove one extension from a file or path.
76
77     >>> without_extension('foobar.txt')
78     'foobar'
79
80     >>> without_extension('/home/scott/frapp.py')
81     '/home/scott/frapp'
82
83     >>> without_extension('a.b.c.tar.gz')
84     'a.b.c.tar'
85
86     >>> without_extension('foobar')
87     'foobar'
88
89     """
90     return os.path.splitext(path)[0]
91
92
93 def without_all_extensions(path: str) -> str:
94     """Removes all extensions from a path; handles multiple extensions
95     like foobar.tar.gz -> foobar.
96
97     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
98     '/home/scott/foobar'
99
100     """
101     while '.' in path:
102         path = without_extension(path)
103     return path
104
105
106 def get_extension(path: str) -> str:
107     """Extract and return one extension from a file or path.
108
109     >>> get_extension('this_is_a_test.txt')
110     '.txt'
111
112     >>> get_extension('/home/scott/test.py')
113     '.py'
114
115     >>> get_extension('foobar')
116     ''
117
118     """
119     return os.path.splitext(path)[1]
120
121
122 def get_all_extensions(path: str) -> List[str]:
123     """Return the extensions of a file or path in order.
124
125     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
126     ['.tar', '.gz', '.1']
127
128     """
129     ret = []
130     while True:
131         ext = get_extension(path)
132         path = without_extension(path)
133         if ext:
134             ret.append(ext)
135         else:
136             ret.reverse()
137             return ret
138
139
140 def without_path(filespec: str) -> str:
141     """Returns the base filename without any leading path.
142
143     >>> without_path('/home/scott/foo.py')
144     'foo.py'
145
146     >>> without_path('foo.py')
147     'foo.py'
148
149     """
150     return os.path.split(filespec)[1]
151
152
153 def get_path(filespec: str) -> str:
154     """Returns just the path of the filespec by removing the filename and
155     extension.
156
157     >>> get_path('/home/scott/foobar.py')
158     '/home/scott'
159
160     >>> get_path('~scott/frapp.txt')
161     '~scott'
162
163     """
164     return os.path.split(filespec)[0]
165
166
167 def get_canonical_path(filespec: str) -> str:
168     """Returns a canonicalized absolute path.
169
170     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
171     '/usr/home/scott/foo.txt'
172
173     """
174     return os.path.realpath(filespec)
175
176
177 def create_path_if_not_exist(path, on_error=None):
178     """
179     Attempts to create path if it does not exist. If on_error is
180     specified, it is called with an exception if one occurs, otherwise
181     exception is rethrown.
182
183     >>> import uuid
184     >>> import os
185     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
186     >>> os.path.exists(path)
187     False
188     >>> create_path_if_not_exist(path)
189     >>> os.path.exists(path)
190     True
191     """
192     logger.debug("Creating path %s", path)
193     previous_umask = os.umask(0)
194     try:
195         os.makedirs(path)
196         os.chmod(path, 0o777)
197     except OSError as ex:
198         if ex.errno != errno.EEXIST and not os.path.isdir(path):
199             if on_error is not None:
200                 on_error(path, ex)
201             else:
202                 raise
203     finally:
204         os.umask(previous_umask)
205
206
207 def does_file_exist(filename: str) -> bool:
208     """Returns True if a file exists and is a normal file.
209
210     >>> does_file_exist(__file__)
211     True
212     """
213     return os.path.exists(filename) and os.path.isfile(filename)
214
215
216 def file_is_readable(filename: str) -> bool:
217     return does_file_exist(filename) and os.access(filename, os.R_OK)
218
219
220 def file_is_writable(filename: str) -> bool:
221     return does_file_exist(filename) and os.access(filename, os.W_OK)
222
223
224 def file_is_executable(filename: str) -> bool:
225     return does_file_exist(filename) and os.access(filename, os.X_OK)
226
227
228 def does_directory_exist(dirname: str) -> bool:
229     """Returns True if a file exists and is a directory.
230
231     >>> does_directory_exist('/tmp')
232     True
233     """
234     return os.path.exists(dirname) and os.path.isdir(dirname)
235
236
237 def does_path_exist(pathname: str) -> bool:
238     """Just a more verbose wrapper around os.path.exists."""
239     return os.path.exists(pathname)
240
241
242 def get_file_size(filename: str) -> int:
243     """Returns the size of a file in bytes."""
244     return os.path.getsize(filename)
245
246
247 def is_normal_file(filename: str) -> bool:
248     """Returns True if filename is a normal file.
249
250     >>> is_normal_file(__file__)
251     True
252     """
253     return os.path.isfile(filename)
254
255
256 def is_directory(filename: str) -> bool:
257     """Returns True if filename is a directory.
258
259     >>> is_directory('/tmp')
260     True
261     """
262     return os.path.isdir(filename)
263
264
265 def is_symlink(filename: str) -> bool:
266     """True if filename is a symlink, False otherwise.
267
268     >>> is_symlink('/tmp')
269     False
270
271     >>> is_symlink('/home')
272     True
273
274     """
275     return os.path.islink(filename)
276
277
278 def is_same_file(file1: str, file2: str) -> bool:
279     """Returns True if the two files are the same inode.
280
281     >>> is_same_file('/tmp', '/tmp/../tmp')
282     True
283
284     >>> is_same_file('/tmp', '/home')
285     False
286
287     """
288     return os.path.samefile(file1, file2)
289
290
291 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
292     """Stats the file and returns an os.stat_result or None on error."""
293     try:
294         return os.stat(filename)
295     except Exception as e:
296         logger.exception(e)
297         return None
298
299
300 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
301     tss = get_file_raw_timestamps(filename)
302     if tss is not None:
303         return extractor(tss)
304     return None
305
306
307 def get_file_raw_atime(filename: str) -> Optional[float]:
308     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
309
310
311 def get_file_raw_mtime(filename: str) -> Optional[float]:
312     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
313
314
315 def get_file_raw_ctime(filename: str) -> Optional[float]:
316     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
317
318
319 def get_file_md5(filename: str) -> str:
320     """Hashes filename's contents and returns an MD5."""
321     file_hash = hashlib.md5()
322     with open(filename, "rb") as f:
323         chunk = f.read(8192)
324         while chunk:
325             file_hash.update(chunk)
326             chunk = f.read(8192)
327     return file_hash.hexdigest()
328
329
330 def set_file_raw_atime(filename: str, atime: float):
331     mtime = get_file_raw_mtime(filename)
332     assert mtime is not None
333     os.utime(filename, (atime, mtime))
334
335
336 def set_file_raw_mtime(filename: str, mtime: float):
337     atime = get_file_raw_atime(filename)
338     assert atime is not None
339     os.utime(filename, (atime, mtime))
340
341
342 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
343     if ts is not None:
344         os.utime(filename, (ts, ts))
345     else:
346         os.utime(filename, None)
347
348
349 def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
350     ts = producer(filename)
351     if ts is not None:
352         return datetime.datetime.fromtimestamp(ts)
353     return None
354
355
356 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
357     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
358
359
360 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
361     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
362
363
364 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
365     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
366
367
368 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
369     now = time.time()
370     ts = get_file_raw_timestamps(filename)
371     if ts is None:
372         return None
373     result = extractor(ts)
374     return now - result
375
376
377 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
378     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
379
380
381 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
382     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
383
384
385 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
386     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
387
388
389 def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
390     age = get_file_timestamp_age_seconds(filename, extractor)
391     if age is not None:
392         return datetime.timedelta(seconds=float(age))
393     return None
394
395
396 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
397     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
398
399
400 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
401     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
402
403
404 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
405     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
406
407
408 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
409     from datetime_utils import describe_duration, describe_duration_briefly
410
411     age = get_file_timestamp_age_seconds(filename, extractor)
412     if age is None:
413         return None
414     if brief:
415         return describe_duration_briefly(age)
416     else:
417         return describe_duration(age)
418
419
420 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
421     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
422
423
424 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
425     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
426
427
428 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
429     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
430
431
432 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
433     pathlib.Path(filename, mode=mode).touch()
434
435
436 def expand_globs(in_filename: str):
437     for filename in glob.glob(in_filename):
438         yield filename
439
440
441 def get_files(directory: str):
442     for filename in os.listdir(directory):
443         full_path = join(directory, filename)
444         if isfile(full_path) and exists(full_path):
445             yield full_path
446
447
448 def get_directories(directory: str):
449     for d in os.listdir(directory):
450         full_path = join(directory, d)
451         if not isfile(full_path) and exists(full_path):
452             yield full_path
453
454
455 def get_files_recursive(directory: str):
456     for filename in get_files(directory):
457         yield filename
458     for subdir in get_directories(directory):
459         for file_or_directory in get_files_recursive(subdir):
460             yield file_or_directory
461
462
463 class FileWriter(object):
464     """A helper that writes a file to a temporary location and then moves
465     it atomically to its ultimate destination on close.
466
467     """
468
469     def __init__(self, filename: str) -> None:
470         self.filename = filename
471         uuid = uuid4()
472         self.tempfile = f'{filename}-{uuid}.tmp'
473         self.handle: Optional[TextIO] = None
474
475     def __enter__(self) -> TextIO:
476         assert not does_path_exist(self.tempfile)
477         self.handle = open(self.tempfile, mode="w")
478         return self.handle
479
480     def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
481         if self.handle is not None:
482             self.handle.close()
483             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
484             ret = os.system(cmd)
485             if (ret >> 8) != 0:
486                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
487         return None
488
489
490 if __name__ == '__main__':
491     import doctest
492
493     doctest.testmod()