khuedoan-homelab/scripts/hacks

212 lines
7.2 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
"""
Quick and dirty script for things that I can't/don't have time to do properly yet
TODO: retire this script
"""
import base64
import json
import json
import pexpect
import requests
import subprocess
import sys
2023-02-22 18:33:48 +07:00
import urllib
from rich.console import Console
from kubernetes import client, config
from kubernetes.stream import stream
# https://git.khuedoan.com/user/settings/applications
# Doing this properly inside the cluster requires:
# - Kubernetes service account
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
gitea_host = client.NetworkingV1Api().read_namespaced_ingress('gitea', 'gitea').spec.rules[0].host
gitea_user_secret = client.CoreV1Api().read_namespaced_secret('gitea-admin-secret', 'gitea')
gitea_user = base64.b64decode(gitea_user_secret.data['username']).decode("utf-8")
gitea_pass = base64.b64decode(gitea_user_secret.data['password']).decode("utf-8")
2023-02-22 18:33:48 +07:00
gitea_url = f"http://{gitea_user}:{urllib.parse.quote_plus(gitea_pass)}@{gitea_host}"
kanidm_host = client.NetworkingV1Api().read_namespaced_ingress('kanidm', 'kanidm').spec.rules[0].host
def create_secret(name: str, namespace: str, data: dict) -> None:
try:
client.CoreV1Api().read_namespaced_secret(name, namespace)
except client.exceptions.ApiException:
# Secret doesn't exist, create a new one
new_secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
data=data,
)
client.CoreV1Api().create_namespaced_secret(namespace, new_secret)
def setup_gitea_access_token(name: str) -> None:
current_tokens = requests.get(
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
).json()
if not any(token['name'] == name for token in current_tokens):
resp = requests.post(
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
headers={
'Content-Type': 'application/json'
},
data=json.dumps({
'name': name
})
)
if resp.status_code == 201:
create_secret(
f"gitea.{name}",
"global-secrets",
{
'token': base64.b64encode(resp.json()['sha1'].encode("utf-8")).decode("utf-8")
}
)
else:
print(f"Error creating access token {name} ({resp.status_code})")
print(resp.content)
sys.exit(1)
def setup_gitea_oauth_app(name: str, redirect_uri: str) -> None:
current_apps = requests.get(
url=f"{gitea_url}/api/v1/user/applications/oauth2",
).json()
if not any(app['name'] == name for app in current_apps):
resp = requests.post(
url=f"{gitea_url}/api/v1/user/applications/oauth2",
headers={
'Content-Type': 'application/json'
},
data=json.dumps({
'name': name,
'redirect_uris': [redirect_uri]
})
)
if resp.status_code == 201:
create_secret(
f"gitea.{name}",
"global-secrets",
{
'client_id': base64.b64encode(resp.json()['client_id'].encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(resp.json()['client_secret'].encode("utf-8")).decode("utf-8"),
}
)
else:
print(f"Error creating OAuth application {name} ({resp.status_code})")
print(resp.content)
sys.exit(1)
def reset_kanidm_account_password(account: str) -> str:
resp = stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
'kanidm-0',
'kanidm',
command=["kanidmd", "recover-account", "--output", "json", account],
stderr=True, stdin=False,
stdout=False, tty=False
).replace("\'", "\"")
return json.loads(resp)['password']
# TODO Proper automation will be added later, waiting for client library update:
# https://github.com/kanidm/kanidm/pull/2301
def kanidm_login(accounts: list[str]) -> None:
for account in accounts:
password = reset_kanidm_account_password(account)
# There's no way to input password using the standard library, so we have to use pexpect
# https://stackoverflow.com/questions/2387731/use-subprocess-to-send-a-password
cli_login = pexpect.spawn(f"kanidm login --url https://{kanidm_host} --name {account}")
cli_login.sendline(password)
cli_login.read()
def setup_kanidm_group(name: str) -> None:
subprocess.run(
["kanidm", "group", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
capture_output=True,
)
def setup_kanidm_oauth_app(name: str, redirect_uri: str) -> None:
try:
subprocess.run(
["kanidm", "system", "oauth2", "create", "--url", f"https://{kanidm_host}", "--name", "admin", name, name, redirect_uri],
capture_output=True,
check=True,
)
except subprocess.CalledProcessError:
return
# TODO https://github.com/dexidp/dex/pull/3188
subprocess.run(
["kanidm", "system", "oauth2", "warning-insecure-client-disable-pkce", "--url", f"https://{kanidm_host}", "--name", "admin", name],
capture_output=True,
check=True,
)
subprocess.run(
# TODO better group management
["kanidm", "system", "oauth2", "create-scope-map", "--url", f"https://{kanidm_host}", "--name", "admin", name, "editor", "openid", "profile", "email", "groups"],
capture_output=True,
check=True,
)
client_secret = json.loads(subprocess.run(
["kanidm", "system", "oauth2", "show-basic-secret", "--url", f"https://{kanidm_host}", "--name", "admin", "--output", "json", name],
capture_output=True,
check=True,
).stdout.decode("utf-8"))['secret']
create_secret(
f"kanidm.{name}",
"global-secrets",
{
'client_id': base64.b64encode(name.encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(client_secret.encode("utf-8")).decode("utf-8"),
}
)
def main() -> None:
with Console().status("Completing the remaining sorcery"):
gitea_access_tokens = [
'renovate'
]
gitea_oauth_apps = [
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"}
]
kanidm_groups = [
# TODO better group management
{'name': 'editor'},
]
kanidm_oauth_apps = [
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"},
]
for token_name in gitea_access_tokens:
setup_gitea_access_token(token_name)
for app in gitea_oauth_apps:
setup_gitea_oauth_app(app['name'], app['redirect_uri'])
kanidm_login(["admin", "idm_admin"])
for group in kanidm_groups:
setup_kanidm_group(group['name'])
for app in kanidm_oauth_apps:
setup_kanidm_oauth_app(app['name'], app['redirect_uri'])
if __name__ == '__main__':
main()