Initial revision
[python_utils.git] / file_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for working with files."""
4
5 import datetime
6 import errno
7 import logging
8 import os
9 import time
10 from typing import Optional
11 import glob
12 from os.path import isfile, join, exists
13
14 import datetime_utils
15
16
17 logger = logging.getLogger(__name__)
18
19
20 def create_path_if_not_exist(path, on_error=None):
21     """
22     Attempts to create path if it does not exist. If on_error is
23     specified, it is called with an exception if one occurs, otherwise
24     exception is rethrown.
25
26     >>> import uuid
27     >>> import os
28     >>> path = os.path.join("/tmp", str(uuid.uuid4()), str(uuid.uuid4()))
29     >>> os.path.exists(path)
30     False
31     >>> create_path_if_not_exist(path)
32     >>> os.path.exists(path)
33     True
34     """
35     logger.debug(f"Creating path {path}")
36     previous_umask = os.umask(0)
37     try:
38         os.makedirs(path)
39         os.chmod(path, 0o777)
40     except OSError as ex:
41         if ex.errno != errno.EEXIST and not os.path.isdir(path):
42             if on_error is not None:
43                 on_error(path, ex)
44             else:
45                 raise
46     finally:
47         os.umask(previous_umask)
48
49
50 def does_file_exist(filename: str) -> bool:
51     return os.path.exists(filename)
52
53
54 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
55     try:
56         return os.stat(filename)
57     except Exception as e:
58         logger.exception(e)
59         return None
60
61
62 def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
63     tss = get_file_raw_timestamps(filename)
64     if tss is not None:
65         return extractor(tss)
66     return None
67
68
69 def get_file_raw_atime(filename: str) -> Optional[float]:
70     return get_file_raw_timestamp(filename, lambda x: x.st_atime)
71
72
73 def get_file_raw_mtime(filename: str) -> Optional[float]:
74     return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
75
76
77 def get_file_raw_ctime(filename: str) -> Optional[float]:
78     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
79
80
81 def convert_file_timestamp_to_datetime(
82     filename: str, producer
83 ) -> Optional[datetime.datetime]:
84     ts = producer(filename)
85     if ts is not None:
86         return datetime.datetime.fromtimestamp(ts)
87     return None
88
89
90 def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
91     return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
92
93
94 def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
95     return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
96
97
98 def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
99     return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
100
101
102 def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
103     now = time.time()
104     ts = get_file_raw_timestamps(filename)
105     if ts is None:
106         return None
107     result = extractor(ts)
108     return now - result
109
110
111 def get_file_atime_age_seconds(filename: str) -> Optional[int]:
112     return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
113
114
115 def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
116     return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
117
118
119 def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
120     return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
121
122
123 def get_file_timestamp_timedelta(
124     filename: str, extractor
125 ) -> Optional[datetime.timedelta]:
126     age = get_file_timestamp_age_seconds(filename, extractor)
127     if age is not None:
128         return datetime.timedelta(seconds=float(age))
129     return None
130
131
132 def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
133     return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
134
135
136 def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
137     return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
138
139
140 def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
141     return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
142
143
144 def describe_file_timestamp(
145     filename: str, extractor, *, brief=False
146 ) -> Optional[str]:
147     age = get_file_timestamp_age_seconds(filename, extractor)
148     if age is None:
149         return None
150     if brief:
151         return datetime_utils.describe_duration_briefly(age)
152     else:
153         return datetime_utils.describe_duration(age)
154
155
156 def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
157     return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
158
159
160 def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
161     return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
162
163
164 def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
165     return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
166
167
168 def expand_globs(in_filename: str):
169     for filename in glob.glob(in_filename):
170         yield filename
171
172
173 def get_files(directory: str):
174     for filename in os.listdir(directory):
175         full_path = join(directory, filename)
176         if isfile(full_path) and exists(full_path):
177             yield full_path
178
179
180 def get_directories(directory: str):
181     for d in os.listdir(directory):
182         full_path = join(directory, d)
183         if not isfile(full_path) and exists(full_path):
184             yield full_path
185
186
187 def get_files_recursive(directory: str):
188     for filename in get_files(directory):
189         yield filename
190     for subdir in get_directories(directory):
191         for filename in get_files_recursive(subdir):
192             yield filename