+#!/usr/bin/env python3
+
# https://developers.google.com/accounts/docs/OAuth2ForDevices
# https://developers.google.com/drive/web/auth/web-server
# https://developers.google.com/google-apps/calendar/v3/reference/calendars
# https://developers.google.com/picasa-web/
import sys
-import urllib
+import urllib.request, urllib.parse, urllib.error
+
try:
- import httplib # python2
+ import http.client # python2
except ImportError:
- import http.client # python3
+ import http.client # python3
import os.path
import json
import time
-from oauth2client.client import OAuth2Credentials
-import gdata.calendar.service
-import gdata.docs.service
-import gdata.photos.service, gdata.photos
-from apiclient.discovery import build
-import httplib2
-from apiclient.discovery import build
+from typing import Dict, Optional
+from oauth2client.client import OAuth2Credentials # type: ignore
+import gdata.calendar.service # type: ignore
+import gdata.docs.service # type: ignore
+import gdata.photos.service, gdata.photos # type: ignore
+from googleapiclient.discovery import build # type: ignore
+import httplib2 # type: ignore
+from googleapiclient.discovery import build
import datetime
import ssl
+
class OAuth:
- def __init__(self, client_id, client_secret):
+ def __init__(self, client_id: str, client_secret: str) -> None:
print("gdata: initializing oauth token...")
self.client_id = client_id
self.client_secret = client_secret
- self.user_code = None
- #print 'Client id: %s' % (client_id)
- #print 'Client secret: %s' % (client_secret)
- self.token = None
+ self.user_code: Optional[str] = None
+ # print 'Client id: %s' % (client_id)
+ # print 'Client secret: %s' % (client_secret)
+ self.token: Optional[Dict] = None
self.device_code = None
self.verfication_url = None
- self.token_file = 'client_secrets.json'
+ self.token_file = "client_secrets.json"
self.scope = [
#'https://www.googleapis.com/auth/calendar',
#'https://www.googleapis.com/auth/drive',
#'https://docs.google.com/feeds',
#'https://www.googleapis.com/auth/calendar.readonly',
#'https://picasaweb.google.com/data/',
- 'https://www.googleapis.com/auth/photoslibrary.readonly',
+ "https://www.googleapis.com/auth/photoslibrary.readonly",
#'http://picasaweb.google.com/data/',
#'https://www.google.com/calendar/feeds/',
]
- self.host = 'accounts.google.com'
+ self.host = "accounts.google.com"
self.reset_connection()
self.load_token()
- self.last_action = 0
- self.ssl_ctx = None
+ self.last_action = 0.0
+ self.ssl_ctx: Optional[ssl.SSLContext] = None
# this setup is isolated because it eventually generates a BadStatusLine
# exception, after which we always get httplib.CannotSendRequest errors.
# When this happens, we try re-creating the exception.
- def reset_connection(self):
- self.ssl_ctx = ssl.create_default_context(cafile='/usr/local/etc/ssl/cert.pem')
- httplib.HTTPConnection.debuglevel = 2
- self.conn = httplib.HTTPSConnection(self.host, context=self.ssl_ctx)
+ def reset_connection(self) -> None:
+ self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
+ http.client.HTTPConnection.debuglevel = 2
+ self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
- def load_token(self):
+ def load_token(self) -> None:
token = None
if os.path.isfile(self.token_file):
f = open(self.token_file)
self.token = json.loads(json_token)
f.close()
- def save_token(self):
- f = open(self.token_file, 'w')
+ def save_token(self) -> None:
+ f = open(self.token_file, "w")
f.write(json.dumps(self.token))
f.close()
- def has_token(self):
- if self.token != None:
+ def has_token(self) -> bool:
+ if self.token is not None:
print("gdata: we have a token!")
else:
print("gdata: we have no token.")
- return self.token != None
+ return self.token is not None
- def get_user_code(self):
+ def get_user_code(self) -> Optional[str]:
self.conn.request(
"POST",
"/o/oauth2/device/code",
- urllib.urlencode({
- 'client_id': self.client_id,
- 'scope' : ' '.join(self.scope)
- }),
- {"Content-type": "application/x-www-form-urlencoded"})
+ urllib.parse.urlencode(
+ {"client_id": self.client_id, "scope": " ".join(self.scope)}
+ ),
+ {"Content-type": "application/x-www-form-urlencoded"},
+ )
response = self.conn.getresponse()
if response.status == 200:
data = json.loads(response.read())
- self.device_code = data['device_code']
- self.user_code = data['user_code']
- self.verification_url = data['verification_url']
- self.retry_interval = data['interval']
+ self.device_code = data["device_code"]
+ self.user_code = data["user_code"]
+ self.verification_url = data["verification_url"]
+ self.retry_interval = data["interval"]
else:
- print("gdata: %d" % response.status)
+ self.user_code = None
+ print(f"gdata: {response.status}")
print(response.read())
- sys.exit()
+ sys.exit(-1)
return self.user_code
- def get_new_token(self):
+ def get_new_token(self) -> None:
# call get_device_code if not already set
- if self.user_code == None:
+ if self.user_code is None:
print("gdata: getting user code")
self.get_user_code()
- while self.token == None:
+ while self.token is None:
self.conn.request(
"POST",
"/o/oauth2/token",
- urllib.urlencode({
- 'client_id' : self.client_id,
- 'client_secret' : self.client_secret,
- 'code' : self.device_code,
- 'grant_type' : 'http://oauth.net/grant_type/device/1.0'
- }),
- {"Content-type": "application/x-www-form-urlencoded"})
+ urllib.parse.urlencode(
+ {
+ "client_id": self.client_id,
+ "client_secret": self.client_secret,
+ "code": self.device_code,
+ "grant_type": "http://oauth.net/grant_type/device/1.0",
+ }
+ ),
+ {"Content-type": "application/x-www-form-urlencoded"},
+ )
response = self.conn.getresponse()
if response.status == 200:
data = json.loads(response.read())
- if 'access_token' in data:
+ if "access_token" in data:
self.token = data
self.save_token()
else:
time.sleep(self.retry_interval + 2)
else:
print("gdata: failed to get token")
- print(response.status)
- print(response.read())
+ print((response.status))
+ print((response.read()))
- def refresh_token(self):
+ def refresh_token(self) -> bool:
if self.checking_too_often():
print("gdata: not refreshing yet, too soon...")
return False
else:
- print('gdata: trying to refresh oauth token...')
+ print("gdata: trying to refresh oauth token...")
self.reset_connection()
- refresh_token = self.token['refresh_token']
+ if self.token is None:
+ return False
+
+ refresh_token = self.token["refresh_token"]
self.conn.request(
"POST",
"/o/oauth2/token",
- urllib.urlencode({
- 'client_id' : self.client_id,
- 'client_secret' : self.client_secret,
- 'refresh_token' : refresh_token,
- 'grant_type' : 'refresh_token'
- }),
- {"Content-type": "application/x-www-form-urlencoded"})
+ urllib.parse.urlencode(
+ {
+ "client_id": self.client_id,
+ "client_secret": self.client_secret,
+ "refresh_token": refresh_token,
+ "grant_type": "refresh_token",
+ }
+ ),
+ {"Content-type": "application/x-www-form-urlencoded"},
+ )
response = self.conn.getresponse()
self.last_action = time.time()
if response.status == 200:
- data = json.loads(response.read())
- if 'access_token' in data:
+ data: Dict = json.loads(response.read())
+ if "access_token" in data:
self.token = data
# in fact we NEVER get a new refresh token at this point
- if not 'refresh_token' in self.token:
- self.token['refresh_token'] = refresh_token
+ if not "refresh_token" in self.token:
+ self.token["refresh_token"] = refresh_token
self.save_token()
return True
- print("gdata: unexpected response %d to renewal request" % response.status)
- print(response.read())
+ print(("gdata: unexpected response %d to renewal request" % response.status))
+ print((response.read()))
return False
- def checking_too_often(self):
+ def checking_too_often(self) -> bool:
now = time.time()
return (now - self.last_action) <= 30
# https://developers.google.com/picasa-web/
def photos_service(self):
headers = {
- "Authorization": "%s %s" % (self.token['token_type'], self.token['access_token'])
+ "Authorization": "%s %s"
+ % (self.token["token_type"], self.token["access_token"])
}
client = gdata.photos.service.PhotosService(additional_headers=headers)
return client
# https://developers.google.com/drive/
def docs_service(self):
- cred = OAuth2Credentials(self.token['access_token'],
- self.client_id,
- self.client_secret,
- self.token['refresh_token'],
- datetime.datetime.now(),
- 'http://accounts.google.com/o/oauth2/token',
- 'KitchenKiosk/0.9')
+ cred = OAuth2Credentials(
+ self.token["access_token"],
+ self.client_id,
+ self.client_secret,
+ self.token["refresh_token"],
+ datetime.datetime.now(),
+ "http://accounts.google.com/o/oauth2/token",
+ "KitchenKiosk/0.9",
+ )
http = httplib2.Http(disable_ssl_certificate_validation=True)
http = cred.authorize(http)
- service = build('drive', 'v2', http)
+ service = build("drive", "v2", http)
return service
# https://developers.google.com/google-apps/calendar/
def calendar_service(self):
- cred = OAuth2Credentials(self.token['access_token'],
- self.client_id,
- self.client_secret,
- self.token['refresh_token'],
- datetime.datetime.now(),
- 'http://accounts.google.com/o/oauth2/token',
- 'KitchenKiosk/0.9')
+ cred = OAuth2Credentials(
+ self.token["access_token"],
+ self.client_id,
+ self.client_secret,
+ self.token["refresh_token"],
+ datetime.datetime.now(),
+ "http://accounts.google.com/o/oauth2/token",
+ "KitchenKiosk/0.9",
+ )
http = httplib2.Http(disable_ssl_certificate_validation=True)
http = cred.authorize(http)
- service = build('calendar', 'v3', http)
+ service = build("calendar", "v3", http)
return service