Fix wakeword.
[kiosk.git] / twitter_renderer.py
index 8a82e5e645731ba43eacb517db5c10891c138ca3..23c36f87131d64a12591f84f5c2e7b012a2479c8 100644 (file)
@@ -1,20 +1,31 @@
-import file_writer
+#!/usr/bin/env python3
+
+import logging
 import random
-import renderer
-import profanity_filter
 import re
-import secrets
-import tweepy
-
-class twitter_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict):
-        super(twitter_renderer, self).__init__(name_to_timeout_dict, False)
-        self.debug = 1
-        self.tweets_by_author = dict()
-        self.handles_by_author = dict()
-        self.filter = profanity_filter.profanity_filter()
+from typing import Dict, List, Optional
+
+import tweepy  # type: ignore
+from scottutilz import profanity_filter
+
+import file_writer
+import renderer
+import kiosk_secrets as secrets
+
+
+logger = logging.getLogger(__name__)
+
+
+class twitter_renderer(renderer.abstaining_renderer):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
+        super().__init__(name_to_timeout_dict)
+        self.debug = True
+        self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
+        self.handles_by_author: Dict[str, str] = {}
+        self.filter = profanity_filter.ProfanityFilter()
         self.urlfinder = re.compile(
-            "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)")
+            "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
+        )
 
         # == OAuth Authentication ==
         #
@@ -23,86 +34,119 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
 
         # The consumer keys can be found on your application's Details
         # page located at https://dev.twitter.com/apps (under "OAuth settings")
-        consumer_key=secrets.twitter_consumer_key
-        consumer_secret=secrets.twitter_consumer_secret
+        consumer_key = secrets.twitter_consumer_key
+        consumer_secret = secrets.twitter_consumer_secret
 
         # The access tokens can be found on your applications's Details
         # page located at https://dev.twitter.com/apps (located
         # under "Your access token")
-        access_token=secrets.twitter_access_token
-        access_token_secret=secrets.twitter_access_token_secret
+        access_token = secrets.twitter_access_token
+        access_token_secret = secrets.twitter_access_token_secret
 
         auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
         auth.set_access_token(access_token, access_token_secret)
         self.api = tweepy.API(auth)
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "twitter"
 
-    def linkify(self, value):
+    def linkify(self, value: str) -> str:
         return self.urlfinder.sub(r'<a href="\1">\1</a>', value)
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         if key == "Fetch Tweets":
             return self.fetch_tweets()
         elif key == "Shuffle Tweets":
             return self.shuffle_tweets()
         else:
-            raise error('Unexpected operation')
+            raise Exception("Unexpected operation")
 
-    def fetch_tweets(self):
+    def fetch_tweets(self) -> bool:
         try:
-            tweets = self.api.home_timeline(tweet_mode='extended', count=200)
-        except:
+            tweets = self.api.home_timeline(tweet_mode="extended", count=200)
+        except Exception as e:
+            logger.exception(e)
             print("Exception while fetching tweets!")
             return False
         for tweet in tweets:
+            # j = tweet._json
+            # import json
+            # print(json.dumps(j, indent=4, sort_keys=True))
+            # print("------")
+
             author = tweet.author.name
             author_handle = tweet.author.screen_name
             self.handles_by_author[author] = author_handle
             if author not in self.tweets_by_author:
-                self.tweets_by_author[author] = list()
-            l = self.tweets_by_author[author]
-            l.append(tweet)
+                self.tweets_by_author[author] = []
+            x = self.tweets_by_author[author]
+            x.append(tweet)
         return True
 
-    def shuffle_tweets(self):
+    def get_hashtags(self, tweet) -> str:
+        ret = " "
+        if "entities" in tweet._json:
+            entities = tweet._json["entities"]
+            if "hashtags" in entities:
+                for x in entities["hashtags"]:
+                    ret += f'<B>#{x["text"]}</B>, '
+        ret = re.sub(", $", "", ret)
+        return ret
+
+    def get_media_url(self, tweet) -> Optional[str]:
+        if "entities" in tweet._json:
+            entities = tweet._json["entities"]
+            if "media" in entities:
+                media = entities["media"]
+                for x in media:
+                    if "media_url_https" in x:
+                        return x["media_url_https"]
+        return None
+
+    def shuffle_tweets(self) -> bool:
         authors = list(self.tweets_by_author.keys())
         author = random.choice(authors)
         handle = self.handles_by_author[author]
         tweets = self.tweets_by_author[author]
         already_seen = set()
-        f = file_writer.file_writer('twitter_10_none.html')
-        f.write('<TABLE WIDTH=96%><TR><TD WIDTH=86%>')
-        f.write('<H2>%s (@%s)</H2></TD>\n' % (author, handle))
-        f.write('<TD ALIGN="right" VALIGN="top">')
-        f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
-        f.write('<HR>\n<UL>\n')
-        count = 0
-        length = 0
-        for tweet in tweets:
-            text = tweet.full_text
-            if ((text not in already_seen) and
-                (not self.filter.contains_bad_words(text))):
-                already_seen.add(text)
-                text = self.linkify(text)
-                f.write('<LI><B>%s</B>\n' % text)
-                count += 1
-                length += len(text)
-                if count > 3 or length > 270:
-                    break
-        f.write('</UL>\n')
-        f.close()
+        with file_writer.file_writer("twitter_10_3600.html") as f:
+            f.write("<TABLE WIDTH=96%><TR><TD WIDTH=86%>")
+            f.write("<H2>%s (@%s)</H2></TD>\n" % (author, handle))
+            f.write('<TD ALIGN="right" VALIGN="top">')
+            f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
+            f.write("<HR>\n<UL>\n")
+            count = 0
+            length = 0
+            for tweet in tweets:
+                text = tweet.full_text
+                if (text not in already_seen) and (
+                    not self.filter.contains_bad_word(text)
+                ):
+                    already_seen.add(text)
+                    text = self.linkify(text)
+                    text = f"<B>{text}</B>"
+                    text += self.get_hashtags(tweet)
+                    media_url = self.get_media_url(tweet)
+                    if media_url:
+                        text = f"<TABLE WIDTH=100%><TD WITDH=70%>{text}</TD>"
+                        text += f'<TD><IMG SRC="{media_url}" WIDTH=200></TD></TABLE>'
+                    f.write(f"<LI>{text}")
+                    count += 1
+                    length += len(text)
+                    if count > 3 or length > 270:
+                        break
+            f.write("</UL>\n")
         return True
 
+
 # Test
-t = twitter_renderer(
-    {"Fetch Tweets" : 1,
-     "Shuffle Tweets" : 1})
-#x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
-#x = t.linkify(x)
-#print x
-if t.fetch_tweets() == 0:
-    print("Error fetching tweets, none fetched.")
-else:
-    t.shuffle_tweets()
+t = twitter_renderer(
+#    {"Fetch Tweets" : 1,
+#     "Shuffle Tweets" : 1})
+# x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
+# x = t.linkify(x)
+# print(x)
+if t.fetch_tweets() == 0:
+#    print("Error fetching tweets, none fetched.")
+else:
+#    t.shuffle_tweets()