Change locking boundaries for shared dict. Add a unit test.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import re
13 import time
14 from typing import Optional
15 import glob
16 from os.path import isfile, join, exists
17 from typing import List
18 from uuid import uuid4
19
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove_newlines(x):
25     return x.replace('\n', '')
26
27
28 def strip_whitespace(x):
29     return x.strip()
30
31
32 def remove_hash_comments(x):
33     return re.sub(r'#.*$', '', x)
34
35
36 def slurp_file(
37     filename: str,
38     *,
39     skip_blank_lines=False,
40     line_transformers=[],
41 ):
42     ret = []
43     if not file_is_readable(filename):
44         raise Exception(f'{filename} can\'t be read.')
45     with open(filename) as rf:
46         for line in rf:
47             for transformation in line_transformers:
48                 line = transformation(line)
49             if skip_blank_lines and line == '':
50                 continue
51             ret.append(line)
52     return ret
53
54
55 def remove(path: str) -> None:
56     """Deletes a file.  Raises if path refers to a directory or a file
57     that doesn't exist.
58
59     >>> import os
60     >>> filename = '/tmp/file_utils_test_file'
61     >>> os.system(f'touch {filename}')
62     0
63     >>> does_file_exist(filename)
64     True
65     >>> remove(filename)
66     >>> does_file_exist(filename)
67     False
68
69     """
70     os.remove(path)
71
72
73 def delete(path: str) -> None:
74     os.remove(path)
75
76
77 def without_extension(path: str) -> str:
78     """Remove one extension from a file or path.
79
80     >>> without_extension('foobar.txt')
81     'foobar'
82
83     >>> without_extension('/home/scott/frapp.py')
84     '/home/scott/frapp'
85
86     >>> without_extension('a.b.c.tar.gz')
87     'a.b.c.tar'
88
89     >>> without_extension('foobar')
90     'foobar'
91
92     """
93     return os.path.splitext(path)[0]
94
95
96 def without_all_extensions(path: str) -> str:
97     """Removes all extensions from a path; handles multiple extensions
98     like foobar.tar.gz -> foobar.
99
100     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
101     '/home/scott/foobar'
102
103     """
104     while '.' in path:
105         path = without_extension(path)
106     return path
107
108
109 def get_extension(path: str) -> str:
110     """Extract and return one extension from a file or path.
111
112     >>> get_extension('this_is_a_test.txt')
113     '.txt'
114
115     >>> get_extension('/home/scott/test.py')
116     '.py'
117
118     >>> get_extension('foobar')
119     ''
120
121     """
122     return os.path.splitext(path)[1]
123
124
125 def get_all_extensions(path: str) -> List[str]:
126     """Return the extensions of a file or path in order.
127
128     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
129     ['.tar', '.gz', '.1']
130
131     """
132     ret = []
133     while True:
134         ext = get_extension(path)
135         path = without_extension(path)
136         if ext:
137             ret.append(ext)
138         else:
139             ret.reverse()
140             return ret
141
142
143 def without_path(filespec: str) -> str:
144     """Returns the base filename without any leading path.
145
146     >>> without_path('/home/scott/foo.py')
147     'foo.py'
148
149     >>> without_path('foo.py')
150     'foo.py'
151
152     """
153     return os.path.split(filespec)[1]
154
155
156 def get_path(filespec: str) -> str:
157     """Returns just the path of the filespec by removing the filename and
158     extension.
159
160     >>> get_path('/home/scott/foobar.py')
161     '/home/scott'
162
163     >>> get_path('~scott/frapp.txt')
164     '~scott'
165
166     """
167     return os.path.split(filespec)[0]
168
169
170 def get_canonical_path(filespec: str) -> str:
171     """Returns a canonicalized absolute path.
172
173     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
174     '/usr/home/scott/foo.txt'
175
176     """
177     return os.path.realpath(filespec)
178
179
180 def create_path_if_not_exist(path, on_error=None):
181     """
182     Attempts to create path if it does not exist. If on_error is
183     specified, it is called with an exception if one occurs, otherwise
184     exception is rethrown.
185
186     >>> import uuid
187     >>> import os
188     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
189     >>> os.path.exists(path)
190     False
191     >>> create_path_if_not_exist(path)
192     >>> os.path.exists(path)
193     True
194     """
195     logger.debug(f"Creating path {path}")
196     previous_umask = os.umask(0)
197     try:
198         os.makedirs(path)
199         os.chmod(path, 0o777)
200     except OSError as ex:
201         if ex.errno != errno.EEXIST and not os.path.isdir(path):
202             if on_error is not None:
203                 on_error(path, ex)
204             else:
205                 raise
206     finally:
207         os.umask(previous_umask)
208
209
210 def does_file_exist(filename: str) -> bool:
211     """Returns True if a file exists and is a normal file.
212
213     >>> does_file_exist(__file__)
214     True
215     """
216     return os.path.exists(filename) and os.path.isfile(filename)
217
218
219 def file_is_readable(filename: str) -> bool:
220     return does_file_exist(filename) and os.access(filename, os.R_OK)
221
222
223 def file_is_writable(filename: str) -> bool:
224     return does_file_exist(filename) and os.access(filename, os.W_OK)
225
226
227 def file_is_executable(filename: str) -> bool:
228     return does_file_exist(filename) and os.access(filename, os.X_OK)
229
230
231 def does_directory_exist(dirname: str) -> bool:
232     """Returns True if a file exists and is a directory.
233
234     >>> does_directory_exist('/tmp')
235     True
236     """
237     return os.path.exists(dirname) and os.path.isdir(dirname)
238
239
240 def does_path_exist(pathname: str) -> bool:
241     """Just a more verbose wrapper around os.path.exists."""
242     return os.path.exists(pathname)
243
244
245 def get_file_size(filename: str) -> int:
246     """Returns the size of a file in bytes."""
247     return os.path.getsize(filename)
248
249
250 def is_normal_file(filename: str) -> bool:
251     """Returns True if filename is a normal file.
252
253     >>> is_normal_file(__file__)
254     True
255     """
256     return os.path.isfile(filename)
257
258
259 def is_directory(filename: str) -> bool:
260     """Returns True if filename is a directory.
261
262     >>> is_directory('/tmp')
263     True
264     """
265     return os.path.isdir(filename)
266
267
268 def is_symlink(filename: str) -> bool:
269     """True if filename is a symlink, False otherwise.
270
271     >>> is_symlink('/tmp')
272     False
273
274     >>> is_symlink('/home')
275     True
276
277     """
278     return os.path.islink(filename)
279
280
281 def is_same_file(file1: str, file2: str) -> bool:
282     """Returns True if the two files are the same inode.
283
284     >>> is_same_file('/tmp', '/tmp/../tmp')
285     True
286
287     >>> is_same_file('/tmp', '/home')
288     False
289
290     """
291     return os.path.samefile(file1, file2)
292
293
294 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
295     """Stats the file and returns an os.stat_result or None on error."""
296     try:
297         return os.stat(filename)
298     except Exception as e:
299         logger.exception(e)
300         return None
301
302
303 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
304     tss = get_file_raw_timestamps(filename)
305     if tss is not None:
306         return extractor(tss)
307     return None
308
309
310 def get_file_raw_atime(filename: str) -> Optional[float]:
311     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
312
313
314 def get_file_raw_mtime(filename: str) -> Optional[float]:
315     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
316
317
318 def get_file_raw_ctime(filename: str) -> Optional[float]:
319     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
320
321
322 def get_file_md5(filename: str) -> str:
323     """Hashes filename's contents and returns an MD5."""
324     file_hash = hashlib.md5()
325     with open(filename, "rb") as f:
326         chunk = f.read(8192)
327         while chunk:
328             file_hash.update(chunk)
329             chunk = f.read(8192)
330     return file_hash.hexdigest()
331
332
333 def set_file_raw_atime(filename: str, atime: float):
334     mtime = get_file_raw_mtime(filename)
335     os.utime(filename, (atime, mtime))
336
337
338 def set_file_raw_mtime(filename: str, mtime: float):
339     atime = get_file_raw_atime(filename)
340     os.utime(filename, (atime, mtime))
341
342
343 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
344     if ts is not None:
345         os.utime(filename, (ts, ts))
346     else:
347         os.utime(filename, None)
348
349
350 def convert_file_timestamp_to_datetime(
351     filename: str, producer
352 ) -> Optional[datetime.datetime]:
353     ts = producer(filename)
354     if ts is not None:
355         return datetime.datetime.fromtimestamp(ts)
356     return None
357
358
359 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
360     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
361
362
363 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
364     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
365
366
367 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
368     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
369
370
371 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
372     now = time.time()
373     ts = get_file_raw_timestamps(filename)
374     if ts is None:
375         return None
376     result = extractor(ts)
377     return now - result
378
379
380 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
381     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
382
383
384 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
385     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
386
387
388 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
389     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
390
391
392 def get_file_timestamp_timedelta(
393     filename: str, extractor
394 ) -> Optional[datetime.timedelta]:
395     age = get_file_timestamp_age_seconds(filename, extractor)
396     if age is not None:
397         return datetime.timedelta(seconds=float(age))
398     return None
399
400
401 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
402     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
403
404
405 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
406     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
407
408
409 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
410     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
411
412
413 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
414     from datetime_utils import describe_duration, describe_duration_briefly
415
416     age = get_file_timestamp_age_seconds(filename, extractor)
417     if age is None:
418         return None
419     if brief:
420         return describe_duration_briefly(age)
421     else:
422         return describe_duration(age)
423
424
425 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
426     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
427
428
429 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
430     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
431
432
433 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
434     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
435
436
437 def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
438     return pathlib.Path(filename, mode=mode).touch()
439
440
441 def expand_globs(in_filename: str):
442     for filename in glob.glob(in_filename):
443         yield filename
444
445
446 def get_files(directory: str):
447     for filename in os.listdir(directory):
448         full_path = join(directory, filename)
449         if isfile(full_path) and exists(full_path):
450             yield full_path
451
452
453 def get_directories(directory: str):
454     for d in os.listdir(directory):
455         full_path = join(directory, d)
456         if not isfile(full_path) and exists(full_path):
457             yield full_path
458
459
460 def get_files_recursive(directory: str):
461     for filename in get_files(directory):
462         yield filename
463     for subdir in get_directories(directory):
464         for file_or_directory in get_files_recursive(subdir):
465             yield file_or_directory
466
467
468 class FileWriter(object):
469     def __init__(self, filename: str) -> None:
470         self.filename = filename
471         uuid = uuid4()
472         self.tempfile = f'{filename}-{uuid}.tmp'
473         self.handle = None
474
475     def __enter__(self) -> io.TextIOWrapper:
476         assert not does_path_exist(self.tempfile)
477         self.handle = open(self.tempfile, mode="w")
478         return self.handle
479
480     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
481         if self.handle is not None:
482             self.handle.close()
483             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
484             ret = os.system(cmd)
485             if (ret >> 8) != 0:
486                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
487         return None
488
489
490 if __name__ == '__main__':
491     import doctest
492
493     doctest.testmod()