More type annotations.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import re
13 import time
14 from typing import Optional
15 import glob
16 from os.path import isfile, join, exists
17 from typing import List, TextIO
18 from uuid import uuid4
19
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove_newlines(x):
25     return x.replace('\n', '')
26
27
28 def strip_whitespace(x):
29     return x.strip()
30
31
32 def remove_hash_comments(x):
33     return re.sub(r'#.*$', '', x)
34
35
36 def slurp_file(
37     filename: str,
38     *,
39     skip_blank_lines=False,
40     line_transformers=[],
41 ):
42     ret = []
43     if not file_is_readable(filename):
44         raise Exception(f'{filename} can\'t be read.')
45     with open(filename) as rf:
46         for line in rf:
47             for transformation in line_transformers:
48                 line = transformation(line)
49             if skip_blank_lines and line == '':
50                 continue
51             ret.append(line)
52     return ret
53
54
55 def remove(path: str) -> None:
56     """Deletes a file.  Raises if path refers to a directory or a file
57     that doesn't exist.
58
59     >>> import os
60     >>> filename = '/tmp/file_utils_test_file'
61     >>> os.system(f'touch {filename}')
62     0
63     >>> does_file_exist(filename)
64     True
65     >>> remove(filename)
66     >>> does_file_exist(filename)
67     False
68
69     """
70     os.remove(path)
71
72
73 def delete(path: str) -> None:
74     os.remove(path)
75
76
77 def without_extension(path: str) -> str:
78     """Remove one extension from a file or path.
79
80     >>> without_extension('foobar.txt')
81     'foobar'
82
83     >>> without_extension('/home/scott/frapp.py')
84     '/home/scott/frapp'
85
86     >>> without_extension('a.b.c.tar.gz')
87     'a.b.c.tar'
88
89     >>> without_extension('foobar')
90     'foobar'
91
92     """
93     return os.path.splitext(path)[0]
94
95
96 def without_all_extensions(path: str) -> str:
97     """Removes all extensions from a path; handles multiple extensions
98     like foobar.tar.gz -> foobar.
99
100     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
101     '/home/scott/foobar'
102
103     """
104     while '.' in path:
105         path = without_extension(path)
106     return path
107
108
109 def get_extension(path: str) -> str:
110     """Extract and return one extension from a file or path.
111
112     >>> get_extension('this_is_a_test.txt')
113     '.txt'
114
115     >>> get_extension('/home/scott/test.py')
116     '.py'
117
118     >>> get_extension('foobar')
119     ''
120
121     """
122     return os.path.splitext(path)[1]
123
124
125 def get_all_extensions(path: str) -> List[str]:
126     """Return the extensions of a file or path in order.
127
128     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
129     ['.tar', '.gz', '.1']
130
131     """
132     ret = []
133     while True:
134         ext = get_extension(path)
135         path = without_extension(path)
136         if ext:
137             ret.append(ext)
138         else:
139             ret.reverse()
140             return ret
141
142
143 def without_path(filespec: str) -> str:
144     """Returns the base filename without any leading path.
145
146     >>> without_path('/home/scott/foo.py')
147     'foo.py'
148
149     >>> without_path('foo.py')
150     'foo.py'
151
152     """
153     return os.path.split(filespec)[1]
154
155
156 def get_path(filespec: str) -> str:
157     """Returns just the path of the filespec by removing the filename and
158     extension.
159
160     >>> get_path('/home/scott/foobar.py')
161     '/home/scott'
162
163     >>> get_path('~scott/frapp.txt')
164     '~scott'
165
166     """
167     return os.path.split(filespec)[0]
168
169
170 def get_canonical_path(filespec: str) -> str:
171     """Returns a canonicalized absolute path.
172
173     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
174     '/usr/home/scott/foo.txt'
175
176     """
177     return os.path.realpath(filespec)
178
179
180 def create_path_if_not_exist(path, on_error=None):
181     """
182     Attempts to create path if it does not exist. If on_error is
183     specified, it is called with an exception if one occurs, otherwise
184     exception is rethrown.
185
186     >>> import uuid
187     >>> import os
188     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
189     >>> os.path.exists(path)
190     False
191     >>> create_path_if_not_exist(path)
192     >>> os.path.exists(path)
193     True
194     """
195     logger.debug(f"Creating path {path}")
196     previous_umask = os.umask(0)
197     try:
198         os.makedirs(path)
199         os.chmod(path, 0o777)
200     except OSError as ex:
201         if ex.errno != errno.EEXIST and not os.path.isdir(path):
202             if on_error is not None:
203                 on_error(path, ex)
204             else:
205                 raise
206     finally:
207         os.umask(previous_umask)
208
209
210 def does_file_exist(filename: str) -> bool:
211     """Returns True if a file exists and is a normal file.
212
213     >>> does_file_exist(__file__)
214     True
215     """
216     return os.path.exists(filename) and os.path.isfile(filename)
217
218
219 def file_is_readable(filename: str) -> bool:
220     return does_file_exist(filename) and os.access(filename, os.R_OK)
221
222
223 def file_is_writable(filename: str) -> bool:
224     return does_file_exist(filename) and os.access(filename, os.W_OK)
225
226
227 def file_is_executable(filename: str) -> bool:
228     return does_file_exist(filename) and os.access(filename, os.X_OK)
229
230
231 def does_directory_exist(dirname: str) -> bool:
232     """Returns True if a file exists and is a directory.
233
234     >>> does_directory_exist('/tmp')
235     True
236     """
237     return os.path.exists(dirname) and os.path.isdir(dirname)
238
239
240 def does_path_exist(pathname: str) -> bool:
241     """Just a more verbose wrapper around os.path.exists."""
242     return os.path.exists(pathname)
243
244
245 def get_file_size(filename: str) -> int:
246     """Returns the size of a file in bytes."""
247     return os.path.getsize(filename)
248
249
250 def is_normal_file(filename: str) -> bool:
251     """Returns True if filename is a normal file.
252
253     >>> is_normal_file(__file__)
254     True
255     """
256     return os.path.isfile(filename)
257
258
259 def is_directory(filename: str) -> bool:
260     """Returns True if filename is a directory.
261
262     >>> is_directory('/tmp')
263     True
264     """
265     return os.path.isdir(filename)
266
267
268 def is_symlink(filename: str) -> bool:
269     """True if filename is a symlink, False otherwise.
270
271     >>> is_symlink('/tmp')
272     False
273
274     >>> is_symlink('/home')
275     True
276
277     """
278     return os.path.islink(filename)
279
280
281 def is_same_file(file1: str, file2: str) -> bool:
282     """Returns True if the two files are the same inode.
283
284     >>> is_same_file('/tmp', '/tmp/../tmp')
285     True
286
287     >>> is_same_file('/tmp', '/home')
288     False
289
290     """
291     return os.path.samefile(file1, file2)
292
293
294 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
295     """Stats the file and returns an os.stat_result or None on error."""
296     try:
297         return os.stat(filename)
298     except Exception as e:
299         logger.exception(e)
300         return None
301
302
303 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
304     tss = get_file_raw_timestamps(filename)
305     if tss is not None:
306         return extractor(tss)
307     return None
308
309
310 def get_file_raw_atime(filename: str) -> Optional[float]:
311     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
312
313
314 def get_file_raw_mtime(filename: str) -> Optional[float]:
315     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
316
317
318 def get_file_raw_ctime(filename: str) -> Optional[float]:
319     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
320
321
322 def get_file_md5(filename: str) -> str:
323     """Hashes filename's contents and returns an MD5."""
324     file_hash = hashlib.md5()
325     with open(filename, "rb") as f:
326         chunk = f.read(8192)
327         while chunk:
328             file_hash.update(chunk)
329             chunk = f.read(8192)
330     return file_hash.hexdigest()
331
332
333 def set_file_raw_atime(filename: str, atime: float):
334     mtime = get_file_raw_mtime(filename)
335     assert mtime
336     os.utime(filename, (atime, mtime))
337
338
339 def set_file_raw_mtime(filename: str, mtime: float):
340     atime = get_file_raw_atime(filename)
341     assert atime
342     os.utime(filename, (atime, mtime))
343
344
345 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
346     if ts is not None:
347         os.utime(filename, (ts, ts))
348     else:
349         os.utime(filename, None)
350
351
352 def convert_file_timestamp_to_datetime(
353     filename: str, producer
354 ) -> Optional[datetime.datetime]:
355     ts = producer(filename)
356     if ts is not None:
357         return datetime.datetime.fromtimestamp(ts)
358     return None
359
360
361 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
362     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
363
364
365 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
366     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
367
368
369 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
370     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
371
372
373 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
374     now = time.time()
375     ts = get_file_raw_timestamps(filename)
376     if ts is None:
377         return None
378     result = extractor(ts)
379     return now - result
380
381
382 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
383     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
384
385
386 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
387     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
388
389
390 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
391     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
392
393
394 def get_file_timestamp_timedelta(
395     filename: str, extractor
396 ) -> Optional[datetime.timedelta]:
397     age = get_file_timestamp_age_seconds(filename, extractor)
398     if age is not None:
399         return datetime.timedelta(seconds=float(age))
400     return None
401
402
403 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
404     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
405
406
407 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
408     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
409
410
411 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
412     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
413
414
415 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
416     from datetime_utils import describe_duration, describe_duration_briefly
417
418     age = get_file_timestamp_age_seconds(filename, extractor)
419     if age is None:
420         return None
421     if brief:
422         return describe_duration_briefly(age)
423     else:
424         return describe_duration(age)
425
426
427 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
428     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
429
430
431 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
432     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
433
434
435 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
436     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
437
438
439 def touch_file(filename: str, *, mode: Optional[int] = 0o666):
440     pathlib.Path(filename, mode=mode).touch()
441
442
443 def expand_globs(in_filename: str):
444     for filename in glob.glob(in_filename):
445         yield filename
446
447
448 def get_files(directory: str):
449     for filename in os.listdir(directory):
450         full_path = join(directory, filename)
451         if isfile(full_path) and exists(full_path):
452             yield full_path
453
454
455 def get_directories(directory: str):
456     for d in os.listdir(directory):
457         full_path = join(directory, d)
458         if not isfile(full_path) and exists(full_path):
459             yield full_path
460
461
462 def get_files_recursive(directory: str):
463     for filename in get_files(directory):
464         yield filename
465     for subdir in get_directories(directory):
466         for file_or_directory in get_files_recursive(subdir):
467             yield file_or_directory
468
469
470 class FileWriter(object):
471     def __init__(self, filename: str) -> None:
472         self.filename = filename
473         uuid = uuid4()
474         self.tempfile = f'{filename}-{uuid}.tmp'
475         self.handle: Optional[TextIO] = None
476
477     def __enter__(self) -> TextIO:
478         assert not does_path_exist(self.tempfile)
479         self.handle = open(self.tempfile, mode="w")
480         return self.handle
481
482     def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
483         if self.handle is not None:
484             self.handle.close()
485             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
486             ret = os.system(cmd)
487             if (ret >> 8) != 0:
488                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
489         return None
490
491
492 if __name__ == '__main__':
493     import doctest
494
495     doctest.testmod()