X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=twitter_renderer.py;h=23c36f87131d64a12591f84f5c2e7b012a2479c8;hb=addd4980077f6e3857c5c035b49784dc3ceca49a;hp=173842ef0ea2801182760ff7a1d0111f1739bff7;hpb=d6990436e08a57ce211b10058dc61fb223cb94ec;p=kiosk.git diff --git a/twitter_renderer.py b/twitter_renderer.py index 173842e..23c36f8 100644 --- a/twitter_renderer.py +++ b/twitter_renderer.py @@ -1,20 +1,31 @@ -import file_writer +#!/usr/bin/env python3 + +import logging import random -import renderer -import profanity_filter import re -import secrets -import tweepy - -class twitter_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict): - super(twitter_renderer, self).__init__(name_to_timeout_dict, False) - self.debug = 1 - self.tweets_by_author = dict() - self.handles_by_author = dict() - self.filter = profanity_filter.profanity_filter() +from typing import Dict, List, Optional + +import tweepy # type: ignore +from scottutilz import profanity_filter + +import file_writer +import renderer +import kiosk_secrets as secrets + + +logger = logging.getLogger(__name__) + + +class twitter_renderer(renderer.abstaining_renderer): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: + super().__init__(name_to_timeout_dict) + self.debug = True + self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {} + self.handles_by_author: Dict[str, str] = {} + self.filter = profanity_filter.ProfanityFilter() self.urlfinder = re.compile( - "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)") + "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)" + ) # == OAuth Authentication == # @@ -23,86 +34,119 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer): # The consumer keys can be found on your application's Details # page located at https://dev.twitter.com/apps (under "OAuth settings") - consumer_key=secrets.twitter_consumer_key - consumer_secret=secrets.twitter_consumer_secret + consumer_key = secrets.twitter_consumer_key + consumer_secret = secrets.twitter_consumer_secret # The access tokens can be found on your applications's Details # page located at https://dev.twitter.com/apps (located # under "Your access token") - access_token=secrets.twitter_access_token - access_token_secret=secrets.twitter_access_token_secret + access_token = secrets.twitter_access_token + access_token_secret = secrets.twitter_access_token_secret auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) self.api = tweepy.API(auth) - def debug_prefix(self): + def debug_prefix(self) -> str: return "twitter" - def linkify(self, value): + def linkify(self, value: str) -> str: return self.urlfinder.sub(r'\1', value) - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: if key == "Fetch Tweets": return self.fetch_tweets() elif key == "Shuffle Tweets": return self.shuffle_tweets() else: - raise error('Unexpected operation') + raise Exception("Unexpected operation") - def fetch_tweets(self): + def fetch_tweets(self) -> bool: try: - tweets = self.api.home_timeline(tweet_mode='extended', count=200) - except: + tweets = self.api.home_timeline(tweet_mode="extended", count=200) + except Exception as e: + logger.exception(e) print("Exception while fetching tweets!") return False for tweet in tweets: + # j = tweet._json + # import json + # print(json.dumps(j, indent=4, sort_keys=True)) + # print("------") + author = tweet.author.name author_handle = tweet.author.screen_name self.handles_by_author[author] = author_handle if author not in self.tweets_by_author: - self.tweets_by_author[author] = list() - l = self.tweets_by_author[author] - l.append(tweet) + self.tweets_by_author[author] = [] + x = self.tweets_by_author[author] + x.append(tweet) return True - def shuffle_tweets(self): + def get_hashtags(self, tweet) -> str: + ret = " " + if "entities" in tweet._json: + entities = tweet._json["entities"] + if "hashtags" in entities: + for x in entities["hashtags"]: + ret += f'#{x["text"]}, ' + ret = re.sub(", $", "", ret) + return ret + + def get_media_url(self, tweet) -> Optional[str]: + if "entities" in tweet._json: + entities = tweet._json["entities"] + if "media" in entities: + media = entities["media"] + for x in media: + if "media_url_https" in x: + return x["media_url_https"] + return None + + def shuffle_tweets(self) -> bool: authors = list(self.tweets_by_author.keys()) author = random.choice(authors) handle = self.handles_by_author[author] tweets = self.tweets_by_author[author] already_seen = set() - f = file_writer.file_writer('twitter_10_3600.html') - f.write('\n' % (author, handle)) - f.write('
') - f.write('

%s (@%s)

') - f.write('
\n') - f.write('
\n\n') - f.close() + with file_writer.file_writer("twitter_10_3600.html") as f: + f.write("\n" % (author, handle)) + f.write('
") + f.write("

%s (@%s)

') + f.write('
\n') + f.write("
\n\n") return True + # Test -#t = twitter_renderer( +# t = twitter_renderer( # {"Fetch Tweets" : 1, # "Shuffle Tweets" : 1}) -#x = "bla bla bla https://t.co/EjWnT3UA9U bla bla" -#x = t.linkify(x) -#print x -#if t.fetch_tweets() == 0: +# x = "bla bla bla https://t.co/EjWnT3UA9U bla bla" +# x = t.linkify(x) +# print(x) +# if t.fetch_tweets() == 0: # print("Error fetching tweets, none fetched.") -#else: +# else: # t.shuffle_tweets()