X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=generic_news_rss_renderer.py;h=1ffe024a7e9d3d798e6432804badcb2c430a18f4;hb=bfde32ea8f021da27fb2cdf535efb0e9c465d6a2;hp=4db466957456095148d1793a772e0069b02ae32b;hpb=78b904e30bc0f9a05a96da1b597ad11795afbd46;p=kiosk.git diff --git a/generic_news_rss_renderer.py b/generic_news_rss_renderer.py index 4db4669..1ffe024 100644 --- a/generic_news_rss_renderer.py +++ b/generic_news_rss_renderer.py @@ -1,241 +1,321 @@ +#!/usr/bin/env python3 + +from abc import abstractmethod import datetime from dateutil.parser import parse +import http.client +import logging +import re +from typing import Dict, List, Optional, Union +import xml.etree.ElementTree as ET + +from scottutilz import profanity_filter + import file_writer import grab_bag import renderer -import http.client import page_builder -import profanity_filter -import random -import re -import xml.etree.ElementTree as ET -class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): - super(generic_news_rss_renderer, self).__init__(name_to_timeout_dict, - False) - self.debug = 1 + +logger = logging.getLogger(__file__) + + +class generic_news_rss_renderer(renderer.abstaining_renderer): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): + super().__init__(name_to_timeout_dict) self.feed_site = feed_site self.feed_uris = feed_uris self.page_title = page_title self.news = grab_bag.grab_bag() self.details = grab_bag.grab_bag() - self.filter = profanity_filter.profanity_filter() + self.filter = profanity_filter.ProfanityFilter() - def debug_prefix(self): + @abstractmethod + def get_headlines_page_prefix(self) -> str: pass - def get_headlines_page_prefix(self): + @abstractmethod + def get_details_page_prefix(self) -> str: pass - def get_details_page_prefix(self): - pass + def get_headlines_page_priority(self) -> str: + return "4" - def should_use_https(self): + def get_details_page_priority(self) -> str: + return "6" + + @abstractmethod + def should_use_https(self) -> bool: pass - def should_profanity_filter(self): + def should_profanity_filter(self) -> bool: return False - def find_title(self, item): - return item.findtext('title') + def find_title(self, item: ET.Element) -> Optional[str]: + return item.findtext("title") - def munge_title(self, title): + def munge_title(self, title: str, item: ET.Element) -> str: return title - def find_description(self, item): - return item.findtext('description') + def find_description(self, item: ET.Element) -> Optional[str]: + return item.findtext("description") - def munge_description(self, description): - description = re.sub('<[^>]+>', '', description) + def munge_description( + self, + description: str, + item: ET.Element + ) -> str: + description = re.sub("<[^>]+>", "", description) return description - def find_link(self, item): - return item.findtext('link') + def find_link(self, item: ET.Element) -> Optional[str]: + return item.findtext("link") - def munge_link(self, link): + def munge_link(self, link: str) -> str: return link - def find_image(self, item): - return item.findtext('image') + def find_image(self, item: ET.Element) -> Optional[str]: + return item.findtext("image") - def munge_image(self, image): + def munge_image(self, image: str) -> str: return image - def item_is_interesting_for_headlines(self, title, description, item): + def find_pubdate(self, item: ET.Element) -> Optional[str]: + return item.findtext("pubDate") + + def munge_pubdate(self, pubdate: str) -> str: + return pubdate + + def item_is_interesting_for_headlines( + self, title: str, description: str, item: ET.Element + ) -> bool: return True - def is_item_older_than_n_days(self, item, n): - pubdate = item.findtext('pubDate') - if pubdate is not None: - pubdate = parse(pubdate) - tzinfo = pubdate.tzinfo - now = datetime.datetime.now(tzinfo) - delta = (now - pubdate).total_seconds() / (60 * 60 * 24) - if (delta > n): - return True - return False + def do_headlines(self) -> bool: + return True + + def do_details(self) -> bool: + return True + + def is_item_older_than_n_days(self, item: ET.Element, n: int) -> bool: + pubdate = self.find_pubdate(item) + if pubdate is None: + return False + pubdatetime = parse(pubdate) + tzinfo = pubdatetime.tzinfo + now = datetime.datetime.now(tzinfo) + delta = (now - pubdatetime).total_seconds() / (60 * 60 * 24) + return delta > n - def item_is_interesting_for_article(self, title, description, item): + def item_is_interesting_for_article( + self, title: str, description: str, item: ET.Element + ) -> bool: return True - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: if key == "Fetch News": return self.fetch_news() elif key == "Shuffle News": return self.shuffle_news() else: - raise error('Unexpected operation') - - def shuffle_news(self): - headlines = page_builder.page_builder() - headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS) - headlines.set_title("%s" % self.page_title) - subset = self.news.subset(4) - if subset is None: - self.debug_print("Not enough messages to choose from.") - return False - for msg in subset: - headlines.add_item(msg) - headlines.set_custom_html(""" -""") - f = file_writer.file_writer('%s_4_none.html' % ( - self.get_headlines_page_prefix())) - headlines.render_html(f) - f.close() - - details = page_builder.page_builder() - details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM) - details.set_custom_html(""" -""") - details.set_title("%s" % self.page_title) - subset = self.details.subset(1) - if subset is None: - self.debug_print("Not enough details to choose from."); - return False - for msg in subset: - blurb = msg - blurb += u'' - details.add_item(blurb) - g = file_writer.file_writer('%s_6_none.html' % ( - self.get_details_page_prefix())) - details.render_html(g) - g.close() + raise Exception + + def shuffle_news(self) -> bool: + if self.do_headlines(): + headlines = page_builder.page_builder() + headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS) + headlines.set_title("%s" % self.page_title) + subset = self.news.subset(4) + if subset is None: + logger.warning('Not enough messages to select from in shuffle_news?!') + return False + for msg in subset: + headlines.add_item(msg) + headlines.set_custom_html( + """ + """ + ) + _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html" + with file_writer.file_writer(_) as f: + headlines.render_html(f) + + if self.do_details(): + details = page_builder.page_builder() + details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM) + details.set_custom_html( + """ + """ + ) + details.set_title(self.page_title) + subset = self.details.subset(1) + if subset is None: + logger.warning('Not enough details to choose from in do_details') + logger.debug("Not enough details to choose from.") + return False + for msg in subset: + blurb = msg + blurb += "" + details.add_item(blurb) + _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html" + with file_writer.file_writer(_) as g: + details.render_html(g) return True - def fetch_news(self): + def fetch_news(self) -> bool: count = 0 self.news.clear() self.details.clear() + self.conn: Optional[Union[http.client.HTTPConnection, + http.client.HTTPSConnection]] = None for uri in self.feed_uris: + url = None if self.should_use_https(): - self.debug_print("Fetching: https://%s%s" % (self.feed_site, uri)) - self.conn = http.client.HTTPSConnection(self.feed_site) + url = f'https://{self.feed_site}{uri}' + logger.info(f'Fetching: {url}') + self.conn = http.client.HTTPSConnection(self.feed_site, timeout=10) else: - self.debug_print("Fetching: http://%s%s" % (self.feed_site, uri)) - self.conn = http.client.HTTPConnection(self.feed_site) + url = f'http://{self.feed_site}{uri}' + logger.info(f'Fetching: {url}') + self.conn = http.client.HTTPConnection(self.feed_site, timeout=10) + assert self.conn is not None + assert url is not None self.conn.request( "GET", uri, None, - {"Accept-Charset": "utf-8"}) - response = self.conn.getresponse() + { + "Accept": "*/*", + "Cache-control": "max-age=50", + }, + ) + try: + response = self.conn.getresponse() + except Exception as e: + logger.exception(e) + logger.error( + f"Exception in generic RSS renderer HTTP connection fetching {url}; giving up." + ) + return False + if response.status != 200: - print(("%s: RSS fetch_news error, response: %d" % (self.page_title, - response.status))) - self.debug_print(response.read()) + logger.error( + f'Unexpected status {response.status} while fetching {url}; giving up.' + ) return False - rss = ET.fromstring(response.read()) + raw = response.read() + logger.info(f'Status 200: got {len(raw)} bytes back from {url}') + rss = ET.fromstring(raw) channel = rss[0] - for item in channel.getchildren(): + title_filter = set() + for item in list(channel): title = self.find_title(item) + description = item.findtext("description") if title is not None: - title = self.munge_title(title) - description = item.findtext('description') + title = self.munge_title(title, item) + else: + logger.info('Skipping RSS feed item with no title.') + continue + logger.debug(f'Considering RSS item {title}...') if description is not None: - description = self.munge_description(description) + description = self.munge_description(description, item) + else: + description = "" image = self.find_image(item) if image is not None: image = self.munge_image(image) - link = item.findtext('link') + link = item.findtext("link") if link is not None: link = self.munge_link(link) - - if (title is None or - not self.item_is_interesting_for_headlines(title, - description, - item)): - self.debug_print('Item "%s" is not interesting' % title) + if not self.item_is_interesting_for_headlines( + title, description, item + ): + logger.info(f'Skipping {title} because it\'s not interesting.') continue - if (self.should_profanity_filter() and - (self.filter.contains_bad_words(title) or - self.filter.contains_bad_words(description))): - self.debug_print('Found bad words in item "%s"' % title) + if self.should_profanity_filter() and ( + self.filter.contains_bad_word(title) + or self.filter.contains_bad_word(description) + ): + logger.info(f'Skipping {title} because it contains profanity.') continue - #print u"Title: %s\nDescription: %s\nLink: %s\nImage: %s\n" % ( - # title, description, link, image) + if title in title_filter: + logger.info(f'Skipping {title} because we already saw an item with the same title.') + continue + title_filter.add(title) - blurb = u"""
""" + blurb = """
""" if image is not None: - blurb += u'' + blurb += f'%s' % title + blurb += f"

{title}" else: - blurb += u'

%s' % (link, title) + blurb += f'

{title}' + + pubdate = self.find_pubdate(item) + if pubdate is not None: + logger.debug(f'Raw pubdate={pubdate}') + pubdate = self.munge_pubdate(pubdate) + ts = parse(pubdate) + logger.debug(f'Translated pubdate into: {ts}') + blurb += f' {ts.strftime("%b %d")}' - if (description is not None and - self.item_is_interesting_for_article(title, - description, - item)): + if self.item_is_interesting_for_article(title, description, item): + logger.info(f'Item {title} is also interesting as an article details page; creating...') longblurb = blurb - longblurb += u"
" + longblurb += "
" longblurb += description - longblurb += u"

" - longblurb = longblurb.replace("font-size:34pt", - "font-size:44pt") + longblurb += "
" + longblurb = longblurb.replace("font-size:34pt", "font-size:44pt") self.details.add(longblurb) - - blurb += u"" + else: + logger.info(f'Item {title} isn\'t interesting for article details page; skipped.') + blurb += "" self.news.add(blurb) count += 1 + logger.debug(f'Added {count} items so far...') return count > 0