Add some new helper methods to file utils.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import re
13 import time
14 from typing import Optional
15 import glob
16 from os.path import isfile, join, exists
17 from typing import List
18 from uuid import uuid4
19
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove_newlines(x):
25     return x.replace('\n', '')
26
27
28 def strip_whitespace(x):
29     return x.strip()
30
31
32 def remove_hash_comments(x):
33     return re.sub(r'#.*$', '', x)
34
35
36 def read_file_to_list(
37     filename: str, *, skip_blank_lines=False, line_transformations=[]
38 ):
39     ret = []
40     if not file_is_readable(filename):
41         raise Exception(f'{filename} can\'t be read.')
42     with open(filename) as rf:
43         for line in rf:
44             for transformation in line_transformations:
45                 line = transformation(line)
46             if skip_blank_lines and line == '':
47                 continue
48             ret.append(line)
49     return ret
50
51
52 def remove(path: str) -> None:
53     """Deletes a file.  Raises if path refers to a directory or a file
54     that doesn't exist.
55
56     >>> import os
57     >>> filename = '/tmp/file_utils_test_file'
58     >>> os.system(f'touch {filename}')
59     0
60     >>> does_file_exist(filename)
61     True
62     >>> remove(filename)
63     >>> does_file_exist(filename)
64     False
65
66     """
67     os.remove(path)
68
69
70 def delete(path: str) -> None:
71     os.remove(path)
72
73
74 def without_extension(path: str) -> str:
75     """Remove one extension from a file or path.
76
77     >>> without_extension('foobar.txt')
78     'foobar'
79
80     >>> without_extension('/home/scott/frapp.py')
81     '/home/scott/frapp'
82
83     >>> without_extension('a.b.c.tar.gz')
84     'a.b.c.tar'
85
86     >>> without_extension('foobar')
87     'foobar'
88
89     """
90     return os.path.splitext(path)[0]
91
92
93 def without_all_extensions(path: str) -> str:
94     """Removes all extensions from a path; handles multiple extensions
95     like foobar.tar.gz -> foobar.
96
97     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
98     '/home/scott/foobar'
99
100     """
101     while '.' in path:
102         path = without_extension(path)
103     return path
104
105
106 def get_extension(path: str) -> str:
107     """Extract and return one extension from a file or path.
108
109     >>> get_extension('this_is_a_test.txt')
110     '.txt'
111
112     >>> get_extension('/home/scott/test.py')
113     '.py'
114
115     >>> get_extension('foobar')
116     ''
117
118     """
119     return os.path.splitext(path)[1]
120
121
122 def get_all_extensions(path: str) -> List[str]:
123     """Return the extensions of a file or path in order.
124
125     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
126     ['.tar', '.gz', '.1']
127
128     """
129     ret = []
130     while True:
131         ext = get_extension(path)
132         path = without_extension(path)
133         if ext:
134             ret.append(ext)
135         else:
136             ret.reverse()
137             return ret
138
139
140 def without_path(filespec: str) -> str:
141     """Returns the base filename without any leading path.
142
143     >>> without_path('/home/scott/foo.py')
144     'foo.py'
145
146     >>> without_path('foo.py')
147     'foo.py'
148
149     """
150     return os.path.split(filespec)[1]
151
152
153 def get_path(filespec: str) -> str:
154     """Returns just the path of the filespec by removing the filename and
155     extension.
156
157     >>> get_path('/home/scott/foobar.py')
158     '/home/scott'
159
160     >>> get_path('~scott/frapp.txt')
161     '~scott'
162
163     """
164     return os.path.split(filespec)[0]
165
166
167 def get_canonical_path(filespec: str) -> str:
168     """Returns a canonicalized absolute path.
169
170     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
171     '/usr/home/scott/foo.txt'
172
173     """
174     return os.path.realpath(filespec)
175
176
177 def create_path_if_not_exist(path, on_error=None):
178     """
179     Attempts to create path if it does not exist. If on_error is
180     specified, it is called with an exception if one occurs, otherwise
181     exception is rethrown.
182
183     >>> import uuid
184     >>> import os
185     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
186     >>> os.path.exists(path)
187     False
188     >>> create_path_if_not_exist(path)
189     >>> os.path.exists(path)
190     True
191     """
192     logger.debug(f"Creating path {path}")
193     previous_umask = os.umask(0)
194     try:
195         os.makedirs(path)
196         os.chmod(path, 0o777)
197     except OSError as ex:
198         if ex.errno != errno.EEXIST and not os.path.isdir(path):
199             if on_error is not None:
200                 on_error(path, ex)
201             else:
202                 raise
203     finally:
204         os.umask(previous_umask)
205
206
207 def does_file_exist(filename: str) -> bool:
208     """Returns True if a file exists and is a normal file.
209
210     >>> does_file_exist(__file__)
211     True
212     """
213     return os.path.exists(filename) and os.path.isfile(filename)
214
215
216 def file_is_readable(filename: str) -> bool:
217     return does_file_exist(filename) and os.access(filename, os.R_OK)
218
219
220 def file_is_writable(filename: str) -> bool:
221     return does_file_exist(filename) and os.access(filename, os.W_OK)
222
223
224 def file_is_executable(filename: str) -> bool:
225     return does_file_exist(filename) and os.access(filename, os.X_OK)
226
227
228 def does_directory_exist(dirname: str) -> bool:
229     """Returns True if a file exists and is a directory.
230
231     >>> does_directory_exist('/tmp')
232     True
233     """
234     return os.path.exists(dirname) and os.path.isdir(dirname)
235
236
237 def does_path_exist(pathname: str) -> bool:
238     """Just a more verbose wrapper around os.path.exists."""
239     return os.path.exists(pathname)
240
241
242 def get_file_size(filename: str) -> int:
243     """Returns the size of a file in bytes."""
244     return os.path.getsize(filename)
245
246
247 def is_normal_file(filename: str) -> bool:
248     """Returns True if filename is a normal file.
249
250     >>> is_normal_file(__file__)
251     True
252     """
253     return os.path.isfile(filename)
254
255
256 def is_directory(filename: str) -> bool:
257     """Returns True if filename is a directory.
258
259     >>> is_directory('/tmp')
260     True
261     """
262     return os.path.isdir(filename)
263
264
265 def is_symlink(filename: str) -> bool:
266     """True if filename is a symlink, False otherwise.
267
268     >>> is_symlink('/tmp')
269     False
270
271     >>> is_symlink('/home')
272     True
273
274     """
275     return os.path.islink(filename)
276
277
278 def is_same_file(file1: str, file2: str) -> bool:
279     """Returns True if the two files are the same inode.
280
281     >>> is_same_file('/tmp', '/tmp/../tmp')
282     True
283
284     >>> is_same_file('/tmp', '/home')
285     False
286
287     """
288     return os.path.samefile(file1, file2)
289
290
291 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
292     """Stats the file and returns an os.stat_result or None on error."""
293     try:
294         return os.stat(filename)
295     except Exception as e:
296         logger.exception(e)
297         return None
298
299
300 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
301     tss = get_file_raw_timestamps(filename)
302     if tss is not None:
303         return extractor(tss)
304     return None
305
306
307 def get_file_raw_atime(filename: str) -> Optional[float]:
308     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
309
310
311 def get_file_raw_mtime(filename: str) -> Optional[float]:
312     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
313
314
315 def get_file_raw_ctime(filename: str) -> Optional[float]:
316     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
317
318
319 def get_file_md5(filename: str) -> str:
320     """Hashes filename's contents and returns an MD5."""
321     file_hash = hashlib.md5()
322     with open(filename, "rb") as f:
323         chunk = f.read(8192)
324         while chunk:
325             file_hash.update(chunk)
326             chunk = f.read(8192)
327     return file_hash.hexdigest()
328
329
330 def set_file_raw_atime(filename: str, atime: float):
331     mtime = get_file_raw_mtime(filename)
332     os.utime(filename, (atime, mtime))
333
334
335 def set_file_raw_mtime(filename: str, mtime: float):
336     atime = get_file_raw_atime(filename)
337     os.utime(filename, (atime, mtime))
338
339
340 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
341     if ts is not None:
342         os.utime(filename, (ts, ts))
343     else:
344         os.utime(filename, None)
345
346
347 def convert_file_timestamp_to_datetime(
348     filename: str, producer
349 ) -> Optional[datetime.datetime]:
350     ts = producer(filename)
351     if ts is not None:
352         return datetime.datetime.fromtimestamp(ts)
353     return None
354
355
356 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
357     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
358
359
360 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
361     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
362
363
364 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
365     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
366
367
368 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
369     now = time.time()
370     ts = get_file_raw_timestamps(filename)
371     if ts is None:
372         return None
373     result = extractor(ts)
374     return now - result
375
376
377 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
378     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
379
380
381 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
382     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
383
384
385 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
386     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
387
388
389 def get_file_timestamp_timedelta(
390     filename: str, extractor
391 ) -> Optional[datetime.timedelta]:
392     age = get_file_timestamp_age_seconds(filename, extractor)
393     if age is not None:
394         return datetime.timedelta(seconds=float(age))
395     return None
396
397
398 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
399     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
400
401
402 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
403     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
404
405
406 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
407     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
408
409
410 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
411     from datetime_utils import describe_duration, describe_duration_briefly
412
413     age = get_file_timestamp_age_seconds(filename, extractor)
414     if age is None:
415         return None
416     if brief:
417         return describe_duration_briefly(age)
418     else:
419         return describe_duration(age)
420
421
422 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
423     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
424
425
426 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
427     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
428
429
430 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
431     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
432
433
434 def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
435     return pathlib.Path(filename, mode=mode).touch()
436
437
438 def expand_globs(in_filename: str):
439     for filename in glob.glob(in_filename):
440         yield filename
441
442
443 def get_files(directory: str):
444     for filename in os.listdir(directory):
445         full_path = join(directory, filename)
446         if isfile(full_path) and exists(full_path):
447             yield full_path
448
449
450 def get_directories(directory: str):
451     for d in os.listdir(directory):
452         full_path = join(directory, d)
453         if not isfile(full_path) and exists(full_path):
454             yield full_path
455
456
457 def get_files_recursive(directory: str):
458     for filename in get_files(directory):
459         yield filename
460     for subdir in get_directories(directory):
461         for file_or_directory in get_files_recursive(subdir):
462             yield file_or_directory
463
464
465 class FileWriter(object):
466     def __init__(self, filename: str) -> None:
467         self.filename = filename
468         uuid = uuid4()
469         self.tempfile = f'{filename}-{uuid}.tmp'
470         self.handle = None
471
472     def __enter__(self) -> io.TextIOWrapper:
473         assert not does_path_exist(self.tempfile)
474         self.handle = open(self.tempfile, mode="w")
475         return self.handle
476
477     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
478         if self.handle is not None:
479             self.handle.close()
480             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
481             ret = os.system(cmd)
482             if (ret >> 8) != 0:
483                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
484         return None
485
486
487 if __name__ == '__main__':
488     import doctest
489
490     doctest.testmod()