#!/usr/bin/env python3 # https://developers.google.com/accounts/docs/OAuth2ForDevices # https://developers.google.com/drive/web/auth/web-server # https://developers.google.com/google-apps/calendar/v3/reference/calendars # https://developers.google.com/picasa-web/ import sys import urllib.request, urllib.parse, urllib.error try: import http.client # python2 except ImportError: import http.client # python3 import os.path import json import time from typing import Dict, Optional from oauth2client.client import OAuth2Credentials # type: ignore import gdata.calendar.service # type: ignore import gdata.docs.service # type: ignore import gdata.photos.service, gdata.photos # type: ignore from googleapiclient.discovery import build # type: ignore import httplib2 # type: ignore from googleapiclient.discovery import build import datetime import ssl class OAuth: def __init__(self, client_id: str, client_secret: str) -> None: print("gdata: initializing oauth token...") self.client_id = client_id self.client_secret = client_secret self.user_code: Optional[str] = None # print 'Client id: %s' % (client_id) # print 'Client secret: %s' % (client_secret) self.token: Optional[Dict] = None self.device_code = None self.verfication_url = None self.token_file = "client_secrets.json" self.scope = [ #'https://www.googleapis.com/auth/calendar', #'https://www.googleapis.com/auth/drive', #'https://docs.google.com/feeds', #'https://www.googleapis.com/auth/calendar.readonly', #'https://picasaweb.google.com/data/', "https://www.googleapis.com/auth/photoslibrary.readonly", #'http://picasaweb.google.com/data/', #'https://www.google.com/calendar/feeds/', ] self.host = "accounts.google.com" self.reset_connection() self.load_token() self.last_action = 0.0 self.ssl_ctx: Optional[ssl.SSLContext] = None # this setup is isolated because it eventually generates a BadStatusLine # exception, after which we always get httplib.CannotSendRequest errors. # When this happens, we try re-creating the exception. def reset_connection(self) -> None: self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem") http.client.HTTPConnection.debuglevel = 2 self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) def load_token(self) -> None: token = None if os.path.isfile(self.token_file): f = open(self.token_file) json_token = f.read() self.token = json.loads(json_token) f.close() def save_token(self) -> None: f = open(self.token_file, "w") f.write(json.dumps(self.token)) f.close() def has_token(self) -> bool: if self.token is not None: print("gdata: we have a token!") else: print("gdata: we have no token.") return self.token is not None def get_user_code(self) -> Optional[str]: self.conn.request( "POST", "/o/oauth2/device/code", urllib.parse.urlencode( {"client_id": self.client_id, "scope": " ".join(self.scope)} ), {"Content-type": "application/x-www-form-urlencoded"}, ) response = self.conn.getresponse() if response.status == 200: data = json.loads(response.read()) self.device_code = data["device_code"] self.user_code = data["user_code"] self.verification_url = data["verification_url"] self.retry_interval = data["interval"] else: self.user_code = None print(f"gdata: {response.status}") print(response.read()) sys.exit(-1) return self.user_code def get_new_token(self) -> None: # call get_device_code if not already set if self.user_code is None: print("gdata: getting user code") self.get_user_code() while self.token is None: self.conn.request( "POST", "/o/oauth2/token", urllib.parse.urlencode( { "client_id": self.client_id, "client_secret": self.client_secret, "code": self.device_code, "grant_type": "http://oauth.net/grant_type/device/1.0", } ), {"Content-type": "application/x-www-form-urlencoded"}, ) response = self.conn.getresponse() if response.status == 200: data = json.loads(response.read()) if "access_token" in data: self.token = data self.save_token() else: time.sleep(self.retry_interval + 2) else: print("gdata: failed to get token") print((response.status)) print((response.read())) def refresh_token(self) -> bool: if self.checking_too_often(): print("gdata: not refreshing yet, too soon...") return False else: print("gdata: trying to refresh oauth token...") self.reset_connection() if self.token is None: return False refresh_token = self.token["refresh_token"] self.conn.request( "POST", "/o/oauth2/token", urllib.parse.urlencode( { "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": refresh_token, "grant_type": "refresh_token", } ), {"Content-type": "application/x-www-form-urlencoded"}, ) response = self.conn.getresponse() self.last_action = time.time() if response.status == 200: data: Dict = json.loads(response.read()) if "access_token" in data: self.token = data # in fact we NEVER get a new refresh token at this point if not "refresh_token" in self.token: self.token["refresh_token"] = refresh_token self.save_token() return True print(("gdata: unexpected response %d to renewal request" % response.status)) print((response.read())) return False def checking_too_often(self) -> bool: now = time.time() return (now - self.last_action) <= 30 # https://developers.google.com/picasa-web/ def photos_service(self): headers = { "Authorization": "%s %s" % (self.token["token_type"], self.token["access_token"]) } client = gdata.photos.service.PhotosService(additional_headers=headers) return client # https://developers.google.com/drive/ def docs_service(self): cred = OAuth2Credentials( self.token["access_token"], self.client_id, self.client_secret, self.token["refresh_token"], datetime.datetime.now(), "http://accounts.google.com/o/oauth2/token", "KitchenKiosk/0.9", ) http = httplib2.Http(disable_ssl_certificate_validation=True) http = cred.authorize(http) service = build("drive", "v2", http) return service # https://developers.google.com/google-apps/calendar/ def calendar_service(self): cred = OAuth2Credentials( self.token["access_token"], self.client_id, self.client_secret, self.token["refresh_token"], datetime.datetime.now(), "http://accounts.google.com/o/oauth2/token", "KitchenKiosk/0.9", ) http = httplib2.Http(disable_ssl_certificate_validation=True) http = cred.authorize(http) service = build("calendar", "v3", http) return service