Merge branch 'master' of ssh://wwwjail.house/usr/local/git/base/kiosk
[kiosk.git] / twitter_renderer.py
1 #!/usr/bin/env python3
2
3 import logging
4 import random
5 import re
6 from typing import Dict, List, Optional
7
8 import tweepy  # type: ignore
9 from scottutilz import profanity_filter
10
11 import file_writer
12 import renderer
13 import kiosk_secrets as secrets
14
15
16 logger = logging.getLogger(__name__)
17
18
19 class twitter_renderer(renderer.abstaining_renderer):
20     def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
21         super().__init__(name_to_timeout_dict)
22         self.debug = True
23         self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
24         self.handles_by_author: Dict[str, str] = {}
25         self.filter = profanity_filter.ProfanityFilter()
26         self.urlfinder = re.compile(
27             "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
28         )
29
30         # == OAuth Authentication ==
31         #
32         # This mode of authentication is the new preferred way
33         # of authenticating with Twitter.
34
35         # The consumer keys can be found on your application's Details
36         # page located at https://dev.twitter.com/apps (under "OAuth settings")
37         consumer_key = secrets.twitter_consumer_key
38         consumer_secret = secrets.twitter_consumer_secret
39
40         # The access tokens can be found on your applications's Details
41         # page located at https://dev.twitter.com/apps (located
42         # under "Your access token")
43         access_token = secrets.twitter_access_token
44         access_token_secret = secrets.twitter_access_token_secret
45
46         auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
47         auth.set_access_token(access_token, access_token_secret)
48         self.api = tweepy.API(auth)
49
50     def debug_prefix(self) -> str:
51         return "twitter"
52
53     def linkify(self, value: str) -> str:
54         return self.urlfinder.sub(r'<a href="\1">\1</a>', value)
55
56     def periodic_render(self, key: str) -> bool:
57         if key == "Fetch Tweets":
58             return self.fetch_tweets()
59         elif key == "Shuffle Tweets":
60             return self.shuffle_tweets()
61         else:
62             raise Exception("Unexpected operation")
63
64     def fetch_tweets(self) -> bool:
65         try:
66             tweets = self.api.home_timeline(tweet_mode="extended", count=200)
67         except Exception as e:
68             logger.exception(e)
69             print("Exception while fetching tweets!")
70             return False
71         for tweet in tweets:
72             # j = tweet._json
73             # import json
74             # print(json.dumps(j, indent=4, sort_keys=True))
75             # print("------")
76
77             author = tweet.author.name
78             author_handle = tweet.author.screen_name
79             self.handles_by_author[author] = author_handle
80             if author not in self.tweets_by_author:
81                 self.tweets_by_author[author] = []
82             x = self.tweets_by_author[author]
83             x.append(tweet)
84         return True
85
86     def get_hashtags(self, tweet) -> str:
87         ret = " "
88         if "entities" in tweet._json:
89             entities = tweet._json["entities"]
90             if "hashtags" in entities:
91                 for x in entities["hashtags"]:
92                     ret += f'<B>#{x["text"]}</B>, '
93         ret = re.sub(", $", "", ret)
94         return ret
95
96     def get_media_url(self, tweet) -> Optional[str]:
97         if "entities" in tweet._json:
98             entities = tweet._json["entities"]
99             if "media" in entities:
100                 media = entities["media"]
101                 for x in media:
102                     if "media_url_https" in x:
103                         return x["media_url_https"]
104         return None
105
106     def shuffle_tweets(self) -> bool:
107         authors = list(self.tweets_by_author.keys())
108         author = random.choice(authors)
109         handle = self.handles_by_author[author]
110         tweets = self.tweets_by_author[author]
111         already_seen = set()
112         with file_writer.file_writer("twitter_10_3600.html") as f:
113             f.write("<TABLE WIDTH=96%><TR><TD WIDTH=86%>")
114             f.write("<H2>%s (@%s)</H2></TD>\n" % (author, handle))
115             f.write('<TD ALIGN="right" VALIGN="top">')
116             f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
117             f.write("<HR>\n<UL>\n")
118             count = 0
119             length = 0
120             for tweet in tweets:
121                 text = tweet.full_text
122                 if (text not in already_seen) and (
123                     not self.filter.contains_bad_word(text)
124                 ):
125                     already_seen.add(text)
126                     text = self.linkify(text)
127                     text = f"<B>{text}</B>"
128                     text += self.get_hashtags(tweet)
129                     media_url = self.get_media_url(tweet)
130                     if media_url:
131                         text = f"<TABLE WIDTH=100%><TD WITDH=70%>{text}</TD>"
132                         text += f'<TD><IMG SRC="{media_url}" WIDTH=200></TD></TABLE>'
133                     f.write(f"<LI>{text}")
134                     count += 1
135                     length += len(text)
136                     if count > 3 or length > 270:
137                         break
138             f.write("</UL>\n")
139         return True
140
141
142 # Test
143 # t = twitter_renderer(
144 #    {"Fetch Tweets" : 1,
145 #     "Shuffle Tweets" : 1})
146 # x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
147 # x = t.linkify(x)
148 # print(x)
149 # if t.fetch_tweets() == 0:
150 #    print("Error fetching tweets, none fetched.")
151 # else:
152 #    t.shuffle_tweets()