#!/usr/bin/env python3 import logging import random import re from typing import Dict, List, Optional import tweepy # type: ignore from scottutilz import profanity_filter import file_writer import renderer import kiosk_secrets as secrets logger = logging.getLogger(__name__) class twitter_renderer(renderer.abstaining_renderer): def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: super().__init__(name_to_timeout_dict) self.debug = True self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {} self.handles_by_author: Dict[str, str] = {} self.filter = profanity_filter.ProfanityFilter() self.urlfinder = re.compile( "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)" ) # == OAuth Authentication == # # This mode of authentication is the new preferred way # of authenticating with Twitter. # The consumer keys can be found on your application's Details # page located at https://dev.twitter.com/apps (under "OAuth settings") consumer_key = secrets.twitter_consumer_key consumer_secret = secrets.twitter_consumer_secret # The access tokens can be found on your applications's Details # page located at https://dev.twitter.com/apps (located # under "Your access token") access_token = secrets.twitter_access_token access_token_secret = secrets.twitter_access_token_secret auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) self.api = tweepy.API(auth) def debug_prefix(self) -> str: return "twitter" def linkify(self, value: str) -> str: return self.urlfinder.sub(r'\1', value) def periodic_render(self, key: str) -> bool: if key == "Fetch Tweets": return self.fetch_tweets() elif key == "Shuffle Tweets": return self.shuffle_tweets() else: raise Exception("Unexpected operation") def fetch_tweets(self) -> bool: try: tweets = self.api.home_timeline(tweet_mode="extended", count=200) except Exception as e: logger.exception(e) print("Exception while fetching tweets!") return False for tweet in tweets: # j = tweet._json # import json # print(json.dumps(j, indent=4, sort_keys=True)) # print("------") author = tweet.author.name author_handle = tweet.author.screen_name self.handles_by_author[author] = author_handle if author not in self.tweets_by_author: self.tweets_by_author[author] = [] x = self.tweets_by_author[author] x.append(tweet) return True def get_hashtags(self, tweet) -> str: ret = " " if "entities" in tweet._json: entities = tweet._json["entities"] if "hashtags" in entities: for x in entities["hashtags"]: ret += f'#{x["text"]}, ' ret = re.sub(", $", "", ret) return ret def get_media_url(self, tweet) -> Optional[str]: if "entities" in tweet._json: entities = tweet._json["entities"] if "media" in entities: media = entities["media"] for x in media: if "media_url_https" in x: return x["media_url_https"] return None def shuffle_tweets(self) -> bool: authors = list(self.tweets_by_author.keys()) author = random.choice(authors) handle = self.handles_by_author[author] tweets = self.tweets_by_author[author] already_seen = set() with file_writer.file_writer("twitter_10_3600.html") as f: f.write("\n" % (author, handle)) f.write('
") f.write("

%s (@%s)

') f.write('
\n') f.write("
\n\n") return True # Test # t = twitter_renderer( # {"Fetch Tweets" : 1, # "Shuffle Tweets" : 1}) # x = "bla bla bla https://t.co/EjWnT3UA9U bla bla" # x = t.linkify(x) # print(x) # if t.fetch_tweets() == 0: # print("Error fetching tweets, none fetched.") # else: # t.shuffle_tweets()