mypy clean
[kiosk.git] / gdata_oauth.py
index 64934ebf173831c93146d4a58cb9bb192b96c2a7..7e8b336db29380387b11808c0cecf0d352db4455 100644 (file)
@@ -1,64 +1,69 @@
+#!/usr/bin/env python3
+
 # https://developers.google.com/accounts/docs/OAuth2ForDevices
 # https://developers.google.com/drive/web/auth/web-server
 # https://developers.google.com/google-apps/calendar/v3/reference/calendars
 # https://developers.google.com/picasa-web/
 
 import sys
-import urllib
+import urllib.request, urllib.parse, urllib.error
+
 try:
-    import httplib     # python2
+    import http.client  # python2
 except ImportError:
-    import http.client # python3
+    import http.client  # python3
 import os.path
 import json
 import time
-from oauth2client.client import OAuth2Credentials
-import gdata.calendar.service
-import gdata.docs.service
-import gdata.photos.service, gdata.photos
-from apiclient.discovery import build
-import httplib2
-from apiclient.discovery import build
+from typing import Dict, Optional
+from oauth2client.client import OAuth2Credentials  # type: ignore
+import gdata.calendar.service  # type: ignore
+import gdata.docs.service  # type: ignore
+import gdata.photos.service, gdata.photos  # type: ignore
+from googleapiclient.discovery import build  # type: ignore
+import httplib2  # type: ignore
+from googleapiclient.discovery import build
 import datetime
 import ssl
 
+
 class OAuth:
-    def __init__(self, client_id, client_secret):
+    def __init__(self, client_id: str, client_secret: str) -> None:
         print("gdata: initializing oauth token...")
         self.client_id = client_id
         self.client_secret = client_secret
-        self.user_code = None
-        #print 'Client id: %s' % (client_id)
-        #print 'Client secret: %s' % (client_secret)
-        self.token = None
+        self.user_code: Optional[str] = None
+        # print 'Client id: %s' % (client_id)
+        # print 'Client secret: %s' % (client_secret)
+        self.token: Optional[Dict] = None
         self.device_code = None
         self.verfication_url = None
-        self.token_file = 'client_secrets.json'
+        self.token_file = "client_secrets.json"
         self.scope = [
             #'https://www.googleapis.com/auth/calendar',
             #'https://www.googleapis.com/auth/drive',
             #'https://docs.google.com/feeds',
             #'https://www.googleapis.com/auth/calendar.readonly',
             #'https://picasaweb.google.com/data/',
-            'https://www.googleapis.com/auth/photoslibrary.readonly',
+            "https://www.googleapis.com/auth/photoslibrary.readonly",
             #'http://picasaweb.google.com/data/',
             #'https://www.google.com/calendar/feeds/',
         ]
-        self.host = 'accounts.google.com'
+        self.host = "accounts.google.com"
         self.reset_connection()
         self.load_token()
-        self.last_action = 0
-        self.ssl_ctx = None
+        self.last_action = 0.0
+        self.ssl_ctx: Optional[ssl.SSLContext] = None
 
     # this setup is isolated because it eventually generates a BadStatusLine
     # exception, after which we always get httplib.CannotSendRequest errors.
     # When this happens, we try re-creating the exception.
-    def reset_connection(self):
-        self.ssl_ctx = ssl.create_default_context(cafile='/usr/local/etc/ssl/cert.pem')
-        httplib.HTTPConnection.debuglevel = 2
-        self.conn = httplib.HTTPSConnection(self.host, context=self.ssl_ctx)
+    def reset_connection(self) -> None:
+        self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
+        http.client.HTTPConnection.debuglevel = 2
+        self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
 
-    def load_token(self):
+    def load_token(self) -> None:
         token = None
         if os.path.isfile(self.token_file):
             f = open(self.token_file)
@@ -66,140 +71,155 @@ class OAuth:
             self.token = json.loads(json_token)
             f.close()
 
-    def save_token(self):
-        f = open(self.token_file, 'w')
+    def save_token(self) -> None:
+        f = open(self.token_file, "w")
         f.write(json.dumps(self.token))
         f.close()
 
-    def has_token(self):
-        if self.token != None:
+    def has_token(self) -> bool:
+        if self.token is not None:
             print("gdata: we have a token!")
         else:
             print("gdata: we have no token.")
-        return self.token != None
+        return self.token is not None
 
-    def get_user_code(self):
+    def get_user_code(self) -> Optional[str]:
         self.conn.request(
             "POST",
             "/o/oauth2/device/code",
-            urllib.urlencode({
-                'client_id': self.client_id,
-                'scope'    : ' '.join(self.scope)
-            }),
-            {"Content-type": "application/x-www-form-urlencoded"})
+            urllib.parse.urlencode(
+                {"client_id": self.client_id, "scope": " ".join(self.scope)}
+            ),
+            {"Content-type": "application/x-www-form-urlencoded"},
+        )
         response = self.conn.getresponse()
         if response.status == 200:
             data = json.loads(response.read())
-            self.device_code = data['device_code']
-            self.user_code = data['user_code']
-            self.verification_url = data['verification_url']
-            self.retry_interval = data['interval']
+            self.device_code = data["device_code"]
+            self.user_code = data["user_code"]
+            self.verification_url = data["verification_url"]
+            self.retry_interval = data["interval"]
         else:
-            print("gdata: %d" % response.status)
+            self.user_code = None
+            print(f"gdata: {response.status}")
             print(response.read())
-            sys.exit()
+            sys.exit(-1)
         return self.user_code
 
-    def get_new_token(self):
+    def get_new_token(self) -> None:
         # call get_device_code if not already set
-        if self.user_code == None:
+        if self.user_code is None:
             print("gdata: getting user code")
             self.get_user_code()
 
-        while self.token == None:
+        while self.token is None:
             self.conn.request(
                 "POST",
                 "/o/oauth2/token",
-                urllib.urlencode({
-                    'client_id'     : self.client_id,
-                    'client_secret' : self.client_secret,
-                    'code'          : self.device_code,
-                    'grant_type'    : 'http://oauth.net/grant_type/device/1.0'
-                    }),
-                {"Content-type": "application/x-www-form-urlencoded"})
+                urllib.parse.urlencode(
+                    {
+                        "client_id": self.client_id,
+                        "client_secret": self.client_secret,
+                        "code": self.device_code,
+                        "grant_type": "http://oauth.net/grant_type/device/1.0",
+                    }
+                ),
+                {"Content-type": "application/x-www-form-urlencoded"},
+            )
             response = self.conn.getresponse()
             if response.status == 200:
                 data = json.loads(response.read())
-                if 'access_token' in data:
+                if "access_token" in data:
                     self.token = data
                     self.save_token()
                 else:
                     time.sleep(self.retry_interval + 2)
             else:
                 print("gdata: failed to get token")
-                print(response.status)
-                print(response.read())
+                print((response.status))
+                print((response.read()))
 
-    def refresh_token(self):
+    def refresh_token(self) -> bool:
         if self.checking_too_often():
             print("gdata: not refreshing yet, too soon...")
             return False
         else:
-            print('gdata: trying to refresh oauth token...')
+            print("gdata: trying to refresh oauth token...")
         self.reset_connection()
-        refresh_token = self.token['refresh_token']
+        if self.token is None:
+            return False
+
+        refresh_token = self.token["refresh_token"]
         self.conn.request(
             "POST",
             "/o/oauth2/token",
-            urllib.urlencode({
-                'client_id'     : self.client_id,
-                'client_secret' : self.client_secret,
-                'refresh_token' : refresh_token,
-                'grant_type'    : 'refresh_token'
-                }),
-            {"Content-type": "application/x-www-form-urlencoded"})
+            urllib.parse.urlencode(
+                {
+                    "client_id": self.client_id,
+                    "client_secret": self.client_secret,
+                    "refresh_token": refresh_token,
+                    "grant_type": "refresh_token",
+                }
+            ),
+            {"Content-type": "application/x-www-form-urlencoded"},
+        )
 
         response = self.conn.getresponse()
         self.last_action = time.time()
         if response.status == 200:
-            data = json.loads(response.read())
-            if 'access_token' in data:
+            data: Dict = json.loads(response.read())
+            if "access_token" in data:
                 self.token = data
                 # in fact we NEVER get a new refresh token at this point
-                if not 'refresh_token' in self.token:
-                    self.token['refresh_token'] = refresh_token
+                if not "refresh_token" in self.token:
+                    self.token["refresh_token"] = refresh_token
                     self.save_token()
                 return True
-        print("gdata: unexpected response %d to renewal request" % response.status)
-        print(response.read())
+        print(("gdata: unexpected response %d to renewal request" % response.status))
+        print((response.read()))
         return False
 
-    def checking_too_often(self):
+    def checking_too_often(self) -> bool:
         now = time.time()
         return (now - self.last_action) <= 30
 
     # https://developers.google.com/picasa-web/
     def photos_service(self):
         headers = {
-            "Authorization": "%s %s"  % (self.token['token_type'], self.token['access_token'])
+            "Authorization": "%s %s"
+            % (self.token["token_type"], self.token["access_token"])
         }
         client = gdata.photos.service.PhotosService(additional_headers=headers)
         return client
 
     # https://developers.google.com/drive/
     def docs_service(self):
-        cred = OAuth2Credentials(self.token['access_token'],
-                                 self.client_id,
-                                 self.client_secret,
-                                 self.token['refresh_token'],
-                                 datetime.datetime.now(),
-                                 'http://accounts.google.com/o/oauth2/token',
-                                 'KitchenKiosk/0.9')
+        cred = OAuth2Credentials(
+            self.token["access_token"],
+            self.client_id,
+            self.client_secret,
+            self.token["refresh_token"],
+            datetime.datetime.now(),
+            "http://accounts.google.com/o/oauth2/token",
+            "KitchenKiosk/0.9",
+        )
         http = httplib2.Http(disable_ssl_certificate_validation=True)
         http = cred.authorize(http)
-        service = build('drive', 'v2', http)
+        service = build("drive", "v2", http)
         return service
 
     # https://developers.google.com/google-apps/calendar/
     def calendar_service(self):
-        cred = OAuth2Credentials(self.token['access_token'],
-                                 self.client_id,
-                                 self.client_secret,
-                                 self.token['refresh_token'],
-                                 datetime.datetime.now(),
-                                 'http://accounts.google.com/o/oauth2/token',
-                                 'KitchenKiosk/0.9')
+        cred = OAuth2Credentials(
+            self.token["access_token"],
+            self.client_id,
+            self.client_secret,
+            self.token["refresh_token"],
+            datetime.datetime.now(),
+            "http://accounts.google.com/o/oauth2/token",
+            "KitchenKiosk/0.9",
+        )
         http = httplib2.Http(disable_ssl_certificate_validation=True)
         http = cred.authorize(http)
-        service = build('calendar', 'v3', http)
+        service = build("calendar", "v3", http)
         return service