> On Jul 2, 2020, at 9:06 PM, Kevin J. McCarthy <ke...@8t8.us> wrote: > > With your permission, I'll add this to the contrib directory and add a link > in the manual.
Certainly. Attached is a new version. (1) Token file is now encrypted by a pipe specified near the top of the script. Edit to suit. (2) Supports a localhost redirect variant of the authcode flow in which the script listens on a local port to automatically grab the auth code rather than relying on the user to manually extract the code from the browser address bar. This can only be used when the user's browser is on the same device as mutt, and only with app registrations that have "http://localhost/" among their redirect URIs. If one has configured the script to prefer this flow, but occasionally can't use it because one's browser is on a different device, the flow can be temporarily overridden by specifying --authflow. > The Google oauth script, for instance, has the refresh token for a parameter, > so one could simply store the invocation encrypted and use something like: > set imap_oauth_refresh_command="oauth2.py `gpg --batch -q --decrypt > goauth.gpg`" > to store the sensitive arguments. Not maintaining state and passing in a hardcoded refresh token as a parameter is ultimately prone to failure. From RFC 6749: 6. Refreshing an Access Token [...] The authorization server MAY issue a new refresh token, in which case the client MUST discard the old refresh token and replace it with the new refresh token. The authorization server MAY revoke the old refresh token after issuing a new refresh token to the client. Google seemingly doesn't hand out a new refresh token each time, whereas Microsoft seemingly always hands out a new one yet doesn't invalidate the prior one (nonetheless the new one is preferred as it might remain valid longer). In a project unrelated to mail I discovered a cloud provider who does immediately invalidate the prior refresh token; in fact, that's how I was alerted to this possibility, only later confirming such behavior to be RFC-compliant. Thus it seems, somewhere between mutt and the script, someone must maintain state and replace the refresh token whenever a new one is issued. Given that, it also makes sense to hold on to the access token---reusing it until expiration is more efficient than obtaining a new one each time. This is how I arrived at using a token file. An alternative, if the burden remains entirely outside mutt, is to launch a background process to maintain state in memory, but that wouldn't survive reboots. A hybrid approach would use both a file and a background process: this would allow the data to be encrypted at rest while also giving the script non-interactive access to the unencrypted data in memory. That sounds complicated, but gpg with its gpg-agent provides just such a mechanism! Thanks for the suggestion to look into use of gpg. --Alex
#!/usr/bin/python3 # # Mutt OAuth2 token management script, version 2020-07-06 # # Copyright (C) 2020 Alexander Perlis # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. import sys import json import argparse import urllib.parse import urllib.request import imaplib import poplib import smtplib import base64 import secrets import hashlib import time from datetime import timedelta, datetime from pathlib import Path import socket import http.server import subprocess # The token file must be encrypted because it contains multi-use bearer tokens whose usage does not require # additional verification. Specify whichever encryption and decryption pipes you prefer. They should read # from standard input and write to standard output. ENCRYPTION_PIPE=['gpg', '--encrypt', '--recipient', 'YOUR_GPG_IDENTITY'] DECRYPTION_PIPE=['gpg', '--decrypt'] registrations = { 'google': { 'authorize_endpoint': 'https://accounts.google.com/o/oauth2/auth', 'devicecode_endpoint': 'https://oauth2.googleapis.com/device/code', 'token_endpoint': 'https://accounts.google.com/o/oauth2/token', 'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob', 'imap_endpoint': 'imap.gmail.com', 'pop_endpoint': 'pop.gmail.com', 'smtp_endpoint': 'smtp.gmail.com', 'sasl_method': 'OAUTHBEARER', 'scope': 'https://mail.google.com/', 'client_id': '', 'client_secret': '', }, 'microsoft': { 'authorize_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize', 'devicecode_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/devicecode', 'token_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/token', 'redirect_uri': 'https://login.microsoftonline.com/common/oauth2/nativeclient', 'tenant': 'common', 'imap_endpoint': 'outlook.office365.com', 'pop_endpoint': 'outlook.office365.com', 'smtp_endpoint': 'smtp.office365.com', 'sasl_method': 'XOAUTH2', 'scope': 'offline_access https://outlook.office.com/IMAP.AccessAsUser.All https://outlook.office.com/POP.AccessAsUser.All https://outlook.office.com/SMTP.Send', 'client_id': '', 'client_secret': '', }, } ap = argparse.ArgumentParser(epilog=''' This script obtains and prints a valid OAuth2 access token. State is maintained in an encrypted TOKENFILE. Run with "--verbose --authorize" to get started or whenever all tokens have expired, optionally with "--authflow" to override the default authorization flow. To truly start over from scratch, first delete TOKENFILE. Use "--verbose --test" to test the IMAP/POP/SMTP endpoints. ''') ap.add_argument('-v', '--verbose', action='store_true', help='increase verbosity') ap.add_argument('-d', '--debug', action='store_true', help='enable debug output') ap.add_argument('tokenfile', help='persistent token storage') ap.add_argument('-a', '--authorize', action='store_true', help='manually authorize new tokens') ap.add_argument('--authflow', help='authcode | localhostauthcode | devicecode') ap.add_argument('-t', '--test', action='store_true', help='test IMAP/POP/SMTP endpoints') args = ap.parse_args() token = {} path = Path(args.tokenfile) if path.exists(): if 0o600 != 0o777 & path.stat().st_mode: sys.exit('Token file has unsafe mode. Suggest deleting and starting over.') try: sub = subprocess.run(DECRYPTION_PIPE, check=True, input=path.read_bytes(), capture_output=True) token = json.loads(sub.stdout) except subprocess.CalledProcessError: sys.exit('Difficulty decrypting token file. Is your decryption agent primed for non-interactive usage, or an appropriate environment variable such as GPG_TTY set to allow interactive agent usage from inside a pipe?') def writetokenfile(): if not path.exists(): path.touch(mode=0o600) if 0o600 != 0o777 & path.stat().st_mode: sys.exit('Token file has unsafe mode. Suggest deleting and starting over.') sub = subprocess.run(ENCRYPTION_PIPE, check=True, input=json.dumps(token).encode(), capture_output=True) path.write_bytes(sub.stdout) if args.debug: print('Obtained from token file:', json.dumps(token)) if not token: if not args.authorize: sys.exit('You must run script with "--authorize" at least once.') print('Available app and endpoint registrations:', *registrations) token['registration'] = input('OAuth2 registration: ') token['authflow'] = input('Preferred OAuth2 flow ("authcode" or "localhostauthcode" or "devicecode"): ') token['email'] = input('Account e-mail address: ') token['access_token'] = '' token['access_token_expiration'] = '' token['refresh_token'] = '' writetokenfile() if token['registration'] not in registrations: sys.exit(f'ERROR: Unknown registration "{token["registration"]}". Delete token file and start over.') registration = registrations[token['registration']] authflow = token['authflow'] if args.authflow: authflow = args.authflow baseparams = {'client_id': registration['client_id']} # Microsoft uses 'tenant' but Google does not if 'tenant' in registration: baseparams['tenant'] = registration['tenant'] def access_token_valid(): a = token['access_token_expiration'] return a and datetime.now() < datetime.fromisoformat(a) def update_tokens(r): token['access_token'] = r['access_token'] token['access_token_expiration'] = (datetime.now() + timedelta(seconds=int(r['expires_in']))).isoformat() if 'refresh_token' in r: token['refresh_token'] = r['refresh_token'] writetokenfile() if args.verbose: print(f'NOTICE: Obtained new access token, expires {token["access_token_expiration"]}.') if args.authorize: p = baseparams.copy(); p['scope'] = registration['scope'] if authflow in ('authcode', 'localhostauthcode'): verifier = secrets.token_urlsafe(90) challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())[:-1] redirect_uri = registration['redirect_uri'] port = 0 if authflow == 'localhostauthcode': # Find an available port to listen on s = socket.socket() s.bind(('127.0.0.1', 0)) port = s.getsockname()[1] s.close() redirect_uri = 'http://localhost:'+str(port)+'/' # Probably should edit the port number into the actual redirect URL. p.update({'login_hint': token['email'], 'response_type': 'code', 'redirect_uri': redirect_uri, 'code_challenge': challenge, 'code_challenge_method': 'S256'}) print(registration["authorize_endpoint"]+'?'+urllib.parse.urlencode(p, quote_via=urllib.parse.quote)) authcode = '' if authflow == 'authcode': authcode = input('Visit displayed URL to retrieve authorization code. Enter code from server (might be in browser address bar): ') else: print('Visit displayed URL to authorize this application. Waiting...', end='', flush=True) class MyHandler(http.server.BaseHTTPRequestHandler): def do_HEAD(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() def do_GET(self): global authcode querystring = urllib.parse.urlparse(self.path).query querydict = urllib.parse.parse_qs(querystring) if 'code' in querydict: authcode = querydict['code'][0] self.do_HEAD() self.wfile.write(b'<html><head><title>Authorizaton result</title></head>') self.wfile.write(b'<body><p>Authorization redirect completed. You may close this window.</p></body></html>') with http.server.HTTPServer(('127.0.0.1',port), MyHandler) as httpd: try: httpd.handle_request() except KeyboardInterrupt: pass if not authcode: sys.exit('Did not obtain an authcode.') for k in 'response_type', 'login_hint', 'code_challenge', 'code_challenge_method': del p[k] p.update({'grant_type': 'authorization_code', 'code': authcode, 'client_secret': registration['client_secret'], 'code_verifier': verifier}) try: response = urllib.request.urlopen(registration['token_endpoint'], urllib.parse.urlencode(p).encode()) except urllib.error.HTTPError as err: print(err.code, err.reason) response = err response = response.read() if args.debug: print(response) response = json.loads(response) if 'error' in response: print(response['error']) if 'error_description' in response: print(response['error_description']) sys.exit(1) elif authflow == 'devicecode': try: response = urllib.request.urlopen(registration['devicecode_endpoint'], urllib.parse.urlencode(p).encode()) except urllib.error.HTTPError as err: print(err.code, err.reason) response = err response = response.read() if args.debug: print(response) response = json.loads(response) if 'error' in response: print(response['error']) if 'error_description' in response: print(response['error_description']) sys.exit(1) print(response['message']) del p['scope'] p.update({'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', 'client_secret': registration['client_secret'], 'device_code': response['device_code']}) interval = int(response['interval']) print('Polling...', end='', flush=True) while True: time.sleep(interval) print('.', end='', flush=True) try: response = urllib.request.urlopen(registration['token_endpoint'], urllib.parse.urlencode(p).encode()) except urllib.error.HTTPError as err: # Not actually always an error, might just mean "keep trying..." response = err response = response.read() if args.debug: print(response) response = json.loads(response) if 'error' not in response: break if response['error'] == 'authorization_declined': print(' user declined authorization.') sys.exit(1) if response['error'] == 'expired_token': print(' too much time has elapsed.') sys.exit(1) if response['error'] != 'authorization_pending': print(response['error']) if 'error_description' in response: print(response['error_description']) sys.exit(1) print() else: sys.exit(f'ERROR: Unknown OAuth2 flow "{token["authflow"]}. Delete token file and start over.') update_tokens(response) if not access_token_valid(): if args.verbose: print('NOTICE: Invalid or expired access token; using refresh token to obtain new access token.') if not token['refresh_token']: sys.exit(f'ERROR: No refresh token. Run script with "--authorize".') p = baseparams.copy() p.update({'client_secret': registration['client_secret'], 'refresh_token': token['refresh_token'], 'grant_type': 'refresh_token'}) try: response = urllib.request.urlopen(registration['token_endpoint'], urllib.parse.urlencode(p).encode()) except urllib.error.HTTPError as err: print(err.code, err.reason) response = err response = response.read() if args.debug: print(response) response = json.loads(response) if 'error' in response: print(response['error']) if 'error_description' in response: print(response['error_description']) print(f'Perhaps refresh token invalid. Try running once with "--authorize"') sys.exit(1) update_tokens(response) if not access_token_valid(): sys.exit('ERROR: No valid access token. This should not be able to happen.') if args.verbose: print('Access Token: ', end='') print(token['access_token']) def build_sasl_string(user,host,port,token): if registration['sasl_method'] == 'OAUTHBEARER': return f'n,a={user},\1host={host}\1port={port}\1auth=Bearer {token}\1\1' elif registration['sasl_method'] == 'XOAUTH2': return f'user={user}\1auth=Bearer {token}\1\1' else: sys.exit(f'Unknown SASL method {registration["sasl_method"]}.') if args.test: errors = False imap_conn = imaplib.IMAP4_SSL(registration['imap_endpoint']) sasl_string = build_sasl_string(token['email'], registration['imap_endpoint'], 993, token['access_token']) if args.debug: imap_conn.debug = 4 try: imap_conn.authenticate(registration['sasl_method'], lambda _: sasl_string.encode()) # Microsoft has a bug wherein a mismatch between username and token can still report a # successful login... (Try a consumer login with the token from a work/school account.) # Fortunately subsequent commands fail with an error. Thus we follow AUTH with another # IMAP command before reporting success. imap_conn.list() if args.verbose: print('IMAP authentication succeeded') except imaplib.IMAP4.error as e: print('IMAP authentication FAILED (does your account allow IMAP?):', e) errors = True pop_conn = poplib.POP3_SSL(registration['pop_endpoint']) sasl_string = build_sasl_string(token['email'], registration['pop_endpoint'], 995, token['access_token']) if args.debug: pop_conn.set_debuglevel(2) try: # poplib doesn't have an auth command taking an authenticator object # Microsoft requires a two-line SASL for POP pop_conn._shortcmd('AUTH ' + registration['sasl_method']) pop_conn._shortcmd(base64.standard_b64encode(sasl_string.encode()).decode()) if args.verbose: print('POP authentication succeeded') except poplib.error_proto as e: print('POP authentication FAILED (does your account allow POP?):', e) errors = True # SMTP_SSL would be simpler but Microsoft does not answer on port 465. smtp_conn = smtplib.SMTP(registration['smtp_endpoint'], 587) sasl_string = build_sasl_string(token['email'], registration['smtp_endpoint'], 587, token['access_token']) smtp_conn.ehlo('test') smtp_conn.starttls() smtp_conn.ehlo('test') if args.debug: smtp_conn.set_debuglevel(2) try: smtp_conn.auth(registration['sasl_method'], lambda _ = None: sasl_string) if args.verbose: print('SMTP authentication succeeded') except smtplib.SMTPAuthenticationError as e: print('SMTP authentication FAILED:', e) errors = True if errors: sys.exit(1)