Ran black code formatter on everything.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import time
13 from typing import Optional
14 import glob
15 from os.path import isfile, join, exists
16 from typing import List
17 from uuid import uuid4
18
19
20 logger = logging.getLogger(__name__)
21
22
23 def remove(path: str) -> None:
24     """Deletes a file.  Raises if path refers to a directory or a file
25     that doesn't exist.
26
27     >>> import os
28     >>> filename = '/tmp/file_utils_test_file'
29     >>> os.system(f'touch {filename}')
30     0
31     >>> does_file_exist(filename)
32     True
33     >>> remove(filename)
34     >>> does_file_exist(filename)
35     False
36
37     """
38     os.remove(path)
39
40
41 def delete(path: str) -> None:
42     os.remove(path)
43
44
45 def without_extension(path: str) -> str:
46     """Remove one extension from a file or path.
47
48     >>> without_extension('foobar.txt')
49     'foobar'
50
51     >>> without_extension('/home/scott/frapp.py')
52     '/home/scott/frapp'
53
54     >>> without_extension('a.b.c.tar.gz')
55     'a.b.c.tar'
56
57     >>> without_extension('foobar')
58     'foobar'
59
60     """
61     return os.path.splitext(path)[0]
62
63
64 def without_all_extensions(path: str) -> str:
65     """Removes all extensions from a path; handles multiple extensions
66     like foobar.tar.gz -> foobar.
67
68     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
69     '/home/scott/foobar'
70
71     """
72     while '.' in path:
73         path = without_extension(path)
74     return path
75
76
77 def get_extension(path: str) -> str:
78     """Extract and return one extension from a file or path.
79
80     >>> get_extension('this_is_a_test.txt')
81     '.txt'
82
83     >>> get_extension('/home/scott/test.py')
84     '.py'
85
86     >>> get_extension('foobar')
87     ''
88
89     """
90     return os.path.splitext(path)[1]
91
92
93 def get_all_extensions(path: str) -> List[str]:
94     """Return the extensions of a file or path in order.
95
96     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
97     ['.tar', '.gz', '.1']
98
99     """
100     ret = []
101     while True:
102         ext = get_extension(path)
103         path = without_extension(path)
104         if ext:
105             ret.append(ext)
106         else:
107             ret.reverse()
108             return ret
109
110
111 def without_path(filespec: str) -> str:
112     """Returns the base filename without any leading path.
113
114     >>> without_path('/home/scott/foo.py')
115     'foo.py'
116
117     >>> without_path('foo.py')
118     'foo.py'
119
120     """
121     return os.path.split(filespec)[1]
122
123
124 def get_path(filespec: str) -> str:
125     """Returns just the path of the filespec by removing the filename and
126     extension.
127
128     >>> get_path('/home/scott/foobar.py')
129     '/home/scott'
130
131     >>> get_path('~scott/frapp.txt')
132     '~scott'
133
134     """
135     return os.path.split(filespec)[0]
136
137
138 def get_canonical_path(filespec: str) -> str:
139     """Returns a canonicalized absolute path.
140
141     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
142     '/usr/home/scott/foo.txt'
143
144     """
145     return os.path.realpath(filespec)
146
147
148 def create_path_if_not_exist(path, on_error=None):
149     """
150     Attempts to create path if it does not exist. If on_error is
151     specified, it is called with an exception if one occurs, otherwise
152     exception is rethrown.
153
154     >>> import uuid
155     >>> import os
156     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
157     >>> os.path.exists(path)
158     False
159     >>> create_path_if_not_exist(path)
160     >>> os.path.exists(path)
161     True
162     """
163     logger.debug(f"Creating path {path}")
164     previous_umask = os.umask(0)
165     try:
166         os.makedirs(path)
167         os.chmod(path, 0o777)
168     except OSError as ex:
169         if ex.errno != errno.EEXIST and not os.path.isdir(path):
170             if on_error is not None:
171                 on_error(path, ex)
172             else:
173                 raise
174     finally:
175         os.umask(previous_umask)
176
177
178 def does_file_exist(filename: str) -> bool:
179     """Returns True if a file exists and is a normal file.
180
181     >>> does_file_exist(__file__)
182     True
183     """
184     return os.path.exists(filename) and os.path.isfile(filename)
185
186
187 def does_directory_exist(dirname: str) -> bool:
188     """Returns True if a file exists and is a directory.
189
190     >>> does_directory_exist('/tmp')
191     True
192     """
193     return os.path.exists(dirname) and os.path.isdir(dirname)
194
195
196 def does_path_exist(pathname: str) -> bool:
197     """Just a more verbose wrapper around os.path.exists."""
198     return os.path.exists(pathname)
199
200
201 def get_file_size(filename: str) -> int:
202     """Returns the size of a file in bytes."""
203     return os.path.getsize(filename)
204
205
206 def is_normal_file(filename: str) -> bool:
207     """Returns True if filename is a normal file.
208
209     >>> is_normal_file(__file__)
210     True
211     """
212     return os.path.isfile(filename)
213
214
215 def is_directory(filename: str) -> bool:
216     """Returns True if filename is a directory.
217
218     >>> is_directory('/tmp')
219     True
220     """
221     return os.path.isdir(filename)
222
223
224 def is_symlink(filename: str) -> bool:
225     """True if filename is a symlink, False otherwise.
226
227     >>> is_symlink('/tmp')
228     False
229
230     >>> is_symlink('/home')
231     True
232
233     """
234     return os.path.islink(filename)
235
236
237 def is_same_file(file1: str, file2: str) -> bool:
238     """Returns True if the two files are the same inode.
239
240     >>> is_same_file('/tmp', '/tmp/../tmp')
241     True
242
243     >>> is_same_file('/tmp', '/home')
244     False
245
246     """
247     return os.path.samefile(file1, file2)
248
249
250 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
251     """Stats the file and returns an os.stat_result or None on error."""
252     try:
253         return os.stat(filename)
254     except Exception as e:
255         logger.exception(e)
256         return None
257
258
259 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
260     tss = get_file_raw_timestamps(filename)
261     if tss is not None:
262         return extractor(tss)
263     return None
264
265
266 def get_file_raw_atime(filename: str) -> Optional[float]:
267     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
268
269
270 def get_file_raw_mtime(filename: str) -> Optional[float]:
271     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
272
273
274 def get_file_raw_ctime(filename: str) -> Optional[float]:
275     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
276
277
278 def get_file_md5(filename: str) -> str:
279     """Hashes filename's contents and returns an MD5."""
280     file_hash = hashlib.md5()
281     with open(filename, "rb") as f:
282         chunk = f.read(8192)
283         while chunk:
284             file_hash.update(chunk)
285             chunk = f.read(8192)
286     return file_hash.hexdigest()
287
288
289 def set_file_raw_atime(filename: str, atime: float):
290     mtime = get_file_raw_mtime(filename)
291     os.utime(filename, (atime, mtime))
292
293
294 def set_file_raw_mtime(filename: str, mtime: float):
295     atime = get_file_raw_atime(filename)
296     os.utime(filename, (atime, mtime))
297
298
299 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
300     if ts is not None:
301         os.utime(filename, (ts, ts))
302     else:
303         os.utime(filename, None)
304
305
306 def convert_file_timestamp_to_datetime(
307     filename: str, producer
308 ) -> Optional[datetime.datetime]:
309     ts = producer(filename)
310     if ts is not None:
311         return datetime.datetime.fromtimestamp(ts)
312     return None
313
314
315 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
316     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
317
318
319 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
320     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
321
322
323 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
324     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
325
326
327 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
328     now = time.time()
329     ts = get_file_raw_timestamps(filename)
330     if ts is None:
331         return None
332     result = extractor(ts)
333     return now - result
334
335
336 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
337     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
338
339
340 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
341     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
342
343
344 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
345     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
346
347
348 def get_file_timestamp_timedelta(
349     filename: str, extractor
350 ) -> Optional[datetime.timedelta]:
351     age = get_file_timestamp_age_seconds(filename, extractor)
352     if age is not None:
353         return datetime.timedelta(seconds=float(age))
354     return None
355
356
357 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
358     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
359
360
361 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
362     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
363
364
365 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
366     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
367
368
369 def describe_file_timestamp(
370     filename: str, extractor, *, brief=False
371 ) -> Optional[str]:
372     from datetime_utils import describe_duration, describe_duration_briefly
373
374     age = get_file_timestamp_age_seconds(filename, extractor)
375     if age is None:
376         return None
377     if brief:
378         return describe_duration_briefly(age)
379     else:
380         return describe_duration(age)
381
382
383 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
384     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
385
386
387 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
388     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
389
390
391 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
392     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
393
394
395 def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
396     return pathlib.Path(filename, mode=mode).touch()
397
398
399 def expand_globs(in_filename: str):
400     for filename in glob.glob(in_filename):
401         yield filename
402
403
404 def get_files(directory: str):
405     for filename in os.listdir(directory):
406         full_path = join(directory, filename)
407         if isfile(full_path) and exists(full_path):
408             yield full_path
409
410
411 def get_directories(directory: str):
412     for d in os.listdir(directory):
413         full_path = join(directory, d)
414         if not isfile(full_path) and exists(full_path):
415             yield full_path
416
417
418 def get_files_recursive(directory: str):
419     for filename in get_files(directory):
420         yield filename
421     for subdir in get_directories(directory):
422         for file_or_directory in get_files_recursive(subdir):
423             yield file_or_directory
424
425
426 class FileWriter(object):
427     def __init__(self, filename: str) -> None:
428         self.filename = filename
429         uuid = uuid4()
430         self.tempfile = f'{filename}-{uuid}.tmp'
431         self.handle = None
432
433     def __enter__(self) -> io.TextIOWrapper:
434         assert not does_path_exist(self.tempfile)
435         self.handle = open(self.tempfile, mode="w")
436         return self.handle
437
438     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
439         if self.handle is not None:
440             self.handle.close()
441             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
442             ret = os.system(cmd)
443             if (ret >> 8) != 0:
444                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
445         return None
446
447
448 if __name__ == '__main__':
449     import doctest
450
451     doctest.testmod()