Various sundry changes.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import time
11 from typing import Optional
12 import glob
13 from os.path import isfile, join, exists
14
15 import datetime_utils
16
17
18 logger = logging.getLogger(__name__)
19
20
21 def create_path_if_not_exist(path, on_error=None):
22     """
23     Attempts to create path if it does not exist. If on_error is
24     specified, it is called with an exception if one occurs, otherwise
25     exception is rethrown.
26
27     >>> import uuid
28     >>> import os
29     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
30     >>> os.path.exists(path)
31     False
32     >>> create_path_if_not_exist(path)
33     >>> os.path.exists(path)
34     True
35     """
36     logger.debug(f"Creating path {path}")
37     previous_umask = os.umask(0)
38     try:
39         os.makedirs(path)
40         os.chmod(path, 0o777)
41     except OSError as ex:
42         if ex.errno != errno.EEXIST and not os.path.isdir(path):
43             if on_error is not None:
44                 on_error(path, ex)
45             else:
46                 raise
47     finally:
48         os.umask(previous_umask)
49
50
51 def does_file_exist(filename: str) -> bool:
52     return os.path.exists(filename) and os.path.isfile(filename)
53
54
55 def does_directory_exist(dirname: str) -> bool:
56     return os.path.exists(dirname) and os.path.isdir(dirname)
57
58
59 def does_path_exist(pathname: str) -> bool:
60     return os.path.exists(pathname)
61
62
63 def get_file_size(filename: str) -> int:
64     return os.path.getsize(filename)
65
66
67 def is_normal_file(filename: str) -> bool:
68     return os.path.isfile(filename)
69
70
71 def is_directory(filename: str) -> bool:
72     return os.path.isdir(filename)
73
74
75 def is_symlink(filename: str) -> bool:
76     return os.path.islink(filename)
77
78
79 def is_same_file(file1: str, file2: str) -> bool:
80     return os.path.samefile(file1, file2)
81
82
83 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
84     try:
85         return os.stat(filename)
86     except Exception as e:
87         logger.exception(e)
88         return None
89
90
91 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
92     tss = get_file_raw_timestamps(filename)
93     if tss is not None:
94         return extractor(tss)
95     return None
96
97
98 def get_file_raw_atime(filename: str) -> Optional[float]:
99     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
100
101
102 def get_file_raw_mtime(filename: str) -> Optional[float]:
103     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
104
105
106 def get_file_raw_ctime(filename: str) -> Optional[float]:
107     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
108
109
110 def get_file_md5(filename: str) -> str:
111     file_hash = hashlib.md5()
112     with open(filename, "rb") as f:
113         chunk = f.read(8192)
114         while chunk:
115             file_hash.update(chunk)
116             chunk = f.read(8192)
117     return file_hash.hexdigest()
118
119
120 def set_file_raw_atime(filename: str, atime: float):
121     mtime = get_file_raw_mtime(filename)
122     os.utime(filename, (atime, mtime))
123
124
125 def set_file_raw_mtime(filename: str, mtime: float):
126     atime = get_file_raw_atime(filename)
127     os.utime(filename, (atime, mtime))
128
129
130 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
131     if ts is not None:
132         os.utime(filename, (ts, ts))
133     else:
134         os.utime(filename, None)
135
136
137 def convert_file_timestamp_to_datetime(
138     filename: str, producer
139 ) -> Optional[datetime.datetime]:
140     ts = producer(filename)
141     if ts is not None:
142         return datetime.datetime.fromtimestamp(ts)
143     return None
144
145
146 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
147     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
148
149
150 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
151     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
152
153
154 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
155     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
156
157
158 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
159     now = time.time()
160     ts = get_file_raw_timestamps(filename)
161     if ts is None:
162         return None
163     result = extractor(ts)
164     return now - result
165
166
167 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
168     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
169
170
171 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
172     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
173
174
175 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
176     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
177
178
179 def get_file_timestamp_timedelta(
180     filename: str, extractor
181 ) -> Optional[datetime.timedelta]:
182     age = get_file_timestamp_age_seconds(filename, extractor)
183     if age is not None:
184         return datetime.timedelta(seconds=float(age))
185     return None
186
187
188 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
189     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
190
191
192 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
193     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
194
195
196 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
197     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
198
199
200 def describe_file_timestamp(
201     filename: str, extractor, *, brief=False
202 ) -> Optional[str]:
203     age = get_file_timestamp_age_seconds(filename, extractor)
204     if age is None:
205         return None
206     if brief:
207         return datetime_utils.describe_duration_briefly(age)
208     else:
209         return datetime_utils.describe_duration(age)
210
211
212 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
213     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
214
215
216 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
217     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
218
219
220 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
221     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
222
223
224 def expand_globs(in_filename: str):
225     for filename in glob.glob(in_filename):
226         yield filename
227
228
229 def get_files(directory: str):
230     for filename in os.listdir(directory):
231         full_path = join(directory, filename)
232         if isfile(full_path) and exists(full_path):
233             yield full_path
234
235
236 def get_directories(directory: str):
237     for d in os.listdir(directory):
238         full_path = join(directory, d)
239         if not isfile(full_path) and exists(full_path):
240             yield full_path
241
242
243 def get_files_recursive(directory: str):
244     for filename in get_files(directory):
245         yield filename
246     for subdir in get_directories(directory):
247         for filename in get_files_recursive(subdir):
248             yield filename