-import file_writer
+#!/usr/bin/env python3
+
+import logging
import random
-import renderer
-import profanity_filter
import re
-import secrets
-import tweepy
-
-class twitter_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict):
- super(twitter_renderer, self).__init__(name_to_timeout_dict, False)
- self.debug = 1
- self.tweets_by_author = dict()
- self.handles_by_author = dict()
- self.filter = profanity_filter.profanity_filter()
+from typing import Dict, List, Optional
+
+import tweepy # type: ignore
+from scottutilz import profanity_filter
+
+import file_writer
+import renderer
+import kiosk_secrets as secrets
+
+
+logger = logging.getLogger(__name__)
+
+
+class twitter_renderer(renderer.abstaining_renderer):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
+ super().__init__(name_to_timeout_dict)
+ self.debug = True
+ self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
+ self.handles_by_author: Dict[str, str] = {}
+ self.filter = profanity_filter.ProfanityFilter()
self.urlfinder = re.compile(
- "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)")
+ "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
+ )
# == OAuth Authentication ==
#
# The consumer keys can be found on your application's Details
# page located at https://dev.twitter.com/apps (under "OAuth settings")
- consumer_key=secrets.twitter_consumer_key
- consumer_secret=secrets.twitter_consumer_secret
+ consumer_key = secrets.twitter_consumer_key
+ consumer_secret = secrets.twitter_consumer_secret
# The access tokens can be found on your applications's Details
# page located at https://dev.twitter.com/apps (located
# under "Your access token")
- access_token=secrets.twitter_access_token
- access_token_secret=secrets.twitter_access_token_secret
+ access_token = secrets.twitter_access_token
+ access_token_secret = secrets.twitter_access_token_secret
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(auth)
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "twitter"
- def linkify(self, value):
+ def linkify(self, value: str) -> str:
return self.urlfinder.sub(r'<a href="\1">\1</a>', value)
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
if key == "Fetch Tweets":
return self.fetch_tweets()
elif key == "Shuffle Tweets":
return self.shuffle_tweets()
else:
- raise error('Unexpected operation')
+ raise Exception("Unexpected operation")
- def fetch_tweets(self):
+ def fetch_tweets(self) -> bool:
try:
- tweets = self.api.home_timeline(tweet_mode='extended', count=200)
- except:
+ tweets = self.api.home_timeline(tweet_mode="extended", count=200)
+ except Exception as e:
+ logger.exception(e)
print("Exception while fetching tweets!")
return False
for tweet in tweets:
+ # j = tweet._json
+ # import json
+ # print(json.dumps(j, indent=4, sort_keys=True))
+ # print("------")
+
author = tweet.author.name
author_handle = tweet.author.screen_name
self.handles_by_author[author] = author_handle
if author not in self.tweets_by_author:
- self.tweets_by_author[author] = list()
- l = self.tweets_by_author[author]
- l.append(tweet)
+ self.tweets_by_author[author] = []
+ x = self.tweets_by_author[author]
+ x.append(tweet)
return True
- def shuffle_tweets(self):
+ def get_hashtags(self, tweet) -> str:
+ ret = " "
+ if "entities" in tweet._json:
+ entities = tweet._json["entities"]
+ if "hashtags" in entities:
+ for x in entities["hashtags"]:
+ ret += f'<B>#{x["text"]}</B>, '
+ ret = re.sub(", $", "", ret)
+ return ret
+
+ def get_media_url(self, tweet) -> Optional[str]:
+ if "entities" in tweet._json:
+ entities = tweet._json["entities"]
+ if "media" in entities:
+ media = entities["media"]
+ for x in media:
+ if "media_url_https" in x:
+ return x["media_url_https"]
+ return None
+
+ def shuffle_tweets(self) -> bool:
authors = list(self.tweets_by_author.keys())
author = random.choice(authors)
handle = self.handles_by_author[author]
tweets = self.tweets_by_author[author]
already_seen = set()
- f = file_writer.file_writer('twitter_10_3600.html')
- f.write('<TABLE WIDTH=96%><TR><TD WIDTH=86%>')
- f.write('<H2>%s (@%s)</H2></TD>\n' % (author, handle))
- f.write('<TD ALIGN="right" VALIGN="top">')
- f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
- f.write('<HR>\n<UL>\n')
- count = 0
- length = 0
- for tweet in tweets:
- text = tweet.full_text
- if ((text not in already_seen) and
- (not self.filter.contains_bad_words(text))):
- already_seen.add(text)
- text = self.linkify(text)
- f.write('<LI><B>%s</B>\n' % text)
- count += 1
- length += len(text)
- if count > 3 or length > 270:
- break
- f.write('</UL>\n')
- f.close()
+ with file_writer.file_writer("twitter_10_3600.html") as f:
+ f.write("<TABLE WIDTH=96%><TR><TD WIDTH=86%>")
+ f.write("<H2>%s (@%s)</H2></TD>\n" % (author, handle))
+ f.write('<TD ALIGN="right" VALIGN="top">')
+ f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
+ f.write("<HR>\n<UL>\n")
+ count = 0
+ length = 0
+ for tweet in tweets:
+ text = tweet.full_text
+ if (text not in already_seen) and (
+ not self.filter.contains_bad_word(text)
+ ):
+ already_seen.add(text)
+ text = self.linkify(text)
+ text = f"<B>{text}</B>"
+ text += self.get_hashtags(tweet)
+ media_url = self.get_media_url(tweet)
+ if media_url:
+ text = f"<TABLE WIDTH=100%><TD WITDH=70%>{text}</TD>"
+ text += f'<TD><IMG SRC="{media_url}" WIDTH=200></TD></TABLE>'
+ f.write(f"<LI>{text}")
+ count += 1
+ length += len(text)
+ if count > 3 or length > 270:
+ break
+ f.write("</UL>\n")
return True
+
# Test
-#t = twitter_renderer(
+# t = twitter_renderer(
# {"Fetch Tweets" : 1,
# "Shuffle Tweets" : 1})
-#x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
-#x = t.linkify(x)
-#print x
-#if t.fetch_tweets() == 0:
+# x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
+# x = t.linkify(x)
+# print(x)
+# if t.fetch_tweets() == 0:
# print("Error fetching tweets, none fetched.")
-#else:
+# else:
# t.shuffle_tweets()