--- /dev/null
+# oauth2_token_manager
+
+This script allows to obtain oauth2 tokens from Microsoft web services. These
+can then be used by software like [mutt](https://www.mutt.org/),
+[neomutt](https://neomutt.org/), [msmtp](http://msmtp.sourceforge.net/),
+[offlineimap](http://www.offlineimap.org/),
+[mbsync](http://isync.sourceforge.net/) to access our email accounts at
+[Micorsoft 365](https://en.wikipedia.org/wiki/Microsoft_365)[^office365] or
+Gmail.
+
+Large parts of this script are based on
+[mutt_oauth2.py](https://github.com/muttmua/mutt/blob/master/contrib/mutt_oauth2.py)
+by Alexander Perlis found in the mutt repository. It compares as follows:
+
+ - The token file encryption is symmetric and performed by openssl rather than
+ gpg. This makes it easier to integrate with password or wallet managers
+ without reoccurring interactions with gpg-agent.
+
+ - The token files are stored in `~/.cache/` or whatever the equivalent on
+ your platform is.
+
+ - The entire code base has been largely redesigned, various authorization
+ variations have been stripped and the test code for IMAP, POP and SMTP
+ endpoints has been removed.
+
+ - The original script also supported Google services, while this one at the
+ moment only supports Microsoft services.
+
+## Installation
+
+This script is written in Python 3 and requires the following packages:
+
+ - `requests`
+ - `platformdirs`
+
+These can be installed using pip:
+
+```sh
+pip install -r requirements.txt
+```
+
+
+## Usage
+
+On first use, authenticate for some account and store the token in a
+token file `myaccount`:
+
+```sh
+oauth2_token_manager -p password -t myaccount -a
+```
+
+On subsequent uses, retrieve the token as follows:
+```sh
+oauth2_token_manager -p password -t myaccount
+```
+
+Instead of passing "password" on the command line, it is **highly recommended**
+to pass it by a password manager!
+
+```sh
+# For example, with pass:
+oauth2_token_manager -p $(pass show oauth2_token_manager/myaccount)
+
+# Or with kwallet:
+oauth2_token_manager -p $(kwallet-query kdewallet -f oauth2_token_manager -r myaccount)
+```
+
+In case of kallet, you have to create the folder "oauth2_token_manager" and
+create the entry "myaccount" in the kwallet manager. You can check the
+available entries with:
+
+```sh
+kwallet-query kdewallet -f oauth2_token_manager -l
+```
+
+
+## Example with mbsync
+
+I use this script together with mbsync to sync my mail. In `~/.mbsyncrc` you can use the following config:
+
+```
+IMAPAccount myaccount
+Host outlook.office365.com
+User username@example.com
+AuthMechs XOAUTH2
+PassCmd "oauth2_token_manager -p $(kwallet-query kdewallet -f oauth2_token_manager -r myaccount) -t myaccount"
+SSLType IMAPS
+```
+
+
+[^office365]: I gave up tracking the different product, marketing and
+ technology names Microsoft uses for their email services. I think it is
+ called Microsoft 365 now. But I am not sure. I am sure that it will change
+ again in the future. (Paragraph phrased by GitHub's Copilot.)
--- /dev/null
+#!/usr/bin/env python3
+
+"""A program to retrieve, store and read oauth2 tokens."""
+
+__author__ = "Stefan Huber"
+__license__ = "GPL-3.0"
+__version__ = "0.1.0"
+
+# Copyright (c) 2024 Stefan Huber
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# his program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <https://www.gnu.org/licenses/>.
+
+# The oauth2 API code is largely based on mutt_oauth2.py by Alexander Perlis
+# (copyright 2020), licensed by GPL-2.
+
+
+from platformdirs import user_cache_dir
+import argparse
+import datetime
+import json
+import os
+import requests
+import subprocess
+import sys
+import time
+
+global args
+
+services = {
+ # https://hackmd.io/@linD026/mutt-oauth2
+ # "google": {
+ # the devicecode endpoint does not work with mail.google.com scope, it seams
+ # "devicecode_endpoint": "https://oauth2.googleapis.com/device/code",
+ # "token_endpoint": "https://accounts.google.com/o/oauth2/token",
+ # "scope": "https://mail.google.com/",
+ # },
+ "microsoft": {
+ # https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code
+ "devicecode_endpoint":
+ "https://login.microsoftonline.com/common/oauth2/v2.0/devicecode",
+ # https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-auth-code-flow
+ "token_endpoint":
+ "https://login.microsoftonline.com/common/oauth2/v2.0/token",
+ "tenant": "common",
+ "scope": (
+ "offline_access https://outlook.office.com/IMAP.AccessAsUser.All "
+ "https://outlook.office.com/POP.AccessAsUser.All "
+ "https://outlook.office.com/SMTP.Send"),
+ "client_secret": "",
+ },
+}
+
+
+def debug_log(*msg):
+ """Print to stderr if debug is enabled."""
+ if args.debug:
+ print(" [debug] ", *msg, file=sys.stderr)
+
+
+def verbose_log(*msg, **kwargs):
+ """Print to stderr if verbose is enabled."""
+ if args.verbose:
+ print("[info] ", *msg, file=sys.stderr, **kwargs)
+
+
+def error_log(*msg):
+ """Print to stderr."""
+ print("[err] ", *msg, file=sys.stderr)
+
+
+def get_service_api(token):
+ """Return the service API for the token file."""
+ return services[token["service"]]
+
+
+def create_request_baseparams(token):
+ """Create the base parameters for API request."""
+ api = get_service_api(token)
+ p = {"client_id": token["client_id"]}
+
+ # Microsoft uses "tenant" but Google does not
+ if "tenant" in api:
+ p["tenant"] = api["tenant"]
+
+ return p
+
+
+def create_pretoken():
+ """Create a token dictionary with base infos from user input prior sending
+ it to the API."""
+
+ token = {}
+
+ token["service"] = args.service or input("Choose service: ")
+ token["email"] = args.email or input("Account e-mail address: ")
+ token["client_id"] = args.client_id or input("Client ID: ")
+ token["access_token"] = ""
+ token["access_token_expiration"] = ""
+ token["refresh_token"] = ""
+
+ if token["service"] not in services:
+ error_log("Unknown service.")
+ sys.exit(1)
+
+ # Replace special values of client id
+ if token["client_id"].lower() == "thunderbird":
+ token["client_id"] = "9e5f94bc-e8a4-4e73-b8be-63364c29d753"
+ if token["client_id"].lower() == "outlook":
+ token["client_id"] = "d3590ed6-52b3-4102-aeff-aad2292ab01c"
+
+ return token
+
+
+def run_authenticate():
+ """Run the command to authenticate the user and store the token."""
+
+ verbose_log("Starting 2-step authenticating procedure...")
+
+ token = create_pretoken()
+ api = get_service_api(token)
+
+ p = create_request_baseparams(token)
+ p["scope"] = api["scope"]
+
+ try:
+ verbose_log("Step 1: Requesting devicecode...")
+ r = requests.get(api["devicecode_endpoint"], data=p)
+ debug_log("Response: status {} -> {} ".format(r.status_code, r.text))
+ response = r.json()
+ except requests.exceptions.RequestException as e:
+ error_log("Error requesting devicecode: ", e)
+ sys.exit(1)
+
+ if "error" in response:
+ error_log(response["error"])
+ if "error_description" in response:
+ error_log(response["error_description"])
+ sys.exit(1)
+
+ verbose_log("Message from the service:")
+ print(response["message"])
+
+ p = create_request_baseparams(token)
+ p |= {"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "client_secret": api["client_secret"],
+ "device_code": response["device_code"]}
+
+ interval = int(response["interval"])
+ verbose_log("Step 2: Receiving tokens from service...")
+ print("Polling for token ({} secs): ".format(interval), end="", flush=True)
+ while True:
+ time.sleep(interval)
+
+ if not args.verbose:
+ print(".", end="", flush=True)
+
+ try:
+ verbose_log("Requesting token...")
+ r = requests.get(api["token_endpoint"], data=p)
+ debug_log("Got status {}: {} ".format(r.status_code, r.text))
+ response = r.json()
+ except requests.exceptions.RequestException as e:
+ # Not actually always an error, might just mean "keep trying..."
+ debug_log("Error requesting token: ", e)
+ continue
+
+ # No error, we got the token
+ if "error" not in response:
+ verbose_log("Done polling. Got token.")
+ break
+
+ # Got an error message
+ err = response["error"]
+
+ # Still pending, try again...
+ if err == "authorization_pending":
+ continue
+
+ if err == "authorization_declined":
+ error_log("User declined authorization.")
+ elif err == "expired_token":
+ error_log("Too much time has elapsed.")
+ else:
+ error_log(response["error"])
+ if "error_description" in response:
+ error_log(response["error_description"])
+
+ sys.exit(1)
+
+ # Finish "." printing
+ if not args.verbose:
+ print()
+
+ update_token(token, response)
+ write_token(token)
+
+
+def run_updatetoken(token):
+ """Run the command to update the token file with a new access token."""
+
+ verbose_log("Using refresh token to obtain new access token.")
+ if not token["refresh_token"]:
+ error_log("No refresh token. Restart with --authorize.")
+ sys.exit(1)
+
+ api = get_service_api(token)
+ p = create_request_baseparams(token)
+ p |= {"client_secret": api["client_secret"],
+ "refresh_token": token["refresh_token"],
+ "grant_type": "refresh_token"}
+
+ try:
+ verbose_log("Requesting token...")
+ r = requests.get(api["token_endpoint"], data=p)
+ debug_log("Got status {}: {} ".format(r.status_code, r.text))
+ response = r.json()
+ except requests.exceptions.RequestException as e:
+ error_log("Error requesting token: " + e)
+ sys.exit(1)
+
+ if "error" in response:
+ error_log(response["error"])
+ if "error_description" in response:
+ error_log(response["error_description"])
+
+ error_log("Error updating expired token. Try restart with --authorize")
+ sys.exit(1)
+
+ verbose_log("Got new access token.")
+ update_token(token, response)
+ write_token(token)
+
+
+def run_printaccesstoken(token):
+ """Run the command to print the token file to stdout."""
+ verbose_log("Printing access token...")
+ print(token["access_token"])
+
+
+def read_token():
+ """Read the token from the token file."""
+ # Check if tokenfile exists
+ if not os.path.exists(args.tokenfile):
+ error_log("Token file not found. Use -a to authenticate first.")
+ sys.exit(1)
+
+ try:
+ return json.loads(read_tokenfile())
+ except Exception as e:
+ error_log("Error reading token file: ", e)
+ error_log("Restart and use -a to authenticate again.")
+ sys.exit(1)
+
+
+def write_token(token):
+ """Write the token to the token file."""
+ try:
+ write_tokenfile(json.dumps(token))
+ except Exception as e:
+ error_log("Error writing token file: ", e)
+ sys.exit(1)
+
+
+def read_tokenfile():
+ """Open, decrypt and return the token file."""
+
+ cmd = ["openssl", "enc", "-aes128", "-pbkdf2", "-d",
+ "-pass", "pass:" + args.password, "-in", args.tokenfile]
+ sub = subprocess.run(cmd, check=True, capture_output=True)
+ content = sub.stdout.decode()
+ debug_log("read from tokenfile:", content)
+ return content
+
+
+def write_tokenfile(content):
+ """Encrypt and write the token file."""
+ cmd = ["openssl", "enc", "-aes128", "-pbkdf2",
+ "-pass", "pass:" + args.password, "-out", args.tokenfile]
+ subprocess.run(cmd, input=content.encode(), check=True)
+
+
+def access_token_valid(token):
+ """Returns True if stored access token is still valid now."""
+ exp = token["access_token_expiration"]
+ if not exp:
+ return False
+ return datetime.datetime.now() < datetime.datetime.fromisoformat(exp)
+
+
+def update_token(token, response):
+ """Update token by desponse dictionary."""
+
+ token["access_token"] = response["access_token"]
+
+ td = datetime.timedelta(seconds=int(response["expires_in"]))
+ exp = (datetime.datetime.now() + td).isoformat()
+ token["access_token_expiration"] = exp
+ verbose_log("Obtained new access token expiring on {}".format(exp))
+
+ if "refresh_token" in response:
+ token["refresh_token"] = response["refresh_token"]
+
+ write_token(token)
+
+
+if __name__ == "__main__":
+ epilog = """\
+It stores the tokens securely in a file in ~/.cache/outh2_token_manager/ using
+openssl symmetric encryption. It is *highly recommended* that the password
+supplied is stored in a password manager, like pass, or a wallet manager, like
+kwallet.
+
+Example usage:
+
+ On first use, authenticate for some account and store the token in a
+ file:
+ $ oauth2_token_manager -p password -a -t myaccount
+
+ On subsequent uses, retrieve the token from the file:
+ $ oauth2_token_manager -p password -t myaccount
+
+ Instead of passing "password" on the command line, it is *highly
+ recommended* to pass it by a password manager!
+
+ For example, with pass:
+ $ oauth2_token_manager -p $(pass show oauth2_token_manager/myaccount)
+
+ Or with kwallet:
+ $ oauth2_token_manager -p $(kwallet-query kdewallet
+ -f oauth2_token_manager -r myaccount)
+
+ You have to create the folder "oauth2_token_manager" and create the
+ entry "myaccount" in the kwallet manager. You can check the available
+ entries with:
+
+ $ kwallet-query kdewallet -f oauth2_token_manager -l"""
+
+ ap = argparse.ArgumentParser(
+ prog="oauth2_token_manager",
+ description="A program to retrieve, store and read oauth2 tokens.",
+ epilog=epilog,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+
+ ap.add_argument(
+ "-p", "--password",
+ help="the password to use to encrypt/decrypt the token file.",
+ required=True)
+ ap.add_argument(
+ "-t", "--tokenfile",
+ default="token",
+ help="the filename for the encrypted token. "
+ "default: token")
+ ap.add_argument(
+ "-a", "--authenticate",
+ action="store_true",
+ help="authenticate the user and store the token in the file.")
+ ap.add_argument(
+ "--debug",
+ action="store_true",
+ help="print debug information.")
+ ap.add_argument(
+ "--verbose",
+ action="store_true",
+ help="print verbose information.")
+ ap.add_argument(
+ "--email",
+ default=None,
+ help="the e-mail address to use for authentication. "
+ "only used with --authenticate.")
+ ap.add_argument(
+ "--service",
+ default=None,
+ help="the service to authenticate against. "
+ "only used with --authenticate. "
+ "available services: " + ", ".join(services))
+ ap.add_argument(
+ "--client-id",
+ default=None,
+ help="the client id to use for authentication. "
+ "only used with --authenticate."
+ "special values: thunderbird, outlook")
+
+ if len(sys.argv) == 1:
+ ap.print_help()
+ exit(1)
+
+ args = ap.parse_args()
+ args.tokenfile = os.path.expanduser(args.tokenfile)
+
+ # --debug implies --verbose
+ if args.debug:
+ args.verbose = True
+
+ cachedir = user_cache_dir("oauth2_token_manager", ensure_exists=True)
+ args.tokenfile = os.path.join(cachedir, args.tokenfile)
+
+ debug_log("Token file: ", args.tokenfile)
+
+ if args.authenticate:
+ run_authenticate()
+
+ token = read_token()
+
+ if not access_token_valid(token):
+ run_updatetoken(token)
+
+ if not access_token_valid(token):
+ error_log("No valid access token. Update failed!?")
+ sys.exit(1)
+
+ run_printaccesstoken(token)