#!/usr/bin/env python3 # https://developers.google.com/accounts/docs/OAuth2ForDevices # https://developers.google.com/drive/web/auth/web-server # https://developers.google.com/google-apps/calendar/v3/reference/calendars # https://developers.google.com/picasa-web/ import logging import os import pickle from apiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow logger = logging.getLogger(__name__) class OAuth: def __init__(self, client_secret_file="client_secret.json"): self.credentials = None self.credentials_pickle = "./credentials.pickle" if os.path.exists(self.credentials_pickle): logger.debug( f"Refreshing credentials from disk pickle file {self.credentials_pickle}" ) self.credentials = pickle.load(open(self.credentials_pickle, "rb")) else: logger.debug(f"{self.credentials_pickle} does not exist; calling Google.") self.refresh_credentials(client_secret_file) self.save() assert self.credentials is not None def refresh_credentials(self, client_secret_file): scopes = [ "https://www.googleapis.com/auth/calendar.events.readonly", "https://www.googleapis.com/auth/calendar.readonly", "https://www.googleapis.com/auth/drive.readonly", "https://www.googleapis.com/auth/drive.photos.readonly", "https://www.googleapis.com/auth/photoslibrary.readonly", # 'https://www.googleapis.com/auth/keep.readonly', ] flow = InstalledAppFlow.from_client_secrets_file( self.client_secret_file, scopes=scopes ) self.credentials = flow.run_console() def save(self): pickle.dump(self.credentials, open(self.credentials_pickle, "wb")) def calendar_service(self): return build("calendar", "v3", credentials=self.credentials) def keep_service(self): return build( "keep", "v1", discoveryServiceUrl="https://keep.googleapis.com/$discovery/rest?version=v1", credentials=self.credentials, ) # print(gkeep_service.notes().list().execute()) # class OAuth: # def __init__(self, client_id: str, client_secret: str) -> None: # print("gdata: initializing oauth token...") # self.client_id = client_id # self.client_secret = client_secret # self.user_code: Optional[str] = None # # print 'Client id: %s' % (client_id) # # print 'Client secret: %s' % (client_secret) # self.token: Optional[Dict] = None # self.device_code = None # self.verfication_url = None # self.token_file = "client_secrets.json" # scopes = [ # 'https://www.googleapis.com/auth/calendar.events.readonly', # 'https://www.googleapis.com/auth/calendar.readonly', # 'https://www.googleapis.com/auth/drive.readonly', # 'https://www.googleapis.com/auth/drive.photos.readonly', # 'https://www.googleapis.com/auth/photoslibrary.readonly', # # 'https://www.googleapis.com/auth/keep.readonly', # ] # self.host = "accounts.google.com" # self.reset_connection() # self.load_token() # self.last_action = 0.0 # self.ssl_ctx: Optional[ssl.SSLContext] = None # # this setup is isolated because it eventually generates a BadStatusLine # # exception, after which we always get httplib.CannotSendRequest errors. # # When this happens, we try re-creating the exception. # def reset_connection(self) -> None: # self.ssl_ctx = ssl.create_default_context() #cafile="/usr/local/etc/ssl/cert.pem") # http.client.HTTPConnection.debuglevel = 2 # self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) # def load_token(self) -> None: # token = None # if os.path.isfile(self.token_file): # f = open(self.token_file) # json_token = f.read() # self.token = json.loads(json_token) # f.close() # def save_token(self) -> None: # f = open(self.token_file, "w") # f.write(json.dumps(self.token)) # f.close() # def has_token(self) -> bool: # if self.token is not None: # print("gdata: we have a token!") # else: # print("gdata: we have no token.") # return self.token is not None # def get_user_code(self) -> Optional[str]: # self.conn.request( # "POST", # "/o/oauth2/device/code", # urllib.parse.urlencode( # {"client_id": self.client_id, "scope": " ".join(self.scope)} # ), # {"Content-type": "application/x-www-form-urlencoded"}, # ) # response = self.conn.getresponse() # if response.status == 200: # data = json.loads(response.read()) # self.device_code = data["device_code"] # self.user_code = data["user_code"] # self.verification_url = data["verification_url"] # self.retry_interval = data["interval"] # else: # self.user_code = None # print(f"gdata: {response.status}") # print(response.read()) # sys.exit(-1) # return self.user_code # def get_new_token(self) -> None: # # call get_device_code if not already set # if self.user_code is None: # print("gdata: getting user code") # self.get_user_code() # while self.token is None: # self.conn.request( # "POST", # "/o/oauth2/token", # urllib.parse.urlencode( # { # "client_id": self.client_id, # "client_secret": self.client_secret, # "code": self.device_code, # "grant_type": "http://oauth.net/grant_type/device/1.0", # } # ), # {"Content-type": "application/x-www-form-urlencoded"}, # ) # response = self.conn.getresponse() # if response.status == 200: # data = json.loads(response.read()) # if "access_token" in data: # self.token = data # self.save_token() # else: # time.sleep(self.retry_interval + 2) # else: # print("gdata: failed to get token") # print((response.status)) # print((response.read())) # def refresh_token(self) -> bool: # if self.checking_too_often(): # print("gdata: not refreshing yet, too soon...") # return False # else: # print("gdata: trying to refresh oauth token...") # self.reset_connection() # if self.token is None: # return False # refresh_token = self.token["refresh_token"] # self.conn.request( # "POST", # "/o/oauth2/token", # urllib.parse.urlencode( # { # "client_id": self.client_id, # "client_secret": self.client_secret, # "refresh_token": refresh_token, # "grant_type": "refresh_token", # } # ), # {"Content-type": "application/x-www-form-urlencoded"}, # ) # response = self.conn.getresponse() # self.last_action = time.time() # if response.status == 200: # data: Dict = json.loads(response.read()) # if "access_token" in data: # self.token = data # # in fact we NEVER get a new refresh token at this point # if not "refresh_token" in self.token: # self.token["refresh_token"] = refresh_token # self.save_token() # return True # print(("gdata: unexpected response %d to renewal request" % response.status)) # print((response.read())) # return False # def checking_too_often(self) -> bool: # now = time.time() # return (now - self.last_action) <= 30 # # https://developers.google.com/picasa-web/ # def photos_service(self): # headers = { # "Authorization": "%s %s" # % (self.token["token_type"], self.token["access_token"]) # } # client = gdata.photos.service.PhotosService(additional_headers=headers) # return client # # https://developers.google.com/drive/ # def docs_service(self): # cred = OAuth2Credentials( # self.token["access_token"], # self.client_id, # self.client_secret, # self.token["refresh_token"], # datetime.datetime.now(), # "http://accounts.google.com/o/oauth2/token", # "KitchenKiosk/0.9", # ) # http = httplib2.Http(disable_ssl_certificate_validation=True) # http = cred.authorize(http) # service = build("drive", "v2", http) # return service # # https://developers.google.com/google-apps/calendar/ # def calendar_service(self): # cred = OAuth2Credentials( # self.token["access_token"], # self.client_id, # self.client_secret, # self.token["refresh_token"], # datetime.datetime.now(), # "http://accounts.google.com/o/oauth2/token", # "KitchenKiosk/0.9", # ) # http = httplib2.Http(disable_ssl_certificate_validation=True) # http = cred.authorize(http) # service = build("calendar", "v3", http) # return service