100 lines
3.3 KiB
Python

from multiprocessing.connection import default_family
import click
import requests
from .cli_main import main, auth_header, base_url
from ...schemas import StateParams
@main.group("state")
def state():
pass
@state.command("list")
def state_list():
click.echo("Listing known states...")
try:
response = requests.get(f'{base_url()}/state', headers=auth_header())
response.raise_for_status()
click.echo("States:")
for st in response.json():
click.echo(f" - {st}")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to list states:\n{e}")
@state.command("show")
@click.argument("name")
def state_show(name: str):
click.echo(f"Showing state '{name}'...")
try:
data = StateParams()
response = requests.get(f'{base_url()}/state/{name}', headers=auth_header(), params=data.model_dump())
response.raise_for_status()
click.echo("State details:")
for key, value in response.json().items():
if isinstance(value, dict):
click.echo(f" {key}:")
for sub_key, sub_val in value.items():
click.echo(f" {sub_key}: {sub_val}")
else:
click.echo(f" {key}: {value}")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to show state:\n{e}")
@state.command("create")
@click.argument("name")
@click.option("--addenv", nargs=1, default=None, required=False, multiple=True)
def state_create(name: str, addenv: list[str] | None):
click.echo(f"Creating state '{name}'...")
try:
data = StateParams(
addenv=addenv,
delenv=None
)
response = requests.post(f'{base_url()}/state/{name}', headers=auth_header(), json=data.model_dump())
response.raise_for_status()
click.echo("State created successfully:")
for key, value in response.json().items():
click.echo(f" {key}: {value}")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to create state:\n{e}")
@state.command("delete")
@click.argument("name")
def state_delete(name: str):
click.echo(f"Deleting state '{name}'...")
try:
response = requests.delete(f'{base_url()}/state/{name}', headers=auth_header())
response.raise_for_status()
click.echo("State deleted successfully.")
except requests.exceptions.HTTPError:
raise click.ClickException(f"Failed to delete state:\n{response.text}")
@state.command("update")
@click.argument("state")
@click.option("--addenv", nargs=1, default=None, required=False, multiple=True)
@click.option("--delenv", nargs=1, default=None, required=False, multiple=True)
def state_update(state: str, addenv: list[str], delenv: list[str]):
click.echo(f"Updating state '{state}'...")
try:
params = StateParams(
addenv=list(addenv),
delenv=list(delenv)
)
response = requests.patch(f'{base_url()}/state/{state}', headers=auth_header(), json=params.model_dump())
response.raise_for_status()
click.echo("State updated successfully.")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to update state:\n{e}")