X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=gdata_oauth.py;h=7e8b336db29380387b11808c0cecf0d352db4455;hb=e4dca16bbd329afdb587e8488767d88e17777254;hp=64934ebf173831c93146d4a58cb9bb192b96c2a7;hpb=4b1f3d8a8b278ca6d62f461ea80c8ea21080c301;p=kiosk.git diff --git a/gdata_oauth.py b/gdata_oauth.py index 64934eb..7e8b336 100644 --- a/gdata_oauth.py +++ b/gdata_oauth.py @@ -1,64 +1,69 @@ +#!/usr/bin/env python3 + # https://developers.google.com/accounts/docs/OAuth2ForDevices # https://developers.google.com/drive/web/auth/web-server # https://developers.google.com/google-apps/calendar/v3/reference/calendars # https://developers.google.com/picasa-web/ import sys -import urllib +import urllib.request, urllib.parse, urllib.error + try: - import httplib # python2 + import http.client # python2 except ImportError: - import http.client # python3 + import http.client # python3 import os.path import json import time -from oauth2client.client import OAuth2Credentials -import gdata.calendar.service -import gdata.docs.service -import gdata.photos.service, gdata.photos -from apiclient.discovery import build -import httplib2 -from apiclient.discovery import build +from typing import Dict, Optional +from oauth2client.client import OAuth2Credentials # type: ignore +import gdata.calendar.service # type: ignore +import gdata.docs.service # type: ignore +import gdata.photos.service, gdata.photos # type: ignore +from googleapiclient.discovery import build # type: ignore +import httplib2 # type: ignore +from googleapiclient.discovery import build import datetime import ssl + class OAuth: - def __init__(self, client_id, client_secret): + def __init__(self, client_id: str, client_secret: str) -> None: print("gdata: initializing oauth token...") self.client_id = client_id self.client_secret = client_secret - self.user_code = None - #print 'Client id: %s' % (client_id) - #print 'Client secret: %s' % (client_secret) - self.token = None + self.user_code: Optional[str] = None + # print 'Client id: %s' % (client_id) + # print 'Client secret: %s' % (client_secret) + self.token: Optional[Dict] = None self.device_code = None self.verfication_url = None - self.token_file = 'client_secrets.json' + self.token_file = "client_secrets.json" self.scope = [ #'https://www.googleapis.com/auth/calendar', #'https://www.googleapis.com/auth/drive', #'https://docs.google.com/feeds', #'https://www.googleapis.com/auth/calendar.readonly', #'https://picasaweb.google.com/data/', - 'https://www.googleapis.com/auth/photoslibrary.readonly', + "https://www.googleapis.com/auth/photoslibrary.readonly", #'http://picasaweb.google.com/data/', #'https://www.google.com/calendar/feeds/', ] - self.host = 'accounts.google.com' + self.host = "accounts.google.com" self.reset_connection() self.load_token() - self.last_action = 0 - self.ssl_ctx = None + self.last_action = 0.0 + self.ssl_ctx: Optional[ssl.SSLContext] = None # this setup is isolated because it eventually generates a BadStatusLine # exception, after which we always get httplib.CannotSendRequest errors. # When this happens, we try re-creating the exception. - def reset_connection(self): - self.ssl_ctx = ssl.create_default_context(cafile='/usr/local/etc/ssl/cert.pem') - httplib.HTTPConnection.debuglevel = 2 - self.conn = httplib.HTTPSConnection(self.host, context=self.ssl_ctx) + def reset_connection(self) -> None: + self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem") + http.client.HTTPConnection.debuglevel = 2 + self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) - def load_token(self): + def load_token(self) -> None: token = None if os.path.isfile(self.token_file): f = open(self.token_file) @@ -66,140 +71,155 @@ class OAuth: self.token = json.loads(json_token) f.close() - def save_token(self): - f = open(self.token_file, 'w') + def save_token(self) -> None: + f = open(self.token_file, "w") f.write(json.dumps(self.token)) f.close() - def has_token(self): - if self.token != None: + def has_token(self) -> bool: + if self.token is not None: print("gdata: we have a token!") else: print("gdata: we have no token.") - return self.token != None + return self.token is not None - def get_user_code(self): + def get_user_code(self) -> Optional[str]: self.conn.request( "POST", "/o/oauth2/device/code", - urllib.urlencode({ - 'client_id': self.client_id, - 'scope' : ' '.join(self.scope) - }), - {"Content-type": "application/x-www-form-urlencoded"}) + urllib.parse.urlencode( + {"client_id": self.client_id, "scope": " ".join(self.scope)} + ), + {"Content-type": "application/x-www-form-urlencoded"}, + ) response = self.conn.getresponse() if response.status == 200: data = json.loads(response.read()) - self.device_code = data['device_code'] - self.user_code = data['user_code'] - self.verification_url = data['verification_url'] - self.retry_interval = data['interval'] + self.device_code = data["device_code"] + self.user_code = data["user_code"] + self.verification_url = data["verification_url"] + self.retry_interval = data["interval"] else: - print("gdata: %d" % response.status) + self.user_code = None + print(f"gdata: {response.status}") print(response.read()) - sys.exit() + sys.exit(-1) return self.user_code - def get_new_token(self): + def get_new_token(self) -> None: # call get_device_code if not already set - if self.user_code == None: + if self.user_code is None: print("gdata: getting user code") self.get_user_code() - while self.token == None: + while self.token is None: self.conn.request( "POST", "/o/oauth2/token", - urllib.urlencode({ - 'client_id' : self.client_id, - 'client_secret' : self.client_secret, - 'code' : self.device_code, - 'grant_type' : 'http://oauth.net/grant_type/device/1.0' - }), - {"Content-type": "application/x-www-form-urlencoded"}) + urllib.parse.urlencode( + { + "client_id": self.client_id, + "client_secret": self.client_secret, + "code": self.device_code, + "grant_type": "http://oauth.net/grant_type/device/1.0", + } + ), + {"Content-type": "application/x-www-form-urlencoded"}, + ) response = self.conn.getresponse() if response.status == 200: data = json.loads(response.read()) - if 'access_token' in data: + if "access_token" in data: self.token = data self.save_token() else: time.sleep(self.retry_interval + 2) else: print("gdata: failed to get token") - print(response.status) - print(response.read()) + print((response.status)) + print((response.read())) - def refresh_token(self): + def refresh_token(self) -> bool: if self.checking_too_often(): print("gdata: not refreshing yet, too soon...") return False else: - print('gdata: trying to refresh oauth token...') + print("gdata: trying to refresh oauth token...") self.reset_connection() - refresh_token = self.token['refresh_token'] + if self.token is None: + return False + + refresh_token = self.token["refresh_token"] self.conn.request( "POST", "/o/oauth2/token", - urllib.urlencode({ - 'client_id' : self.client_id, - 'client_secret' : self.client_secret, - 'refresh_token' : refresh_token, - 'grant_type' : 'refresh_token' - }), - {"Content-type": "application/x-www-form-urlencoded"}) + urllib.parse.urlencode( + { + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": refresh_token, + "grant_type": "refresh_token", + } + ), + {"Content-type": "application/x-www-form-urlencoded"}, + ) response = self.conn.getresponse() self.last_action = time.time() if response.status == 200: - data = json.loads(response.read()) - if 'access_token' in data: + data: Dict = json.loads(response.read()) + if "access_token" in data: self.token = data # in fact we NEVER get a new refresh token at this point - if not 'refresh_token' in self.token: - self.token['refresh_token'] = refresh_token + if not "refresh_token" in self.token: + self.token["refresh_token"] = refresh_token self.save_token() return True - print("gdata: unexpected response %d to renewal request" % response.status) - print(response.read()) + print(("gdata: unexpected response %d to renewal request" % response.status)) + print((response.read())) return False - def checking_too_often(self): + def checking_too_often(self) -> bool: now = time.time() return (now - self.last_action) <= 30 # https://developers.google.com/picasa-web/ def photos_service(self): headers = { - "Authorization": "%s %s" % (self.token['token_type'], self.token['access_token']) + "Authorization": "%s %s" + % (self.token["token_type"], self.token["access_token"]) } client = gdata.photos.service.PhotosService(additional_headers=headers) return client # https://developers.google.com/drive/ def docs_service(self): - cred = OAuth2Credentials(self.token['access_token'], - self.client_id, - self.client_secret, - self.token['refresh_token'], - datetime.datetime.now(), - 'http://accounts.google.com/o/oauth2/token', - 'KitchenKiosk/0.9') + cred = OAuth2Credentials( + self.token["access_token"], + self.client_id, + self.client_secret, + self.token["refresh_token"], + datetime.datetime.now(), + "http://accounts.google.com/o/oauth2/token", + "KitchenKiosk/0.9", + ) http = httplib2.Http(disable_ssl_certificate_validation=True) http = cred.authorize(http) - service = build('drive', 'v2', http) + service = build("drive", "v2", http) return service # https://developers.google.com/google-apps/calendar/ def calendar_service(self): - cred = OAuth2Credentials(self.token['access_token'], - self.client_id, - self.client_secret, - self.token['refresh_token'], - datetime.datetime.now(), - 'http://accounts.google.com/o/oauth2/token', - 'KitchenKiosk/0.9') + cred = OAuth2Credentials( + self.token["access_token"], + self.client_id, + self.client_secret, + self.token["refresh_token"], + datetime.datetime.now(), + "http://accounts.google.com/o/oauth2/token", + "KitchenKiosk/0.9", + ) http = httplib2.Http(disable_ssl_certificate_validation=True) http = cred.authorize(http) - service = build('calendar', 'v3', http) + service = build("calendar", "v3", http) return service