Getting this up-to-date.
@staticmethod
def looks_like_spam(title: str, description: str) -> bool:
return (
- 'marketplace' in description
- or 'national-marketplace' in description
- or re.search('[Ww]eed', title) is not None
- or re.search('[Cc]annabis', title) is not None
- or re.search('[Cc]annabis', description) is not None
- or 'THC' in title
- or re.search('[Tt]op.[Rr]ated', title) is not None
- or re.search('[Ll]ose [Ww]eight', title) is not None
- or re.search('[Ll]ose [Ww]eight', description) is not None
+ description is not None
+ and title is not None
+ and (
+ 'marketplace' in description
+ or 'national-marketplace' in description
+ or re.search('[Ww]eed', title) is not None
+ or re.search('[Cc]annabis', title) is not None
+ or re.search('[Cc]annabis', description) is not None
+ or 'THC' in title
+ or re.search('[Tt]op.[Rr]ated', title) is not None
+ or re.search('[Ll]ose [Ww]eight', title) is not None
+ or re.search('[Ll]ose [Ww]eight', description) is not None
+ )
)
def item_is_interesting_for_headlines(
import time
from typing import Any, Callable, List, Optional, Set, Tuple
-from pyutilz.datetimez import datetime_utils
+from pyutils.datetimes import datetime_utils
import kiosk_constants
import trigger
import kiosk_constants
+
class file_writer:
"""Helper context to write a pages file."""
continue
delta = eventstamp - now
x = int(delta.total_seconds())
- if x > -120 and x < 4 * kiosk_constants.seconds_per_minute:
+ if x > 0 and x < 4 * kiosk_constants.seconds_per_minute:
days = divmod(x, kiosk_constants.seconds_per_day)
hours = divmod(days[1], kiosk_constants.seconds_per_hour)
minutes = divmod(hours[1], kiosk_constants.seconds_per_minute)
from typing import List, Optional, Tuple
-import constants
+import kiosk_constants as constants
import globals
import trigger
--- /dev/null
+dynamic weights for pages
+make the renderers declare what pages they wrote
+make the renderers have tags and use search in chooser to filter by tag
+split triggers away from page chooser; main chooser thread in kiosk.py gets triggers
+ ..from the page queue just like voice commands and therefore doesn't have to poll
+ ..the chooser as often; can probably be a blocking read with a longer timeout.
import pvporcupine
import pytz
-from pyutilz import (
+from pyutils import (
bootstrap,
config,
)
-from pyutilz.datetimez import datetime_utils
-from pyutilz.files import file_utils
+from pyutils.datetimes import datetime_utils
+from pyutils.files import file_utils
import kiosk_constants
import file_writer
myq_pagename = "myq_4_300.html"
render_stats_pagename = 'internal/render-stats_1_1000.html'
gcal_imminent_pagename = "hidden/gcal-imminent_0_none.html"
+
+static_content_url_filepath = "/home/pi/kiosk_static_url.txt"
album_whitelist = frozenset(
[
- "8-Mile Lake Hike",
+ "Autumn at Kubota",
"Bangkok and Phuket, 2003",
"Barn",
"Blue Angels... Seafair",
"Portland, ME 2021",
"Prague and Munich 2019",
"Random",
+ "SFO 2014",
"Scott and Lynn",
"Sculpture Place",
- "SFO 2014",
"Skiing with Alex",
"Sonoma",
"Trip to California, '16",
"Trip to East Coast '16",
"Turkey 2022",
"Tuscany 2008",
- "Yosemite 2010",
"WA Roadtrip, 2021",
+ "Yosemite 2010",
"Zoo",
]
)
import datetime_utils
import file_utils
-import constants
+import kiosk_constants as constants
import renderer_catalog
import chooser
import listen
import pymyq # type: ignore
from typing import Dict, Optional
-from pyutilz.datetimez import datetime_utils
+from pyutils.datetimes import datetime_utils
import kiosk_constants
import file_writer
import time
from typing import Dict, Optional, Set
-from pyutilz.decorator_utils import invocation_logged
+from pyutils.decorator_utils import invocation_logged
logger = logging.getLogger(__file__)
"ABHYX",
"SPAB",
"SPHD",
+ "SCHD",
+ "BCD",
"GC=F",
- "VDC",
+ "VYM",
"VYMI",
+ "VDC",
"VNQ",
"VNQI",
],
- { "BTC-USD": "BTC",
- "GC=F": "GOLD" },
+ {
+ "BTC-USD": "BTC",
+ "GC=F": "GOLD"
+ },
),
seattletimes_rss_renderer.seattletimes_rss_renderer(
{"Fetch News": (hours * 4), "Shuffle News": (always)},
#!/usr/bin/env python3
import logging
-from typing import Dict, List, Optional, Tuple
+import os
+from typing import Any, Dict, List, Optional, Tuple
import yfinance as yf # type: ignore
+import plotly.graph_objects as go
import file_writer
+import kiosk_constants
import renderer
super().__init__(name_to_timeout_dict)
self.symbols = symbols
self.display_subs = display_subs
+ self.info_cache: Dict[yf.ticker.Ticker, Dict] = {}
- @staticmethod
- def get_ticker_name(ticker: yf.ticker.Ticker) -> str:
+ def cache_info(self, ticker: yf.ticker.Ticker) -> Dict:
+ if ticker in self.info_cache:
+ return self.info_cache[ticker]
+ i = ticker.get_info()
+ self.info_cache[ticker] = i
+ return i
+
+ def get_ticker_name(self, ticker: yf.Ticker) -> str:
"""Get friendly name of a ticker."""
- info = ticker.get_info()
+ info = self.cache_info(ticker)
if "shortName" in info:
return info["shortName"]
return ticker
@staticmethod
- def get_price(ticker: yf.ticker.Ticker) -> Optional[float]:
- """Get most recent price of a ticker."""
- keys = [
- "bid",
- "ask",
- "regularMarketPrice",
- "lastMarket",
- "open",
- "previousClose",
- ]
- info = ticker.get_info()
+ def get_item_from_dict(keys: List[str], dictionary: Dict[str, Any]) -> Optional[Any]:
+ result = None
for key in keys:
- if key in info and info[key] is not None and info[key] != 0.0:
- print(f"Price: picked {key}, ${info[key]}.")
- return float(info[key])
+ result = dictionary.get(key, None)
+ if result:
+ return result
return None
+ def get_price(self, ticker: yf.Ticker) -> Optional[float]:
+ """Get most recent price of a ticker."""
+
+ # First try fast_info
+ price = stock_quote_renderer.get_item_from_dict(
+ ['last_price',
+ 'open',
+ 'previous_close'],
+ ticker.fast_info)
+ if price:
+ return price
+
+ # Next try info
+ price = stock_quote_renderer.get_item_from_dict(
+ ['bid',
+ 'ask',
+ 'lastMarket'],
+ self.cache_info(ticker))
+ if price:
+ return price
+
+ # Finally, fall back on history
+ hist = ticker.history(period="1d").to_dict()['Close']
+ latest = None
+ latest_price = None
+ for k, v in hist.items():
+ if latest is None or k > latest:
+ price = hist[k]
+ if price is not None:
+ latest = k
+ latest_price = price
+ print(f"Price: fell back on latest close {latest_price} at {latest}")
+ return latest_price
+
@staticmethod
+ def make_chart(symbol: str, ticker: yf.Ticker, period: str) -> str:
+ base_filename = f'stock_chart_{symbol}.png'
+ output_filename = os.path.join(kiosk_constants.pages_dir, base_filename)
+ transparent = go.Layout(
+ paper_bgcolor='rgba(0,0,0,0)',
+ plot_bgcolor='rgba(0,0,0,0)',
+ xaxis_rangeslider_visible=False,
+ )
+ hist = ticker.history(period=period, interval="1wk")
+ chart = go.Figure(
+ data=go.Candlestick(
+ open=hist['Open'],
+ high=hist['High'],
+ low=hist['Low'],
+ close=hist['Close'],
+ ),
+ layout=transparent,
+ )
+ chart.update_xaxes(visible=False, showticklabels=False)
+ chart.update_yaxes(side="right")
+ chart.write_image(output_filename, format="png", width=600, height=350)
+ print(f"Write {output_filename}...")
+ return base_filename
+
+ def get_last_close(
+ self,
+ ticker: yf.Ticker
+ ) -> float:
+ last_close = stock_quote_renderer.get_item_from_dict(
+ ['previous_close',
+ 'open'],
+ ticker.fast_info)
+ if last_close:
+ return last_close
+
+ last_close = stock_quote_renderer.get_item_from_dict(
+ ['preMarketPrice'],
+ self.cache_info(ticker))
+ if last_close:
+ return last_close
+ return self.get_price(ticker)
+
def get_change_and_delta(
- ticker: yf.ticker.Ticker, price: float
+ self,
+ ticker: yf.Ticker,
+ price: float
) -> Tuple[float, float]:
"""Given the current price, look up opening price and compute delta."""
- keys = [
- "previousClose",
- "open",
- ]
- info = ticker.get_info()
- for key in keys:
- if key in info and info[key] is not None:
- print(f"Change: picked {key}, ${info[key]}.")
- old_price = float(info[key])
- delta = price - old_price
- return (delta / old_price * 100.0, delta)
- return (0.0, 0.0)
+ last_price = self.get_last_close(ticker)
+ delta = price - last_price
+ return (delta / last_price * 100.0, delta)
def periodic_render(self, key: str) -> bool:
"""Write an up-to-date stock page."""
symbols_finished = 0
for symbol in self.symbols:
ticker = yf.Ticker(symbol)
- # print(ticker.get_info())
if ticker is None:
logger.debug(f"Unknown symbol {symbol} -- ignored.")
continue
- name = stock_quote_renderer.get_ticker_name(ticker)
- price = stock_quote_renderer.get_price(ticker)
+ name = self.get_ticker_name(ticker)
+ if name is None:
+ logger.debug(f'Bad name for {symbol} -- skipped.')
+ continue
+ price = self.get_price(ticker)
if price is None:
logger.debug(f"No price information for {symbol} -- skipped.")
continue
- (percent_change, delta) = stock_quote_renderer.get_change_and_delta(
+ (percent_change, delta) = self.get_change_and_delta(
ticker, price
)
- # print(f"delta: {delta}, change: {percent_change}")
- cell_color = "#b00000" if percent_change < 0 else "#009000"
+ chart_filename = stock_quote_renderer.make_chart(symbol, ticker, "1y")
+ print(f"delta: {delta}, change: {percent_change}")
+ if percent_change < 0:
+ cell_color = "#b00000"
+ elif percent_change > 0:
+ cell_color = "#009000"
+ else:
+ cell_color = "#a0a0a0"
if symbols_finished % 4 == 0:
if symbols_finished > 0:
f.write("</TR>")
<TD WIDTH=20% HEIGHT=150 BGCOLOR="{cell_color}">
<!-- Container -->
<DIV style="position:relative;
- height:150px;">
+ height:175px;">
<!-- Symbol {symbol} -->
<DIV style="position:absolute;
bottom:50;
<DIV style="position:absolute;
left:10;
top:20;
- font-size:23pt;
- font-family: helvetica, arial, sans-serif;
- width:70%">
+ font-size:24pt;
+ font-family:helvetica, arial, sans-serif;
+ font-weight:900;
+ width:80%">
${price:.2f}<BR>
<I>({percent_change:.1f}%)</I><BR>
<B>${delta:.2f}</B>
</DIV>
+ <IMG SRC="{chart_filename}" WIDTH=100%>
</DIV>
</TD>"""
)
return True
# Test
-#x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
-#x.periodic_render(None)
+x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
+x.periodic_render(None)
from typing import Dict
from bs4 import BeautifulSoup # type: ignore
-from pyutilz import profanity_filter
+from scottutilz import profanity_filter
import file_writer
import grab_bag
#!/usr/bin/env python3
+import logging
import random
import re
-from typing import Dict, List
+from typing import Dict, List, Optional
import tweepy # type: ignore
-from pyutilz import profanity_filter
+from scottutilz import profanity_filter
import file_writer
import renderer
import kiosk_secrets as secrets
+logger = logging.getLogger(__name__)
+
+
class twitter_renderer(renderer.abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super().__init__(name_to_timeout_dict)
def fetch_tweets(self) -> bool:
try:
tweets = self.api.home_timeline(tweet_mode="extended", count=200)
- except:
+ except Exception as e:
+ logger.exception(e)
print("Exception while fetching tweets!")
return False
for tweet in tweets:
+ #j = tweet._json
+ #import json
+ #print(json.dumps(j, indent=4, sort_keys=True))
+ #print("------")
+
author = tweet.author.name
author_handle = tweet.author.screen_name
self.handles_by_author[author] = author_handle
if author not in self.tweets_by_author:
self.tweets_by_author[author] = []
- l = self.tweets_by_author[author]
- l.append(tweet)
+ x = self.tweets_by_author[author]
+ x.append(tweet)
return True
+ def get_hashtags(self, tweet) -> str:
+ ret = ' '
+ if 'entities' in tweet._json:
+ entities = tweet._json['entities']
+ if 'hashtags' in entities:
+ for x in entities['hashtags']:
+ ret += f'<B>#{x["text"]}</B>, '
+ ret = re.sub(', $', '', ret)
+ return ret
+
+ def get_media_url(self, tweet) -> Optional[str]:
+ if 'entities' in tweet._json:
+ entities = tweet._json['entities']
+ if 'media' in entities:
+ media = entities['media']
+ for x in media:
+ if 'media_url_https' in x:
+ return x['media_url_https']
+ return None
+
def shuffle_tweets(self) -> bool:
authors = list(self.tweets_by_author.keys())
author = random.choice(authors)
):
already_seen.add(text)
text = self.linkify(text)
- f.write("<LI><B>%s</B>\n" % text)
+ text = f'<B>{text}</B>'
+ text += self.get_hashtags(tweet)
+ media_url = self.get_media_url(tweet)
+ if media_url:
+ text = f'<TABLE WIDTH=100%><TD WITDH=70%>{text}</TD>'
+ text += f'<TD><IMG SRC="{media_url}" WIDTH=200></TD></TABLE>'
+ f.write(f"<LI>{text}")
count += 1
length += len(text)
if count > 3 or length > 270:
# Test
-# t = twitter_renderer(
-# {"Fetch Tweets" : 1,
-# "Shuffle Tweets" : 1})
-# x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
-# x = t.linkify(x)
-# print(x)
-# if t.fetch_tweets() == 0:
-# print("Error fetching tweets, none fetched.")
-# else:
-# t.shuffle_tweets()
+t = twitter_renderer(
+ {"Fetch Tweets" : 1,
+ "Shuffle Tweets" : 1})
+#x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
+#x = t.linkify(x)
+#print(x)
+if t.fetch_tweets() == 0:
+ print("Error fetching tweets, none fetched.")
+else:
+ t.shuffle_tweets()
return True
-x = weather_renderer({"Stevens": 1000}, "stevens")
-x.periodic_render("Stevens")
+#x = weather_renderer({"Stevens": 1000}, "stevens")
+#x.periodic_render("Stevens")