khuedoan-homelab/scripts/hacks

257 lines
9.0 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
"""
Quick and dirty script for things that I can't/don't have time to do properly yet
TODO: retire this script
"""
import base64
import json
import json
import pexpect
import requests
import subprocess
import sys
2023-02-22 18:33:48 +07:00
import urllib
from rich.console import Console
from kubernetes import client, config
from kubernetes.stream import stream
# https://git.khuedoan.com/user/settings/applications
# Doing this properly inside the cluster requires:
# - Kubernetes service account
config.load_config()
gitea_host = client.NetworkingV1Api().read_namespaced_ingress('gitea', 'gitea').spec.rules[0].host
gitea_user_secret = client.CoreV1Api().read_namespaced_secret('gitea-admin-secret', 'gitea')
gitea_user = base64.b64decode(gitea_user_secret.data['username']).decode("utf-8")
gitea_pass = base64.b64decode(gitea_user_secret.data['password']).decode("utf-8")
gitea_url = f"http://{gitea_host}"
kanidm_host = client.NetworkingV1Api().read_namespaced_ingress('kanidm', 'kanidm').spec.rules[0].host
def apply_secret(name: str, namespace: str, data: dict) -> None:
try:
client.CoreV1Api().read_namespaced_secret(name, namespace)
patch_body = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
data=data,
)
client.CoreV1Api().replace_namespaced_secret(name, namespace, patch_body)
except client.exceptions.ApiException:
# Secret doesn't exist, create a new one
new_secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
data=data,
)
client.CoreV1Api().create_namespaced_secret(namespace, new_secret)
def setup_gitea_access_token(name: str, scopes: list[str]) -> None:
current_tokens = requests.get(
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
auth=(gitea_user,gitea_pass),
).json()
if not any(token['name'] == name for token in current_tokens):
resp = requests.post(
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
auth=(gitea_user,gitea_pass),
headers={
'Content-Type': 'application/json'
},
data=json.dumps({
'name': name,
'scopes': scopes
})
)
if resp.status_code == 201:
apply_secret(
f"gitea.{name}",
"global-secrets",
{
'token': base64.b64encode(resp.json()['sha1'].encode("utf-8")).decode("utf-8")
}
)
else:
print(f"Error creating access token {name} ({resp.status_code})")
print(resp.content)
sys.exit(1)
def setup_gitea_oauth_app(name: str, redirect_uri: str) -> None:
# TODO use the new global application, while it's there in the UI, there's no API yet.
current_apps = requests.get(
url=f"{gitea_url}/api/v1/user/applications/oauth2",
auth=(gitea_user,gitea_pass),
).json()
if not any(app['name'] == name for app in current_apps):
resp = requests.post(
url=f"{gitea_url}/api/v1/user/applications/oauth2",
auth=(gitea_user,gitea_pass),
headers={
'Content-Type': 'application/json'
},
data=json.dumps({
'name': name,
'redirect_uris': [redirect_uri],
'confidential_client': True
})
)
if resp.status_code == 201:
apply_secret(
f"gitea.{name}",
"global-secrets",
{
'client_id': base64.b64encode(resp.json()['client_id'].encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(resp.json()['client_secret'].encode("utf-8")).decode("utf-8"),
}
)
else:
print(f"Error creating OAuth application {name} ({resp.status_code})")
print(resp.content)
sys.exit(1)
def setup_gitea_auth_with_dex():
gitea_pod = client.CoreV1Api().list_namespaced_pod(namespace='gitea', label_selector='app=gitea').items[0].metadata.name
client_secret = base64.b64decode(
client.CoreV1Api().read_namespaced_secret('dex.gitea', 'global-secrets').data['client_secret']
).decode("utf-8")
discovery_url = f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/.well-known/openid-configuration"
# TODO currently there's no API to add new authentication sources in Gitea,
# so we have to workaround by running Gitea CLI in a Gitea pod.
stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
gitea_pod,
'gitea',
command=[
'gitea', 'admin', 'auth', 'add-oauth',
'--name', 'Dex',
'--provider', 'openidConnect',
'--key', 'gitea',
'--secret', client_secret,
'--auto-discover-url', discovery_url
],
stderr=True, stdin=False,
stdout=False, tty=False
)
def reset_kanidm_account_password(account: str) -> str:
resp = stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
'kanidm-0',
'kanidm',
command=["kanidmd", "recover-account", "--output", "json", account],
stderr=False, stdin=False,
stdout=True, tty=False
).splitlines()[-1]
return json.loads(resp)['password']
# TODO Proper automation will be added later, waiting for client library update:
# https://github.com/kanidm/kanidm/pull/2301
def kanidm_login(accounts: list[str]) -> None:
for account in accounts:
password = reset_kanidm_account_password(account)
# There's no way to input password using the standard library, so we have to use pexpect
# https://stackoverflow.com/questions/2387731/use-subprocess-to-send-a-password
cli_login = pexpect.spawn(f"kanidm login --url https://{kanidm_host} --name {account}")
cli_login.sendline(password)
cli_login.read()
def setup_kanidm_group(name: str) -> None:
subprocess.run(
["kanidm", "group", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
capture_output=True,
)
def setup_kanidm_oauth_app(name: str, redirect_uri: str) -> None:
try:
subprocess.run(
["kanidm", "system", "oauth2", "create", "--url", f"https://{kanidm_host}", "--name", "admin", name, name, redirect_uri],
capture_output=True,
check=True,
)
except subprocess.CalledProcessError:
return
# TODO https://github.com/dexidp/dex/pull/3188
subprocess.run(
["kanidm", "system", "oauth2", "warning-insecure-client-disable-pkce", "--url", f"https://{kanidm_host}", "--name", "admin", name],
capture_output=True,
check=True,
)
subprocess.run(
# TODO better group management
["kanidm", "system", "oauth2", "create-scope-map", "--url", f"https://{kanidm_host}", "--name", "admin", name, "editor", "openid", "profile", "email", "groups"],
capture_output=True,
check=True,
)
client_secret = json.loads(subprocess.run(
["kanidm", "system", "oauth2", "show-basic-secret", "--url", f"https://{kanidm_host}", "--name", "admin", "--output", "json", name],
capture_output=True,
check=True,
).stdout.decode("utf-8"))['secret']
apply_secret(
f"kanidm.{name}",
"global-secrets",
{
'client_id': base64.b64encode(name.encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(client_secret.encode("utf-8")).decode("utf-8"),
}
)
def main() -> None:
with Console().status("Completing the remaining sorcery"):
gitea_access_tokens = [
{
'name': 'renovate',
'scopes': [
"write:repository",
"read:user",
"write:issue",
"read:organization",
"read:misc"
]
}
]
gitea_oauth_apps = [
{'name': 'woodpecker', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('woodpecker-server', 'woodpecker').spec.rules[0].host}/authorize"},
]
kanidm_groups = [
# TODO better group management
{'name': 'editor'},
]
kanidm_oauth_apps = [
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"},
]
for token in gitea_access_tokens:
setup_gitea_access_token(token['name'], token['scopes'])
for app in gitea_oauth_apps:
setup_gitea_oauth_app(app['name'], app['redirect_uri'])
setup_gitea_auth_with_dex()
kanidm_login(["admin", "idm_admin"])
for group in kanidm_groups:
setup_kanidm_group(group['name'])
for app in kanidm_oauth_apps:
setup_kanidm_oauth_app(app['name'], app['redirect_uri'])
if __name__ == '__main__':
main()