mirror of
https://github.com/khuedoan/homelab.git
synced 2025-01-27 16:11:03 +07:00
257 lines
9.0 KiB
Python
Executable File
257 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
"""
|
|
Quick and dirty script for things that I can't/don't have time to do properly yet
|
|
TODO: retire this script
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import json
|
|
import pexpect
|
|
import requests
|
|
import subprocess
|
|
import sys
|
|
import urllib
|
|
|
|
from rich.console import Console
|
|
from kubernetes import client, config
|
|
from kubernetes.stream import stream
|
|
|
|
# https://git.khuedoan.com/user/settings/applications
|
|
# Doing this properly inside the cluster requires:
|
|
# - Kubernetes service account
|
|
config.load_config()
|
|
|
|
gitea_host = client.NetworkingV1Api().read_namespaced_ingress('gitea', 'gitea').spec.rules[0].host
|
|
gitea_user_secret = client.CoreV1Api().read_namespaced_secret('gitea-admin-secret', 'gitea')
|
|
gitea_user = base64.b64decode(gitea_user_secret.data['username']).decode("utf-8")
|
|
gitea_pass = base64.b64decode(gitea_user_secret.data['password']).decode("utf-8")
|
|
gitea_url = f"http://{gitea_host}"
|
|
|
|
kanidm_host = client.NetworkingV1Api().read_namespaced_ingress('kanidm', 'kanidm').spec.rules[0].host
|
|
|
|
def apply_secret(name: str, namespace: str, data: dict) -> None:
|
|
try:
|
|
client.CoreV1Api().read_namespaced_secret(name, namespace)
|
|
patch_body = client.V1Secret(
|
|
metadata=client.V1ObjectMeta(name=name),
|
|
data=data,
|
|
)
|
|
client.CoreV1Api().replace_namespaced_secret(name, namespace, patch_body)
|
|
except client.exceptions.ApiException:
|
|
# Secret doesn't exist, create a new one
|
|
new_secret = client.V1Secret(
|
|
metadata=client.V1ObjectMeta(name=name),
|
|
data=data,
|
|
)
|
|
client.CoreV1Api().create_namespaced_secret(namespace, new_secret)
|
|
|
|
def setup_gitea_access_token(name: str, scopes: list[str]) -> None:
|
|
current_tokens = requests.get(
|
|
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
|
|
auth=(gitea_user,gitea_pass),
|
|
).json()
|
|
|
|
if not any(token['name'] == name for token in current_tokens):
|
|
resp = requests.post(
|
|
url=f"{gitea_url}/api/v1/users/{gitea_user}/tokens",
|
|
auth=(gitea_user,gitea_pass),
|
|
headers={
|
|
'Content-Type': 'application/json'
|
|
},
|
|
data=json.dumps({
|
|
'name': name,
|
|
'scopes': scopes
|
|
})
|
|
)
|
|
|
|
if resp.status_code == 201:
|
|
apply_secret(
|
|
f"gitea.{name}",
|
|
"global-secrets",
|
|
{
|
|
'token': base64.b64encode(resp.json()['sha1'].encode("utf-8")).decode("utf-8")
|
|
}
|
|
)
|
|
else:
|
|
print(f"Error creating access token {name} ({resp.status_code})")
|
|
print(resp.content)
|
|
sys.exit(1)
|
|
|
|
def setup_gitea_oauth_app(name: str, redirect_uri: str) -> None:
|
|
# TODO use the new global application, while it's there in the UI, there's no API yet.
|
|
current_apps = requests.get(
|
|
url=f"{gitea_url}/api/v1/user/applications/oauth2",
|
|
auth=(gitea_user,gitea_pass),
|
|
).json()
|
|
|
|
if not any(app['name'] == name for app in current_apps):
|
|
resp = requests.post(
|
|
url=f"{gitea_url}/api/v1/user/applications/oauth2",
|
|
auth=(gitea_user,gitea_pass),
|
|
headers={
|
|
'Content-Type': 'application/json'
|
|
},
|
|
data=json.dumps({
|
|
'name': name,
|
|
'redirect_uris': [redirect_uri],
|
|
'confidential_client': True
|
|
})
|
|
)
|
|
|
|
if resp.status_code == 201:
|
|
apply_secret(
|
|
f"gitea.{name}",
|
|
"global-secrets",
|
|
{
|
|
'client_id': base64.b64encode(resp.json()['client_id'].encode("utf-8")).decode("utf-8"),
|
|
'client_secret': base64.b64encode(resp.json()['client_secret'].encode("utf-8")).decode("utf-8"),
|
|
}
|
|
)
|
|
else:
|
|
print(f"Error creating OAuth application {name} ({resp.status_code})")
|
|
print(resp.content)
|
|
sys.exit(1)
|
|
|
|
def setup_gitea_auth_with_dex():
|
|
gitea_pod = client.CoreV1Api().list_namespaced_pod(namespace='gitea', label_selector='app=gitea').items[0].metadata.name
|
|
client_secret = base64.b64decode(
|
|
client.CoreV1Api().read_namespaced_secret('dex.gitea', 'global-secrets').data['client_secret']
|
|
).decode("utf-8")
|
|
discovery_url = f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/.well-known/openid-configuration"
|
|
|
|
# TODO currently there's no API to add new authentication sources in Gitea,
|
|
# so we have to workaround by running Gitea CLI in a Gitea pod.
|
|
stream(
|
|
client.CoreV1Api().connect_get_namespaced_pod_exec,
|
|
gitea_pod,
|
|
'gitea',
|
|
command=[
|
|
'gitea', 'admin', 'auth', 'add-oauth',
|
|
'--name', 'Dex',
|
|
'--provider', 'openidConnect',
|
|
'--key', 'gitea',
|
|
'--secret', client_secret,
|
|
'--auto-discover-url', discovery_url
|
|
],
|
|
stderr=True, stdin=False,
|
|
stdout=False, tty=False
|
|
)
|
|
|
|
def reset_kanidm_account_password(account: str) -> str:
|
|
resp = stream(
|
|
client.CoreV1Api().connect_get_namespaced_pod_exec,
|
|
'kanidm-0',
|
|
'kanidm',
|
|
command=["kanidmd", "recover-account", "--output", "json", account],
|
|
stderr=False, stdin=False,
|
|
stdout=True, tty=False
|
|
).splitlines()[-1]
|
|
|
|
return json.loads(resp)['password']
|
|
|
|
# TODO Proper automation will be added later, waiting for client library update:
|
|
# https://github.com/kanidm/kanidm/pull/2301
|
|
def kanidm_login(accounts: list[str]) -> None:
|
|
for account in accounts:
|
|
password = reset_kanidm_account_password(account)
|
|
|
|
# There's no way to input password using the standard library, so we have to use pexpect
|
|
# https://stackoverflow.com/questions/2387731/use-subprocess-to-send-a-password
|
|
cli_login = pexpect.spawn(f"kanidm login --url https://{kanidm_host} --name {account}")
|
|
cli_login.sendline(password)
|
|
cli_login.read()
|
|
|
|
def setup_kanidm_group(name: str) -> None:
|
|
subprocess.run(
|
|
["kanidm", "group", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
|
|
capture_output=True,
|
|
)
|
|
|
|
def setup_kanidm_oauth_app(name: str, redirect_uri: str) -> None:
|
|
try:
|
|
subprocess.run(
|
|
["kanidm", "system", "oauth2", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name, name, redirect_uri],
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
return
|
|
|
|
# TODO https://github.com/dexidp/dex/pull/3188
|
|
subprocess.run(
|
|
["kanidm", "system", "oauth2", "warning-insecure-client-disable-pkce", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
|
|
subprocess.run(
|
|
# TODO better group management
|
|
["kanidm", "system", "oauth2", "create-scope-map", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name, "editor", "openid", "profile", "email", "groups"],
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
|
|
client_secret = json.loads(subprocess.run(
|
|
["kanidm", "system", "oauth2", "show-basic-secret", "--url", f"https://{kanidm_host}", "--name", "idm_admin", "--output", "json", name],
|
|
capture_output=True,
|
|
check=True,
|
|
).stdout.decode("utf-8"))['secret']
|
|
|
|
apply_secret(
|
|
f"kanidm.{name}",
|
|
"global-secrets",
|
|
{
|
|
'client_id': base64.b64encode(name.encode("utf-8")).decode("utf-8"),
|
|
'client_secret': base64.b64encode(client_secret.encode("utf-8")).decode("utf-8"),
|
|
}
|
|
)
|
|
|
|
def main() -> None:
|
|
with Console().status("Completing the remaining sorcery"):
|
|
gitea_access_tokens = [
|
|
{
|
|
'name': 'renovate',
|
|
'scopes': [
|
|
"write:repository",
|
|
"read:user",
|
|
"write:issue",
|
|
"read:organization",
|
|
"read:misc"
|
|
]
|
|
}
|
|
]
|
|
|
|
gitea_oauth_apps = [
|
|
{'name': 'woodpecker', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('woodpecker-server', 'woodpecker').spec.rules[0].host}/authorize"},
|
|
]
|
|
|
|
kanidm_groups = [
|
|
# TODO better group management
|
|
{'name': 'editor'},
|
|
]
|
|
|
|
kanidm_oauth_apps = [
|
|
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"},
|
|
]
|
|
|
|
for token in gitea_access_tokens:
|
|
setup_gitea_access_token(token['name'], token['scopes'])
|
|
|
|
for app in gitea_oauth_apps:
|
|
setup_gitea_oauth_app(app['name'], app['redirect_uri'])
|
|
|
|
setup_gitea_auth_with_dex()
|
|
|
|
kanidm_login(["admin", "idm_admin"])
|
|
|
|
for group in kanidm_groups:
|
|
setup_kanidm_group(group['name'])
|
|
|
|
for app in kanidm_oauth_apps:
|
|
setup_kanidm_oauth_app(app['name'], app['redirect_uri'])
|
|
|
|
if __name__ == '__main__':
|
|
main()
|