feat: automate Kanidm configuration

Just a hack for now.
This commit is contained in:
Khue Doan 2024-01-06 01:25:55 +07:00
parent 50220aaf6a
commit 9ff1077470
2 changed files with 94 additions and 0 deletions

View File

@ -27,6 +27,7 @@
iproute2
jq
k9s
kanidm
kube3d
kubectl
kubernetes-helm
@ -45,6 +46,7 @@
kubernetes
mkdocs-material
netaddr
pexpect
rich
]))
];

View File

@ -7,12 +7,16 @@ TODO: retire this script
import base64
import json
import json
import pexpect
import requests
import subprocess
import sys
import urllib
from rich.console import Console
from kubernetes import client, config
from kubernetes.stream import stream
# https://git.khuedoan.com/user/settings/applications
# Doing this properly inside the cluster requires:
@ -28,6 +32,8 @@ gitea_user = base64.b64decode(gitea_user_secret.data['username']).decode("utf-8"
gitea_pass = base64.b64decode(gitea_user_secret.data['password']).decode("utf-8")
gitea_url = f"http://{gitea_user}:{urllib.parse.quote_plus(gitea_pass)}@{gitea_host}"
kanidm_host = client.NetworkingV1Api().read_namespaced_ingress('kanidm', 'kanidm').spec.rules[0].host
def create_secret(name: str, namespace: str, data: dict) -> None:
try:
client.CoreV1Api().read_namespaced_secret(name, namespace)
@ -99,6 +105,75 @@ def setup_gitea_oauth_app(name: str, redirect_uri: str) -> None:
print(resp.content)
sys.exit(1)
def reset_kanidm_account_password(account: str) -> str:
resp = stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
'kanidm-0',
'kanidm',
command=["kanidmd", "recover-account", "--output", "json", account],
stderr=True, stdin=False,
stdout=False, tty=False
).replace("\'", "\"")
return json.loads(resp)['password']
# TODO Proper automation will be added later, waiting for client library update:
# https://github.com/kanidm/kanidm/pull/2301
def kanidm_login(accounts: list[str]) -> None:
for account in accounts:
password = reset_kanidm_account_password(account)
# There's no way to input password using the standard library, so we have to use pexpect
# https://stackoverflow.com/questions/2387731/use-subprocess-to-send-a-password
cli_login = pexpect.spawn(f"kanidm login --url https://{kanidm_host} --name {account}")
cli_login.sendline(password)
cli_login.read()
def setup_kanidm_group(name: str) -> None:
subprocess.run(
["kanidm", "group", "create", "--url", f"https://{kanidm_host}", "--name", "idm_admin", name],
capture_output=True,
)
def setup_kanidm_oauth_app(name: str, redirect_uri: str) -> None:
try:
subprocess.run(
["kanidm", "system", "oauth2", "create", "--url", f"https://{kanidm_host}", "--name", "admin", name, name, redirect_uri],
capture_output=True,
check=True,
)
except subprocess.CalledProcessError:
return
# TODO https://github.com/dexidp/dex/pull/3188
subprocess.run(
["kanidm", "system", "oauth2", "warning-insecure-client-disable-pkce", "--url", f"https://{kanidm_host}", "--name", "admin", name],
capture_output=True,
check=True,
)
subprocess.run(
# TODO better group management
["kanidm", "system", "oauth2", "create-scope-map", "--url", f"https://{kanidm_host}", "--name", "admin", name, "editor", "openid", "profile", "email", "groups"],
capture_output=True,
check=True,
)
client_secret = json.loads(subprocess.run(
["kanidm", "system", "oauth2", "show-basic-secret", "--url", f"https://{kanidm_host}", "--name", "admin", "--output", "json", name],
capture_output=True,
check=True,
).stdout.decode("utf-8"))['secret']
create_secret(
f"kanidm.{name}",
"global-secrets",
{
'client_id': base64.b64encode(name.encode("utf-8")).decode("utf-8"),
'client_secret': base64.b64encode(client_secret.encode("utf-8")).decode("utf-8"),
}
)
def main() -> None:
with Console().status("Completing the remaining sorcery"):
gitea_access_tokens = [
@ -109,11 +184,28 @@ def main() -> None:
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"}
]
kanidm_groups = [
# TODO better group management
{'name': 'editor'},
]
kanidm_oauth_apps = [
{'name': 'dex', 'redirect_uri': f"https://{client.NetworkingV1Api().read_namespaced_ingress('dex', 'dex').spec.rules[0].host}/callback"},
]
for token_name in gitea_access_tokens:
setup_gitea_access_token(token_name)
for app in gitea_oauth_apps:
setup_gitea_oauth_app(app['name'], app['redirect_uri'])
kanidm_login(["admin", "idm_admin"])
for group in kanidm_groups:
setup_kanidm_group(group['name'])
for app in kanidm_oauth_apps:
setup_kanidm_oauth_app(app['name'], app['redirect_uri'])
if __name__ == '__main__':
main()