from typing import Any, Dict, List, Optional, Tuple
import yfinance as yf # type: ignore
+import yahooquery as yq # type: ignore
import plotly.graph_objects as go
import file_writer
import renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class stock_quote_renderer(renderer.abstaining_renderer):
"""Render the stock prices page."""
def __init__(
- self,
- name_to_timeout_dict: Dict[str, int],
- symbols: List[str],
- display_subs: Dict[str, str] = None,
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ symbols: List[str],
+ display_subs: Dict[str, str] = None,
) -> None:
super().__init__(name_to_timeout_dict)
+ self.backend = "yahooquery"
self.symbols = symbols
self.display_subs = display_subs
- self.info_cache: Dict[yf.ticker.Ticker, Dict] = {}
-
- def cache_info(self, ticker: yf.ticker.Ticker) -> Dict:
- if ticker in self.info_cache:
- return self.info_cache[ticker]
- i = ticker.get_info()
- self.info_cache[ticker] = i
- return i
-
- def get_ticker_name(self, ticker: yf.Ticker) -> str:
+ self.info_cache: Dict[str, Any] = {}
+
+ def get_financial_data(self, symbol: str) -> Optional[Any]:
+ if symbol in self.info_cache:
+ return self.info_cache[symbol]
+ if self.backend == "yahooquery":
+ ticker = yq.Ticker(symbol)
+ if "Quote not found" in ticker.price[symbol]:
+ return None
+ self.info_cache[symbol] = ticker
+ return ticker
+ elif self.backend == "yfinance":
+ ticker = yf.Ticker(symbol)
+ if not ticker.fast_info["last_price"]:
+ return None
+ self.info_cache[symbol] = ticker
+ return ticker
+ else:
+ raise Exception(f"Unknown backend: {self.backend}")
+
+ def get_ticker_name(self, ticker: Any) -> Optional[str]:
"""Get friendly name of a ticker."""
- info = self.cache_info(ticker)
- if "shortName" in info:
- return info["shortName"]
- return ticker
+ if isinstance(ticker, yf.Ticker):
+ return ticker.ticker
+ elif isinstance(ticker, yq.Ticker):
+ return ticker.symbols[0]
+ return None
@staticmethod
- def get_item_from_dict(keys: List[str], dictionary: Dict[str, Any]) -> Optional[Any]:
+ def prioritized_get_item_from_dict(
+ keys: List[str], dictionary: Dict[str, Any]
+ ) -> Optional[Any]:
result = None
for key in keys:
result = dictionary.get(key, None)
return result
return None
- def get_price(self, ticker: yf.Ticker) -> Optional[float]:
+ def get_price(self, ticker: Any) -> Optional[float]:
"""Get most recent price of a ticker."""
+ if isinstance(ticker, yf.Ticker):
+ price = stock_quote_renderer.prioritized_get_item_from_dict(
+ ["last_price", "open", "previous_close"], ticker.fast_info
+ )
+ if price:
+ return price
+
+ price = stock_quote_renderer.prioritized_get_item_from_dict(
+ ["bid", "ask", "lastMarket"],
+ ticker.info,
+ )
+ if price:
+ return price
+ return None
+ elif isinstance(ticker, yq.Ticker):
+ price = stock_quote_renderer.prioritized_get_item_from_dict(
+ ["regularMarketPrice", "preMarketPrice", "regularMarketPreviousClose"],
+ ticker.price[ticker.symbols[0]],
+ )
+ if price:
+ return price
+ return None
+
+ def get_last_close(self, ticker: Any) -> Optional[float]:
+ if isinstance(ticker, yf.Ticker):
+ last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+ ["previous_close", "open"], ticker.fast_info
+ )
+ if last_close:
+ return last_close
+
+ last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+ ["preMarketPrice"], ticker.info
+ )
+ if last_close:
+ return last_close
+ elif isinstance(ticker, yq.Ticker):
+ last_close = stock_quote_renderer.prioritized_get_item_from_dict(
+ ["regularMarketPreviousClose", "regularMarketOpen"],
+ ticker.price[ticker.symbols[0]],
+ )
+ if last_close:
+ return last_close
+ return self.get_price(ticker)
- # First try fast_info
- price = stock_quote_renderer.get_item_from_dict(
- ['last_price',
- 'open',
- 'previous_close'],
- ticker.fast_info)
- if price:
- return price
-
- # Next try info
- price = stock_quote_renderer.get_item_from_dict(
- ['bid',
- 'ask',
- 'lastMarket'],
- self.cache_info(ticker))
- if price:
- return price
-
- # Finally, fall back on history
- hist = ticker.history(period="1d").to_dict()['Close']
- latest = None
- latest_price = None
- for k, v in hist.items():
- if latest is None or k > latest:
- price = hist[k]
- if price is not None:
- latest = k
- latest_price = price
- print(f"Price: fell back on latest close {latest_price} at {latest}")
- return latest_price
+ def get_change_and_delta(
+ self, ticker: Any, current_price: float
+ ) -> Tuple[float, float]:
+ """Given the current price, look up opening price and compute delta."""
+ last_price = self.get_last_close(ticker)
+ delta = current_price - last_price
+ return (delta / last_price * 100.0, delta)
@staticmethod
- def make_chart(symbol: str, ticker: yf.Ticker, period: str) -> str:
- base_filename = f'stock_chart_{symbol}.png'
+ def make_chart(symbol: str, ticker: Any, period: str) -> str:
+ base_filename = f"stock_chart_{symbol}.png"
output_filename = os.path.join(kiosk_constants.pages_dir, base_filename)
transparent = go.Layout(
- paper_bgcolor='rgba(0,0,0,0)',
- plot_bgcolor='rgba(0,0,0,0)',
+ paper_bgcolor="rgba(0,0,0,0)",
+ plot_bgcolor="rgba(0,0,0,0)",
xaxis_rangeslider_visible=False,
)
+
hist = ticker.history(period=period, interval="1wk")
+ if isinstance(ticker, yq.Ticker):
+ _open = "open"
+ _high = "high"
+ _low = "low"
+ _close = "adjclose"
+ elif isinstance(ticker, yf.Ticker):
+ _open = "Open"
+ _high = "High"
+ _low = "Low"
+ _close = "Close"
+ else:
+ raise Exception("Bad Ticker type")
chart = go.Figure(
data=go.Candlestick(
- open=hist['Open'],
- high=hist['High'],
- low=hist['Low'],
- close=hist['Close'],
+ open=hist[_open],
+ high=hist[_high],
+ low=hist[_low],
+ close=hist[_close],
),
layout=transparent,
)
print(f"Write {output_filename}...")
return base_filename
- def get_last_close(
- self,
- ticker: yf.Ticker
- ) -> float:
- last_close = stock_quote_renderer.get_item_from_dict(
- ['previous_close',
- 'open'],
- ticker.fast_info)
- if last_close:
- return last_close
-
- last_close = stock_quote_renderer.get_item_from_dict(
- ['preMarketPrice'],
- self.cache_info(ticker))
- if last_close:
- return last_close
- return self.get_price(ticker)
-
- def get_change_and_delta(
- self,
- ticker: yf.Ticker,
- price: float
- ) -> Tuple[float, float]:
- """Given the current price, look up opening price and compute delta."""
- last_price = self.get_last_close(ticker)
- delta = price - last_price
- return (delta / last_price * 100.0, delta)
-
def periodic_render(self, key: str) -> bool:
"""Write an up-to-date stock page."""
with file_writer.file_writer("stock_3_86400.html") as f:
f.write("<H1>Stock Quotes</H1><HR>")
f.write("<TABLE WIDTH=99%>")
symbols_finished = 0
+
for symbol in self.symbols:
- ticker = yf.Ticker(symbol)
+ ticker = self.get_financial_data(symbol)
if ticker is None:
logger.debug(f"Unknown symbol {symbol} -- ignored.")
continue
name = self.get_ticker_name(ticker)
if name is None:
- logger.debug(f'Bad name for {symbol} -- skipped.')
+ logger.debug(f"Bad name for {symbol} -- skipped.")
continue
price = self.get_price(ticker)
if price is None:
logger.debug(f"No price information for {symbol} -- skipped.")
continue
- (percent_change, delta) = self.get_change_and_delta(
- ticker, price
- )
+ (percent_change, delta) = self.get_change_and_delta(ticker, price)
chart_filename = stock_quote_renderer.make_chart(symbol, ticker, "1y")
print(f"delta: {delta}, change: {percent_change}")
if percent_change < 0:
f.write("</TR></TABLE>")
return True
+
# Test
-x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
+x = stock_quote_renderer(
+ {},
+ ["MSFT", "GOOG", "BTC-USD", "ABHYX", "GC=F", "VNQ"],
+ {"BTC-USD": "BTC", "GC=F": "GOLD"},
+)
x.periodic_render(None)