Ahem. Still running black?
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import io
11 import pathlib
12 import time
13 from typing import Optional
14 import glob
15 from os.path import isfile, join, exists
16 from typing import List
17 from uuid import uuid4
18
19
20 logger = logging.getLogger(__name__)
21
22
23 def remove(path: str) -> None:
24     """Deletes a file.  Raises if path refers to a directory or a file
25     that doesn't exist.
26
27     >>> import os
28     >>> filename = '/tmp/file_utils_test_file'
29     >>> os.system(f'touch {filename}')
30     0
31     >>> does_file_exist(filename)
32     True
33     >>> remove(filename)
34     >>> does_file_exist(filename)
35     False
36
37     """
38     os.remove(path)
39
40
41 def delete(path: str) -> None:
42     os.remove(path)
43
44
45 def without_extension(path: str) -> str:
46     """Remove one extension from a file or path.
47
48     >>> without_extension('foobar.txt')
49     'foobar'
50
51     >>> without_extension('/home/scott/frapp.py')
52     '/home/scott/frapp'
53
54     >>> without_extension('a.b.c.tar.gz')
55     'a.b.c.tar'
56
57     >>> without_extension('foobar')
58     'foobar'
59
60     """
61     return os.path.splitext(path)[0]
62
63
64 def without_all_extensions(path: str) -> str:
65     """Removes all extensions from a path; handles multiple extensions
66     like foobar.tar.gz -> foobar.
67
68     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
69     '/home/scott/foobar'
70
71     """
72     while '.' in path:
73         path = without_extension(path)
74     return path
75
76
77 def get_extension(path: str) -> str:
78     """Extract and return one extension from a file or path.
79
80     >>> get_extension('this_is_a_test.txt')
81     '.txt'
82
83     >>> get_extension('/home/scott/test.py')
84     '.py'
85
86     >>> get_extension('foobar')
87     ''
88
89     """
90     return os.path.splitext(path)[1]
91
92
93 def get_all_extensions(path: str) -> List[str]:
94     """Return the extensions of a file or path in order.
95
96     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
97     ['.tar', '.gz', '.1']
98
99     """
100     ret = []
101     while True:
102         ext = get_extension(path)
103         path = without_extension(path)
104         if ext:
105             ret.append(ext)
106         else:
107             ret.reverse()
108             return ret
109
110
111 def without_path(filespec: str) -> str:
112     """Returns the base filename without any leading path.
113
114     >>> without_path('/home/scott/foo.py')
115     'foo.py'
116
117     >>> without_path('foo.py')
118     'foo.py'
119
120     """
121     return os.path.split(filespec)[1]
122
123
124 def get_path(filespec: str) -> str:
125     """Returns just the path of the filespec by removing the filename and
126     extension.
127
128     >>> get_path('/home/scott/foobar.py')
129     '/home/scott'
130
131     >>> get_path('~scott/frapp.txt')
132     '~scott'
133
134     """
135     return os.path.split(filespec)[0]
136
137
138 def get_canonical_path(filespec: str) -> str:
139     """Returns a canonicalized absolute path.
140
141     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
142     '/usr/home/scott/foo.txt'
143
144     """
145     return os.path.realpath(filespec)
146
147
148 def create_path_if_not_exist(path, on_error=None):
149     """
150     Attempts to create path if it does not exist. If on_error is
151     specified, it is called with an exception if one occurs, otherwise
152     exception is rethrown.
153
154     >>> import uuid
155     >>> import os
156     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
157     >>> os.path.exists(path)
158     False
159     >>> create_path_if_not_exist(path)
160     >>> os.path.exists(path)
161     True
162     """
163     logger.debug(f"Creating path {path}")
164     previous_umask = os.umask(0)
165     try:
166         os.makedirs(path)
167         os.chmod(path, 0o777)
168     except OSError as ex:
169         if ex.errno != errno.EEXIST and not os.path.isdir(path):
170             if on_error is not None:
171                 on_error(path, ex)
172             else:
173                 raise
174     finally:
175         os.umask(previous_umask)
176
177
178 def does_file_exist(filename: str) -> bool:
179     """Returns True if a file exists and is a normal file.
180
181     >>> does_file_exist(__file__)
182     True
183     """
184     return os.path.exists(filename) and os.path.isfile(filename)
185
186
187 def does_directory_exist(dirname: str) -> bool:
188     """Returns True if a file exists and is a directory.
189
190     >>> does_directory_exist('/tmp')
191     True
192     """
193     return os.path.exists(dirname) and os.path.isdir(dirname)
194
195
196 def does_path_exist(pathname: str) -> bool:
197     """Just a more verbose wrapper around os.path.exists."""
198     return os.path.exists(pathname)
199
200
201 def get_file_size(filename: str) -> int:
202     """Returns the size of a file in bytes."""
203     return os.path.getsize(filename)
204
205
206 def is_normal_file(filename: str) -> bool:
207     """Returns True if filename is a normal file.
208
209     >>> is_normal_file(__file__)
210     True
211     """
212     return os.path.isfile(filename)
213
214
215 def is_directory(filename: str) -> bool:
216     """Returns True if filename is a directory.
217
218     >>> is_directory('/tmp')
219     True
220     """
221     return os.path.isdir(filename)
222
223
224 def is_symlink(filename: str) -> bool:
225     """True if filename is a symlink, False otherwise.
226
227     >>> is_symlink('/tmp')
228     False
229
230     >>> is_symlink('/home')
231     True
232
233     """
234     return os.path.islink(filename)
235
236
237 def is_same_file(file1: str, file2: str) -> bool:
238     """Returns True if the two files are the same inode.
239
240     >>> is_same_file('/tmp', '/tmp/../tmp')
241     True
242
243     >>> is_same_file('/tmp', '/home')
244     False
245
246     """
247     return os.path.samefile(file1, file2)
248
249
250 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
251     """Stats the file and returns an os.stat_result or None on error."""
252     try:
253         return os.stat(filename)
254     except Exception as e:
255         logger.exception(e)
256         return None
257
258
259 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
260     tss = get_file_raw_timestamps(filename)
261     if tss is not None:
262         return extractor(tss)
263     return None
264
265
266 def get_file_raw_atime(filename: str) -> Optional[float]:
267     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
268
269
270 def get_file_raw_mtime(filename: str) -> Optional[float]:
271     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
272
273
274 def get_file_raw_ctime(filename: str) -> Optional[float]:
275     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
276
277
278 def get_file_md5(filename: str) -> str:
279     """Hashes filename's contents and returns an MD5."""
280     file_hash = hashlib.md5()
281     with open(filename, "rb") as f:
282         chunk = f.read(8192)
283         while chunk:
284             file_hash.update(chunk)
285             chunk = f.read(8192)
286     return file_hash.hexdigest()
287
288
289 def set_file_raw_atime(filename: str, atime: float):
290     mtime = get_file_raw_mtime(filename)
291     os.utime(filename, (atime, mtime))
292
293
294 def set_file_raw_mtime(filename: str, mtime: float):
295     atime = get_file_raw_atime(filename)
296     os.utime(filename, (atime, mtime))
297
298
299 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
300     if ts is not None:
301         os.utime(filename, (ts, ts))
302     else:
303         os.utime(filename, None)
304
305
306 def convert_file_timestamp_to_datetime(
307     filename: str, producer
308 ) -> Optional[datetime.datetime]:
309     ts = producer(filename)
310     if ts is not None:
311         return datetime.datetime.fromtimestamp(ts)
312     return None
313
314
315 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
316     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
317
318
319 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
320     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
321
322
323 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
324     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
325
326
327 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
328     now = time.time()
329     ts = get_file_raw_timestamps(filename)
330     if ts is None:
331         return None
332     result = extractor(ts)
333     return now - result
334
335
336 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
337     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
338
339
340 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
341     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
342
343
344 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
345     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
346
347
348 def get_file_timestamp_timedelta(
349     filename: str, extractor
350 ) -> Optional[datetime.timedelta]:
351     age = get_file_timestamp_age_seconds(filename, extractor)
352     if age is not None:
353         return datetime.timedelta(seconds=float(age))
354     return None
355
356
357 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
358     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
359
360
361 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
362     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
363
364
365 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
366     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
367
368
369 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
370     from datetime_utils import describe_duration, describe_duration_briefly
371
372     age = get_file_timestamp_age_seconds(filename, extractor)
373     if age is None:
374         return None
375     if brief:
376         return describe_duration_briefly(age)
377     else:
378         return describe_duration(age)
379
380
381 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
382     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
383
384
385 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
386     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
387
388
389 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
390     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
391
392
393 def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
394     return pathlib.Path(filename, mode=mode).touch()
395
396
397 def expand_globs(in_filename: str):
398     for filename in glob.glob(in_filename):
399         yield filename
400
401
402 def get_files(directory: str):
403     for filename in os.listdir(directory):
404         full_path = join(directory, filename)
405         if isfile(full_path) and exists(full_path):
406             yield full_path
407
408
409 def get_directories(directory: str):
410     for d in os.listdir(directory):
411         full_path = join(directory, d)
412         if not isfile(full_path) and exists(full_path):
413             yield full_path
414
415
416 def get_files_recursive(directory: str):
417     for filename in get_files(directory):
418         yield filename
419     for subdir in get_directories(directory):
420         for file_or_directory in get_files_recursive(subdir):
421             yield file_or_directory
422
423
424 class FileWriter(object):
425     def __init__(self, filename: str) -> None:
426         self.filename = filename
427         uuid = uuid4()
428         self.tempfile = f'{filename}-{uuid}.tmp'
429         self.handle = None
430
431     def __enter__(self) -> io.TextIOWrapper:
432         assert not does_path_exist(self.tempfile)
433         self.handle = open(self.tempfile, mode="w")
434         return self.handle
435
436     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
437         if self.handle is not None:
438             self.handle.close()
439             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
440             ret = os.system(cmd)
441             if (ret >> 8) != 0:
442                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
443         return None
444
445
446 if __name__ == '__main__':
447     import doctest
448
449     doctest.testmod()