]> git.sthu.org Git - oauth2_token_manager.git/commitdiff
Initial commit
authorStefan Huber <shuber@sthu.org>
Fri, 16 Aug 2024 21:17:42 +0000 (23:17 +0200)
committerStefan Huber <shuber@sthu.org>
Wed, 21 Aug 2024 19:58:07 +0000 (21:58 +0200)
REAME.md [new file with mode: 0644]
oauth2_token_manager [new file with mode: 0755]
requirements.txt [new file with mode: 0644]

diff --git a/REAME.md b/REAME.md
new file mode 100644 (file)
index 0000000..2e89f52
--- /dev/null
+++ b/REAME.md
@@ -0,0 +1,94 @@
+# oauth2_token_manager
+
+This script allows to obtain oauth2 tokens from Microsoft web services. These
+can then be used by software like [mutt](https://www.mutt.org/),
+[neomutt](https://neomutt.org/), [msmtp](http://msmtp.sourceforge.net/),
+[offlineimap](http://www.offlineimap.org/),
+[mbsync](http://isync.sourceforge.net/) to access our email accounts at
+[Micorsoft 365](https://en.wikipedia.org/wiki/Microsoft_365)[^office365] or
+Gmail.
+
+Large parts of this script are based on
+[mutt_oauth2.py](https://github.com/muttmua/mutt/blob/master/contrib/mutt_oauth2.py)
+by Alexander Perlis found in the mutt repository. It compares as follows:
+
+  - The token file encryption is symmetric and performed by openssl rather than
+    gpg. This makes it easier to integrate with password or wallet managers
+    without reoccurring interactions with gpg-agent.
+
+  - The token files are stored in `~/.cache/` or whatever the equivalent on
+    your platform is.
+
+  - The entire code base has been largely redesigned, various authorization
+    variations have been stripped and the test code for IMAP, POP and SMTP
+    endpoints has been removed.
+
+  - The original script also supported Google services, while this one at the
+    moment only supports Microsoft services.
+
+## Installation
+
+This script is written in Python 3 and requires the following packages:
+
+  - `requests`
+  - `platformdirs`
+
+These can be installed using pip:
+
+```sh
+pip install -r requirements.txt
+```
+
+
+## Usage
+
+On first use, authenticate for some account and store the token in a
+token file `myaccount`:
+
+```sh
+oauth2_token_manager -p password -t myaccount -a
+```
+
+On subsequent uses, retrieve the token as follows:
+```sh
+oauth2_token_manager -p password -t myaccount
+```
+
+Instead of passing "password" on the command line, it is **highly recommended**
+to pass it by a password manager!
+
+```sh
+# For example, with pass:
+oauth2_token_manager -p $(pass show oauth2_token_manager/myaccount)
+
+# Or with kwallet:
+oauth2_token_manager -p $(kwallet-query kdewallet -f oauth2_token_manager -r myaccount)
+```
+
+In case of kallet, you have to create the folder "oauth2_token_manager" and
+create the entry "myaccount" in the kwallet manager. You can check the
+available entries with:
+
+```sh
+kwallet-query kdewallet -f oauth2_token_manager -l
+```
+
+
+## Example with mbsync
+
+I use this script together with mbsync to sync my mail. In `~/.mbsyncrc` you can use the following config:
+
+```
+IMAPAccount myaccount
+Host outlook.office365.com
+User username@example.com
+AuthMechs XOAUTH2
+PassCmd "oauth2_token_manager -p $(kwallet-query kdewallet -f oauth2_token_manager -r myaccount) -t myaccount"
+SSLType IMAPS
+```
+
+
+[^office365]: I gave up tracking the different product, marketing and
+    technology names Microsoft uses for their email services. I think it is
+    called Microsoft 365 now. But I am not sure. I am sure that it will change
+    again in the future. (Paragraph phrased by GitHub's Copilot.)
diff --git a/oauth2_token_manager b/oauth2_token_manager
new file mode 100755 (executable)
index 0000000..aeb9810
--- /dev/null
@@ -0,0 +1,422 @@
+#!/usr/bin/env python3
+
+"""A program to retrieve, store and read oauth2 tokens."""
+
+__author__ = "Stefan Huber"
+__license__ = "GPL-3.0"
+__version__ = "0.1.0"
+
+# Copyright (c) 2024 Stefan Huber
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# his program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <https://www.gnu.org/licenses/>.
+
+# The oauth2 API code is largely based on mutt_oauth2.py by Alexander Perlis
+# (copyright 2020), licensed by GPL-2.
+
+
+from platformdirs import user_cache_dir
+import argparse
+import datetime
+import json
+import os
+import requests
+import subprocess
+import sys
+import time
+
+global args
+
+services = {
+    # https://hackmd.io/@linD026/mutt-oauth2
+    # "google": {
+    #     the devicecode endpoint does not work with mail.google.com scope, it seams
+    #     "devicecode_endpoint": "https://oauth2.googleapis.com/device/code",
+    #     "token_endpoint": "https://accounts.google.com/o/oauth2/token",
+    #     "scope": "https://mail.google.com/",
+    # },
+    "microsoft": {
+        # https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code
+        "devicecode_endpoint":
+        "https://login.microsoftonline.com/common/oauth2/v2.0/devicecode",
+        # https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-auth-code-flow
+        "token_endpoint":
+        "https://login.microsoftonline.com/common/oauth2/v2.0/token",
+        "tenant": "common",
+        "scope": (
+            "offline_access https://outlook.office.com/IMAP.AccessAsUser.All "
+            "https://outlook.office.com/POP.AccessAsUser.All "
+            "https://outlook.office.com/SMTP.Send"),
+        "client_secret": "",
+    },
+}
+
+
+def debug_log(*msg):
+    """Print to stderr if debug is enabled."""
+    if args.debug:
+        print("  [debug] ", *msg, file=sys.stderr)
+
+
+def verbose_log(*msg, **kwargs):
+    """Print to stderr if verbose is enabled."""
+    if args.verbose:
+        print("[info] ", *msg, file=sys.stderr, **kwargs)
+
+
+def error_log(*msg):
+    """Print to stderr."""
+    print("[err] ", *msg, file=sys.stderr)
+
+
+def get_service_api(token):
+    """Return the service API for the token file."""
+    return services[token["service"]]
+
+
+def create_request_baseparams(token):
+    """Create the base parameters for API request."""
+    api = get_service_api(token)
+    p = {"client_id": token["client_id"]}
+
+    # Microsoft uses "tenant" but Google does not
+    if "tenant" in api:
+        p["tenant"] = api["tenant"]
+
+    return p
+
+
+def create_pretoken():
+    """Create a token dictionary with base infos from user input prior sending
+    it to the API."""
+
+    token = {}
+
+    token["service"] = args.service or input("Choose service: ")
+    token["email"] = args.email or input("Account e-mail address: ")
+    token["client_id"] = args.client_id or input("Client ID: ")
+    token["access_token"] = ""
+    token["access_token_expiration"] = ""
+    token["refresh_token"] = ""
+
+    if token["service"] not in services:
+        error_log("Unknown service.")
+        sys.exit(1)
+
+    # Replace special values of client id
+    if token["client_id"].lower() == "thunderbird":
+        token["client_id"] = "9e5f94bc-e8a4-4e73-b8be-63364c29d753"
+    if token["client_id"].lower() == "outlook":
+        token["client_id"] = "d3590ed6-52b3-4102-aeff-aad2292ab01c"
+
+    return token
+
+
+def run_authenticate():
+    """Run the command to authenticate the user and store the token."""
+
+    verbose_log("Starting 2-step authenticating procedure...")
+
+    token = create_pretoken()
+    api = get_service_api(token)
+
+    p = create_request_baseparams(token)
+    p["scope"] = api["scope"]
+
+    try:
+        verbose_log("Step 1: Requesting devicecode...")
+        r = requests.get(api["devicecode_endpoint"], data=p)
+        debug_log("Response: status {} -> {} ".format(r.status_code, r.text))
+        response = r.json()
+    except requests.exceptions.RequestException as e:
+        error_log("Error requesting devicecode: ", e)
+        sys.exit(1)
+
+    if "error" in response:
+        error_log(response["error"])
+        if "error_description" in response:
+            error_log(response["error_description"])
+        sys.exit(1)
+
+    verbose_log("Message from the service:")
+    print(response["message"])
+
+    p = create_request_baseparams(token)
+    p |= {"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+          "client_secret": api["client_secret"],
+          "device_code": response["device_code"]}
+
+    interval = int(response["interval"])
+    verbose_log("Step 2: Receiving tokens from service...")
+    print("Polling for token ({} secs): ".format(interval), end="", flush=True)
+    while True:
+        time.sleep(interval)
+
+        if not args.verbose:
+            print(".", end="", flush=True)
+
+        try:
+            verbose_log("Requesting token...")
+            r = requests.get(api["token_endpoint"], data=p)
+            debug_log("Got status {}: {} ".format(r.status_code, r.text))
+            response = r.json()
+        except requests.exceptions.RequestException as e:
+            # Not actually always an error, might just mean "keep trying..."
+            debug_log("Error requesting token: ", e)
+            continue
+
+        # No error, we got the token
+        if "error" not in response:
+            verbose_log("Done polling. Got token.")
+            break
+
+        # Got an error message
+        err = response["error"]
+
+        # Still pending, try again...
+        if err == "authorization_pending":
+            continue
+
+        if err == "authorization_declined":
+            error_log("User declined authorization.")
+        elif err == "expired_token":
+            error_log("Too much time has elapsed.")
+        else:
+            error_log(response["error"])
+            if "error_description" in response:
+                error_log(response["error_description"])
+
+        sys.exit(1)
+
+    # Finish "." printing
+    if not args.verbose:
+        print()
+
+    update_token(token, response)
+    write_token(token)
+
+
+def run_updatetoken(token):
+    """Run the command to update the token file with a new access token."""
+
+    verbose_log("Using refresh token to obtain new access token.")
+    if not token["refresh_token"]:
+        error_log("No refresh token. Restart with --authorize.")
+        sys.exit(1)
+
+    api = get_service_api(token)
+    p = create_request_baseparams(token)
+    p |= {"client_secret": api["client_secret"],
+          "refresh_token": token["refresh_token"],
+          "grant_type": "refresh_token"}
+
+    try:
+        verbose_log("Requesting token...")
+        r = requests.get(api["token_endpoint"], data=p)
+        debug_log("Got status {}: {} ".format(r.status_code, r.text))
+        response = r.json()
+    except requests.exceptions.RequestException as e:
+        error_log("Error requesting token: " + e)
+        sys.exit(1)
+
+    if "error" in response:
+        error_log(response["error"])
+        if "error_description" in response:
+            error_log(response["error_description"])
+
+        error_log("Error updating expired token. Try restart with --authorize")
+        sys.exit(1)
+
+    verbose_log("Got new access token.")
+    update_token(token, response)
+    write_token(token)
+
+
+def run_printaccesstoken(token):
+    """Run the command to print the token file to stdout."""
+    verbose_log("Printing access token...")
+    print(token["access_token"])
+
+
+def read_token():
+    """Read the token from the token file."""
+    # Check if tokenfile exists
+    if not os.path.exists(args.tokenfile):
+        error_log("Token file not found. Use -a to authenticate first.")
+        sys.exit(1)
+
+    try:
+        return json.loads(read_tokenfile())
+    except Exception as e:
+        error_log("Error reading token file: ", e)
+        error_log("Restart and use -a to authenticate again.")
+        sys.exit(1)
+
+
+def write_token(token):
+    """Write the token to the token file."""
+    try:
+        write_tokenfile(json.dumps(token))
+    except Exception as e:
+        error_log("Error writing token file: ", e)
+        sys.exit(1)
+
+
+def read_tokenfile():
+    """Open, decrypt and return the token file."""
+
+    cmd = ["openssl", "enc", "-aes128", "-pbkdf2", "-d",
+           "-pass", "pass:" + args.password, "-in", args.tokenfile]
+    sub = subprocess.run(cmd, check=True, capture_output=True)
+    content = sub.stdout.decode()
+    debug_log("read from tokenfile:", content)
+    return content
+
+
+def write_tokenfile(content):
+    """Encrypt and write the token file."""
+    cmd = ["openssl", "enc", "-aes128", "-pbkdf2",
+           "-pass", "pass:" + args.password, "-out", args.tokenfile]
+    subprocess.run(cmd, input=content.encode(), check=True)
+
+
+def access_token_valid(token):
+    """Returns True if stored access token is still valid now."""
+    exp = token["access_token_expiration"]
+    if not exp:
+        return False
+    return datetime.datetime.now() < datetime.datetime.fromisoformat(exp)
+
+
+def update_token(token, response):
+    """Update token by desponse dictionary."""
+
+    token["access_token"] = response["access_token"]
+
+    td = datetime.timedelta(seconds=int(response["expires_in"]))
+    exp = (datetime.datetime.now() + td).isoformat()
+    token["access_token_expiration"] = exp
+    verbose_log("Obtained new access token expiring on {}".format(exp))
+
+    if "refresh_token" in response:
+        token["refresh_token"] = response["refresh_token"]
+
+    write_token(token)
+
+
+if __name__ == "__main__":
+    epilog = """\
+It stores the tokens securely in a file in ~/.cache/outh2_token_manager/ using
+openssl symmetric encryption. It is *highly recommended* that the password
+supplied is stored in a password manager, like pass, or a wallet manager, like
+kwallet.
+
+Example usage:
+
+    On first use, authenticate for some account and store the token in a
+    file:
+        $ oauth2_token_manager -p password -a -t myaccount
+
+    On subsequent uses, retrieve the token from the file:
+        $ oauth2_token_manager -p password -t myaccount
+
+    Instead of passing "password" on the command line, it is *highly
+    recommended* to pass it by a password manager!
+
+    For example, with pass:
+        $ oauth2_token_manager -p $(pass show oauth2_token_manager/myaccount)
+
+    Or with kwallet:
+        $ oauth2_token_manager -p $(kwallet-query kdewallet
+            -f oauth2_token_manager -r myaccount)
+
+        You have to create the folder "oauth2_token_manager" and create the
+        entry "myaccount" in the kwallet manager. You can check the available
+        entries with:
+
+        $ kwallet-query kdewallet -f oauth2_token_manager -l"""
+
+    ap = argparse.ArgumentParser(
+        prog="oauth2_token_manager",
+        description="A program to retrieve, store and read oauth2 tokens.",
+        epilog=epilog,
+        formatter_class=argparse.RawDescriptionHelpFormatter)
+
+    ap.add_argument(
+        "-p", "--password",
+        help="the password to use to encrypt/decrypt the token file.",
+        required=True)
+    ap.add_argument(
+        "-t", "--tokenfile",
+        default="token",
+        help="the filename for the encrypted token. "
+             "default: token")
+    ap.add_argument(
+        "-a", "--authenticate",
+        action="store_true",
+        help="authenticate the user and store the token in the file.")
+    ap.add_argument(
+        "--debug",
+        action="store_true",
+        help="print debug information.")
+    ap.add_argument(
+        "--verbose",
+        action="store_true",
+        help="print verbose information.")
+    ap.add_argument(
+        "--email",
+        default=None,
+        help="the e-mail address to use for authentication. "
+             "only used with --authenticate.")
+    ap.add_argument(
+        "--service",
+        default=None,
+        help="the service to authenticate against. "
+             "only used with --authenticate. "
+             "available services: " + ", ".join(services))
+    ap.add_argument(
+        "--client-id",
+        default=None,
+        help="the client id to use for authentication. "
+             "only used with --authenticate."
+             "special values: thunderbird, outlook")
+
+    if len(sys.argv) == 1:
+        ap.print_help()
+        exit(1)
+
+    args = ap.parse_args()
+    args.tokenfile = os.path.expanduser(args.tokenfile)
+
+    # --debug implies --verbose
+    if args.debug:
+        args.verbose = True
+
+    cachedir = user_cache_dir("oauth2_token_manager", ensure_exists=True)
+    args.tokenfile = os.path.join(cachedir, args.tokenfile)
+
+    debug_log("Token file: ", args.tokenfile)
+
+    if args.authenticate:
+        run_authenticate()
+
+    token = read_token()
+
+    if not access_token_valid(token):
+        run_updatetoken(token)
+
+    if not access_token_valid(token):
+        error_log("No valid access token. Update failed!?")
+        sys.exit(1)
+
+    run_printaccesstoken(token)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644 (file)
index 0000000..01247e9
--- /dev/null
@@ -0,0 +1,2 @@
+requests
+platformdirs