Random changes.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import pathlib
11 import time
12 from typing import Optional
13 import glob
14 from os.path import isfile, join, exists
15
16 logger = logging.getLogger(__name__)
17
18
19 def create_path_if_not_exist(path, on_error=None):
20     """
21     Attempts to create path if it does not exist. If on_error is
22     specified, it is called with an exception if one occurs, otherwise
23     exception is rethrown.
24
25     >>> import uuid
26     >>> import os
27     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
28     >>> os.path.exists(path)
29     False
30     >>> create_path_if_not_exist(path)
31     >>> os.path.exists(path)
32     True
33     """
34     logger.debug(f"Creating path {path}")
35     previous_umask = os.umask(0)
36     try:
37         os.makedirs(path)
38         os.chmod(path, 0o777)
39     except OSError as ex:
40         if ex.errno != errno.EEXIST and not os.path.isdir(path):
41             if on_error is not None:
42                 on_error(path, ex)
43             else:
44                 raise
45     finally:
46         os.umask(previous_umask)
47
48
49 def does_file_exist(filename: str) -> bool:
50     return os.path.exists(filename) and os.path.isfile(filename)
51
52
53 def does_directory_exist(dirname: str) -> bool:
54     return os.path.exists(dirname) and os.path.isdir(dirname)
55
56
57 def does_path_exist(pathname: str) -> bool:
58     return os.path.exists(pathname)
59
60
61 def get_file_size(filename: str) -> int:
62     return os.path.getsize(filename)
63
64
65 def is_normal_file(filename: str) -> bool:
66     return os.path.isfile(filename)
67
68
69 def is_directory(filename: str) -> bool:
70     return os.path.isdir(filename)
71
72
73 def is_symlink(filename: str) -> bool:
74     return os.path.islink(filename)
75
76
77 def is_same_file(file1: str, file2: str) -> bool:
78     return os.path.samefile(file1, file2)
79
80
81 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
82     try:
83         return os.stat(filename)
84     except Exception as e:
85         logger.exception(e)
86         return None
87
88
89 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
90     tss = get_file_raw_timestamps(filename)
91     if tss is not None:
92         return extractor(tss)
93     return None
94
95
96 def get_file_raw_atime(filename: str) -> Optional[float]:
97     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
98
99
100 def get_file_raw_mtime(filename: str) -> Optional[float]:
101     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
102
103
104 def get_file_raw_ctime(filename: str) -> Optional[float]:
105     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
106
107
108 def get_file_md5(filename: str) -> str:
109     file_hash = hashlib.md5()
110     with open(filename, "rb") as f:
111         chunk = f.read(8192)
112         while chunk:
113             file_hash.update(chunk)
114             chunk = f.read(8192)
115     return file_hash.hexdigest()
116
117
118 def set_file_raw_atime(filename: str, atime: float):
119     mtime = get_file_raw_mtime(filename)
120     os.utime(filename, (atime, mtime))
121
122
123 def set_file_raw_mtime(filename: str, mtime: float):
124     atime = get_file_raw_atime(filename)
125     os.utime(filename, (atime, mtime))
126
127
128 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
129     if ts is not None:
130         os.utime(filename, (ts, ts))
131     else:
132         os.utime(filename, None)
133
134
135 def convert_file_timestamp_to_datetime(
136     filename: str, producer
137 ) -> Optional[datetime.datetime]:
138     ts = producer(filename)
139     if ts is not None:
140         return datetime.datetime.fromtimestamp(ts)
141     return None
142
143
144 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
145     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
146
147
148 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
149     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
150
151
152 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
153     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
154
155
156 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
157     now = time.time()
158     ts = get_file_raw_timestamps(filename)
159     if ts is None:
160         return None
161     result = extractor(ts)
162     return now - result
163
164
165 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
166     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
167
168
169 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
170     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
171
172
173 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
174     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
175
176
177 def get_file_timestamp_timedelta(
178     filename: str, extractor
179 ) -> Optional[datetime.timedelta]:
180     age = get_file_timestamp_age_seconds(filename, extractor)
181     if age is not None:
182         return datetime.timedelta(seconds=float(age))
183     return None
184
185
186 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
187     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
188
189
190 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
191     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
192
193
194 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
195     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
196
197
198 def describe_file_timestamp(
199     filename: str, extractor, *, brief=False
200 ) -> Optional[str]:
201     from datetime_utils import describe_duration, describe_duration_briefly
202     age = get_file_timestamp_age_seconds(filename, extractor)
203     if age is None:
204         return None
205     if brief:
206         return describe_duration_briefly(age)
207     else:
208         return describe_duration(age)
209
210
211 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
212     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
213
214
215 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
216     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
217
218
219 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
220     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
221
222
223 def touch_file(filename: str) -> bool:
224     return pathlib.Path(filename).touch()
225
226
227 def expand_globs(in_filename: str):
228     for filename in glob.glob(in_filename):
229         yield filename
230
231
232 def get_files(directory: str):
233     for filename in os.listdir(directory):
234         full_path = join(directory, filename)
235         if isfile(full_path) and exists(full_path):
236             yield full_path
237
238
239 def get_directories(directory: str):
240     for d in os.listdir(directory):
241         full_path = join(directory, d)
242         if not isfile(full_path) and exists(full_path):
243             yield full_path
244
245
246 def get_files_recursive(directory: str):
247     for filename in get_files(directory):
248         yield filename
249     for subdir in get_directories(directory):
250         for file_or_directory in get_files_recursive(subdir):
251             yield file_or_directory