Make stock renderer optionally use yahooquery because yfinace has
authorScott Gasch <[email protected]>
Tue, 21 Feb 2023 18:58:59 +0000 (10:58 -0800)
committerScott Gasch <[email protected]>
Tue, 21 Feb 2023 18:58:59 +0000 (10:58 -0800)
been screwed up for so long.

stock_renderer.py

index 41a56ba650a29a4134d89745b05df5f653e8bf6c..2897edb7fb474b872ffa367096f50e19338bcb5f 100644 (file)
@@ -5,6 +5,7 @@ import os
 from typing import Any, Dict, List, Optional, Tuple
 
 import yfinance as yf  # type: ignore
+import yahooquery as yq  # type: ignore
 import plotly.graph_objects as go
 
 import file_writer
@@ -25,26 +26,39 @@ class stock_quote_renderer(renderer.abstaining_renderer):
         display_subs: Dict[str, str] = None,
     ) -> None:
         super().__init__(name_to_timeout_dict)
+        self.backend = "yahooquery"
         self.symbols = symbols
         self.display_subs = display_subs
-        self.info_cache: Dict[yf.ticker.Ticker, Dict] = {}
-
-    def cache_info(self, ticker: yf.ticker.Ticker) -> Dict:
-        if ticker in self.info_cache:
-            return self.info_cache[ticker]
-        i = ticker.get_info()
-        self.info_cache[ticker] = i
-        return i
-
-    def get_ticker_name(self, ticker: yf.Ticker) -> str:
+        self.info_cache: Dict[str, Any] = {}
+
+    def get_financial_data(self, symbol: str) -> Optional[Any]:
+        if symbol in self.info_cache:
+            return self.info_cache[symbol]
+        if self.backend == "yahooquery":
+            ticker = yq.Ticker(symbol)
+            if "Quote not found" in ticker.price[symbol]:
+                return None
+            self.info_cache[symbol] = ticker
+            return ticker
+        elif self.backend == "yfinance":
+            ticker = yf.Ticker(symbol)
+            if not ticker.fast_info["last_price"]:
+                return None
+            self.info_cache[symbol] = ticker
+            return ticker
+        else:
+            raise Exception(f"Unknown backend: {self.backend}")
+
+    def get_ticker_name(self, ticker: Any) -> Optional[str]:
         """Get friendly name of a ticker."""
-        info = self.cache_info(ticker)
-        if "shortName" in info:
-            return info["shortName"]
-        return ticker
+        if isinstance(ticker, yf.Ticker):
+            return ticker.ticker
+        elif isinstance(ticker, yq.Ticker):
+            return ticker.symbols[0]
+        return None
 
     @staticmethod
-    def get_item_from_dict(
+    def prioritized_get_item_from_dict(
         keys: List[str], dictionary: Dict[str, Any]
     ) -> Optional[Any]:
         result = None
@@ -54,38 +68,63 @@ class stock_quote_renderer(renderer.abstaining_renderer):
                 return result
         return None
 
-    def get_price(self, ticker: yf.Ticker) -> Optional[float]:
+    def get_price(self, ticker: Any) -> Optional[float]:
         """Get most recent price of a ticker."""
+        if isinstance(ticker, yf.Ticker):
+            price = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["last_price", "open", "previous_close"], ticker.fast_info
+            )
+            if price:
+                return price
+
+            price = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["bid", "ask", "lastMarket"],
+                ticker.info,
+            )
+            if price:
+                return price
+            return None
+        elif isinstance(ticker, yq.Ticker):
+            price = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["regularMarketPrice", "preMarketPrice", "regularMarketPreviousClose"],
+                ticker.price[ticker.symbols[0]],
+            )
+            if price:
+                return price
+        return None
 
-        # First try fast_info
-        price = stock_quote_renderer.get_item_from_dict(
-            ["last_price", "open", "previous_close"], ticker.fast_info
-        )
-        if price:
-            return price
+    def get_last_close(self, ticker: Any) -> Optional[float]:
+        if isinstance(ticker, yf.Ticker):
+            last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["previous_close", "open"], ticker.fast_info
+            )
+            if last_close:
+                return last_close
+
+            last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["preMarketPrice"], ticker.info
+            )
+            if last_close:
+                return last_close
+        elif isinstance(ticker, yq.Ticker):
+            last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["regularMarketPreviousClose", "regularMarketOpen"],
+                ticker.price[ticker.symbols[0]],
+            )
+            if last_close:
+                return last_close
+        return self.get_price(ticker)
 
-        # Next try info
-        price = stock_quote_renderer.get_item_from_dict(
-            ["bid", "ask", "lastMarket"], self.cache_info(ticker)
-        )
-        if price:
-            return price
-
-        # Finally, fall back on history
-        hist = ticker.history(period="1d").to_dict()["Close"]
-        latest = None
-        latest_price = None
-        for k, v in hist.items():
-            if latest is None or k > latest:
-                price = hist[k]
-                if price is not None:
-                    latest = k
-                    latest_price = price
-        print(f"Price: fell back on latest close {latest_price} at {latest}")
-        return latest_price
+    def get_change_and_delta(
+        self, ticker: Any, current_price: float
+    ) -> Tuple[float, float]:
+        """Given the current price, look up opening price and compute delta."""
+        last_price = self.get_last_close(ticker)
+        delta = current_price - last_price
+        return (delta / last_price * 100.0, delta)
 
     @staticmethod
-    def make_chart(symbol: str, ticker: yf.Ticker, period: str) -> str:
+    def make_chart(symbol: str, ticker: Any, period: str) -> str:
         base_filename = f"stock_chart_{symbol}.png"
         output_filename = os.path.join(kiosk_constants.pages_dir, base_filename)
         transparent = go.Layout(
@@ -93,6 +132,7 @@ class stock_quote_renderer(renderer.abstaining_renderer):
             plot_bgcolor="rgba(0,0,0,0)",
             xaxis_rangeslider_visible=False,
         )
+
         hist = ticker.history(period=period, interval="1wk")
         chart = go.Figure(
             data=go.Candlestick(
@@ -109,36 +149,15 @@ class stock_quote_renderer(renderer.abstaining_renderer):
         print(f"Write {output_filename}...")
         return base_filename
 
-    def get_last_close(self, ticker: yf.Ticker) -> float:
-        last_close = stock_quote_renderer.get_item_from_dict(
-            ["previous_close", "open"], ticker.fast_info
-        )
-        if last_close:
-            return last_close
-
-        last_close = stock_quote_renderer.get_item_from_dict(
-            ["preMarketPrice"], self.cache_info(ticker)
-        )
-        if last_close:
-            return last_close
-        return self.get_price(ticker)
-
-    def get_change_and_delta(
-        self, ticker: yf.Ticker, price: float
-    ) -> Tuple[float, float]:
-        """Given the current price, look up opening price and compute delta."""
-        last_price = self.get_last_close(ticker)
-        delta = price - last_price
-        return (delta / last_price * 100.0, delta)
-
     def periodic_render(self, key: str) -> bool:
         """Write an up-to-date stock page."""
         with file_writer.file_writer("stock_3_86400.html") as f:
             f.write("<H1>Stock Quotes</H1><HR>")
             f.write("<TABLE WIDTH=99%>")
             symbols_finished = 0
+
             for symbol in self.symbols:
-                ticker = yf.Ticker(symbol)
+                ticker = self.get_financial_data(symbol)
                 if ticker is None:
                     logger.debug(f"Unknown symbol {symbol} -- ignored.")
                     continue
@@ -205,5 +224,9 @@ class stock_quote_renderer(renderer.abstaining_renderer):
 
 
 # Test
-# x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
+# x = stock_quote_renderer(
+#    {},
+#    ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"],
+#    {"BTC-USD": "BTC", "GC=F": "GOLD"},
+# )
 # x.periodic_render(None)