X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=stock_renderer.py;h=b13af908235908428ea1e0dc747fca0424624058;hb=HEAD;hp=2ff6895cbd30d69bb1151c7e99e5d03471fd6aed;hpb=c06bfef53f70551e7920bc4facce27f47b89e2ba;p=kiosk.git diff --git a/stock_renderer.py b/stock_renderer.py index 2ff6895..b13af90 100644 --- a/stock_renderer.py +++ b/stock_renderer.py @@ -1,65 +1,165 @@ #!/usr/bin/env python3 -from typing import Dict, List, Tuple -import yfinance as yf +import logging +import os +from typing import Any, Dict, List, Optional, Tuple + +import yfinance as yf # type: ignore +import yahooquery as yq # type: ignore +import plotly.graph_objects as go import file_writer +import kiosk_constants import renderer -class stock_quote_renderer(renderer.debuggable_abstaining_renderer): +logger = logging.getLogger(__name__) + + +class stock_quote_renderer(renderer.abstaining_renderer): """Render the stock prices page.""" def __init__( - self, name_to_timeout_dict: Dict[str, int], symbols: List[str] + self, + name_to_timeout_dict: Dict[str, int], + symbols: List[str], + display_subs: Dict[str, str] = None, ) -> None: - super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False) + super().__init__(name_to_timeout_dict) + self.backend = "yahooquery" self.symbols = symbols + self.display_subs = display_subs + self.info_cache: Dict[str, Any] = {} - def debug_prefix(self) -> str: - return "stock" + def get_financial_data(self, symbol: str) -> Optional[Any]: + if symbol in self.info_cache: + return self.info_cache[symbol] + if self.backend == "yahooquery": + ticker = yq.Ticker(symbol) + if "Quote not found" in ticker.price[symbol]: + return None + self.info_cache[symbol] = ticker + return ticker + elif self.backend == "yfinance": + ticker = yf.Ticker(symbol) + if not ticker.fast_info["last_price"]: + return None + self.info_cache[symbol] = ticker + return ticker + else: + raise Exception(f"Unknown backend: {self.backend}") - @staticmethod - def get_ticker_name(ticker: yf.ticker.Ticker) -> str: + def get_ticker_name(self, ticker: Any) -> Optional[str]: """Get friendly name of a ticker.""" - info = ticker.get_info() - return info["shortName"] + if isinstance(ticker, yf.Ticker): + return ticker.ticker + elif isinstance(ticker, yq.Ticker): + return ticker.symbols[0] + return None @staticmethod - def get_price(ticker: yf.ticker.Ticker) -> float: - """Get most recent price of a ticker.""" - keys = [ - "bid", - "ask", - "regularMarketPrice", - "lastMarket", - "open", - "previousClose", - ] - info = ticker.get_info() + def prioritized_get_item_from_dict( + keys: List[str], dictionary: Dict[str, Any] + ) -> Optional[Any]: + result = None for key in keys: - if key in info and info[key] is not None and info[key] != 0.0: - print(f"Price: picked {key}, ${info[key]}.") - return float(info[key]) + result = dictionary.get(key, None) + if result: + return result return None - @staticmethod + def get_price(self, ticker: Any) -> Optional[float]: + """Get most recent price of a ticker.""" + if isinstance(ticker, yf.Ticker): + price = stock_quote_renderer.prioritized_get_item_from_dict( + ["last_price", "open", "previous_close"], ticker.fast_info + ) + if price: + return price + + price = stock_quote_renderer.prioritized_get_item_from_dict( + ["bid", "ask", "lastMarket"], + ticker.info, + ) + if price: + return price + return None + elif isinstance(ticker, yq.Ticker): + price = stock_quote_renderer.prioritized_get_item_from_dict( + ["regularMarketPrice", "preMarketPrice", "regularMarketPreviousClose"], + ticker.price[ticker.symbols[0]], + ) + if price: + return price + return None + + def get_last_close(self, ticker: Any) -> Optional[float]: + if isinstance(ticker, yf.Ticker): + last_close = stock_quote_renderer.prioritized_get_item_from_dict( + ["previous_close", "open"], ticker.fast_info + ) + if last_close: + return last_close + + last_close = stock_quote_renderer.prioritized_get_item_from_dict( + ["preMarketPrice"], ticker.info + ) + if last_close: + return last_close + elif isinstance(ticker, yq.Ticker): + last_close = stock_quote_renderer.prioritized_get_item_from_dict( + ["regularMarketPreviousClose", "regularMarketOpen"], + ticker.price[ticker.symbols[0]], + ) + if last_close: + return last_close + return self.get_price(ticker) + def get_change_and_delta( - ticker: yf.ticker.Ticker, price: float + self, ticker: Any, current_price: float ) -> Tuple[float, float]: """Given the current price, look up opening price and compute delta.""" - keys = [ - "open", - "previousClose", - ] - info = ticker.get_info() - for key in keys: - if key in info and info[key] is not None: - print(f"Change: picked {key}, ${info[key]}.") - old_price = float(info[key]) - delta = price - old_price - return (delta / old_price * 100.0, delta) - return (0.0, 0.0) + last_price = self.get_last_close(ticker) + delta = current_price - last_price + return (delta / last_price * 100.0, delta) + + @staticmethod + def make_chart(symbol: str, ticker: Any, period: str) -> str: + base_filename = f"stock_chart_{symbol}.png" + output_filename = os.path.join(kiosk_constants.pages_dir, base_filename) + transparent = go.Layout( + paper_bgcolor="rgba(0,0,0,0)", + plot_bgcolor="rgba(0,0,0,0)", + xaxis_rangeslider_visible=False, + ) + + hist = ticker.history(period=period, interval="1wk") + if isinstance(ticker, yq.Ticker): + _open = "open" + _high = "high" + _low = "low" + _close = "adjclose" + elif isinstance(ticker, yf.Ticker): + _open = "Open" + _high = "High" + _low = "Low" + _close = "Close" + else: + raise Exception("Bad Ticker type") + chart = go.Figure( + data=go.Candlestick( + open=hist[_open], + high=hist[_high], + low=hist[_low], + close=hist[_close], + ), + layout=transparent, + ) + chart.update_xaxes(visible=False, showticklabels=False) + chart.update_yaxes(side="right") + chart.write_image(output_filename, format="png", width=600, height=350) + print(f"Write {output_filename}...") + return base_filename def periodic_render(self, key: str) -> bool: """Write an up-to-date stock page.""" @@ -67,36 +167,43 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer): f.write("

Stock Quotes


") f.write("") symbols_finished = 0 + for symbol in self.symbols: - # print(f"--- Symbol: {symbol} ---") - ticker = yf.Ticker(symbol) - print(type(ticker)) - # print(ticker.get_info()) + ticker = self.get_financial_data(symbol) if ticker is None: - self.debug_print(f"Unknown symbol {symbol} -- ignored.") + logger.debug(f"Unknown symbol {symbol} -- ignored.") + continue + name = self.get_ticker_name(ticker) + if name is None: + logger.debug(f"Bad name for {symbol} -- skipped.") continue - name = stock_quote_renderer.get_ticker_name(ticker) - price = stock_quote_renderer.get_price(ticker) + price = self.get_price(ticker) if price is None: - self.debug_print(f"No price information for {symbol} -- skipped.") + logger.debug(f"No price information for {symbol} -- skipped.") continue - (percent_change, delta) = stock_quote_renderer.get_change_and_delta( - ticker, price - ) - # print(f"delta: {delta}, change: {percent_change}") - cell_color = "#b00000" if percent_change < 0 else "#009000" + (percent_change, delta) = self.get_change_and_delta(ticker, price) + chart_filename = stock_quote_renderer.make_chart(symbol, ticker, "1y") + print(f"delta: {delta}, change: {percent_change}") + if percent_change < 0: + cell_color = "#b00000" + elif percent_change > 0: + cell_color = "#009000" + else: + cell_color = "#a0a0a0" if symbols_finished % 4 == 0: if symbols_finished > 0: f.write("") f.write("") symbols_finished += 1 + if self.display_subs is not None and symbol in self.display_subs: + symbol = self.display_subs[symbol] f.write( f""" """ ) @@ -127,5 +236,9 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer): # Test -# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GBTC", "OPTAX", "VNQ"]) +# x = stock_quote_renderer( +# {}, +# ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], +# {"BTC-USD": "BTC", "GC=F": "GOLD"}, +# ) # x.periodic_render(None)
- + height:175px;"> +
+ font-size:24pt; + font-family:helvetica, arial, sans-serif; + font-weight:900; + width:80%"> ${price:.2f}
({percent_change:.1f}%)
${delta:.2f}
+