+#!/usr/bin/env python3
+
+from abc import abstractmethod
import datetime
from dateutil.parser import parse
+import http.client
+import logging
+import re
+from typing import Dict, List, Optional, Union
+import xml.etree.ElementTree as ET
+
+from scottutilz import profanity_filter
+
import file_writer
import grab_bag
import renderer
-import http.client
import page_builder
-import profanity_filter
-import random
-import re
-import xml.etree.ElementTree as ET
-class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
- super(generic_news_rss_renderer, self).__init__(name_to_timeout_dict,
- False)
- self.debug = 1
+
+logger = logging.getLogger(__file__)
+
+
+class generic_news_rss_renderer(renderer.abstaining_renderer):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
+ super().__init__(name_to_timeout_dict)
self.feed_site = feed_site
self.feed_uris = feed_uris
self.page_title = page_title
self.news = grab_bag.grab_bag()
self.details = grab_bag.grab_bag()
- self.filter = profanity_filter.profanity_filter()
+ self.filter = profanity_filter.ProfanityFilter()
- def debug_prefix(self):
+ @abstractmethod
+ def get_headlines_page_prefix(self) -> str:
pass
- def get_headlines_page_prefix(self):
+ @abstractmethod
+ def get_details_page_prefix(self) -> str:
pass
- def get_details_page_prefix(self):
- pass
+ def get_headlines_page_priority(self) -> str:
+ return "4"
- def should_use_https(self):
+ def get_details_page_priority(self) -> str:
+ return "6"
+
+ @abstractmethod
+ def should_use_https(self) -> bool:
pass
- def should_profanity_filter(self):
+ def should_profanity_filter(self) -> bool:
return False
- def find_title(self, item):
- return item.findtext('title')
+ def find_title(self, item: ET.Element) -> Optional[str]:
+ return item.findtext("title")
- def munge_title(self, title):
+ def munge_title(self, title: str, item: ET.Element) -> str:
return title
- def find_description(self, item):
- return item.findtext('description')
+ def find_description(self, item: ET.Element) -> Optional[str]:
+ return item.findtext("description")
- def munge_description(self, description):
- description = re.sub('<[^>]+>', '', description)
+ def munge_description(
+ self,
+ description: str,
+ item: ET.Element
+ ) -> str:
+ description = re.sub("<[^>]+>", "", description)
return description
- def find_link(self, item):
- return item.findtext('link')
+ def find_link(self, item: ET.Element) -> Optional[str]:
+ return item.findtext("link")
- def munge_link(self, link):
+ def munge_link(self, link: str) -> str:
return link
- def find_image(self, item):
- return item.findtext('image')
+ def find_image(self, item: ET.Element) -> Optional[str]:
+ return item.findtext("image")
- def munge_image(self, image):
+ def munge_image(self, image: str) -> str:
return image
- def item_is_interesting_for_headlines(self, title, description, item):
+ def find_pubdate(self, item: ET.Element) -> Optional[str]:
+ return item.findtext("pubDate")
+
+ def munge_pubdate(self, pubdate: str) -> str:
+ return pubdate
+
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: ET.Element
+ ) -> bool:
return True
- def is_item_older_than_n_days(self, item, n):
- pubdate = item.findtext('pubDate')
- if pubdate is not None:
- pubdate = parse(pubdate)
- tzinfo = pubdate.tzinfo
- now = datetime.datetime.now(tzinfo)
- delta = (now - pubdate).total_seconds() / (60 * 60 * 24)
- if (delta > n):
- return True
- return False
+ def do_headlines(self) -> bool:
+ return True
+
+ def do_details(self) -> bool:
+ return True
+
+ def is_item_older_than_n_days(self, item: ET.Element, n: int) -> bool:
+ pubdate = self.find_pubdate(item)
+ if pubdate is None:
+ return False
+ pubdatetime = parse(pubdate)
+ tzinfo = pubdatetime.tzinfo
+ now = datetime.datetime.now(tzinfo)
+ delta = (now - pubdatetime).total_seconds() / (60 * 60 * 24)
+ return delta > n
- def item_is_interesting_for_article(self, title, description, item):
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: ET.Element
+ ) -> bool:
return True
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
if key == "Fetch News":
return self.fetch_news()
elif key == "Shuffle News":
return self.shuffle_news()
else:
- raise error('Unexpected operation')
-
- def shuffle_news(self):
- headlines = page_builder.page_builder()
- headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
- headlines.set_title("%s" % self.page_title)
- subset = self.news.subset(4)
- if subset is None:
- self.debug_print("Not enough messages to choose from.")
- return False
- for msg in subset:
- headlines.add_item(msg)
- f = file_writer.file_writer('%s_4_none.html' % (
- self.get_headlines_page_prefix()))
- headlines.render_html(f)
- f.close()
-
- details = page_builder.page_builder()
- details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
- details.set_title("%s" % self.page_title)
- subset = self.details.subset(1)
- if subset is None:
- self.debug_print("Not enough details to choose from.");
- return False
- for msg in subset:
- blurb = msg
- blurb += u'</TD>'
- details.add_item(blurb)
- g = file_writer.file_writer('%s_6_none.html' % (
- self.get_details_page_prefix()))
- details.render_html(g)
- g.close()
+ raise Exception
+
+ def shuffle_news(self) -> bool:
+ if self.do_headlines():
+ headlines = page_builder.page_builder()
+ headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
+ headlines.set_title("%s" % self.page_title)
+ subset = self.news.subset(4)
+ if subset is None:
+ logger.warning('Not enough messages to select from in shuffle_news?!')
+ return False
+ for msg in subset:
+ headlines.add_item(msg)
+ headlines.set_custom_html(
+ """
+ <STYLE>
+ a:link {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:visited {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:active {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ </STYLE>"""
+ )
+ _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html"
+ with file_writer.file_writer(_) as f:
+ headlines.render_html(f)
+
+ if self.do_details():
+ details = page_builder.page_builder()
+ details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
+ details.set_custom_html(
+ """
+ <STYLE>
+ a:link {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:visited {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:active {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ </STYLE>"""
+ )
+ details.set_title(self.page_title)
+ subset = self.details.subset(1)
+ if subset is None:
+ logger.warning('Not enough details to choose from in do_details')
+ logger.debug("Not enough details to choose from.")
+ return False
+ for msg in subset:
+ blurb = msg
+ blurb += "</TD>"
+ details.add_item(blurb)
+ _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html"
+ with file_writer.file_writer(_) as g:
+ details.render_html(g)
return True
- def fetch_news(self):
+ def fetch_news(self) -> bool:
count = 0
self.news.clear()
self.details.clear()
+ self.conn: Optional[Union[http.client.HTTPConnection,
+ http.client.HTTPSConnection]] = None
for uri in self.feed_uris:
+ url = None
if self.should_use_https():
- self.debug_print("Fetching: https://%s%s" % (self.feed_site, uri))
- self.conn = http.client.HTTPSConnection(self.feed_site)
+ url = f'https://{self.feed_site}{uri}'
+ logger.info(f'Fetching: {url}')
+ self.conn = http.client.HTTPSConnection(self.feed_site, timeout=10)
else:
- self.debug_print("Fetching: http://%s%s" % (self.feed_site, uri))
- self.conn = http.client.HTTPConnection(self.feed_site)
+ url = f'http://{self.feed_site}{uri}'
+ logger.info(f'Fetching: {url}')
+ self.conn = http.client.HTTPConnection(self.feed_site, timeout=10)
+ assert self.conn is not None
+ assert url is not None
self.conn.request(
"GET",
uri,
None,
- {"Accept-Charset": "utf-8"})
- response = self.conn.getresponse()
+ {
+ "Accept": "*/*",
+ "Cache-control": "max-age=50",
+ },
+ )
+ try:
+ response = self.conn.getresponse()
+ except Exception as e:
+ logger.exception(e)
+ logger.error(
+ f"Exception in generic RSS renderer HTTP connection fetching {url}; giving up."
+ )
+ return False
+
if response.status != 200:
- print(("%s: RSS fetch_news error, response: %d" % (self.page_title,
- response.status)))
- self.debug_print(response.read())
+ logger.error(
+ f'Unexpected status {response.status} while fetching {url}; giving up.'
+ )
return False
- rss = ET.fromstring(response.read())
+ raw = response.read()
+ logger.info(f'Status 200: got {len(raw)} bytes back from {url}')
+ rss = ET.fromstring(raw)
channel = rss[0]
- for item in channel.getchildren():
+ title_filter = set()
+ for item in list(channel):
title = self.find_title(item)
+ description = item.findtext("description")
if title is not None:
- title = self.munge_title(title)
- description = item.findtext('description')
+ title = self.munge_title(title, item)
+ else:
+ logger.info('Skipping RSS feed item with no title.')
+ continue
+ logger.debug(f'Considering RSS item {title}...')
if description is not None:
- description = self.munge_description(description)
+ description = self.munge_description(description, item)
+ else:
+ description = ""
image = self.find_image(item)
if image is not None:
image = self.munge_image(image)
- link = item.findtext('link')
+ link = item.findtext("link")
if link is not None:
link = self.munge_link(link)
-
- if (title is None or
- not self.item_is_interesting_for_headlines(title,
- description,
- item)):
- self.debug_print('Item "%s" is not interesting' % title)
+ if not self.item_is_interesting_for_headlines(
+ title, description, item
+ ):
+ logger.info(f'Skipping {title} because it\'s not interesting.')
continue
- if (self.should_profanity_filter() and
- (self.filter.contains_bad_words(title) or
- self.filter.contains_bad_words(description))):
- self.debug_print('Found bad words in item "%s"' % title)
+ if self.should_profanity_filter() and (
+ self.filter.contains_bad_word(title)
+ or self.filter.contains_bad_word(description)
+ ):
+ logger.info(f'Skipping {title} because it contains profanity.')
continue
- #print u"Title: %s\nDescription: %s\nLink: %s\nImage: %s\n" % (
- # title, description, link, image)
+ if title in title_filter:
+ logger.info(f'Skipping {title} because we already saw an item with the same title.')
+ continue
+ title_filter.add(title)
- blurb = u"""<DIV style="padding:8px;
- font-size:34pt;
- -webkit-column-break-inside:avoid;">"""
+ blurb = """<DIV style="padding:8px;
+ font-size:34pt;
+ -webkit-column-break-inside:avoid;">"""
if image is not None:
- blurb += u'<IMG SRC="%s" ALIGN=LEFT HEIGHT=115 ' % image
- blurb += u'style="padding:8px;">'
+ blurb += f'<IMG SRC="{image}" ALIGN=LEFT HEIGHT=115 '
+ blurb += 'style="padding:8px;">'
if link is None:
- blurb += u'<P><B>%s</B>' % title
+ blurb += f"<P><B>{title}</B>"
else:
- blurb += u'<P><B><A HREF="%s">%s</A></B>' % (link, title)
+ blurb += f'<P><B><A HREF="{link}">{title}</A></B>'
+
+ pubdate = self.find_pubdate(item)
+ if pubdate is not None:
+ logger.debug(f'Raw pubdate={pubdate}')
+ pubdate = self.munge_pubdate(pubdate)
+ ts = parse(pubdate)
+ logger.debug(f'Translated pubdate into: {ts}')
+ blurb += f' <FONT COLOR=#cccccc>{ts.strftime("%b %d")}</FONT>'
- if (description is not None and
- self.item_is_interesting_for_article(title,
- description,
- item)):
+ if self.item_is_interesting_for_article(title, description, item):
+ logger.info(f'Item {title} is also interesting as an article details page; creating...')
longblurb = blurb
- longblurb += u"<BR>"
+ longblurb += "<BR>"
longblurb += description
- longblurb += u"</DIV>"
- longblurb = longblurb.replace("font-size:34pt",
- "font-size:44pt")
+ longblurb += "</DIV>"
+ longblurb = longblurb.replace("font-size:34pt", "font-size:44pt")
self.details.add(longblurb)
-
- blurb += u"</DIV>"
+ else:
+ logger.info(f'Item {title} isn\'t interesting for article details page; skipped.')
+ blurb += "</DIV>"
self.news.add(blurb)
count += 1
+ logger.debug(f'Added {count} items so far...')
return count > 0