X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=gdata_oauth.py;h=7e8b336db29380387b11808c0cecf0d352db4455;hb=e4dca16bbd329afdb587e8488767d88e17777254;hp=1f9cd67b1e59e9188f4f9486c4923493e5b84a8b;hpb=5e241dc47e497c547463cecc07946ea6882835a7;p=kiosk.git diff --git a/gdata_oauth.py b/gdata_oauth.py index 1f9cd67..7e8b336 100644 --- a/gdata_oauth.py +++ b/gdata_oauth.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + # https://developers.google.com/accounts/docs/OAuth2ForDevices # https://developers.google.com/drive/web/auth/web-server # https://developers.google.com/google-apps/calendar/v3/reference/calendars @@ -13,26 +15,27 @@ except ImportError: import os.path import json import time -from oauth2client.client import OAuth2Credentials -import gdata.calendar.service -import gdata.docs.service -import gdata.photos.service, gdata.photos -from googleapiclient.discovery import build -import httplib2 +from typing import Dict, Optional +from oauth2client.client import OAuth2Credentials # type: ignore +import gdata.calendar.service # type: ignore +import gdata.docs.service # type: ignore +import gdata.photos.service, gdata.photos # type: ignore +from googleapiclient.discovery import build # type: ignore +import httplib2 # type: ignore from googleapiclient.discovery import build import datetime import ssl class OAuth: - def __init__(self, client_id, client_secret): + def __init__(self, client_id: str, client_secret: str) -> None: print("gdata: initializing oauth token...") self.client_id = client_id self.client_secret = client_secret - self.user_code = None + self.user_code: Optional[str] = None # print 'Client id: %s' % (client_id) # print 'Client secret: %s' % (client_secret) - self.token = None + self.token: Optional[Dict] = None self.device_code = None self.verfication_url = None self.token_file = "client_secrets.json" @@ -49,18 +52,18 @@ class OAuth: self.host = "accounts.google.com" self.reset_connection() self.load_token() - self.last_action = 0 - self.ssl_ctx = None + self.last_action = 0.0 + self.ssl_ctx: Optional[ssl.SSLContext] = None # this setup is isolated because it eventually generates a BadStatusLine # exception, after which we always get httplib.CannotSendRequest errors. # When this happens, we try re-creating the exception. - def reset_connection(self): + def reset_connection(self) -> None: self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem") http.client.HTTPConnection.debuglevel = 2 self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) - def load_token(self): + def load_token(self) -> None: token = None if os.path.isfile(self.token_file): f = open(self.token_file) @@ -68,19 +71,19 @@ class OAuth: self.token = json.loads(json_token) f.close() - def save_token(self): + def save_token(self) -> None: f = open(self.token_file, "w") f.write(json.dumps(self.token)) f.close() - def has_token(self): - if self.token != None: + def has_token(self) -> bool: + if self.token is not None: print("gdata: we have a token!") else: print("gdata: we have no token.") - return self.token != None + return self.token is not None - def get_user_code(self): + def get_user_code(self) -> Optional[str]: self.conn.request( "POST", "/o/oauth2/device/code", @@ -97,18 +100,19 @@ class OAuth: self.verification_url = data["verification_url"] self.retry_interval = data["interval"] else: - print(("gdata: %d" % response.status)) - print((response.read())) - sys.exit() + self.user_code = None + print(f"gdata: {response.status}") + print(response.read()) + sys.exit(-1) return self.user_code - def get_new_token(self): + def get_new_token(self) -> None: # call get_device_code if not already set - if self.user_code == None: + if self.user_code is None: print("gdata: getting user code") self.get_user_code() - while self.token == None: + while self.token is None: self.conn.request( "POST", "/o/oauth2/token", @@ -135,13 +139,16 @@ class OAuth: print((response.status)) print((response.read())) - def refresh_token(self): + def refresh_token(self) -> bool: if self.checking_too_often(): print("gdata: not refreshing yet, too soon...") return False else: print("gdata: trying to refresh oauth token...") self.reset_connection() + if self.token is None: + return False + refresh_token = self.token["refresh_token"] self.conn.request( "POST", @@ -160,7 +167,7 @@ class OAuth: response = self.conn.getresponse() self.last_action = time.time() if response.status == 200: - data = json.loads(response.read()) + data: Dict = json.loads(response.read()) if "access_token" in data: self.token = data # in fact we NEVER get a new refresh token at this point @@ -172,7 +179,7 @@ class OAuth: print((response.read())) return False - def checking_too_often(self): + def checking_too_often(self) -> bool: now = time.time() return (now - self.last_action) <= 30