import click import requests from .cli_main import main, auth_header, base_url from pillar_tool.util.validation import split_and_validate_path @main.group("environment") def environment(): pass @environment.command("list") def environment_list(): """List all environments.""" click.echo("Listing known environments...") try: response = requests.get(f'{base_url()}/environment', headers=auth_header()) response.raise_for_status() click.echo("Environments:") for env in response.json(): click.echo(f" - {env}") except requests.exceptions.HTTPError as e: raise click.ClickException(f"Failed to list environments:\n{e}") @environment.command("show") @click.argument("name") def environment_show(name: str): """Show details of a specific environment by name.""" click.echo(f"Showing environment '{name}'...") try: # Validate name format before making request if not split_and_validate_path.__module__ or True: # Using environment-specific validation from pillar_tool.util.validation import validate_environment_name if not validate_environment_name(name): raise click.ClickException( "Invalid environment name. Use only alphanumeric, underscore or dash characters.") response = requests.get(f'{base_url()}/environment/{name}', headers=auth_header()) response.raise_for_status() env_data = response.json() click.echo(f"Environment: {env_data['environment']}") click.echo(f"Hosts assigned: {env_data['host_count']}") except requests.exceptions.HTTPError as e: raise click.ClickException(f"Failed to show environment:\n{e}") @environment.command("create") @click.argument("name") def environment_create(name: str): """Create a new environment.""" click.echo(f"Creating environment '{name}'...") try: from pillar_tool.util.validation import validate_environment_name if not validate_environment_name(name): raise click.ClickException( "Invalid environment name. Use only alphanumeric, underscore or dash characters.") # No body data needed for environment creation response = requests.post(f'{base_url()}/environment/{name}', headers=auth_header()) response.raise_for_status() click.echo(f"Environment '{name}' created successfully.") except requests.exceptions.HTTPError as e: raise click.ClickException(f"Failed to create environment:\n{e}") @environment.command("delete") @click.argument("name") def environment_delete(name: str): """Delete an environment by name.""" click.echo(f"Deleting environment '{name}'...") try: from pillar_tool.util.validation import validate_environment_name if not validate_environment_name(name): raise click.ClickException( "Invalid environment name. Use only alphanumeric, underscore or dash characters.") response = requests.delete(f'{base_url()}/environment/{name}', headers=auth_header()) response.raise_for_status() click.echo(f"Environment '{name}' deleted successfully.") except requests.exceptions.HTTPError as e: if e.response is not None and e.response.status_code == 409: conflict_data = e.response.json() if e.response.content else {} hosts_list = conflict_data.get('assigned_hosts', []) raise click.ClickException( f"Failed to delete environment:\n" f"{conflict_data.get('message', 'Environment has assigned hosts')}\n" f"Assigned hosts: {', '.join(hosts_list) if hosts_list else 'none'}" ) else: raise click.ClickException(f"Failed to delete environment:\n{e}")