X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=stock_renderer.py;h=60e70432ccbbbfee620119dc32b718d407222da8;hb=6cd5b068127501d2b48e8ac67b7432bffc5fce53;hp=7b34455eb610283946eeb37a9db571449b9e3c22;hpb=5e241dc47e497c547463cecc07946ea6882835a7;p=kiosk.git
diff --git a/stock_renderer.py b/stock_renderer.py
index 7b34455..60e7043 100644
--- a/stock_renderer.py
+++ b/stock_renderer.py
@@ -1,136 +1,188 @@
-from bs4 import BeautifulSoup
-from threading import Thread
-import datetime
+#!/usr/bin/env python3
+
+import logging
+import os
+from typing import Any, Dict, List, Optional, Tuple
+
+import yfinance as yf # type: ignore
+import plotly.graph_objects as go
+
import file_writer
-import json
-import re
+import kiosk_constants
import renderer
-import random
-import secrets
-import time
-import urllib.request, urllib.error, urllib.parse
-class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
- # format exchange:symbol
- def __init__(self, name_to_timeout_dict, symbols):
- super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False)
- self.symbols = symbols
- self.prefix = "https://www.alphavantage.co/query?"
- self.thread = None
-
- def debug_prefix(self):
- return "stock"
-
- def get_random_key(self):
- return random.choice(secrets.alphavantage_keys)
-
- def periodic_render(self, key):
- now = datetime.datetime.now()
- if (
- now.hour < (9 - 3)
- or now.hour >= (17 - 3)
- or datetime.datetime.today().weekday() > 4
- ):
- self.debug_print("The stock market is closed so not re-rendering")
- return True
-
- if self.thread is None or not self.thread.is_alive():
- self.debug_print("Spinning up a background thread...")
- self.thread = Thread(target=self.thread_internal_render, args=())
- self.thread.start()
- return True
+logger = logging.getLogger(__file__)
- def thread_internal_render(self):
- symbols_finished = 0
- f = file_writer.file_writer("stock_3_86400.html")
- f.write("
Stock Quotes
")
- f.write("")
- for symbol in self.symbols:
- # print "---------- Working on %s\n" % symbol
-
- # https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&apikey=
-
- # https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=MSFT&apikey=
-
- attempts = 0
- cooked = ""
- while True:
- key = self.get_random_key()
- url = self.prefix + "function=GLOBAL_QUOTE&symbol=%s&apikey=%s" % (
- symbol,
- key,
+
+class stock_quote_renderer(renderer.abstaining_renderer):
+ """Render the stock prices page."""
+
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ symbols: List[str],
+ display_subs: Dict[str, str] = None,
+ ) -> None:
+ super().__init__(name_to_timeout_dict)
+ self.symbols = symbols
+ self.display_subs = display_subs
+ self.info_cache: Dict[yf.ticker.Ticker, Dict] = {}
+
+ def cache_info(self, ticker: yf.ticker.Ticker) -> Dict:
+ if ticker in self.info_cache:
+ return self.info_cache[ticker]
+ i = ticker.get_info()
+ self.info_cache[ticker] = i
+ return i
+
+ def get_ticker_name(self, ticker: yf.Ticker) -> str:
+ """Get friendly name of a ticker."""
+ info = self.cache_info(ticker)
+ if "shortName" in info:
+ return info["shortName"]
+ return ticker
+
+ @staticmethod
+ def get_item_from_dict(keys: List[str], dictionary: Dict[str, Any]) -> Optional[Any]:
+ result = None
+ for key in keys:
+ result = dictionary.get(key, None)
+ if result:
+ return result
+ return None
+
+ def get_price(self, ticker: yf.Ticker) -> Optional[float]:
+ """Get most recent price of a ticker."""
+
+ # First try fast_info
+ price = stock_quote_renderer.get_item_from_dict(
+ ['last_price',
+ 'open',
+ 'previous_close'],
+ ticker.fast_info)
+ if price:
+ return price
+
+ # Next try info
+ price = stock_quote_renderer.get_item_from_dict(
+ ['bid',
+ 'ask',
+ 'lastMarket'],
+ self.cache_info(ticker))
+ if price:
+ return price
+
+ # Finally, fall back on history
+ hist = ticker.history(period="1d").to_dict()['Close']
+ latest = None
+ latest_price = None
+ for k, v in hist.items():
+ if latest is None or k > latest:
+ price = hist[k]
+ if price is not None:
+ latest = k
+ latest_price = price
+ print(f"Price: fell back on latest close {latest_price} at {latest}")
+ return latest_price
+
+ @staticmethod
+ def make_chart(symbol: str, ticker: yf.Ticker, period: str) -> str:
+ base_filename = f'stock_chart_{symbol}.png'
+ output_filename = os.path.join(kiosk_constants.pages_dir, base_filename)
+ transparent = go.Layout(
+ paper_bgcolor='rgba(0,0,0,0)',
+ plot_bgcolor='rgba(0,0,0,0)',
+ xaxis_rangeslider_visible=False,
+ )
+ hist = ticker.history(period=period, interval="1wk")
+ chart = go.Figure(
+ data=go.Candlestick(
+ open=hist['Open'],
+ high=hist['High'],
+ low=hist['Low'],
+ close=hist['Close'],
+ ),
+ layout=transparent,
+ )
+ chart.update_xaxes(visible=False, showticklabels=False)
+ chart.update_yaxes(side="right")
+ chart.write_image(output_filename, format="png", width=600, height=350)
+ print(f"Write {output_filename}...")
+ return base_filename
+
+ def get_last_close(
+ self,
+ ticker: yf.Ticker
+ ) -> float:
+ last_close = stock_quote_renderer.get_item_from_dict(
+ ['previous_close',
+ 'open'],
+ ticker.fast_info)
+ if last_close:
+ return last_close
+
+ last_close = stock_quote_renderer.get_item_from_dict(
+ ['preMarketPrice'],
+ self.cache_info(ticker))
+ if last_close:
+ return last_close
+ return self.get_price(ticker)
+
+ def get_change_and_delta(
+ self,
+ ticker: yf.Ticker,
+ price: float
+ ) -> Tuple[float, float]:
+ """Given the current price, look up opening price and compute delta."""
+ last_price = self.get_last_close(ticker)
+ delta = price - last_price
+ return (delta / last_price * 100.0, delta)
+
+ def periodic_render(self, key: str) -> bool:
+ """Write an up-to-date stock page."""
+ with file_writer.file_writer("stock_3_86400.html") as f:
+ f.write("Stock Quotes
")
+ f.write("")
+ symbols_finished = 0
+ for symbol in self.symbols:
+ ticker = yf.Ticker(symbol)
+ if ticker is None:
+ logger.debug(f"Unknown symbol {symbol} -- ignored.")
+ continue
+ name = self.get_ticker_name(ticker)
+ if name is None:
+ logger.debug(f'Bad name for {symbol} -- skipped.')
+ continue
+ price = self.get_price(ticker)
+ if price is None:
+ logger.debug(f"No price information for {symbol} -- skipped.")
+ continue
+ (percent_change, delta) = self.get_change_and_delta(
+ ticker, price
)
- raw = urllib.request.urlopen(url).read()
- cooked = json.loads(raw)
- if "Global Quote" not in cooked:
- # print "%s\n" % cooked
- print(
- "Failure %d, sleep %d sec...\n" % (attempts + 1, 2 ** attempts)
- )
- time.sleep(2 ** attempts)
- attempts += 1
- if attempts > 10: # we'll wait up to 512 seconds per symbol
- break
- else:
- break
-
- # These fuckers...
- if "Global Quote" not in cooked:
- print("Can't get data for symbol %s: %s\n" % (symbol, raw))
- continue
- cooked = cooked["Global Quote"]
-
- # {
- # u'Global Quote':
- # {
- # u'01. symbol': u'MSFT',
- # u'02. open': u'151.2900',
- # u'03. high': u'151.8900',
- # u'04. low': u'150.7650',
- # u'05. price': u'151.1300',
- # u'06. volume': u'16443559',
- # u'07. latest trading day': u'2019-12-10',
- # u'08. previous close': u'151.3600',
- # u'09. change': u'-0.2300'
- # u'10. change percent': u'-0.1520%',
- # }
- # }
-
- price = "?????"
- if "05. price" in cooked:
- price = cooked["05. price"]
- price = price[:-2]
-
- percent_change = "?????"
- if "10. change percent" in cooked:
- percent_change = cooked["10. change percent"]
- if not "-" in percent_change:
- percent_change = "+" + percent_change
-
- change = "?????"
- cell_color = "#bbbbbb"
- if "09. change" in cooked:
- change = cooked["09. change"]
- if "-" in change:
+ chart_filename = stock_quote_renderer.make_chart(symbol, ticker, "1y")
+ print(f"delta: {delta}, change: {percent_change}")
+ if percent_change < 0:
cell_color = "#b00000"
- else:
+ elif percent_change > 0:
cell_color = "#009000"
- change = change[:-2]
-
- if symbols_finished % 4 == 0:
- if symbols_finished > 0:
- f.write("")
- f.write("")
- symbols_finished += 1
-
- f.write(
- """
-
+ else:
+ cell_color = "#a0a0a0"
+ if symbols_finished % 4 == 0:
+ if symbols_finished > 0:
+ f.write(" |
")
+ f.write("")
+ symbols_finished += 1
+ if self.display_subs is not None and symbol in self.display_subs:
+ symbol = self.display_subs[symbol]
+ f.write(
+ f"""
+
-
+ height:175px;">
+
- %s
+ {symbol}
-
+
- $%s
- (%s)
- $%s
+ font-size:24pt;
+ font-family:helvetica, arial, sans-serif;
+ font-weight:900;
+ width:80%">
+ ${price:.2f}
+ ({percent_change:.1f}%)
+ ${delta:.2f}
+
| """
- % (cell_color, symbol, price, percent_change, change)
- )
- f.write("
")
- f.close()
+ )
+ f.write("
")
return True
-
-# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GOOGL", "OPTAX", "VNQ"])
-# x.periodic_render(None)
-# x.periodic_render(None)
+# Test
+x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
+x.periodic_render(None)