#!/usr/bin/env python3 import logging import os from typing import Any, Dict, List, Optional, Tuple import yfinance as yf # type: ignore import yahooquery as yq # type: ignore import plotly.graph_objects as go import file_writer import kiosk_constants import renderer logger = logging.getLogger(__name__) class stock_quote_renderer(renderer.abstaining_renderer): """Render the stock prices page.""" def __init__( self, name_to_timeout_dict: Dict[str, int], symbols: List[str], display_subs: Dict[str, str] = None, ) -> None: super().__init__(name_to_timeout_dict) self.backend = "yahooquery" self.symbols = symbols self.display_subs = display_subs self.info_cache: Dict[str, Any] = {} def get_financial_data(self, symbol: str) -> Optional[Any]: if symbol in self.info_cache: return self.info_cache[symbol] if self.backend == "yahooquery": ticker = yq.Ticker(symbol) if "Quote not found" in ticker.price[symbol]: return None self.info_cache[symbol] = ticker return ticker elif self.backend == "yfinance": ticker = yf.Ticker(symbol) if not ticker.fast_info["last_price"]: return None self.info_cache[symbol] = ticker return ticker else: raise Exception(f"Unknown backend: {self.backend}") def get_ticker_name(self, ticker: Any) -> Optional[str]: """Get friendly name of a ticker.""" if isinstance(ticker, yf.Ticker): return ticker.ticker elif isinstance(ticker, yq.Ticker): return ticker.symbols[0] return None @staticmethod def prioritized_get_item_from_dict( keys: List[str], dictionary: Dict[str, Any] ) -> Optional[Any]: result = None for key in keys: result = dictionary.get(key, None) if result: return result return None def get_price(self, ticker: Any) -> Optional[float]: """Get most recent price of a ticker.""" if isinstance(ticker, yf.Ticker): price = stock_quote_renderer.prioritized_get_item_from_dict( ["last_price", "open", "previous_close"], ticker.fast_info ) if price: return price price = stock_quote_renderer.prioritized_get_item_from_dict( ["bid", "ask", "lastMarket"], ticker.info, ) if price: return price return None elif isinstance(ticker, yq.Ticker): price = stock_quote_renderer.prioritized_get_item_from_dict( ["regularMarketPrice", "preMarketPrice", "regularMarketPreviousClose"], ticker.price[ticker.symbols[0]], ) if price: return price return None def get_last_close(self, ticker: Any) -> Optional[float]: if isinstance(ticker, yf.Ticker): last_close = stock_quote_renderer.prioritized_get_item_from_dict( ["previous_close", "open"], ticker.fast_info ) if last_close: return last_close last_close = stock_quote_renderer.prioritized_get_item_from_dict( ["preMarketPrice"], ticker.info ) if last_close: return last_close elif isinstance(ticker, yq.Ticker): last_close = stock_quote_renderer.prioritized_get_item_from_dict( ["regularMarketPreviousClose", "regularMarketOpen"], ticker.price[ticker.symbols[0]], ) if last_close: return last_close return self.get_price(ticker) def get_change_and_delta( self, ticker: Any, current_price: float ) -> Tuple[float, float]: """Given the current price, look up opening price and compute delta.""" last_price = self.get_last_close(ticker) delta = current_price - last_price return (delta / last_price * 100.0, delta) @staticmethod def make_chart(symbol: str, ticker: Any, period: str) -> str: base_filename = f"stock_chart_{symbol}.png" output_filename = os.path.join(kiosk_constants.pages_dir, base_filename) transparent = go.Layout( paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)", xaxis_rangeslider_visible=False, ) hist = ticker.history(period=period, interval="1wk") if isinstance(ticker, yq.Ticker): _open = "open" _high = "high" _low = "low" _close = "adjclose" elif isinstance(ticker, yf.Ticker): _open = "Open" _high = "High" _low = "Low" _close = "Close" else: raise Exception("Bad Ticker type") chart = go.Figure( data=go.Candlestick( open=hist[_open], high=hist[_high], low=hist[_low], close=hist[_close], ), layout=transparent, ) chart.update_xaxes(visible=False, showticklabels=False) chart.update_yaxes(side="right") chart.write_image(output_filename, format="png", width=600, height=350) print(f"Write {output_filename}...") return base_filename def periodic_render(self, key: str) -> bool: """Write an up-to-date stock page.""" with file_writer.file_writer("stock_3_86400.html") as f: f.write("

Stock Quotes


") f.write("") symbols_finished = 0 for symbol in self.symbols: ticker = self.get_financial_data(symbol) if ticker is None: logger.debug(f"Unknown symbol {symbol} -- ignored.") continue name = self.get_ticker_name(ticker) if name is None: logger.debug(f"Bad name for {symbol} -- skipped.") continue price = self.get_price(ticker) if price is None: logger.debug(f"No price information for {symbol} -- skipped.") continue (percent_change, delta) = self.get_change_and_delta(ticker, price) chart_filename = stock_quote_renderer.make_chart(symbol, ticker, "1y") print(f"delta: {delta}, change: {percent_change}") if percent_change < 0: cell_color = "#b00000" elif percent_change > 0: cell_color = "#009000" else: cell_color = "#a0a0a0" if symbols_finished % 4 == 0: if symbols_finished > 0: f.write("") f.write("") symbols_finished += 1 if self.display_subs is not None and symbol in self.display_subs: symbol = self.display_subs[symbol] f.write( f""" """ ) f.write("
{symbol}
${price:.2f}
({percent_change:.1f}%)
${delta:.2f}
") return True # Test # x = stock_quote_renderer( # {}, # ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], # {"BTC-USD": "BTC", "GC=F": "GOLD"}, # ) # x.periodic_render(None)