Reduce import scopes, remove cycles.
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import hashlib
8 import logging
9 import os
10 import time
11 from typing import Optional
12 import glob
13 from os.path import isfile, join, exists
14
15 logger = logging.getLogger(__name__)
16
17
18 def create_path_if_not_exist(path, on_error=None):
19     """
20     Attempts to create path if it does not exist. If on_error is
21     specified, it is called with an exception if one occurs, otherwise
22     exception is rethrown.
23
24     >>> import uuid
25     >>> import os
26     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
27     >>> os.path.exists(path)
28     False
29     >>> create_path_if_not_exist(path)
30     >>> os.path.exists(path)
31     True
32     """
33     logger.debug(f"Creating path {path}")
34     previous_umask = os.umask(0)
35     try:
36         os.makedirs(path)
37         os.chmod(path, 0o777)
38     except OSError as ex:
39         if ex.errno != errno.EEXIST and not os.path.isdir(path):
40             if on_error is not None:
41                 on_error(path, ex)
42             else:
43                 raise
44     finally:
45         os.umask(previous_umask)
46
47
48 def does_file_exist(filename: str) -> bool:
49     return os.path.exists(filename) and os.path.isfile(filename)
50
51
52 def does_directory_exist(dirname: str) -> bool:
53     return os.path.exists(dirname) and os.path.isdir(dirname)
54
55
56 def does_path_exist(pathname: str) -> bool:
57     return os.path.exists(pathname)
58
59
60 def get_file_size(filename: str) -> int:
61     return os.path.getsize(filename)
62
63
64 def is_normal_file(filename: str) -> bool:
65     return os.path.isfile(filename)
66
67
68 def is_directory(filename: str) -> bool:
69     return os.path.isdir(filename)
70
71
72 def is_symlink(filename: str) -> bool:
73     return os.path.islink(filename)
74
75
76 def is_same_file(file1: str, file2: str) -> bool:
77     return os.path.samefile(file1, file2)
78
79
80 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
81     try:
82         return os.stat(filename)
83     except Exception as e:
84         logger.exception(e)
85         return None
86
87
88 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
89     tss = get_file_raw_timestamps(filename)
90     if tss is not None:
91         return extractor(tss)
92     return None
93
94
95 def get_file_raw_atime(filename: str) -> Optional[float]:
96     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
97
98
99 def get_file_raw_mtime(filename: str) -> Optional[float]:
100     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
101
102
103 def get_file_raw_ctime(filename: str) -> Optional[float]:
104     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
105
106
107 def get_file_md5(filename: str) -> str:
108     file_hash = hashlib.md5()
109     with open(filename, "rb") as f:
110         chunk = f.read(8192)
111         while chunk:
112             file_hash.update(chunk)
113             chunk = f.read(8192)
114     return file_hash.hexdigest()
115
116
117 def set_file_raw_atime(filename: str, atime: float):
118     mtime = get_file_raw_mtime(filename)
119     os.utime(filename, (atime, mtime))
120
121
122 def set_file_raw_mtime(filename: str, mtime: float):
123     atime = get_file_raw_atime(filename)
124     os.utime(filename, (atime, mtime))
125
126
127 def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
128     if ts is not None:
129         os.utime(filename, (ts, ts))
130     else:
131         os.utime(filename, None)
132
133
134 def convert_file_timestamp_to_datetime(
135     filename: str, producer
136 ) -> Optional[datetime.datetime]:
137     ts = producer(filename)
138     if ts is not None:
139         return datetime.datetime.fromtimestamp(ts)
140     return None
141
142
143 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
144     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
145
146
147 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
148     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
149
150
151 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
152     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
153
154
155 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
156     now = time.time()
157     ts = get_file_raw_timestamps(filename)
158     if ts is None:
159         return None
160     result = extractor(ts)
161     return now - result
162
163
164 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
165     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
166
167
168 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
169     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
170
171
172 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
173     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
174
175
176 def get_file_timestamp_timedelta(
177     filename: str, extractor
178 ) -> Optional[datetime.timedelta]:
179     age = get_file_timestamp_age_seconds(filename, extractor)
180     if age is not None:
181         return datetime.timedelta(seconds=float(age))
182     return None
183
184
185 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
186     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
187
188
189 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
190     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
191
192
193 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
194     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
195
196
197 def describe_file_timestamp(
198     filename: str, extractor, *, brief=False
199 ) -> Optional[str]:
200     from datetime_utils import describe_duration, describe_duration_briefly
201     age = get_file_timestamp_age_seconds(filename, extractor)
202     if age is None:
203         return None
204     if brief:
205         return describe_duration_briefly(age)
206     else:
207         return describe_duration(age)
208
209
210 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
211     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
212
213
214 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
215     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
216
217
218 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
219     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
220
221
222 def expand_globs(in_filename: str):
223     for filename in glob.glob(in_filename):
224         yield filename
225
226
227 def get_files(directory: str):
228     for filename in os.listdir(directory):
229         full_path = join(directory, filename)
230         if isfile(full_path) and exists(full_path):
231             yield full_path
232
233
234 def get_directories(directory: str):
235     for d in os.listdir(directory):
236         full_path = join(directory, d)
237         if not isfile(full_path) and exists(full_path):
238             yield full_path
239
240
241 def get_files_recursive(directory: str):
242     for filename in get_files(directory):
243         yield filename
244     for subdir in get_directories(directory):
245         for file_or_directory in get_files_recursive(subdir):
246             yield file_or_directory