#!/usr/bin/env python """ Quick and dirty script for things that I can't/don't have time to do properly yet TODO: retire this script """ import base64 import json import json import pexpect import requests import subprocess import sys import urllib from rich.console import Console from kubernetes import client, config from kubernetes.stream import stream # https://git.khuedoan.com/user/settings/applications # Doing this properly inside the cluster requires: # - Kubernetes service account config.load_config() gitea_host = client.NetworkingV1Api().read_namespaced_ingress('gitea', 'gitea').spec.rules[0].host gitea_user_secret = client.CoreV1Api().read_namespaced_secret('gitea-admin-secret', 'gitea') gitea_user = base64.b64decode(gitea_user_secret.data['username']).decode("utf-8") gitea_pass = base64.b64decode(gitea_user_secret.data['password']).decode("utf-8") gitea_url = f"http://{gitea_host}" kanidm_host = client.NetworkingV1Api().read_namespaced_ingress('kanidm', 'kanidm').spec.rules[0].host def apply_secret(name: str, namespace: str, data: dict) -> None: try: client.CoreV1Api().read_namespaced_secret(name, namespace) patch_body = client.V1Secret( metadata=client.V1ObjectMeta(name=name), data=data, ) client.CoreV1Api().replace_namespaced_secret(name, namespace, patch_body) except client.exceptions.ApiException: # Secret doesn't exist, create a new one new_secret = client.V1Secret( metadata=client.V1ObjectMeta(name=name), data=data, ) client.CoreV1Api().create_namespaced_secret(namespace, new_secret) def setup_gitea_access_token(name: str, scopes: list[str]) -> None: current_tokens = requests.get( url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens", auth=(gitea_user,gitea_pass), ).json() if not any(token['name'] == name for token in current_tokens): resp = requests.post( url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens", auth=(gitea_user,gitea_pass), headers={ 'Content-Type': 'application/json' }, data=json.dumps({ 'name': name, 'scopes': scopes }) ) if resp.status_code == 201: apply_secret( f"gitea.{name}", "global-secrets", { 'token': base64.b64encode(resp.json()['sha1'].encode("utf-8")).decode("utf-8") } ) else: print(f"Error creating access token {name} ({resp.status_code})") print(resp.content) sys.exit(1) def setup_gitea_oauth_app(name: str, redirect_uri: str) -> None: # TODO use the new global application, while it's there in the UI, there's no API yet. current_apps = requests.get( url=f"{gitea_url}/api/v1/user/applications/oauth2", auth=(gitea_user,gitea_pass), ).json() if not any(app['name'] == name for app in current_apps): resp = requests.post( url=f"{gitea_url}/api/v1/user/applications/oauth2", auth=(gitea_user,gitea_pass), headers={ 'Content-Type': 'application/json' }, data=json.dumps({ 'name': name, 'redirect_uris': [redirect_uri], 'confidential_client': True }) ) if resp.status_code == 201: apply_secret( f"gitea.{name}", "global-secrets", { 'client_id': base64.b64encode(resp.json()['client_id'].encode("utf-8")).decode("utf-8"), 'client_secret': base64.b64encode(resp.json()['client_secret'].encode("utf-8")).decode("utf-8"), } ) else: print(f"Error creating OAuth application {name} ({resp.status_code})") print(resp.content) sys.exit(1) def setup_gitea_auth_with_dex(): gitea_pod = client.CoreV1Api().list_namespaced_pod(namespace='gitea', label_selector='app=gitea').items[0].metadata.name client_secret = base64.b64decode( client.CoreV1Api().read_namespaced_secret('dex.gitea', 'global-secrets').data['client_secret'] ).decode("utf-8") discovery_url = f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/.well-known/openid-configuration" # TODO currently there's no API to add new authentication sources in Gitea, # so we have to workaround by running Gitea CLI in a Gitea pod. stream( client.CoreV1Api().connect_get_namespaced_pod_exec, gitea_pod, 'gitea', command=[ 'gitea', 'admin', 'auth', 'add-oauth', '--name', 'Dex', '--provider', 'openidConnect', '--key', 'gitea', '--secret', client_secret, '--auto-discover-url', discovery_url ], stderr=True, stdin=False, stdout=False, tty=False ) def reset_kanidm_account_password(account: str) -> str: resp = stream( client.CoreV1Api().connect_get_namespaced_pod_exec, 'kanidm-0', 'kanidm', command=["kanidmd", "recover-account", "--output", "json", account], stderr=False, stdin=False, stdout=True, tty=False ).splitlines()[-1] return json.loads(resp)['password'] # TODO Proper automation will be added later, waiting for client library update: # https://github.com/kanidm/kanidm/pull/2301 def kanidm_login(accounts: list[str]) -> None: for account in accounts: password = reset_kanidm_account_password(account) # There's no way to input password using the standard library, so we have to use pexpect # https://stackoverflow.com/questions/2387731/use-subprocess-to-send-a-password cli_login = pexpect.spawn(f"kanidm login --url https://{kanidm_host} --name {account}") cli_login.sendline(password) cli_login.read() def setup_kanidm_group(name: str) -> None: subprocess.run( ["kanidm", "group", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name], capture_output=True, ) def setup_kanidm_oauth_app(name: str, redirect_uri: str) -> None: try: subprocess.run( ["kanidm", "system", "oauth2", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name, name, redirect_uri], capture_output=True, check=True, ) except subprocess.CalledProcessError: return # TODO https://github.com/dexidp/dex/pull/3188 subprocess.run( ["kanidm", "system", "oauth2", "warning-insecure-client-disable-pkce", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name], capture_output=True, check=True, ) subprocess.run( # TODO better group management ["kanidm", "system", "oauth2", "create-scope-map", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name, "editor", "openid", "profile", "email", "groups"], capture_output=True, check=True, ) client_secret = json.loads(subprocess.run( ["kanidm", "system", "oauth2", "show-basic-secret", "--url", f"https://{kanidm_host}", "--name", "idm_admin", "--output", "json", name], capture_output=True, check=True, ).stdout.decode("utf-8"))['secret'] apply_secret( f"kanidm.{name}", "global-secrets", { 'client_id': base64.b64encode(name.encode("utf-8")).decode("utf-8"), 'client_secret': base64.b64encode(client_secret.encode("utf-8")).decode("utf-8"), } ) def main() -> None: with Console().status("Completing the remaining sorcery"): gitea_access_tokens = [ { 'name': 'renovate', 'scopes': [ "write:repository", "read:user", "write:issue", "read:organization", "read:misc" ] } ] gitea_oauth_apps = [ {'name': 'woodpecker', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('woodpecker-server', 'woodpecker').spec.rules[0].host}/authorize"}, ] kanidm_groups = [ # TODO better group management {'name': 'editor'}, ] kanidm_oauth_apps = [ {'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"}, ] for token in gitea_access_tokens: setup_gitea_access_token(token['name'], token['scopes']) for app in gitea_oauth_apps: setup_gitea_oauth_app(app['name'], app['redirect_uri']) setup_gitea_auth_with_dex() kanidm_login(["admin", "idm_admin"]) for group in kanidm_groups: setup_kanidm_group(group['name']) for app in kanidm_oauth_apps: setup_kanidm_oauth_app(app['name'], app['redirect_uri']) if __name__ == '__main__': main()