X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=gdata_oauth.py;h=d2595f0c1aed43c4f01f5095acb98b1194891986;hb=6cd5b068127501d2b48e8ac67b7432bffc5fce53;hp=7e8b336db29380387b11808c0cecf0d352db4455;hpb=e4dca16bbd329afdb587e8488767d88e17777254;p=kiosk.git diff --git a/gdata_oauth.py b/gdata_oauth.py index 7e8b336..d2595f0 100644 --- a/gdata_oauth.py +++ b/gdata_oauth.py @@ -5,221 +5,254 @@ # https://developers.google.com/google-apps/calendar/v3/reference/calendars # https://developers.google.com/picasa-web/ +import logging +import os +import pickle import sys import urllib.request, urllib.parse, urllib.error -try: - import http.client # python2 -except ImportError: - import http.client # python3 -import os.path -import json -import time -from typing import Dict, Optional -from oauth2client.client import OAuth2Credentials # type: ignore -import gdata.calendar.service # type: ignore -import gdata.docs.service # type: ignore -import gdata.photos.service, gdata.photos # type: ignore -from googleapiclient.discovery import build # type: ignore -import httplib2 # type: ignore -from googleapiclient.discovery import build -import datetime -import ssl +from apiclient.discovery import build +from google_auth_oauthlib.flow import InstalledAppFlow + + +logger = logging.getLogger(__file__) class OAuth: - def __init__(self, client_id: str, client_secret: str) -> None: - print("gdata: initializing oauth token...") - self.client_id = client_id - self.client_secret = client_secret - self.user_code: Optional[str] = None - # print 'Client id: %s' % (client_id) - # print 'Client secret: %s' % (client_secret) - self.token: Optional[Dict] = None - self.device_code = None - self.verfication_url = None - self.token_file = "client_secrets.json" - self.scope = [ - #'https://www.googleapis.com/auth/calendar', - #'https://www.googleapis.com/auth/drive', - #'https://docs.google.com/feeds', - #'https://www.googleapis.com/auth/calendar.readonly', - #'https://picasaweb.google.com/data/', - "https://www.googleapis.com/auth/photoslibrary.readonly", - #'http://picasaweb.google.com/data/', - #'https://www.google.com/calendar/feeds/', - ] - self.host = "accounts.google.com" - self.reset_connection() - self.load_token() - self.last_action = 0.0 - self.ssl_ctx: Optional[ssl.SSLContext] = None - - # this setup is isolated because it eventually generates a BadStatusLine - # exception, after which we always get httplib.CannotSendRequest errors. - # When this happens, we try re-creating the exception. - def reset_connection(self) -> None: - self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem") - http.client.HTTPConnection.debuglevel = 2 - self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) - - def load_token(self) -> None: - token = None - if os.path.isfile(self.token_file): - f = open(self.token_file) - json_token = f.read() - self.token = json.loads(json_token) - f.close() - - def save_token(self) -> None: - f = open(self.token_file, "w") - f.write(json.dumps(self.token)) - f.close() - - def has_token(self) -> bool: - if self.token is not None: - print("gdata: we have a token!") - else: - print("gdata: we have no token.") - return self.token is not None - - def get_user_code(self) -> Optional[str]: - self.conn.request( - "POST", - "/o/oauth2/device/code", - urllib.parse.urlencode( - {"client_id": self.client_id, "scope": " ".join(self.scope)} - ), - {"Content-type": "application/x-www-form-urlencoded"}, - ) - response = self.conn.getresponse() - if response.status == 200: - data = json.loads(response.read()) - self.device_code = data["device_code"] - self.user_code = data["user_code"] - self.verification_url = data["verification_url"] - self.retry_interval = data["interval"] - else: - self.user_code = None - print(f"gdata: {response.status}") - print(response.read()) - sys.exit(-1) - return self.user_code - - def get_new_token(self) -> None: - # call get_device_code if not already set - if self.user_code is None: - print("gdata: getting user code") - self.get_user_code() - - while self.token is None: - self.conn.request( - "POST", - "/o/oauth2/token", - urllib.parse.urlencode( - { - "client_id": self.client_id, - "client_secret": self.client_secret, - "code": self.device_code, - "grant_type": "http://oauth.net/grant_type/device/1.0", - } - ), - {"Content-type": "application/x-www-form-urlencoded"}, + def __init__(self, client_secret_file='client_secret.json'): + self.credentials = None + self.credentials_pickle = './credentials.pickle' + if os.path.exists(self.credentials_pickle): + logger.debug( + f'Refreshing credentials from disk pickle file {self.credentials_pickle}' ) - response = self.conn.getresponse() - if response.status == 200: - data = json.loads(response.read()) - if "access_token" in data: - self.token = data - self.save_token() - else: - time.sleep(self.retry_interval + 2) - else: - print("gdata: failed to get token") - print((response.status)) - print((response.read())) - - def refresh_token(self) -> bool: - if self.checking_too_often(): - print("gdata: not refreshing yet, too soon...") - return False + self.credentials = pickle.load(open(self.credentials_pickle, 'rb')) else: - print("gdata: trying to refresh oauth token...") - self.reset_connection() - if self.token is None: - return False - - refresh_token = self.token["refresh_token"] - self.conn.request( - "POST", - "/o/oauth2/token", - urllib.parse.urlencode( - { - "client_id": self.client_id, - "client_secret": self.client_secret, - "refresh_token": refresh_token, - "grant_type": "refresh_token", - } - ), - {"Content-type": "application/x-www-form-urlencoded"}, - ) + logger.debug( + f'{self.credentials_pickle} does not exist; calling Google.' + ) + self.refresh_credentials(client_secret_file) + self.save() + assert self.credentials is not None - response = self.conn.getresponse() - self.last_action = time.time() - if response.status == 200: - data: Dict = json.loads(response.read()) - if "access_token" in data: - self.token = data - # in fact we NEVER get a new refresh token at this point - if not "refresh_token" in self.token: - self.token["refresh_token"] = refresh_token - self.save_token() - return True - print(("gdata: unexpected response %d to renewal request" % response.status)) - print((response.read())) - return False - - def checking_too_often(self) -> bool: - now = time.time() - return (now - self.last_action) <= 30 - - # https://developers.google.com/picasa-web/ - def photos_service(self): - headers = { - "Authorization": "%s %s" - % (self.token["token_type"], self.token["access_token"]) - } - client = gdata.photos.service.PhotosService(additional_headers=headers) - return client - - # https://developers.google.com/drive/ - def docs_service(self): - cred = OAuth2Credentials( - self.token["access_token"], - self.client_id, - self.client_secret, - self.token["refresh_token"], - datetime.datetime.now(), - "http://accounts.google.com/o/oauth2/token", - "KitchenKiosk/0.9", + def refresh_credentials(self, client_secret_file): + scopes = [ + 'https://www.googleapis.com/auth/calendar.events.readonly', + 'https://www.googleapis.com/auth/calendar.readonly', + 'https://www.googleapis.com/auth/drive.readonly', + 'https://www.googleapis.com/auth/drive.photos.readonly', + 'https://www.googleapis.com/auth/photoslibrary.readonly', + # 'https://www.googleapis.com/auth/keep.readonly', + ] + flow = InstalledAppFlow.from_client_secrets_file( + self.client_secret_file, scopes=scopes ) - http = httplib2.Http(disable_ssl_certificate_validation=True) - http = cred.authorize(http) - service = build("drive", "v2", http) - return service + self.credentials = flow.run_console() + + def save(self): + pickle.dump(self.credentials, open(self.credentials_pickle, 'wb')) - # https://developers.google.com/google-apps/calendar/ def calendar_service(self): - cred = OAuth2Credentials( - self.token["access_token"], - self.client_id, - self.client_secret, - self.token["refresh_token"], - datetime.datetime.now(), - "http://accounts.google.com/o/oauth2/token", - "KitchenKiosk/0.9", - ) - http = httplib2.Http(disable_ssl_certificate_validation=True) - http = cred.authorize(http) - service = build("calendar", "v3", http) - return service + return build("calendar", "v3", credentials=self.credentials) + + def keep_service(self): + return build('keep', 'v1', + discoveryServiceUrl='https://keep.googleapis.com/$discovery/rest?version=v1', + credentials=self.credentials) + #print(gkeep_service.notes().list().execute()) + + +# class OAuth: +# def __init__(self, client_id: str, client_secret: str) -> None: +# print("gdata: initializing oauth token...") +# self.client_id = client_id +# self.client_secret = client_secret +# self.user_code: Optional[str] = None +# # print 'Client id: %s' % (client_id) +# # print 'Client secret: %s' % (client_secret) +# self.token: Optional[Dict] = None +# self.device_code = None +# self.verfication_url = None +# self.token_file = "client_secrets.json" +# scopes = [ +# 'https://www.googleapis.com/auth/calendar.events.readonly', +# 'https://www.googleapis.com/auth/calendar.readonly', +# 'https://www.googleapis.com/auth/drive.readonly', +# 'https://www.googleapis.com/auth/drive.photos.readonly', +# 'https://www.googleapis.com/auth/photoslibrary.readonly', +# # 'https://www.googleapis.com/auth/keep.readonly', +# ] +# self.host = "accounts.google.com" +# self.reset_connection() +# self.load_token() +# self.last_action = 0.0 +# self.ssl_ctx: Optional[ssl.SSLContext] = None + +# # this setup is isolated because it eventually generates a BadStatusLine +# # exception, after which we always get httplib.CannotSendRequest errors. +# # When this happens, we try re-creating the exception. +# def reset_connection(self) -> None: +# self.ssl_ctx = ssl.create_default_context() #cafile="/usr/local/etc/ssl/cert.pem") +# http.client.HTTPConnection.debuglevel = 2 +# self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) + +# def load_token(self) -> None: +# token = None +# if os.path.isfile(self.token_file): +# f = open(self.token_file) +# json_token = f.read() +# self.token = json.loads(json_token) +# f.close() + +# def save_token(self) -> None: +# f = open(self.token_file, "w") +# f.write(json.dumps(self.token)) +# f.close() + +# def has_token(self) -> bool: +# if self.token is not None: +# print("gdata: we have a token!") +# else: +# print("gdata: we have no token.") +# return self.token is not None + +# def get_user_code(self) -> Optional[str]: +# self.conn.request( +# "POST", +# "/o/oauth2/device/code", +# urllib.parse.urlencode( +# {"client_id": self.client_id, "scope": " ".join(self.scope)} +# ), +# {"Content-type": "application/x-www-form-urlencoded"}, +# ) +# response = self.conn.getresponse() +# if response.status == 200: +# data = json.loads(response.read()) +# self.device_code = data["device_code"] +# self.user_code = data["user_code"] +# self.verification_url = data["verification_url"] +# self.retry_interval = data["interval"] +# else: +# self.user_code = None +# print(f"gdata: {response.status}") +# print(response.read()) +# sys.exit(-1) +# return self.user_code + +# def get_new_token(self) -> None: +# # call get_device_code if not already set +# if self.user_code is None: +# print("gdata: getting user code") +# self.get_user_code() + +# while self.token is None: +# self.conn.request( +# "POST", +# "/o/oauth2/token", +# urllib.parse.urlencode( +# { +# "client_id": self.client_id, +# "client_secret": self.client_secret, +# "code": self.device_code, +# "grant_type": "http://oauth.net/grant_type/device/1.0", +# } +# ), +# {"Content-type": "application/x-www-form-urlencoded"}, +# ) +# response = self.conn.getresponse() +# if response.status == 200: +# data = json.loads(response.read()) +# if "access_token" in data: +# self.token = data +# self.save_token() +# else: +# time.sleep(self.retry_interval + 2) +# else: +# print("gdata: failed to get token") +# print((response.status)) +# print((response.read())) + +# def refresh_token(self) -> bool: +# if self.checking_too_often(): +# print("gdata: not refreshing yet, too soon...") +# return False +# else: +# print("gdata: trying to refresh oauth token...") +# self.reset_connection() +# if self.token is None: +# return False + +# refresh_token = self.token["refresh_token"] +# self.conn.request( +# "POST", +# "/o/oauth2/token", +# urllib.parse.urlencode( +# { +# "client_id": self.client_id, +# "client_secret": self.client_secret, +# "refresh_token": refresh_token, +# "grant_type": "refresh_token", +# } +# ), +# {"Content-type": "application/x-www-form-urlencoded"}, +# ) + +# response = self.conn.getresponse() +# self.last_action = time.time() +# if response.status == 200: +# data: Dict = json.loads(response.read()) +# if "access_token" in data: +# self.token = data +# # in fact we NEVER get a new refresh token at this point +# if not "refresh_token" in self.token: +# self.token["refresh_token"] = refresh_token +# self.save_token() +# return True +# print(("gdata: unexpected response %d to renewal request" % response.status)) +# print((response.read())) +# return False + +# def checking_too_often(self) -> bool: +# now = time.time() +# return (now - self.last_action) <= 30 + +# # https://developers.google.com/picasa-web/ +# def photos_service(self): +# headers = { +# "Authorization": "%s %s" +# % (self.token["token_type"], self.token["access_token"]) +# } +# client = gdata.photos.service.PhotosService(additional_headers=headers) +# return client + +# # https://developers.google.com/drive/ +# def docs_service(self): +# cred = OAuth2Credentials( +# self.token["access_token"], +# self.client_id, +# self.client_secret, +# self.token["refresh_token"], +# datetime.datetime.now(), +# "http://accounts.google.com/o/oauth2/token", +# "KitchenKiosk/0.9", +# ) +# http = httplib2.Http(disable_ssl_certificate_validation=True) +# http = cred.authorize(http) +# service = build("drive", "v2", http) +# return service + +# # https://developers.google.com/google-apps/calendar/ +# def calendar_service(self): +# cred = OAuth2Credentials( +# self.token["access_token"], +# self.client_id, +# self.client_secret, +# self.token["refresh_token"], +# datetime.datetime.now(), +# "http://accounts.google.com/o/oauth2/token", +# "KitchenKiosk/0.9", +# ) +# http = httplib2.Http(disable_ssl_certificate_validation=True) +# http = cred.authorize(http) +# service = build("calendar", "v3", http) +# return service