From: Stefan Huber Date: Fri, 16 Aug 2024 21:17:42 +0000 (+0200) Subject: Initial commit X-Git-Url: https://git.sthu.org/?a=commitdiff_plain;h=b3835991b4e548437e841a986cb8bdb92b7f10eb;p=oauth2_token_manager.git Initial commit --- b3835991b4e548437e841a986cb8bdb92b7f10eb diff --git a/REAME.md b/REAME.md new file mode 100644 index 0000000..2e89f52 --- /dev/null +++ b/REAME.md @@ -0,0 +1,94 @@ +# oauth2_token_manager + +This script allows to obtain oauth2 tokens from Microsoft web services. These +can then be used by software like [mutt](https://www.mutt.org/), +[neomutt](https://neomutt.org/), [msmtp](http://msmtp.sourceforge.net/), +[offlineimap](http://www.offlineimap.org/), +[mbsync](http://isync.sourceforge.net/) to access our email accounts at +[Micorsoft 365](https://en.wikipedia.org/wiki/Microsoft_365)[^office365] or +Gmail. + +Large parts of this script are based on +[mutt_oauth2.py](https://github.com/muttmua/mutt/blob/master/contrib/mutt_oauth2.py) +by Alexander Perlis found in the mutt repository. It compares as follows: + + - The token file encryption is symmetric and performed by openssl rather than + gpg. This makes it easier to integrate with password or wallet managers + without reoccurring interactions with gpg-agent. + + - The token files are stored in `~/.cache/` or whatever the equivalent on + your platform is. + + - The entire code base has been largely redesigned, various authorization + variations have been stripped and the test code for IMAP, POP and SMTP + endpoints has been removed. + + - The original script also supported Google services, while this one at the + moment only supports Microsoft services. + +## Installation + +This script is written in Python 3 and requires the following packages: + + - `requests` + - `platformdirs` + +These can be installed using pip: + +```sh +pip install -r requirements.txt +``` + + +## Usage + +On first use, authenticate for some account and store the token in a +token file `myaccount`: + +```sh +oauth2_token_manager -p password -t myaccount -a +``` + +On subsequent uses, retrieve the token as follows: +```sh +oauth2_token_manager -p password -t myaccount +``` + +Instead of passing "password" on the command line, it is **highly recommended** +to pass it by a password manager! + +```sh +# For example, with pass: +oauth2_token_manager -p $(pass show oauth2_token_manager/myaccount) + +# Or with kwallet: +oauth2_token_manager -p $(kwallet-query kdewallet -f oauth2_token_manager -r myaccount) +``` + +In case of kallet, you have to create the folder "oauth2_token_manager" and +create the entry "myaccount" in the kwallet manager. You can check the +available entries with: + +```sh +kwallet-query kdewallet -f oauth2_token_manager -l +``` + + +## Example with mbsync + +I use this script together with mbsync to sync my mail. In `~/.mbsyncrc` you can use the following config: + +``` +IMAPAccount myaccount +Host outlook.office365.com +User username@example.com +AuthMechs XOAUTH2 +PassCmd "oauth2_token_manager -p $(kwallet-query kdewallet -f oauth2_token_manager -r myaccount) -t myaccount" +SSLType IMAPS +``` + + +[^office365]: I gave up tracking the different product, marketing and + technology names Microsoft uses for their email services. I think it is + called Microsoft 365 now. But I am not sure. I am sure that it will change + again in the future. (Paragraph phrased by GitHub's Copilot.) diff --git a/oauth2_token_manager b/oauth2_token_manager new file mode 100755 index 0000000..aeb9810 --- /dev/null +++ b/oauth2_token_manager @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 + +"""A program to retrieve, store and read oauth2 tokens.""" + +__author__ = "Stefan Huber" +__license__ = "GPL-3.0" +__version__ = "0.1.0" + +# Copyright (c) 2024 Stefan Huber +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# his program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +# The oauth2 API code is largely based on mutt_oauth2.py by Alexander Perlis +# (copyright 2020), licensed by GPL-2. + + +from platformdirs import user_cache_dir +import argparse +import datetime +import json +import os +import requests +import subprocess +import sys +import time + +global args + +services = { + # https://hackmd.io/@linD026/mutt-oauth2 + # "google": { + # the devicecode endpoint does not work with mail.google.com scope, it seams + # "devicecode_endpoint": "https://oauth2.googleapis.com/device/code", + # "token_endpoint": "https://accounts.google.com/o/oauth2/token", + # "scope": "https://mail.google.com/", + # }, + "microsoft": { + # https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code + "devicecode_endpoint": + "https://login.microsoftonline.com/common/oauth2/v2.0/devicecode", + # https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-auth-code-flow + "token_endpoint": + "https://login.microsoftonline.com/common/oauth2/v2.0/token", + "tenant": "common", + "scope": ( + "offline_access https://outlook.office.com/IMAP.AccessAsUser.All " + "https://outlook.office.com/POP.AccessAsUser.All " + "https://outlook.office.com/SMTP.Send"), + "client_secret": "", + }, +} + + +def debug_log(*msg): + """Print to stderr if debug is enabled.""" + if args.debug: + print(" [debug] ", *msg, file=sys.stderr) + + +def verbose_log(*msg, **kwargs): + """Print to stderr if verbose is enabled.""" + if args.verbose: + print("[info] ", *msg, file=sys.stderr, **kwargs) + + +def error_log(*msg): + """Print to stderr.""" + print("[err] ", *msg, file=sys.stderr) + + +def get_service_api(token): + """Return the service API for the token file.""" + return services[token["service"]] + + +def create_request_baseparams(token): + """Create the base parameters for API request.""" + api = get_service_api(token) + p = {"client_id": token["client_id"]} + + # Microsoft uses "tenant" but Google does not + if "tenant" in api: + p["tenant"] = api["tenant"] + + return p + + +def create_pretoken(): + """Create a token dictionary with base infos from user input prior sending + it to the API.""" + + token = {} + + token["service"] = args.service or input("Choose service: ") + token["email"] = args.email or input("Account e-mail address: ") + token["client_id"] = args.client_id or input("Client ID: ") + token["access_token"] = "" + token["access_token_expiration"] = "" + token["refresh_token"] = "" + + if token["service"] not in services: + error_log("Unknown service.") + sys.exit(1) + + # Replace special values of client id + if token["client_id"].lower() == "thunderbird": + token["client_id"] = "9e5f94bc-e8a4-4e73-b8be-63364c29d753" + if token["client_id"].lower() == "outlook": + token["client_id"] = "d3590ed6-52b3-4102-aeff-aad2292ab01c" + + return token + + +def run_authenticate(): + """Run the command to authenticate the user and store the token.""" + + verbose_log("Starting 2-step authenticating procedure...") + + token = create_pretoken() + api = get_service_api(token) + + p = create_request_baseparams(token) + p["scope"] = api["scope"] + + try: + verbose_log("Step 1: Requesting devicecode...") + r = requests.get(api["devicecode_endpoint"], data=p) + debug_log("Response: status {} -> {} ".format(r.status_code, r.text)) + response = r.json() + except requests.exceptions.RequestException as e: + error_log("Error requesting devicecode: ", e) + sys.exit(1) + + if "error" in response: + error_log(response["error"]) + if "error_description" in response: + error_log(response["error_description"]) + sys.exit(1) + + verbose_log("Message from the service:") + print(response["message"]) + + p = create_request_baseparams(token) + p |= {"grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "client_secret": api["client_secret"], + "device_code": response["device_code"]} + + interval = int(response["interval"]) + verbose_log("Step 2: Receiving tokens from service...") + print("Polling for token ({} secs): ".format(interval), end="", flush=True) + while True: + time.sleep(interval) + + if not args.verbose: + print(".", end="", flush=True) + + try: + verbose_log("Requesting token...") + r = requests.get(api["token_endpoint"], data=p) + debug_log("Got status {}: {} ".format(r.status_code, r.text)) + response = r.json() + except requests.exceptions.RequestException as e: + # Not actually always an error, might just mean "keep trying..." + debug_log("Error requesting token: ", e) + continue + + # No error, we got the token + if "error" not in response: + verbose_log("Done polling. Got token.") + break + + # Got an error message + err = response["error"] + + # Still pending, try again... + if err == "authorization_pending": + continue + + if err == "authorization_declined": + error_log("User declined authorization.") + elif err == "expired_token": + error_log("Too much time has elapsed.") + else: + error_log(response["error"]) + if "error_description" in response: + error_log(response["error_description"]) + + sys.exit(1) + + # Finish "." printing + if not args.verbose: + print() + + update_token(token, response) + write_token(token) + + +def run_updatetoken(token): + """Run the command to update the token file with a new access token.""" + + verbose_log("Using refresh token to obtain new access token.") + if not token["refresh_token"]: + error_log("No refresh token. Restart with --authorize.") + sys.exit(1) + + api = get_service_api(token) + p = create_request_baseparams(token) + p |= {"client_secret": api["client_secret"], + "refresh_token": token["refresh_token"], + "grant_type": "refresh_token"} + + try: + verbose_log("Requesting token...") + r = requests.get(api["token_endpoint"], data=p) + debug_log("Got status {}: {} ".format(r.status_code, r.text)) + response = r.json() + except requests.exceptions.RequestException as e: + error_log("Error requesting token: " + e) + sys.exit(1) + + if "error" in response: + error_log(response["error"]) + if "error_description" in response: + error_log(response["error_description"]) + + error_log("Error updating expired token. Try restart with --authorize") + sys.exit(1) + + verbose_log("Got new access token.") + update_token(token, response) + write_token(token) + + +def run_printaccesstoken(token): + """Run the command to print the token file to stdout.""" + verbose_log("Printing access token...") + print(token["access_token"]) + + +def read_token(): + """Read the token from the token file.""" + # Check if tokenfile exists + if not os.path.exists(args.tokenfile): + error_log("Token file not found. Use -a to authenticate first.") + sys.exit(1) + + try: + return json.loads(read_tokenfile()) + except Exception as e: + error_log("Error reading token file: ", e) + error_log("Restart and use -a to authenticate again.") + sys.exit(1) + + +def write_token(token): + """Write the token to the token file.""" + try: + write_tokenfile(json.dumps(token)) + except Exception as e: + error_log("Error writing token file: ", e) + sys.exit(1) + + +def read_tokenfile(): + """Open, decrypt and return the token file.""" + + cmd = ["openssl", "enc", "-aes128", "-pbkdf2", "-d", + "-pass", "pass:" + args.password, "-in", args.tokenfile] + sub = subprocess.run(cmd, check=True, capture_output=True) + content = sub.stdout.decode() + debug_log("read from tokenfile:", content) + return content + + +def write_tokenfile(content): + """Encrypt and write the token file.""" + cmd = ["openssl", "enc", "-aes128", "-pbkdf2", + "-pass", "pass:" + args.password, "-out", args.tokenfile] + subprocess.run(cmd, input=content.encode(), check=True) + + +def access_token_valid(token): + """Returns True if stored access token is still valid now.""" + exp = token["access_token_expiration"] + if not exp: + return False + return datetime.datetime.now() < datetime.datetime.fromisoformat(exp) + + +def update_token(token, response): + """Update token by desponse dictionary.""" + + token["access_token"] = response["access_token"] + + td = datetime.timedelta(seconds=int(response["expires_in"])) + exp = (datetime.datetime.now() + td).isoformat() + token["access_token_expiration"] = exp + verbose_log("Obtained new access token expiring on {}".format(exp)) + + if "refresh_token" in response: + token["refresh_token"] = response["refresh_token"] + + write_token(token) + + +if __name__ == "__main__": + epilog = """\ +It stores the tokens securely in a file in ~/.cache/outh2_token_manager/ using +openssl symmetric encryption. It is *highly recommended* that the password +supplied is stored in a password manager, like pass, or a wallet manager, like +kwallet. + +Example usage: + + On first use, authenticate for some account and store the token in a + file: + $ oauth2_token_manager -p password -a -t myaccount + + On subsequent uses, retrieve the token from the file: + $ oauth2_token_manager -p password -t myaccount + + Instead of passing "password" on the command line, it is *highly + recommended* to pass it by a password manager! + + For example, with pass: + $ oauth2_token_manager -p $(pass show oauth2_token_manager/myaccount) + + Or with kwallet: + $ oauth2_token_manager -p $(kwallet-query kdewallet + -f oauth2_token_manager -r myaccount) + + You have to create the folder "oauth2_token_manager" and create the + entry "myaccount" in the kwallet manager. You can check the available + entries with: + + $ kwallet-query kdewallet -f oauth2_token_manager -l""" + + ap = argparse.ArgumentParser( + prog="oauth2_token_manager", + description="A program to retrieve, store and read oauth2 tokens.", + epilog=epilog, + formatter_class=argparse.RawDescriptionHelpFormatter) + + ap.add_argument( + "-p", "--password", + help="the password to use to encrypt/decrypt the token file.", + required=True) + ap.add_argument( + "-t", "--tokenfile", + default="token", + help="the filename for the encrypted token. " + "default: token") + ap.add_argument( + "-a", "--authenticate", + action="store_true", + help="authenticate the user and store the token in the file.") + ap.add_argument( + "--debug", + action="store_true", + help="print debug information.") + ap.add_argument( + "--verbose", + action="store_true", + help="print verbose information.") + ap.add_argument( + "--email", + default=None, + help="the e-mail address to use for authentication. " + "only used with --authenticate.") + ap.add_argument( + "--service", + default=None, + help="the service to authenticate against. " + "only used with --authenticate. " + "available services: " + ", ".join(services)) + ap.add_argument( + "--client-id", + default=None, + help="the client id to use for authentication. " + "only used with --authenticate." + "special values: thunderbird, outlook") + + if len(sys.argv) == 1: + ap.print_help() + exit(1) + + args = ap.parse_args() + args.tokenfile = os.path.expanduser(args.tokenfile) + + # --debug implies --verbose + if args.debug: + args.verbose = True + + cachedir = user_cache_dir("oauth2_token_manager", ensure_exists=True) + args.tokenfile = os.path.join(cachedir, args.tokenfile) + + debug_log("Token file: ", args.tokenfile) + + if args.authenticate: + run_authenticate() + + token = read_token() + + if not access_token_valid(token): + run_updatetoken(token) + + if not access_token_valid(token): + error_log("No valid access token. Update failed!?") + sys.exit(1) + + run_printaccesstoken(token) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..01247e9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests +platformdirs