From 6e31b94aea2ed2ba12de6b03f529ea2a018eb0a0 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Tue, 21 Feb 2023 10:58:59 -0800 Subject: [PATCH] Make stock renderer optionally use yahooquery because yfinace has been screwed up for so long. --- stock_renderer.py | 155 ++++++++++++++++++++++++++-------------------- 1 file changed, 89 insertions(+), 66 deletions(-) diff --git a/stock_renderer.py b/stock_renderer.py index 41a56ba..2897edb 100644 --- a/stock_renderer.py +++ b/stock_renderer.py @@ -5,6 +5,7 @@ import os from typing import Any, Dict, List, Optional, Tuple import yfinance as yf # type: ignore +import yahooquery as yq # type: ignore import plotly.graph_objects as go import file_writer @@ -25,26 +26,39 @@ class stock_quote_renderer(renderer.abstaining_renderer): display_subs: Dict[str, str] = None, ) -> None: super().__init__(name_to_timeout_dict) + self.backend = "yahooquery" self.symbols = symbols self.display_subs = display_subs - self.info_cache: Dict[yf.ticker.Ticker, Dict] = {} - - def cache_info(self, ticker: yf.ticker.Ticker) -> Dict: - if ticker in self.info_cache: - return self.info_cache[ticker] - i = ticker.get_info() - self.info_cache[ticker] = i - return i - - def get_ticker_name(self, ticker: yf.Ticker) -> str: + self.info_cache: Dict[str, Any] = {} + + def get_financial_data(self, symbol: str) -> Optional[Any]: + if symbol in self.info_cache: + return self.info_cache[symbol] + if self.backend == "yahooquery": + ticker = yq.Ticker(symbol) + if "Quote not found" in ticker.price[symbol]: + return None + self.info_cache[symbol] = ticker + return ticker + elif self.backend == "yfinance": + ticker = yf.Ticker(symbol) + if not ticker.fast_info["last_price"]: + return None + self.info_cache[symbol] = ticker + return ticker + else: + raise Exception(f"Unknown backend: {self.backend}") + + def get_ticker_name(self, ticker: Any) -> Optional[str]: """Get friendly name of a ticker.""" - info = self.cache_info(ticker) - if "shortName" in info: - return info["shortName"] - return ticker + if isinstance(ticker, yf.Ticker): + return ticker.ticker + elif isinstance(ticker, yq.Ticker): + return ticker.symbols[0] + return None @staticmethod - def get_item_from_dict( + def prioritized_get_item_from_dict( keys: List[str], dictionary: Dict[str, Any] ) -> Optional[Any]: result = None @@ -54,38 +68,63 @@ class stock_quote_renderer(renderer.abstaining_renderer): return result return None - def get_price(self, ticker: yf.Ticker) -> Optional[float]: + def get_price(self, ticker: Any) -> Optional[float]: """Get most recent price of a ticker.""" + if isinstance(ticker, yf.Ticker): + price = stock_quote_renderer.prioritized_get_item_from_dict( + ["last_price", "open", "previous_close"], ticker.fast_info + ) + if price: + return price + + price = stock_quote_renderer.prioritized_get_item_from_dict( + ["bid", "ask", "lastMarket"], + ticker.info, + ) + if price: + return price + return None + elif isinstance(ticker, yq.Ticker): + price = stock_quote_renderer.prioritized_get_item_from_dict( + ["regularMarketPrice", "preMarketPrice", "regularMarketPreviousClose"], + ticker.price[ticker.symbols[0]], + ) + if price: + return price + return None - # First try fast_info - price = stock_quote_renderer.get_item_from_dict( - ["last_price", "open", "previous_close"], ticker.fast_info - ) - if price: - return price + def get_last_close(self, ticker: Any) -> Optional[float]: + if isinstance(ticker, yf.Ticker): + last_close = stock_quote_renderer.prioritized_get_item_from_dict( + ["previous_close", "open"], ticker.fast_info + ) + if last_close: + return last_close + + last_close = stock_quote_renderer.prioritized_get_item_from_dict( + ["preMarketPrice"], ticker.info + ) + if last_close: + return last_close + elif isinstance(ticker, yq.Ticker): + last_close = stock_quote_renderer.prioritized_get_item_from_dict( + ["regularMarketPreviousClose", "regularMarketOpen"], + ticker.price[ticker.symbols[0]], + ) + if last_close: + return last_close + return self.get_price(ticker) - # Next try info - price = stock_quote_renderer.get_item_from_dict( - ["bid", "ask", "lastMarket"], self.cache_info(ticker) - ) - if price: - return price - - # Finally, fall back on history - hist = ticker.history(period="1d").to_dict()["Close"] - latest = None - latest_price = None - for k, v in hist.items(): - if latest is None or k > latest: - price = hist[k] - if price is not None: - latest = k - latest_price = price - print(f"Price: fell back on latest close {latest_price} at {latest}") - return latest_price + def get_change_and_delta( + self, ticker: Any, current_price: float + ) -> Tuple[float, float]: + """Given the current price, look up opening price and compute delta.""" + last_price = self.get_last_close(ticker) + delta = current_price - last_price + return (delta / last_price * 100.0, delta) @staticmethod - def make_chart(symbol: str, ticker: yf.Ticker, period: str) -> str: + def make_chart(symbol: str, ticker: Any, period: str) -> str: base_filename = f"stock_chart_{symbol}.png" output_filename = os.path.join(kiosk_constants.pages_dir, base_filename) transparent = go.Layout( @@ -93,6 +132,7 @@ class stock_quote_renderer(renderer.abstaining_renderer): plot_bgcolor="rgba(0,0,0,0)", xaxis_rangeslider_visible=False, ) + hist = ticker.history(period=period, interval="1wk") chart = go.Figure( data=go.Candlestick( @@ -109,36 +149,15 @@ class stock_quote_renderer(renderer.abstaining_renderer): print(f"Write {output_filename}...") return base_filename - def get_last_close(self, ticker: yf.Ticker) -> float: - last_close = stock_quote_renderer.get_item_from_dict( - ["previous_close", "open"], ticker.fast_info - ) - if last_close: - return last_close - - last_close = stock_quote_renderer.get_item_from_dict( - ["preMarketPrice"], self.cache_info(ticker) - ) - if last_close: - return last_close - return self.get_price(ticker) - - def get_change_and_delta( - self, ticker: yf.Ticker, price: float - ) -> Tuple[float, float]: - """Given the current price, look up opening price and compute delta.""" - last_price = self.get_last_close(ticker) - delta = price - last_price - return (delta / last_price * 100.0, delta) - def periodic_render(self, key: str) -> bool: """Write an up-to-date stock page.""" with file_writer.file_writer("stock_3_86400.html") as f: f.write("

Stock Quotes


") f.write("") symbols_finished = 0 + for symbol in self.symbols: - ticker = yf.Ticker(symbol) + ticker = self.get_financial_data(symbol) if ticker is None: logger.debug(f"Unknown symbol {symbol} -- ignored.") continue @@ -205,5 +224,9 @@ class stock_quote_renderer(renderer.abstaining_renderer): # Test -# x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" }) +# x = stock_quote_renderer( +# {}, +# ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], +# {"BTC-USD": "BTC", "GC=F": "GOLD"}, +# ) # x.periodic_render(None) -- 2.45.0