khuedoan-homelab/scripts/hacks
2024-09-02 13:22:51 +07:00

257 lines
9.0 KiB
Python
Executable File

#!/usr/bin/env python
"""
Quick and dirty script for things that I can't/don't have time to do properly yet
TODO: retire this script
"""
import base64
import json
import json
import pexpect
import requests
import subprocess
import sys
import urllib
from rich.console import Console
from kubernetes import client, config
from kubernetes.stream import stream
# https://git.khuedoan.com/user/settings/applications
# Doing this properly inside the cluster requires:
# - Kubernetes service account
config.load_config()
gitea_host = client.NetworkingV1Api().read_namespaced_ingress('gitea', 'gitea').spec.rules[0].host
gitea_user_secret = client.CoreV1Api().read_namespaced_secret('gitea-admin-secret', 'gitea')
gitea_user = base64.b64decode(gitea_user_secret.data['username']).decode("utf-8")
gitea_pass = base64.b64decode(gitea_user_secret.data['password']).decode("utf-8")
gitea_url = f"http://{gitea_host}"
kanidm_host = client.NetworkingV1Api().read_namespaced_ingress('kanidm', 'kanidm').spec.rules[0].host
def apply_secret(name: str, namespace: str, data: dict) -> None:
try:
client.CoreV1Api().read_namespaced_secret(name, namespace)
patch_body = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
data=data,
)
client.CoreV1Api().replace_namespaced_secret(name, namespace, patch_body)
except client.exceptions.ApiException:
# Secret doesn't exist, create a new one
new_secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
data=data,
)
client.CoreV1Api().create_namespaced_secret(namespace, new_secret)
def setup_gitea_access_token(name: str, scopes: list[str]) -> None:
current_tokens = requests.get(
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
auth=(gitea_user,gitea_pass),
).json()
if not any(token['name'] == name for token in current_tokens):
resp = requests.post(
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
auth=(gitea_user,gitea_pass),
headers={
'Content-Type': 'application/json'
},
data=json.dumps({
'name': name,
'scopes': scopes
})
)
if resp.status_code == 201:
apply_secret(
f"gitea.{name}",
"global-secrets",
{
'token': base64.b64encode(resp.json()['sha1'].encode("utf-8")).decode("utf-8")
}
)
else:
print(f"Error creating access token {name} ({resp.status_code})")
print(resp.content)
sys.exit(1)
def setup_gitea_oauth_app(name: str, redirect_uri: str) -> None:
# TODO use the new global application, while it's there in the UI, there's no API yet.
current_apps = requests.get(
url=f"{gitea_url}/api/v1/user/applications/oauth2",
auth=(gitea_user,gitea_pass),
).json()
if not any(app['name'] == name for app in current_apps):
resp = requests.post(
url=f"{gitea_url}/api/v1/user/applications/oauth2",
auth=(gitea_user,gitea_pass),
headers={
'Content-Type': 'application/json'
},
data=json.dumps({
'name': name,
'redirect_uris': [redirect_uri],
'confidential_client': True
})
)
if resp.status_code == 201:
apply_secret(
f"gitea.{name}",
"global-secrets",
{
'client_id': base64.b64encode(resp.json()['client_id'].encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(resp.json()['client_secret'].encode("utf-8")).decode("utf-8"),
}
)
else:
print(f"Error creating OAuth application {name} ({resp.status_code})")
print(resp.content)
sys.exit(1)
def setup_gitea_auth_with_dex():
gitea_pod = client.CoreV1Api().list_namespaced_pod(namespace='gitea', label_selector='app=gitea').items[0].metadata.name
client_secret = base64.b64decode(
client.CoreV1Api().read_namespaced_secret('dex.gitea', 'global-secrets').data['client_secret']
).decode("utf-8")
discovery_url = f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/.well-known/openid-configuration"
# TODO currently there's no API to add new authentication sources in Gitea,
# so we have to workaround by running Gitea CLI in a Gitea pod.
stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
gitea_pod,
'gitea',
command=[
'gitea', 'admin', 'auth', 'add-oauth',
'--name', 'Dex',
'--provider', 'openidConnect',
'--key', 'gitea',
'--secret', client_secret,
'--auto-discover-url', discovery_url
],
stderr=True, stdin=False,
stdout=False, tty=False
)
def reset_kanidm_account_password(account: str) -> str:
resp = stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
'kanidm-0',
'kanidm',
command=["kanidmd", "recover-account", "--output", "json", account],
stderr=False, stdin=False,
stdout=True, tty=False
).splitlines()[-1]
return json.loads(resp)['password']
# TODO Proper automation will be added later, waiting for client library update:
# https://github.com/kanidm/kanidm/pull/2301
def kanidm_login(accounts: list[str]) -> None:
for account in accounts:
password = reset_kanidm_account_password(account)
# There's no way to input password using the standard library, so we have to use pexpect
# https://stackoverflow.com/questions/2387731/use-subprocess-to-send-a-password
cli_login = pexpect.spawn(f"kanidm login --url https://{kanidm_host} --name {account}")
cli_login.sendline(password)
cli_login.read()
def setup_kanidm_group(name: str) -> None:
subprocess.run(
["kanidm", "group", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
capture_output=True,
)
def setup_kanidm_oauth_app(name: str, redirect_uri: str) -> None:
try:
subprocess.run(
["kanidm", "system", "oauth2", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name, name, redirect_uri],
capture_output=True,
check=True,
)
except subprocess.CalledProcessError:
return
# TODO https://github.com/dexidp/dex/pull/3188
subprocess.run(
["kanidm", "system", "oauth2", "warning-insecure-client-disable-pkce", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
capture_output=True,
check=True,
)
subprocess.run(
# TODO better group management
["kanidm", "system", "oauth2", "create-scope-map", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name, "editor", "openid", "profile", "email", "groups"],
capture_output=True,
check=True,
)
client_secret = json.loads(subprocess.run(
["kanidm", "system", "oauth2", "show-basic-secret", "--url", f"https://{kanidm_host}", "--name", "idm_admin", "--output", "json", name],
capture_output=True,
check=True,
).stdout.decode("utf-8"))['secret']
apply_secret(
f"kanidm.{name}",
"global-secrets",
{
'client_id': base64.b64encode(name.encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(client_secret.encode("utf-8")).decode("utf-8"),
}
)
def main() -> None:
with Console().status("Completing the remaining sorcery"):
gitea_access_tokens = [
{
'name': 'renovate',
'scopes': [
"write:repository",
"read:user",
"write:issue",
"read:organization",
"read:misc"
]
}
]
gitea_oauth_apps = [
{'name': 'woodpecker', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('woodpecker-server', 'woodpecker').spec.rules[0].host}/authorize"},
]
kanidm_groups = [
# TODO better group management
{'name': 'editor'},
]
kanidm_oauth_apps = [
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"},
]
for token in gitea_access_tokens:
setup_gitea_access_token(token['name'], token['scopes'])
for app in gitea_oauth_apps:
setup_gitea_oauth_app(app['name'], app['redirect_uri'])
setup_gitea_auth_with_dex()
kanidm_login(["admin", "idm_admin"])
for group in kanidm_groups:
setup_kanidm_group(group['name'])
for app in kanidm_oauth_apps:
setup_kanidm_oauth_app(app['name'], app['redirect_uri'])
if __name__ == '__main__':
main()