Unscrambler.py is no longer a utility; see ~bin/unscramble.py.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 from dataclasses import dataclass
6 import datetime
7 import errno
8 import hashlib
9 import logging
10 import os
11 import io
12 import pathlib
13 import time
14 from typing import Optional
15 import glob
16 from os.path import isfile, join, exists
17 from typing import List
18 from uuid import uuid4
19
20
21 logger = logging.getLogger(__name__)
22
23
24 def remove(path: str) -> None:
25     """Deletes a file.  Raises if path refers to a directory or a file
26     that doesn't exist.
27
28     >>> import os
29     >>> filename = '/tmp/file_utils_test_file'
30     >>> os.system(f'touch {filename}')
31     0
32     >>> does_file_exist(filename)
33     True
34     >>> remove(filename)
35     >>> does_file_exist(filename)
36     False
37
38     """
39     os.remove(path)
40
41
42 def delete(path: str) -> None:
43     os.remove(path)
44
45
46 def without_extension(path: str) -> str:
47     """Remove one extension from a file or path.
48
49     >>> without_extension('foobar.txt')
50     'foobar'
51
52     >>> without_extension('/home/scott/frapp.py')
53     '/home/scott/frapp'
54
55     >>> without_extension('a.b.c.tar.gz')
56     'a.b.c.tar'
57
58     >>> without_extension('foobar')
59     'foobar'
60
61     """
62     return os.path.splitext(path)[0]
63
64
65 def without_all_extensions(path: str) -> str:
66     """Removes all extensions from a path; handles multiple extensions
67     like foobar.tar.gz -> foobar.
68
69     >>> without_all_extensions('/home/scott/foobar.1.tar.gz')
70     '/home/scott/foobar'
71
72     """
73     while '.' in path:
74         path = without_extension(path)
75     return path
76
77
78 def get_extension(path: str) -> str:
79     """Extract and return one extension from a file or path.
80
81     >>> get_extension('this_is_a_test.txt')
82     '.txt'
83
84     >>> get_extension('/home/scott/test.py')
85     '.py'
86
87     >>> get_extension('foobar')
88     ''
89
90     """
91     return os.path.splitext(path)[1]
92
93
94 def get_all_extensions(path: str) -> List[str]:
95     """Return the extensions of a file or path in order.
96
97     >>> get_all_extensions('/home/scott/foo.tar.gz.1')
98     ['.tar', '.gz', '.1']
99
100     """
101     ret = []
102     while True:
103         ext = get_extension(path)
104         path = without_extension(path)
105         if ext:
106             ret.append(ext)
107         else:
108             ret.reverse()
109             return ret
110
111
112 def without_path(filespec: str) -> str:
113     """Returns the base filename without any leading path.
114
115     >>> without_path('/home/scott/foo.py')
116     'foo.py'
117
118     >>> without_path('foo.py')
119     'foo.py'
120
121     """
122     return os.path.split(filespec)[1]
123
124
125 def get_path(filespec: str) -> str:
126     """Returns just the path of the filespec by removing the filename and
127     extension.
128
129     >>> get_path('/home/scott/foobar.py')
130     '/home/scott'
131
132     >>> get_path('~scott/frapp.txt')
133     '~scott'
134
135     """
136     return os.path.split(filespec)[0]
137
138
139 def get_canonical_path(filespec: str) -> str:
140     """Returns a canonicalized absolute path.
141
142     >>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
143     '/usr/home/scott/foo.txt'
144
145     """
146     return os.path.realpath(filespec)
147
148
149 def create_path_if_not_exist(path, on_error=None):
150     """
151     Attempts to create path if it does not exist. If on_error is
152     specified, it is called with an exception if one occurs, otherwise
153     exception is rethrown.
154
155     >>> import uuid
156     >>> import os
157     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
158     >>> os.path.exists(path)
159     False
160     >>> create_path_if_not_exist(path)
161     >>> os.path.exists(path)
162     True
163     """
164     logger.debug(f"Creating path {path}")
165     previous_umask = os.umask(0)
166     try:
167         os.makedirs(path)
168         os.chmod(path, 0o777)
169     except OSError as ex:
170         if ex.errno != errno.EEXIST and not os.path.isdir(path):
171             if on_error is not None:
172                 on_error(path, ex)
173             else:
174                 raise
175     finally:
176         os.umask(previous_umask)
177
178
179 def does_file_exist(filename: str) -> bool:
180     """Returns True if a file exists and is a normal file.
181
182     >>> does_file_exist(__file__)
183     True
184     """
185     return os.path.exists(filename) and os.path.isfile(filename)
186
187
188 def does_directory_exist(dirname: str) -> bool:
189     """Returns True if a file exists and is a directory.
190
191     >>> does_directory_exist('/tmp')
192     True
193     """
194     return os.path.exists(dirname) and os.path.isdir(dirname)
195
196
197 def does_path_exist(pathname: str) -> bool:
198     """Just a more verbose wrapper around os.path.exists."""
199     return os.path.exists(pathname)
200
201
202 def get_file_size(filename: str) -> int:
203     """Returns the size of a file in bytes."""
204     return os.path.getsize(filename)
205
206
207 def is_normal_file(filename: str) -> bool:
208     """Returns True if filename is a normal file.
209
210     >>> is_normal_file(__file__)
211     True
212     """
213     return os.path.isfile(filename)
214
215
216 def is_directory(filename: str) -> bool:
217     """Returns True if filename is a directory.
218
219     >>> is_directory('/tmp')
220     True
221     """
222     return os.path.isdir(filename)
223
224
225 def is_symlink(filename: str) -> bool:
226     """True if filename is a symlink, False otherwise.
227
228     >>> is_symlink('/tmp')
229     False
230
231     >>> is_symlink('/home')
232     True
233
234     """
235     return os.path.islink(filename)
236
237
238 def is_same_file(file1: str, file2: str) -> bool:
239     """Returns True if the two files are the same inode.
240
241     >>> is_same_file('/tmp', '/tmp/../tmp')
242     True
243
244     >>> is_same_file('/tmp', '/home')
245     False
246
247     """
248     return os.path.samefile(file1, file2)
249
250
251 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
252     """Stats the file and returns an os.stat_result or None on error."""
253     try:
254         return os.stat(filename)
255     except Exception as e:
256         logger.exception(e)
257         return None
258
259
260 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
261     tss = get_file_raw_timestamps(filename)
262     if tss is not None:
263         return extractor(tss)
264     return None
265
266
267 def get_file_raw_atime(filename: str) -> Optional[float]:
268     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
269
270
271 def get_file_raw_mtime(filename: str) -> Optional[float]:
272     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
273
274
275 def get_file_raw_ctime(filename: str) -> Optional[float]:
276     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
277
278
279 def get_file_md5(filename: str) -> str:
280     """Hashes filename's contents and returns an MD5."""
281     file_hash = hashlib.md5()
282     with open(filename, "rb") as f:
283         chunk = f.read(8192)
284         while chunk:
285             file_hash.update(chunk)
286             chunk = f.read(8192)
287     return file_hash.hexdigest()
288
289
290 def set_file_raw_atime(filename: str, atime: float):
291     mtime = get_file_raw_mtime(filename)
292     os.utime(filename, (atime, mtime))
293
294
295 def set_file_raw_mtime(filename: str, mtime: float):
296     atime = get_file_raw_atime(filename)
297     os.utime(filename, (atime, mtime))
298
299
300 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
301     if ts is not None:
302         os.utime(filename, (ts, ts))
303     else:
304         os.utime(filename, None)
305
306
307 def convert_file_timestamp_to_datetime(
308     filename: str, producer
309 ) -> Optional[datetime.datetime]:
310     ts = producer(filename)
311     if ts is not None:
312         return datetime.datetime.fromtimestamp(ts)
313     return None
314
315
316 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
317     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
318
319
320 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
321     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
322
323
324 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
325     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
326
327
328 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
329     now = time.time()
330     ts = get_file_raw_timestamps(filename)
331     if ts is None:
332         return None
333     result = extractor(ts)
334     return now - result
335
336
337 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
338     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
339
340
341 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
342     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
343
344
345 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
346     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
347
348
349 def get_file_timestamp_timedelta(
350     filename: str, extractor
351 ) -> Optional[datetime.timedelta]:
352     age = get_file_timestamp_age_seconds(filename, extractor)
353     if age is not None:
354         return datetime.timedelta(seconds=float(age))
355     return None
356
357
358 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
359     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
360
361
362 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
363     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
364
365
366 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
367     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
368
369
370 def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
371     from datetime_utils import describe_duration, describe_duration_briefly
372
373     age = get_file_timestamp_age_seconds(filename, extractor)
374     if age is None:
375         return None
376     if brief:
377         return describe_duration_briefly(age)
378     else:
379         return describe_duration(age)
380
381
382 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
383     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
384
385
386 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
387     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
388
389
390 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
391     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
392
393
394 def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
395     return pathlib.Path(filename, mode=mode).touch()
396
397
398 def expand_globs(in_filename: str):
399     for filename in glob.glob(in_filename):
400         yield filename
401
402
403 def get_files(directory: str):
404     for filename in os.listdir(directory):
405         full_path = join(directory, filename)
406         if isfile(full_path) and exists(full_path):
407             yield full_path
408
409
410 def get_directories(directory: str):
411     for d in os.listdir(directory):
412         full_path = join(directory, d)
413         if not isfile(full_path) and exists(full_path):
414             yield full_path
415
416
417 def get_files_recursive(directory: str):
418     for filename in get_files(directory):
419         yield filename
420     for subdir in get_directories(directory):
421         for file_or_directory in get_files_recursive(subdir):
422             yield file_or_directory
423
424
425 class FileWriter(object):
426     def __init__(self, filename: str) -> None:
427         self.filename = filename
428         uuid = uuid4()
429         self.tempfile = f'{filename}-{uuid}.tmp'
430         self.handle = None
431
432     def __enter__(self) -> io.TextIOWrapper:
433         assert not does_path_exist(self.tempfile)
434         self.handle = open(self.tempfile, mode="w")
435         return self.handle
436
437     def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
438         if self.handle is not None:
439             self.handle.close()
440             cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
441             ret = os.system(cmd)
442             if (ret >> 8) != 0:
443                 raise Exception(f'{cmd} failed, exit value {ret>>8}')
444         return None
445
446
447 if __name__ == '__main__':
448     import doctest
449
450     doctest.testmod()