3 # https://developers.google.com/accounts/docs/OAuth2ForDevices
4 # https://developers.google.com/drive/web/auth/web-server
5 # https://developers.google.com/google-apps/calendar/v3/reference/calendars
6 # https://developers.google.com/picasa-web/
12 from apiclient.discovery import build
13 from google_auth_oauthlib.flow import InstalledAppFlow
16 logger = logging.getLogger(__name__)
20 def __init__(self, client_secret_file="client_secret.json"):
21 self.credentials = None
22 self.credentials_pickle = "./credentials.pickle"
23 if os.path.exists(self.credentials_pickle):
25 f"Refreshing credentials from disk pickle file {self.credentials_pickle}"
27 self.credentials = pickle.load(open(self.credentials_pickle, "rb"))
29 logger.debug(f"{self.credentials_pickle} does not exist; calling Google.")
30 self.refresh_credentials(client_secret_file)
32 assert self.credentials is not None
34 def refresh_credentials(self, client_secret_file):
36 "https://www.googleapis.com/auth/calendar.events.readonly",
37 "https://www.googleapis.com/auth/calendar.readonly",
38 "https://www.googleapis.com/auth/drive.readonly",
39 "https://www.googleapis.com/auth/drive.photos.readonly",
40 "https://www.googleapis.com/auth/photoslibrary.readonly",
41 # 'https://www.googleapis.com/auth/keep.readonly',
43 flow = InstalledAppFlow.from_client_secrets_file(
44 self.client_secret_file, scopes=scopes
46 self.credentials = flow.run_console()
49 pickle.dump(self.credentials, open(self.credentials_pickle, "wb"))
51 def calendar_service(self):
52 return build("calendar", "v3", credentials=self.credentials)
54 def keep_service(self):
58 discoveryServiceUrl="https://keep.googleapis.com/$discovery/rest?version=v1",
59 credentials=self.credentials,
61 # print(gkeep_service.notes().list().execute())
65 # def __init__(self, client_id: str, client_secret: str) -> None:
66 # print("gdata: initializing oauth token...")
67 # self.client_id = client_id
68 # self.client_secret = client_secret
69 # self.user_code: Optional[str] = None
70 # # print 'Client id: %s' % (client_id)
71 # # print 'Client secret: %s' % (client_secret)
72 # self.token: Optional[Dict] = None
73 # self.device_code = None
74 # self.verfication_url = None
75 # self.token_file = "client_secrets.json"
77 # 'https://www.googleapis.com/auth/calendar.events.readonly',
78 # 'https://www.googleapis.com/auth/calendar.readonly',
79 # 'https://www.googleapis.com/auth/drive.readonly',
80 # 'https://www.googleapis.com/auth/drive.photos.readonly',
81 # 'https://www.googleapis.com/auth/photoslibrary.readonly',
82 # # 'https://www.googleapis.com/auth/keep.readonly',
84 # self.host = "accounts.google.com"
85 # self.reset_connection()
87 # self.last_action = 0.0
88 # self.ssl_ctx: Optional[ssl.SSLContext] = None
90 # # this setup is isolated because it eventually generates a BadStatusLine
91 # # exception, after which we always get httplib.CannotSendRequest errors.
92 # # When this happens, we try re-creating the exception.
93 # def reset_connection(self) -> None:
94 # self.ssl_ctx = ssl.create_default_context() #cafile="/usr/local/etc/ssl/cert.pem")
95 # http.client.HTTPConnection.debuglevel = 2
96 # self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
98 # def load_token(self) -> None:
100 # if os.path.isfile(self.token_file):
101 # f = open(self.token_file)
102 # json_token = f.read()
103 # self.token = json.loads(json_token)
106 # def save_token(self) -> None:
107 # f = open(self.token_file, "w")
108 # f.write(json.dumps(self.token))
111 # def has_token(self) -> bool:
112 # if self.token is not None:
113 # print("gdata: we have a token!")
115 # print("gdata: we have no token.")
116 # return self.token is not None
118 # def get_user_code(self) -> Optional[str]:
121 # "/o/oauth2/device/code",
122 # urllib.parse.urlencode(
123 # {"client_id": self.client_id, "scope": " ".join(self.scope)}
125 # {"Content-type": "application/x-www-form-urlencoded"},
127 # response = self.conn.getresponse()
128 # if response.status == 200:
129 # data = json.loads(response.read())
130 # self.device_code = data["device_code"]
131 # self.user_code = data["user_code"]
132 # self.verification_url = data["verification_url"]
133 # self.retry_interval = data["interval"]
135 # self.user_code = None
136 # print(f"gdata: {response.status}")
137 # print(response.read())
139 # return self.user_code
141 # def get_new_token(self) -> None:
142 # # call get_device_code if not already set
143 # if self.user_code is None:
144 # print("gdata: getting user code")
145 # self.get_user_code()
147 # while self.token is None:
151 # urllib.parse.urlencode(
153 # "client_id": self.client_id,
154 # "client_secret": self.client_secret,
155 # "code": self.device_code,
156 # "grant_type": "http://oauth.net/grant_type/device/1.0",
159 # {"Content-type": "application/x-www-form-urlencoded"},
161 # response = self.conn.getresponse()
162 # if response.status == 200:
163 # data = json.loads(response.read())
164 # if "access_token" in data:
168 # time.sleep(self.retry_interval + 2)
170 # print("gdata: failed to get token")
171 # print((response.status))
172 # print((response.read()))
174 # def refresh_token(self) -> bool:
175 # if self.checking_too_often():
176 # print("gdata: not refreshing yet, too soon...")
179 # print("gdata: trying to refresh oauth token...")
180 # self.reset_connection()
181 # if self.token is None:
184 # refresh_token = self.token["refresh_token"]
188 # urllib.parse.urlencode(
190 # "client_id": self.client_id,
191 # "client_secret": self.client_secret,
192 # "refresh_token": refresh_token,
193 # "grant_type": "refresh_token",
196 # {"Content-type": "application/x-www-form-urlencoded"},
199 # response = self.conn.getresponse()
200 # self.last_action = time.time()
201 # if response.status == 200:
202 # data: Dict = json.loads(response.read())
203 # if "access_token" in data:
205 # # in fact we NEVER get a new refresh token at this point
206 # if not "refresh_token" in self.token:
207 # self.token["refresh_token"] = refresh_token
210 # print(("gdata: unexpected response %d to renewal request" % response.status))
211 # print((response.read()))
214 # def checking_too_often(self) -> bool:
216 # return (now - self.last_action) <= 30
218 # # https://developers.google.com/picasa-web/
219 # def photos_service(self):
221 # "Authorization": "%s %s"
222 # % (self.token["token_type"], self.token["access_token"])
224 # client = gdata.photos.service.PhotosService(additional_headers=headers)
227 # # https://developers.google.com/drive/
228 # def docs_service(self):
229 # cred = OAuth2Credentials(
230 # self.token["access_token"],
232 # self.client_secret,
233 # self.token["refresh_token"],
234 # datetime.datetime.now(),
235 # "http://accounts.google.com/o/oauth2/token",
236 # "KitchenKiosk/0.9",
238 # http = httplib2.Http(disable_ssl_certificate_validation=True)
239 # http = cred.authorize(http)
240 # service = build("drive", "v2", http)
243 # # https://developers.google.com/google-apps/calendar/
244 # def calendar_service(self):
245 # cred = OAuth2Credentials(
246 # self.token["access_token"],
248 # self.client_secret,
249 # self.token["refresh_token"],
250 # datetime.datetime.now(),
251 # "http://accounts.google.com/o/oauth2/token",
252 # "KitchenKiosk/0.9",
254 # http = httplib2.Http(disable_ssl_certificate_validation=True)
255 # http = cred.authorize(http)
256 # service = build("calendar", "v3", http)