Fix wakeword.
[kiosk.git] / generic_news_rss_renderer.py
1 #!/usr/bin/env python3
2
3 from abc import abstractmethod
4 import datetime
5 from dateutil.parser import parse
6 import http.client
7 import logging
8 import re
9 from typing import Dict, List, Optional, Union
10 import xml.etree.ElementTree as ET
11
12 from scottutilz import profanity_filter
13
14 import file_writer
15 import grab_bag
16 import renderer
17 import page_builder
18
19
20 logger = logging.getLogger(__name__)
21
22
23 class generic_news_rss_renderer(renderer.abstaining_renderer):
24     def __init__(
25         self,
26         name_to_timeout_dict: Dict[str, int],
27         feed_site: str,
28         feed_uris: List[str],
29         page_title: str,
30     ):
31         super().__init__(name_to_timeout_dict)
32         self.feed_site = feed_site
33         self.feed_uris = feed_uris
34         self.page_title = page_title
35         self.news = grab_bag.grab_bag()
36         self.details = grab_bag.grab_bag()
37         self.filter = profanity_filter.ProfanityFilter()
38
39     @abstractmethod
40     def get_headlines_page_prefix(self) -> str:
41         pass
42
43     @abstractmethod
44     def get_details_page_prefix(self) -> str:
45         pass
46
47     def get_headlines_page_priority(self) -> str:
48         return "4"
49
50     def get_details_page_priority(self) -> str:
51         return "6"
52
53     @abstractmethod
54     def should_use_https(self) -> bool:
55         pass
56
57     def should_profanity_filter(self) -> bool:
58         return True
59
60     def find_title(self, item: ET.Element) -> Optional[str]:
61         return item.findtext("title")
62
63     def munge_title(self, title: str, item: ET.Element) -> str:
64         return title
65
66     def find_description(self, item: ET.Element) -> Optional[str]:
67         return item.findtext("description")
68
69     def munge_description(self, description: str, item: ET.Element) -> str:
70         description = re.sub("<[^>]+>", "", description)
71         return description
72
73     def find_link(self, item: ET.Element) -> Optional[str]:
74         return item.findtext("link")
75
76     def munge_link(self, link: str) -> str:
77         return link
78
79     def find_image(self, item: ET.Element) -> Optional[str]:
80         return item.findtext("image")
81
82     def munge_image(self, image: str) -> str:
83         return image
84
85     def find_pubdate(self, item: ET.Element) -> Optional[str]:
86         return item.findtext("pubDate")
87
88     def munge_pubdate(self, pubdate: str) -> str:
89         return pubdate
90
91     def item_is_interesting_for_headlines(
92         self, title: str, description: str, item: ET.Element
93     ) -> bool:
94         return True
95
96     def do_headlines(self) -> bool:
97         return True
98
99     def do_details(self) -> bool:
100         return True
101
102     def is_item_older_than_n_days(
103         self, item: ET.Element, n: int, default: bool = False
104     ) -> bool:
105         pubdate = self.find_pubdate(item)
106         if pubdate is None:
107             return default
108         pubdatetime = parse(pubdate)
109         tzinfo = pubdatetime.tzinfo
110         now = datetime.datetime.now(tzinfo)
111         delta = (now - pubdatetime).total_seconds() / (60 * 60 * 24)
112         return delta > n
113
114     def item_is_interesting_for_article(
115         self, title: str, description: str, item: ET.Element
116     ) -> bool:
117         return True
118
119     def periodic_render(self, key: str) -> bool:
120         if key == "Fetch News":
121             return self.fetch_news()
122         elif key == "Shuffle News":
123             return self.shuffle_news()
124         else:
125             raise Exception
126
127     def shuffle_news(self) -> bool:
128         if self.do_headlines():
129             headlines = page_builder.page_builder()
130             headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
131             headlines.set_title("%s" % self.page_title)
132             subset = self.news.subset(4)
133             if subset is None:
134                 logger.warning("Not enough messages to select from in shuffle_news?!")
135                 return False
136             for msg in subset:
137                 headlines.add_item(msg)
138             headlines.set_custom_html(
139                 """
140     <STYLE>
141     a:link {
142       color: black;
143       text-decoration: none;
144       font-weight: bold;
145     }
146     a:visited {
147       color: black;
148       text-decoration: none;
149       font-weight: bold;
150     }
151     a:active {
152       color: black;
153       text-decoration: none;
154       font-weight: bold;
155     }
156     </STYLE>"""
157             )
158             _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html"
159             with file_writer.file_writer(_) as f:
160                 headlines.render_html(f)
161
162         if self.do_details():
163             details = page_builder.page_builder()
164             details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
165             details.set_custom_html(
166                 """
167     <STYLE>
168     a:link {
169       color: black;
170       text-decoration: none;
171       font-weight: bold;
172     }
173     a:visited {
174       color: black;
175       text-decoration: none;
176       font-weight: bold;
177     }
178     a:active {
179       color: black;
180       text-decoration: none;
181       font-weight: bold;
182     }
183     </STYLE>"""
184             )
185             details.set_title(self.page_title)
186             subset = self.details.subset(1)
187             if subset is None:
188                 logger.warning("Not enough details to choose from in do_details")
189                 logger.debug("Not enough details to choose from.")
190                 return False
191             for msg in subset:
192                 blurb = msg
193                 blurb += "</TD>"
194                 details.add_item(blurb)
195             _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html"
196             with file_writer.file_writer(_) as g:
197                 details.render_html(g)
198         return True
199
200     def fetch_news(self) -> bool:
201         count = 0
202         self.news.clear()
203         self.details.clear()
204         self.conn: Optional[
205             Union[http.client.HTTPConnection, http.client.HTTPSConnection]
206         ] = None
207
208         for uri in self.feed_uris:
209             url = None
210             if self.should_use_https():
211                 url = f"https://{self.feed_site}{uri}"
212                 logger.info(f"Fetching: {url}")
213                 self.conn = http.client.HTTPSConnection(self.feed_site, timeout=10)
214             else:
215                 url = f"http://{self.feed_site}{uri}"
216                 logger.info(f"Fetching: {url}")
217                 self.conn = http.client.HTTPConnection(self.feed_site, timeout=10)
218             assert self.conn is not None
219             assert url is not None
220             self.conn.request(
221                 "GET",
222                 uri,
223                 None,
224                 {
225                     "Accept": "*/*",
226                     "Cache-control": "max-age=50",
227                 },
228             )
229             try:
230                 response = self.conn.getresponse()
231             except Exception:
232                 logger.exception(
233                     f"Exception in generic RSS renderer HTTP connection fetching {url}; giving up."
234                 )
235                 return False
236
237             if response.status != 200:
238                 logger.error(
239                     f"Unexpected status {response.status} while fetching {url}; giving up."
240                 )
241                 return False
242
243             raw = response.read()
244             logger.info(f"Status 200: got {len(raw)} bytes back from {url}")
245             rss = ET.fromstring(raw)
246             channel = rss[0]
247             title_filter = set()
248             for item in list(channel):
249                 title = self.find_title(item)
250                 description = item.findtext("description")
251                 if title is not None:
252                     title = self.munge_title(title, item)
253                 else:
254                     logger.info("Skipping RSS feed item with no title.")
255                     continue
256                 logger.debug(f"Considering RSS item {title}...")
257                 if description is not None:
258                     description = self.munge_description(description, item)
259                 else:
260                     description = ""
261                 image = self.find_image(item)
262                 if image is not None:
263                     image = self.munge_image(image)
264                 link = item.findtext("link")
265                 if link is not None:
266                     link = self.munge_link(link)
267                 if not self.item_is_interesting_for_headlines(title, description, item):
268                     logger.info(f"Skipping {title} because it's not interesting.")
269                     continue
270
271                 if self.should_profanity_filter() and (
272                     self.filter.contains_bad_word(title)
273                     or self.filter.contains_bad_word(description)
274                 ):
275                     logger.info(f"Skipping {title} because it contains profanity.")
276                     continue
277
278                 if title in title_filter:
279                     logger.info(
280                         f"Skipping {title} because we already saw an item with the same title."
281                     )
282                     continue
283                 title_filter.add(title)
284
285                 blurb = """<DIV style="padding:8px;
286                                 font-size:34pt;
287                                 -webkit-column-break-inside:avoid;">"""
288                 if image is not None:
289                     blurb += f'<IMG SRC="{image}" ALIGN=LEFT HEIGHT=115 '
290                     blurb += 'style="padding:8px;">'
291
292                 if link is None:
293                     blurb += f"<P><B>{title}</B>"
294                 else:
295                     blurb += f'<P><B><A HREF="{link}">{title}</A></B>'
296
297                 pubdate = self.find_pubdate(item)
298                 if pubdate is not None:
299                     logger.debug(f"Raw pubdate={pubdate}")
300                     pubdate = self.munge_pubdate(pubdate)
301                     ts = parse(pubdate)
302                     logger.debug(f"Translated pubdate into: {ts}")
303                     blurb += f'  <FONT COLOR=#cccccc>{ts.strftime("%b&nbsp;%d")}</FONT>'
304
305                 if self.item_is_interesting_for_article(title, description, item):
306                     logger.info(
307                         f"Item {title} is also interesting as an article details page; creating..."
308                     )
309                     longblurb = blurb
310                     longblurb += "<BR>"
311                     longblurb += description
312                     longblurb += "</DIV>"
313                     longblurb = longblurb.replace("font-size:34pt", "font-size:44pt")
314                     self.details.add(longblurb)
315                 else:
316                     logger.info(
317                         f"Item {title} isn't interesting for article details page; skipped."
318                     )
319                 blurb += "</DIV>"
320                 self.news.add(blurb)
321                 count += 1
322                 logger.debug(f"Added {count} items so far...")
323         return count > 0