mypy clean
[kiosk.git] / gdata_oauth.py
index 1f9cd67b1e59e9188f4f9486c4923493e5b84a8b..7e8b336db29380387b11808c0cecf0d352db4455 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 # https://developers.google.com/accounts/docs/OAuth2ForDevices
 # https://developers.google.com/drive/web/auth/web-server
 # https://developers.google.com/google-apps/calendar/v3/reference/calendars
@@ -13,26 +15,27 @@ except ImportError:
 import os.path
 import json
 import time
-from oauth2client.client import OAuth2Credentials
-import gdata.calendar.service
-import gdata.docs.service
-import gdata.photos.service, gdata.photos
-from googleapiclient.discovery import build
-import httplib2
+from typing import Dict, Optional
+from oauth2client.client import OAuth2Credentials  # type: ignore
+import gdata.calendar.service  # type: ignore
+import gdata.docs.service  # type: ignore
+import gdata.photos.service, gdata.photos  # type: ignore
+from googleapiclient.discovery import build  # type: ignore
+import httplib2  # type: ignore
 from googleapiclient.discovery import build
 import datetime
 import ssl
 
 
 class OAuth:
-    def __init__(self, client_id, client_secret):
+    def __init__(self, client_id: str, client_secret: str) -> None:
         print("gdata: initializing oauth token...")
         self.client_id = client_id
         self.client_secret = client_secret
-        self.user_code = None
+        self.user_code: Optional[str] = None
         # print 'Client id: %s' % (client_id)
         # print 'Client secret: %s' % (client_secret)
-        self.token = None
+        self.token: Optional[Dict] = None
         self.device_code = None
         self.verfication_url = None
         self.token_file = "client_secrets.json"
@@ -49,18 +52,18 @@ class OAuth:
         self.host = "accounts.google.com"
         self.reset_connection()
         self.load_token()
-        self.last_action = 0
-        self.ssl_ctx = None
+        self.last_action = 0.0
+        self.ssl_ctx: Optional[ssl.SSLContext] = None
 
     # this setup is isolated because it eventually generates a BadStatusLine
     # exception, after which we always get httplib.CannotSendRequest errors.
     # When this happens, we try re-creating the exception.
-    def reset_connection(self):
+    def reset_connection(self) -> None:
         self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
         http.client.HTTPConnection.debuglevel = 2
         self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
 
-    def load_token(self):
+    def load_token(self) -> None:
         token = None
         if os.path.isfile(self.token_file):
             f = open(self.token_file)
@@ -68,19 +71,19 @@ class OAuth:
             self.token = json.loads(json_token)
             f.close()
 
-    def save_token(self):
+    def save_token(self) -> None:
         f = open(self.token_file, "w")
         f.write(json.dumps(self.token))
         f.close()
 
-    def has_token(self):
-        if self.token != None:
+    def has_token(self) -> bool:
+        if self.token is not None:
             print("gdata: we have a token!")
         else:
             print("gdata: we have no token.")
-        return self.token != None
+        return self.token is not None
 
-    def get_user_code(self):
+    def get_user_code(self) -> Optional[str]:
         self.conn.request(
             "POST",
             "/o/oauth2/device/code",
@@ -97,18 +100,19 @@ class OAuth:
             self.verification_url = data["verification_url"]
             self.retry_interval = data["interval"]
         else:
-            print(("gdata: %d" % response.status))
-            print((response.read()))
-            sys.exit()
+            self.user_code = None
+            print(f"gdata: {response.status}")
+            print(response.read())
+            sys.exit(-1)
         return self.user_code
 
-    def get_new_token(self):
+    def get_new_token(self) -> None:
         # call get_device_code if not already set
-        if self.user_code == None:
+        if self.user_code is None:
             print("gdata: getting user code")
             self.get_user_code()
 
-        while self.token == None:
+        while self.token is None:
             self.conn.request(
                 "POST",
                 "/o/oauth2/token",
@@ -135,13 +139,16 @@ class OAuth:
                 print((response.status))
                 print((response.read()))
 
-    def refresh_token(self):
+    def refresh_token(self) -> bool:
         if self.checking_too_often():
             print("gdata: not refreshing yet, too soon...")
             return False
         else:
             print("gdata: trying to refresh oauth token...")
         self.reset_connection()
+        if self.token is None:
+            return False
+
         refresh_token = self.token["refresh_token"]
         self.conn.request(
             "POST",
@@ -160,7 +167,7 @@ class OAuth:
         response = self.conn.getresponse()
         self.last_action = time.time()
         if response.status == 200:
-            data = json.loads(response.read())
+            data: Dict = json.loads(response.read())
             if "access_token" in data:
                 self.token = data
                 # in fact we NEVER get a new refresh token at this point
@@ -172,7 +179,7 @@ class OAuth:
         print((response.read()))
         return False
 
-    def checking_too_often(self):
+    def checking_too_often(self) -> bool:
         now = time.time()
         return (now - self.last_action) <= 30