Added some helpers to file_utils and improved the docs/doctests.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 from dataclasses import dataclass
6 import datetime
7 import errno
8 import hashlib
9 import logging
10 import os
11 import io
12 import pathlib
13 import time
14 from typing import Optional
15 import glob
16 from os.path import isfile, join, exists
17 from typing import List
18 from uuid import uuid4
19
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove(path: str) -> None:
25     """Deletes a file.  Raises if path refers to a directory or a file
26     that doesn't exist.
27
28     >>> import os
29     >>> filename = '/tmp/file_utils_test_file'
30     >>> os.system(f'touch {filename}')
31     0
32     >>> does_file_exist(filename)
33     True
34     >>> remove(filename)
35     >>> does_file_exist(filename)
36     False
37
38     """
39     os.remove(path)
40
41
42 def delete(path: str) -> None:
43     os.remove(path)
44
45
46 def without_extension(path: str) -> str:
47     """Remove one extension from a file or path.
48
49     >>> without_extension('foobar.txt')
50     'foobar'
51
52     >>> without_extension('/home/scott/frapp.py')
53     '/home/scott/frapp'
54
55     >>> without_extension('a.b.c.tar.gz')
56     'a.b.c.tar'
57
58     >>> without_extension('foobar')
59     'foobar'
60
61     """
62     return os.path.splitext(path)[0]
63
64
65 def without_all_extensions(path: str) -> str:
66     """Removes all extensions from a path; handles multiple extensions
67     like foobar.tar.gz -> foobar.
68
69     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
70     '/home/scott/foobar'
71
72     """
73     while '.' in path:
74         path = without_extension(path)
75     return path
76
77
78 def get_extension(path: str) -> str:
79     """Extract and return one extension from a file or path.
80
81     >>> get_extension('this_is_a_test.txt')
82     '.txt'
83
84     >>> get_extension('/home/scott/test.py')
85     '.py'
86
87     >>> get_extension('foobar')
88     ''
89
90     """
91     return os.path.splitext(path)[1]
92
93
94 def get_all_extensions(path: str) -> List[str]:
95     """Return the extensions of a file or path in order.
96
97     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
98     ['.tar', '.gz', '.1']
99
100     """
101     ret = []
102     while True:
103         ext = get_extension(path)
104         path = without_extension(path)
105         if ext:
106             ret.append(ext)
107         else:
108             ret.reverse()
109             return ret
110
111
112 def without_path(filespec: str) -> str:
113     """Returns the base filename without any leading path.
114
115     >>> without_path('/home/scott/foo.py')
116     'foo.py'
117
118     >>> without_path('foo.py')
119     'foo.py'
120
121     """
122     return os.path.split(filespec)[1]
123
124
125 def get_path(filespec: str) -> str:
126     """Returns just the path of the filespec by removing the filename and
127     extension.
128
129     >>> get_path('/home/scott/foobar.py')
130     '/home/scott'
131
132     >>> get_path('~scott/frapp.txt')
133     '~scott'
134
135     """
136     return os.path.split(filespec)[0]
137
138
139 def get_canonical_path(filespec: str) -> str:
140     """Returns a canonicalized absolute path.
141
142     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
143     '/usr/home/scott/foo.txt'
144
145     """
146     return os.path.realpath(filespec)
147
148
149 def create_path_if_not_exist(path, on_error=None):
150     """
151     Attempts to create path if it does not exist. If on_error is
152     specified, it is called with an exception if one occurs, otherwise
153     exception is rethrown.
154
155     >>> import uuid
156     >>> import os
157     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
158     >>> os.path.exists(path)
159     False
160     >>> create_path_if_not_exist(path)
161     >>> os.path.exists(path)
162     True
163     """
164     logger.debug(f"Creating path {path}")
165     previous_umask = os.umask(0)
166     try:
167         os.makedirs(path)
168         os.chmod(path, 0o777)
169     except OSError as ex:
170         if ex.errno != errno.EEXIST and not os.path.isdir(path):
171             if on_error is not None:
172                 on_error(path, ex)
173             else:
174                 raise
175     finally:
176         os.umask(previous_umask)
177
178
179 def does_file_exist(filename: str) -> bool:
180     """Returns True if a file exists and is a normal file.
181
182     >>> does_file_exist(__file__)
183     True
184     """
185     return os.path.exists(filename) and os.path.isfile(filename)
186
187
188 def does_directory_exist(dirname: str) -> bool:
189     """Returns True if a file exists and is a directory.
190
191     >>> does_directory_exist('/tmp')
192     True
193     """
194     return os.path.exists(dirname) and os.path.isdir(dirname)
195
196
197 def does_path_exist(pathname: str) -> bool:
198     """Just a more verbose wrapper around os.path.exists."""
199     return os.path.exists(pathname)
200
201
202 def get_file_size(filename: str) -> int:
203     """Returns the size of a file in bytes."""
204     return os.path.getsize(filename)
205
206
207 def is_normal_file(filename: str) -> bool:
208     """Returns True if filename is a normal file.
209
210     >>> is_normal_file(__file__)
211     True
212     """
213     return os.path.isfile(filename)
214
215
216 def is_directory(filename: str) -> bool:
217     """Returns True if filename is a directory.
218
219     >>> is_directory('/tmp')
220     True
221     """
222     return os.path.isdir(filename)
223
224
225 def is_symlink(filename: str) -> bool:
226     """True if filename is a symlink, False otherwise.
227
228     >>> is_symlink('/tmp')
229     False
230
231     >>> is_symlink('/home')
232     True
233
234     """
235     return os.path.islink(filename)
236
237
238 def is_same_file(file1: str, file2: str) -> bool:
239     """Returns True if the two files are the same inode.
240
241     >>> is_same_file('/tmp', '/tmp/../tmp')
242     True
243
244     >>> is_same_file('/tmp', '/home')
245     False
246
247     """
248     return os.path.samefile(file1, file2)
249
250
251 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
252     """Stats the file and returns an os.stat_result or None on error.
253
254     """
255     try:
256         return os.stat(filename)
257     except Exception as e:
258         logger.exception(e)
259         return None
260
261
262 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
263     tss = get_file_raw_timestamps(filename)
264     if tss is not None:
265         return extractor(tss)
266     return None
267
268
269 def get_file_raw_atime(filename: str) -> Optional[float]:
270     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
271
272
273 def get_file_raw_mtime(filename: str) -> Optional[float]:
274     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
275
276
277 def get_file_raw_ctime(filename: str) -> Optional[float]:
278     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
279
280
281 def get_file_md5(filename: str) -> str:
282     """Hashes filename's contents and returns an MD5.
283
284     """
285     file_hash = hashlib.md5()
286     with open(filename, "rb") as f:
287         chunk = f.read(8192)
288         while chunk:
289             file_hash.update(chunk)
290             chunk = f.read(8192)
291     return file_hash.hexdigest()
292
293
294 def set_file_raw_atime(filename: str, atime: float):
295     mtime = get_file_raw_mtime(filename)
296     os.utime(filename, (atime, mtime))
297
298
299 def set_file_raw_mtime(filename: str, mtime: float):
300     atime = get_file_raw_atime(filename)
301     os.utime(filename, (atime, mtime))
302
303
304 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
305     if ts is not None:
306         os.utime(filename, (ts, ts))
307     else:
308         os.utime(filename, None)
309
310
311 def convert_file_timestamp_to_datetime(
312     filename: str, producer
313 ) -> Optional[datetime.datetime]:
314     ts = producer(filename)
315     if ts is not None:
316         return datetime.datetime.fromtimestamp(ts)
317     return None
318
319
320 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
321     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
322
323
324 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
325     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
326
327
328 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
329     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
330
331
332 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
333     now = time.time()
334     ts = get_file_raw_timestamps(filename)
335     if ts is None:
336         return None
337     result = extractor(ts)
338     return now - result
339
340
341 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
342     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
343
344
345 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
346     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
347
348
349 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
350     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
351
352
353 def get_file_timestamp_timedelta(
354     filename: str, extractor
355 ) -> Optional[datetime.timedelta]:
356     age = get_file_timestamp_age_seconds(filename, extractor)
357     if age is not None:
358         return datetime.timedelta(seconds=float(age))
359     return None
360
361
362 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
363     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
364
365
366 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
367     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
368
369
370 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
371     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
372
373
374 def describe_file_timestamp(
375     filename: str, extractor, *, brief=False
376 ) -> Optional[str]:
377     from datetime_utils import describe_duration, describe_duration_briefly
378     age = get_file_timestamp_age_seconds(filename, extractor)
379     if age is None:
380         return None
381     if brief:
382         return describe_duration_briefly(age)
383     else:
384         return describe_duration(age)
385
386
387 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
388     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
389
390
391 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
392     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
393
394
395 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
396     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
397
398
399 def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
400     return pathlib.Path(filename, mode=mode).touch()
401
402
403 def expand_globs(in_filename: str):
404     for filename in glob.glob(in_filename):
405         yield filename
406
407
408 def get_files(directory: str):
409     for filename in os.listdir(directory):
410         full_path = join(directory, filename)
411         if isfile(full_path) and exists(full_path):
412             yield full_path
413
414
415 def get_directories(directory: str):
416     for d in os.listdir(directory):
417         full_path = join(directory, d)
418         if not isfile(full_path) and exists(full_path):
419             yield full_path
420
421
422 def get_files_recursive(directory: str):
423     for filename in get_files(directory):
424         yield filename
425     for subdir in get_directories(directory):
426         for file_or_directory in get_files_recursive(subdir):
427             yield file_or_directory
428
429
430 class FileWriter(object):
431     def __init__(self, filename: str) -> None:
432         self.filename = filename
433         uuid = uuid4()
434         self.tempfile = f'{filename}-{uuid}.tmp'
435         self.handle = None
436
437     def __enter__(self) -> io.TextIOWrapper:
438         assert not does_path_exist(self.tempfile)
439         self.handle = open(self.tempfile, mode="w")
440         return self.handle
441
442     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
443         if self.handle is not None:
444             self.handle.close()
445             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
446             ret = os.system(cmd)
447             if (ret >> 8) != 0:
448                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
449         return None
450
451
452 if __name__ == '__main__':
453     import doctest
454     doctest.testmod()