Random cleanups and type safety. Created ml subdir.
[python_utils.git] / datetime_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities related to dates and times and datetimes."""
4
5 import datetime
6 import enum
7 import logging
8 import re
9 from typing import Any, NewType, Tuple
10
11 import holidays  # type: ignore
12 import pytz
13
14 import constants
15
16 logger = logging.getLogger(__name__)
17
18
19 def replace_timezone(dt: datetime.datetime,
20                      tz: datetime.tzinfo) -> datetime.datetime:
21     return dt.replace(tzinfo=None).astimezone(tz=tz)
22
23
24 def now() -> datetime.datetime:
25     return datetime.datetime.now()
26
27
28 def now_pst() -> datetime.datetime:
29     return replace_timezone(now(), pytz.timezone("US/Pacific"))
30
31
32 def date_to_datetime(date: datetime.date) -> datetime.datetime:
33     return datetime.datetime(
34         date.year,
35         date.month,
36         date.day,
37         0, 0, 0, 0
38     )
39
40
41 def date_and_time_to_datetime(date: datetime.date,
42                               time: datetime.time) -> datetime.datetime:
43     return datetime.datetime(
44         date.year,
45         date.month,
46         date.day,
47         time.hour,
48         time.minute,
49         time.second,
50         time.millisecond
51     )
52
53
54 def datetime_to_date(date: datetime.datetime) -> datetime.date:
55     return datetime.date(
56         date.year,
57         date.month,
58         date.day
59     )
60
61
62 # An enum to represent units with which we can compute deltas.
63 class TimeUnit(enum.Enum):
64     MONDAYS = 0
65     TUESDAYS = 1
66     WEDNESDAYS = 2
67     THURSDAYS = 3
68     FRIDAYS = 4
69     SATURDAYS = 5
70     SUNDAYS = 6
71     SECONDS = 7
72     MINUTES = 8
73     HOURS = 9
74     DAYS = 10
75     WORKDAYS = 11
76     WEEKS = 12
77     MONTHS = 13
78     YEARS = 14
79
80     @classmethod
81     def is_valid(cls, value: Any):
82         if type(value) is int:
83             return value in cls._value2member_map_
84         elif type(value) is TimeUnit:
85             return value.value in cls._value2member_map_
86         elif type(value) is str:
87             return value in cls._member_names_
88         else:
89             print(type(value))
90             return False
91
92
93 def n_timeunits_from_base(
94     count: int,
95     unit: TimeUnit,
96     base: datetime.datetime
97 ) -> datetime.datetime:
98     assert TimeUnit.is_valid(unit)
99     if count == 0:
100         return base
101
102     # N days from base
103     if unit == TimeUnit.DAYS:
104         timedelta = datetime.timedelta(days=count)
105         return base + timedelta
106
107     # N workdays from base
108     elif unit == TimeUnit.WORKDAYS:
109         if count < 0:
110             count = abs(count)
111             timedelta = datetime.timedelta(days=-1)
112         else:
113             timedelta = datetime.timedelta(days=1)
114         skips = holidays.US(years=base.year).keys()
115         while count > 0:
116             old_year = base.year
117             base += timedelta
118             if base.year != old_year:
119                 skips = holidays.US(years=base.year).keys()
120             if (
121                     base.weekday() < 5 and
122                     datetime.date(base.year,
123                                   base.month,
124                                   base.day) not in skips
125             ):
126                 count -= 1
127         return base
128
129     # N weeks from base
130     elif unit == TimeUnit.WEEKS:
131         timedelta = datetime.timedelta(weeks=count)
132         base = base + timedelta
133         return base
134
135     # N months from base
136     elif unit == TimeUnit.MONTHS:
137         month_term = count % 12
138         year_term = count // 12
139         new_month = base.month + month_term
140         if new_month > 12:
141             new_month %= 12
142             year_term += 1
143         new_year = base.year + year_term
144         return datetime.datetime(
145             new_year,
146             new_month,
147             base.day,
148             base.hour,
149             base.minute,
150             base.second,
151             base.microsecond,
152         )
153
154     # N years from base
155     elif unit == TimeUnit.YEARS:
156         new_year = base.year + count
157         return datetime.datetime(
158             new_year,
159             base.month,
160             base.day,
161             base.hour,
162             base.minute,
163             base.second,
164             base.microsecond,
165         )
166
167     # N weekdays from base (e.g. 4 wednesdays from today)
168     direction = 1 if count > 0 else -1
169     count = abs(count)
170     timedelta = datetime.timedelta(days=direction)
171     start = base
172     while True:
173         dow = base.weekday()
174         if dow == unit and start != base:
175             count -= 1
176             if count == 0:
177                 return base
178         base = base + timedelta
179
180
181 def get_format_string(
182         *,
183         date_time_separator=" ",
184         include_timezone=True,
185         include_dayname=False,
186         use_month_abbrevs=False,
187         include_seconds=True,
188         include_fractional=False,
189         twelve_hour=True,
190 ) -> str:
191     fstring = ""
192     if include_dayname:
193         fstring += "%a/"
194
195     if use_month_abbrevs:
196         fstring = f"%Y/%b/%d{date_time_separator}"
197     else:
198         fstring = f"%Y/%m/%d{date_time_separator}"
199     if twelve_hour:
200         fstring += "%I:%M"
201         if include_seconds:
202             fstring += ":%S"
203         fstring += "%p"
204     else:
205         fstring += "%H:%M"
206         if include_seconds:
207             fstring += ":%S"
208     if include_fractional:
209         fstring += ".%f"
210     if include_timezone:
211         fstring += "%z"
212     return fstring
213
214
215 def datetime_to_string(
216     dt: datetime.datetime,
217     *,
218     date_time_separator=" ",
219     include_timezone=True,
220     include_dayname=False,
221     use_month_abbrevs=False,
222     include_seconds=True,
223     include_fractional=False,
224     twelve_hour=True,
225 ) -> str:
226     """A nice way to convert a datetime into a string."""
227     fstring = get_format_string(
228         date_time_separator=date_time_separator,
229         include_timezone=include_timezone,
230         include_dayname=include_dayname,
231         include_seconds=include_seconds,
232         include_fractional=include_fractional,
233         twelve_hour=twelve_hour)
234     return dt.strftime(fstring).strip()
235
236
237 def string_to_datetime(
238         txt: str,
239         *,
240         date_time_separator=" ",
241         include_timezone=True,
242         include_dayname=False,
243         use_month_abbrevs=False,
244         include_seconds=True,
245         include_fractional=False,
246         twelve_hour=True,
247 ) -> Tuple[datetime.datetime, str]:
248     """A nice way to convert a string into a datetime.  Also consider
249     dateparse.dateparse_utils for a full parser.
250     """
251     fstring = get_format_string(
252         date_time_separator=date_time_separator,
253         include_timezone=include_timezone,
254         include_dayname=include_dayname,
255         include_seconds=include_seconds,
256         include_fractional=include_fractional,
257         twelve_hour=twelve_hour)
258     return (
259         datetime.datetime.strptime(txt, fstring),
260         fstring
261     )
262
263
264 def timestamp() -> str:
265     """Return a timestamp for now in Pacific timezone."""
266     ts = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
267     return datetime_to_string(ts, include_timezone=True)
268
269
270 def time_to_string(
271     dt: datetime.datetime,
272     *,
273     include_seconds=True,
274     include_fractional=False,
275     include_timezone=False,
276     twelve_hour=True,
277 ) -> str:
278     """A nice way to convert a datetime into a time (only) string."""
279     fstring = ""
280     if twelve_hour:
281         fstring += "%l:%M"
282         if include_seconds:
283             fstring += ":%S"
284         fstring += "%p"
285     else:
286         fstring += "%H:%M"
287         if include_seconds:
288             fstring += ":%S"
289     if include_fractional:
290         fstring += ".%f"
291     if include_timezone:
292         fstring += "%z"
293     return dt.strftime(fstring).strip()
294
295
296 def seconds_to_timedelta(seconds: int) -> datetime.timedelta:
297     """Convert a delta in seconds into a timedelta."""
298     return datetime.timedelta(seconds=seconds)
299
300
301 MinuteOfDay = NewType("MinuteOfDay", int)
302
303
304 def minute_number(hour: int, minute: int) -> MinuteOfDay:
305     """Convert hour:minute into minute number from start of day."""
306     return MinuteOfDay(hour * 60 + minute)
307
308
309 def datetime_to_minute_number(dt: datetime.datetime) -> MinuteOfDay:
310     """Convert a datetime into a minute number (of the day)"""
311     return minute_number(dt.hour, dt.minute)
312
313
314 def minute_number_to_time_string(minute_num: MinuteOfDay) -> str:
315     """Convert minute number from start of day into hour:minute am/pm
316     string.
317     """
318     hour = minute_num // 60
319     minute = minute_num % 60
320     ampm = "a"
321     if hour > 12:
322         hour -= 12
323         ampm = "p"
324     if hour == 12:
325         ampm = "p"
326     if hour == 0:
327         hour = 12
328     return f"{hour:2}:{minute:02}{ampm}"
329
330
331 def parse_duration(duration: str) -> int:
332     """Parse a duration in string form."""
333     seconds = 0
334     m = re.search(r'(\d+) *d[ays]*', duration)
335     if m is not None:
336         seconds += int(m.group(1)) * 60 * 60 * 24
337     m = re.search(r'(\d+) *h[ours]*', duration)
338     if m is not None:
339         seconds += int(m.group(1)) * 60 * 60
340     m = re.search(r'(\d+) *m[inutes]*', duration)
341     if m is not None:
342         seconds += int(m.group(1)) * 60
343     m = re.search(r'(\d+) *s[econds]*', duration)
344     if m is not None:
345         seconds += int(m.group(1))
346     return seconds
347
348
349 def describe_duration(age: int) -> str:
350     """Describe a duration."""
351     days = divmod(age, constants.SECONDS_PER_DAY)
352     hours = divmod(days[1], constants.SECONDS_PER_HOUR)
353     minutes = divmod(hours[1], constants.SECONDS_PER_MINUTE)
354
355     descr = ""
356     if days[0] > 1:
357         descr = f"{int(days[0])} days, "
358     elif days[0] == 1:
359         descr = "1 day, "
360     if hours[0] > 1:
361         descr = descr + f"{int(hours[0])} hours, "
362     elif hours[0] == 1:
363         descr = descr + "1 hour, "
364     if len(descr) > 0:
365         descr = descr + "and "
366     if minutes[0] == 1:
367         descr = descr + "1 minute"
368     else:
369         descr = descr + f"{int(minutes[0])} minutes"
370     return descr
371
372
373 def describe_duration_briefly(age: int) -> str:
374     """Describe a duration briefly."""
375     days = divmod(age, constants.SECONDS_PER_DAY)
376     hours = divmod(days[1], constants.SECONDS_PER_HOUR)
377     minutes = divmod(hours[1], constants.SECONDS_PER_MINUTE)
378
379     descr = ""
380     if days[0] > 0:
381         descr = f"{int(days[0])}d "
382     if hours[0] > 0:
383         descr = descr + f"{int(hours[0])}h "
384     if minutes[0] > 0 or len(descr) == 0:
385         descr = descr + f"{int(minutes[0])}m"
386     return descr.strip()