Changes.
[kiosk.git] / stock_renderer.py
index 41a56ba650a29a4134d89745b05df5f653e8bf6c..b13af908235908428ea1e0dc747fca0424624058 100644 (file)
@@ -5,6 +5,7 @@ import os
 from typing import Any, Dict, List, Optional, Tuple
 
 import yfinance as yf  # type: ignore
+import yahooquery as yq  # type: ignore
 import plotly.graph_objects as go
 
 import file_writer
@@ -12,7 +13,7 @@ import kiosk_constants
 import renderer
 
 
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
 
 
 class stock_quote_renderer(renderer.abstaining_renderer):
@@ -25,26 +26,39 @@ class stock_quote_renderer(renderer.abstaining_renderer):
         display_subs: Dict[str, str] = None,
     ) -> None:
         super().__init__(name_to_timeout_dict)
+        self.backend = "yahooquery"
         self.symbols = symbols
         self.display_subs = display_subs
-        self.info_cache: Dict[yf.ticker.Ticker, Dict] = {}
-
-    def cache_info(self, ticker: yf.ticker.Ticker) -> Dict:
-        if ticker in self.info_cache:
-            return self.info_cache[ticker]
-        i = ticker.get_info()
-        self.info_cache[ticker] = i
-        return i
-
-    def get_ticker_name(self, ticker: yf.Ticker) -> str:
+        self.info_cache: Dict[str, Any] = {}
+
+    def get_financial_data(self, symbol: str) -> Optional[Any]:
+        if symbol in self.info_cache:
+            return self.info_cache[symbol]
+        if self.backend == "yahooquery":
+            ticker = yq.Ticker(symbol)
+            if "Quote not found" in ticker.price[symbol]:
+                return None
+            self.info_cache[symbol] = ticker
+            return ticker
+        elif self.backend == "yfinance":
+            ticker = yf.Ticker(symbol)
+            if not ticker.fast_info["last_price"]:
+                return None
+            self.info_cache[symbol] = ticker
+            return ticker
+        else:
+            raise Exception(f"Unknown backend: {self.backend}")
+
+    def get_ticker_name(self, ticker: Any) -> Optional[str]:
         """Get friendly name of a ticker."""
-        info = self.cache_info(ticker)
-        if "shortName" in info:
-            return info["shortName"]
-        return ticker
+        if isinstance(ticker, yf.Ticker):
+            return ticker.ticker
+        elif isinstance(ticker, yq.Ticker):
+            return ticker.symbols[0]
+        return None
 
     @staticmethod
-    def get_item_from_dict(
+    def prioritized_get_item_from_dict(
         keys: List[str], dictionary: Dict[str, Any]
     ) -> Optional[Any]:
         result = None
@@ -54,38 +68,63 @@ class stock_quote_renderer(renderer.abstaining_renderer):
                 return result
         return None
 
-    def get_price(self, ticker: yf.Ticker) -> Optional[float]:
+    def get_price(self, ticker: Any) -> Optional[float]:
         """Get most recent price of a ticker."""
+        if isinstance(ticker, yf.Ticker):
+            price = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["last_price", "open", "previous_close"], ticker.fast_info
+            )
+            if price:
+                return price
+
+            price = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["bid", "ask", "lastMarket"],
+                ticker.info,
+            )
+            if price:
+                return price
+            return None
+        elif isinstance(ticker, yq.Ticker):
+            price = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["regularMarketPrice", "preMarketPrice", "regularMarketPreviousClose"],
+                ticker.price[ticker.symbols[0]],
+            )
+            if price:
+                return price
+        return None
 
-        # First try fast_info
-        price = stock_quote_renderer.get_item_from_dict(
-            ["last_price", "open", "previous_close"], ticker.fast_info
-        )
-        if price:
-            return price
+    def get_last_close(self, ticker: Any) -> Optional[float]:
+        if isinstance(ticker, yf.Ticker):
+            last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["previous_close", "open"], ticker.fast_info
+            )
+            if last_close:
+                return last_close
+
+            last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["preMarketPrice"], ticker.info
+            )
+            if last_close:
+                return last_close
+        elif isinstance(ticker, yq.Ticker):
+            last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+                ["regularMarketPreviousClose", "regularMarketOpen"],
+                ticker.price[ticker.symbols[0]],
+            )
+            if last_close:
+                return last_close
+        return self.get_price(ticker)
 
-        # Next try info
-        price = stock_quote_renderer.get_item_from_dict(
-            ["bid", "ask", "lastMarket"], self.cache_info(ticker)
-        )
-        if price:
-            return price
-
-        # Finally, fall back on history
-        hist = ticker.history(period="1d").to_dict()["Close"]
-        latest = None
-        latest_price = None
-        for k, v in hist.items():
-            if latest is None or k > latest:
-                price = hist[k]
-                if price is not None:
-                    latest = k
-                    latest_price = price
-        print(f"Price: fell back on latest close {latest_price} at {latest}")
-        return latest_price
+    def get_change_and_delta(
+        self, ticker: Any, current_price: float
+    ) -> Tuple[float, float]:
+        """Given the current price, look up opening price and compute delta."""
+        last_price = self.get_last_close(ticker)
+        delta = current_price - last_price
+        return (delta / last_price * 100.0, delta)
 
     @staticmethod
-    def make_chart(symbol: str, ticker: yf.Ticker, period: str) -> str:
+    def make_chart(symbol: str, ticker: Any, period: str) -> str:
         base_filename = f"stock_chart_{symbol}.png"
         output_filename = os.path.join(kiosk_constants.pages_dir, base_filename)
         transparent = go.Layout(
@@ -93,13 +132,26 @@ class stock_quote_renderer(renderer.abstaining_renderer):
             plot_bgcolor="rgba(0,0,0,0)",
             xaxis_rangeslider_visible=False,
         )
+
         hist = ticker.history(period=period, interval="1wk")
+        if isinstance(ticker, yq.Ticker):
+            _open = "open"
+            _high = "high"
+            _low = "low"
+            _close = "adjclose"
+        elif isinstance(ticker, yf.Ticker):
+            _open = "Open"
+            _high = "High"
+            _low = "Low"
+            _close = "Close"
+        else:
+            raise Exception("Bad Ticker type")
         chart = go.Figure(
             data=go.Candlestick(
-                open=hist["Open"],
-                high=hist["High"],
-                low=hist["Low"],
-                close=hist["Close"],
+                open=hist[_open],
+                high=hist[_high],
+                low=hist[_low],
+                close=hist[_close],
             ),
             layout=transparent,
         )
@@ -109,36 +161,15 @@ class stock_quote_renderer(renderer.abstaining_renderer):
         print(f"Write {output_filename}...")
         return base_filename
 
-    def get_last_close(self, ticker: yf.Ticker) -> float:
-        last_close = stock_quote_renderer.get_item_from_dict(
-            ["previous_close", "open"], ticker.fast_info
-        )
-        if last_close:
-            return last_close
-
-        last_close = stock_quote_renderer.get_item_from_dict(
-            ["preMarketPrice"], self.cache_info(ticker)
-        )
-        if last_close:
-            return last_close
-        return self.get_price(ticker)
-
-    def get_change_and_delta(
-        self, ticker: yf.Ticker, price: float
-    ) -> Tuple[float, float]:
-        """Given the current price, look up opening price and compute delta."""
-        last_price = self.get_last_close(ticker)
-        delta = price - last_price
-        return (delta / last_price * 100.0, delta)
-
     def periodic_render(self, key: str) -> bool:
         """Write an up-to-date stock page."""
         with file_writer.file_writer("stock_3_86400.html") as f:
             f.write("<H1>Stock Quotes</H1><HR>")
             f.write("<TABLE WIDTH=99%>")
             symbols_finished = 0
+
             for symbol in self.symbols:
-                ticker = yf.Ticker(symbol)
+                ticker = self.get_financial_data(symbol)
                 if ticker is None:
                     logger.debug(f"Unknown symbol {symbol} -- ignored.")
                     continue
@@ -205,5 +236,9 @@ class stock_quote_renderer(renderer.abstaining_renderer):
 
 
 # Test
-# x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
+# x = stock_quote_renderer(
+#    {},
+#    ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"],
+#    {"BTC-USD": "BTC", "GC=F": "GOLD"},
+# )
 # x.periodic_render(None)