Adding type annotations and fixing up formatting.
[kiosk.git] / gdata_oauth.py
1 #!/usr/bin/env python3
2
3 # https://developers.google.com/accounts/docs/OAuth2ForDevices
4 # https://developers.google.com/drive/web/auth/web-server
5 # https://developers.google.com/google-apps/calendar/v3/reference/calendars
6 # https://developers.google.com/picasa-web/
7
8 import sys
9 import urllib.request, urllib.parse, urllib.error
10
11 try:
12     import http.client  # python2
13 except ImportError:
14     import http.client  # python3
15 import os.path
16 import json
17 import time
18 from oauth2client.client import OAuth2Credentials
19 import gdata.calendar.service
20 import gdata.docs.service
21 import gdata.photos.service, gdata.photos
22 from googleapiclient.discovery import build
23 import httplib2
24 from googleapiclient.discovery import build
25 import datetime
26 import ssl
27
28
29 class OAuth:
30     def __init__(self, client_id: str, client_secret: str) -> None:
31         print("gdata: initializing oauth token...")
32         self.client_id = client_id
33         self.client_secret = client_secret
34         self.user_code = None
35         # print 'Client id: %s' % (client_id)
36         # print 'Client secret: %s' % (client_secret)
37         self.token = None
38         self.device_code = None
39         self.verfication_url = None
40         self.token_file = "client_secrets.json"
41         self.scope = [
42             #'https://www.googleapis.com/auth/calendar',
43             #'https://www.googleapis.com/auth/drive',
44             #'https://docs.google.com/feeds',
45             #'https://www.googleapis.com/auth/calendar.readonly',
46             #'https://picasaweb.google.com/data/',
47             "https://www.googleapis.com/auth/photoslibrary.readonly",
48             #'http://picasaweb.google.com/data/',
49             #'https://www.google.com/calendar/feeds/',
50         ]
51         self.host = "accounts.google.com"
52         self.reset_connection()
53         self.load_token()
54         self.last_action = 0
55         self.ssl_ctx = None
56
57     # this setup is isolated because it eventually generates a BadStatusLine
58     # exception, after which we always get httplib.CannotSendRequest errors.
59     # When this happens, we try re-creating the exception.
60     def reset_connection(self) -> None:
61         self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
62         http.client.HTTPConnection.debuglevel = 2
63         self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
64
65     def load_token(self) -> None:
66         token = None
67         if os.path.isfile(self.token_file):
68             f = open(self.token_file)
69             json_token = f.read()
70             self.token = json.loads(json_token)
71             f.close()
72
73     def save_token(self) -> None:
74         f = open(self.token_file, "w")
75         f.write(json.dumps(self.token))
76         f.close()
77
78     def has_token(self) -> bool:
79         if self.token != None:
80             print("gdata: we have a token!")
81         else:
82             print("gdata: we have no token.")
83         return self.token != None
84
85     def get_user_code(self) -> str:
86         self.conn.request(
87             "POST",
88             "/o/oauth2/device/code",
89             urllib.parse.urlencode(
90                 {"client_id": self.client_id, "scope": " ".join(self.scope)}
91             ),
92             {"Content-type": "application/x-www-form-urlencoded"},
93         )
94         response = self.conn.getresponse()
95         if response.status == 200:
96             data = json.loads(response.read())
97             self.device_code = data["device_code"]
98             self.user_code = data["user_code"]
99             self.verification_url = data["verification_url"]
100             self.retry_interval = data["interval"]
101         else:
102             print(f"gdata: {response.status}")
103             print(response.read())
104             sys.exit(-1)
105         return self.user_code
106
107     def get_new_token(self) -> None:
108         # call get_device_code if not already set
109         if self.user_code == None:
110             print("gdata: getting user code")
111             self.get_user_code()
112
113         while self.token == None:
114             self.conn.request(
115                 "POST",
116                 "/o/oauth2/token",
117                 urllib.parse.urlencode(
118                     {
119                         "client_id": self.client_id,
120                         "client_secret": self.client_secret,
121                         "code": self.device_code,
122                         "grant_type": "http://oauth.net/grant_type/device/1.0",
123                     }
124                 ),
125                 {"Content-type": "application/x-www-form-urlencoded"},
126             )
127             response = self.conn.getresponse()
128             if response.status == 200:
129                 data = json.loads(response.read())
130                 if "access_token" in data:
131                     self.token = data
132                     self.save_token()
133                 else:
134                     time.sleep(self.retry_interval + 2)
135             else:
136                 print("gdata: failed to get token")
137                 print((response.status))
138                 print((response.read()))
139
140     def refresh_token(self) -> bool:
141         if self.checking_too_often():
142             print("gdata: not refreshing yet, too soon...")
143             return False
144         else:
145             print("gdata: trying to refresh oauth token...")
146         self.reset_connection()
147         refresh_token = self.token["refresh_token"]
148         self.conn.request(
149             "POST",
150             "/o/oauth2/token",
151             urllib.parse.urlencode(
152                 {
153                     "client_id": self.client_id,
154                     "client_secret": self.client_secret,
155                     "refresh_token": refresh_token,
156                     "grant_type": "refresh_token",
157                 }
158             ),
159             {"Content-type": "application/x-www-form-urlencoded"},
160         )
161
162         response = self.conn.getresponse()
163         self.last_action = time.time()
164         if response.status == 200:
165             data = json.loads(response.read())
166             if "access_token" in data:
167                 self.token = data
168                 # in fact we NEVER get a new refresh token at this point
169                 if not "refresh_token" in self.token:
170                     self.token["refresh_token"] = refresh_token
171                     self.save_token()
172                 return True
173         print(("gdata: unexpected response %d to renewal request" % response.status))
174         print((response.read()))
175         return False
176
177     def checking_too_often(self) -> bool:
178         now = time.time()
179         return (now - self.last_action) <= 30
180
181     # https://developers.google.com/picasa-web/
182     def photos_service(self):
183         headers = {
184             "Authorization": "%s %s"
185             % (self.token["token_type"], self.token["access_token"])
186         }
187         client = gdata.photos.service.PhotosService(additional_headers=headers)
188         return client
189
190     # https://developers.google.com/drive/
191     def docs_service(self):
192         cred = OAuth2Credentials(
193             self.token["access_token"],
194             self.client_id,
195             self.client_secret,
196             self.token["refresh_token"],
197             datetime.datetime.now(),
198             "http://accounts.google.com/o/oauth2/token",
199             "KitchenKiosk/0.9",
200         )
201         http = httplib2.Http(disable_ssl_certificate_validation=True)
202         http = cred.authorize(http)
203         service = build("drive", "v2", http)
204         return service
205
206     # https://developers.google.com/google-apps/calendar/
207     def calendar_service(self):
208         cred = OAuth2Credentials(
209             self.token["access_token"],
210             self.client_id,
211             self.client_secret,
212             self.token["refresh_token"],
213             datetime.datetime.now(),
214             "http://accounts.google.com/o/oauth2/token",
215             "KitchenKiosk/0.9",
216         )
217         http = httplib2.Http(disable_ssl_certificate_validation=True)
218         http = cred.authorize(http)
219         service = build("calendar", "v3", http)
220         return service