#!/usr/bin/env python3
import logging
import random
import re
from typing import Dict, List, Optional
import tweepy # type: ignore
from scottutilz import profanity_filter
import file_writer
import renderer
import kiosk_secrets as secrets
logger = logging.getLogger(__name__)
class twitter_renderer(renderer.abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super().__init__(name_to_timeout_dict)
self.debug = True
self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
self.handles_by_author: Dict[str, str] = {}
self.filter = profanity_filter.ProfanityFilter()
self.urlfinder = re.compile(
"((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
)
# == OAuth Authentication ==
#
# This mode of authentication is the new preferred way
# of authenticating with Twitter.
# The consumer keys can be found on your application's Details
# page located at https://dev.twitter.com/apps (under "OAuth settings")
consumer_key = secrets.twitter_consumer_key
consumer_secret = secrets.twitter_consumer_secret
# The access tokens can be found on your applications's Details
# page located at https://dev.twitter.com/apps (located
# under "Your access token")
access_token = secrets.twitter_access_token
access_token_secret = secrets.twitter_access_token_secret
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(auth)
def debug_prefix(self) -> str:
return "twitter"
def linkify(self, value: str) -> str:
return self.urlfinder.sub(r'\1', value)
def periodic_render(self, key: str) -> bool:
if key == "Fetch Tweets":
return self.fetch_tweets()
elif key == "Shuffle Tweets":
return self.shuffle_tweets()
else:
raise Exception("Unexpected operation")
def fetch_tweets(self) -> bool:
try:
tweets = self.api.home_timeline(tweet_mode="extended", count=200)
except Exception as e:
logger.exception(e)
print("Exception while fetching tweets!")
return False
for tweet in tweets:
# j = tweet._json
# import json
# print(json.dumps(j, indent=4, sort_keys=True))
# print("------")
author = tweet.author.name
author_handle = tweet.author.screen_name
self.handles_by_author[author] = author_handle
if author not in self.tweets_by_author:
self.tweets_by_author[author] = []
x = self.tweets_by_author[author]
x.append(tweet)
return True
def get_hashtags(self, tweet) -> str:
ret = " "
if "entities" in tweet._json:
entities = tweet._json["entities"]
if "hashtags" in entities:
for x in entities["hashtags"]:
ret += f'#{x["text"]}, '
ret = re.sub(", $", "", ret)
return ret
def get_media_url(self, tweet) -> Optional[str]:
if "entities" in tweet._json:
entities = tweet._json["entities"]
if "media" in entities:
media = entities["media"]
for x in media:
if "media_url_https" in x:
return x["media_url_https"]
return None
def shuffle_tweets(self) -> bool:
authors = list(self.tweets_by_author.keys())
author = random.choice(authors)
handle = self.handles_by_author[author]
tweets = self.tweets_by_author[author]
already_seen = set()
with file_writer.file_writer("twitter_10_3600.html") as f:
f.write("
")
f.write("%s (@%s) | \n" % (author, handle))
f.write('')
f.write(' |
\n')
f.write("
\n\n")
count = 0
length = 0
for tweet in tweets:
text = tweet.full_text
if (text not in already_seen) and (
not self.filter.contains_bad_word(text)
):
already_seen.add(text)
text = self.linkify(text)
text = f"{text}"
text += self.get_hashtags(tweet)
media_url = self.get_media_url(tweet)
if media_url:
text = f"{text} | "
text += f' |
'
f.write(f"- {text}")
count += 1
length += len(text)
if count > 3 or length > 270:
break
f.write("
\n")
return True
# Test
# t = twitter_renderer(
# {"Fetch Tweets" : 1,
# "Shuffle Tweets" : 1})
# x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
# x = t.linkify(x)
# print(x)
# if t.fetch_tweets() == 0:
# print("Error fetching tweets, none fetched.")
# else:
# t.shuffle_tweets()