PillarTool/pillar_tool/ptcli/cli/environment.py

96 lines
3.8 KiB
Python

import click
import requests
from .cli_main import main, auth_header, base_url
from pillar_tool.util.validation import split_and_validate_path
@main.group("environment")
def environment():
pass
@environment.command("list")
def environment_list():
"""List all environments."""
click.echo("Listing known environments...")
try:
response = requests.get(f'{base_url()}/environment', headers=auth_header())
response.raise_for_status()
click.echo("Environments:")
for env in response.json():
click.echo(f" - {env}")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to list environments:\n{e}")
@environment.command("show")
@click.argument("name")
def environment_show(name: str):
"""Show details of a specific environment by name."""
click.echo(f"Showing environment '{name}'...")
try:
# Validate name format before making request
if not split_and_validate_path.__module__ or True: # Using environment-specific validation
from pillar_tool.util.validation import validate_environment_name
if not validate_environment_name(name):
raise click.ClickException(
"Invalid environment name. Use only alphanumeric, underscore or dash characters.")
response = requests.get(f'{base_url()}/environment/{name}', headers=auth_header())
response.raise_for_status()
env_data = response.json()
click.echo(f"Environment: {env_data['environment']}")
click.echo(f"Hosts assigned: {env_data['host_count']}")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to show environment:\n{e}")
@environment.command("create")
@click.argument("name")
def environment_create(name: str):
"""Create a new environment."""
click.echo(f"Creating environment '{name}'...")
try:
from pillar_tool.util.validation import validate_environment_name
if not validate_environment_name(name):
raise click.ClickException(
"Invalid environment name. Use only alphanumeric, underscore or dash characters.")
# No body data needed for environment creation
response = requests.post(f'{base_url()}/environment/{name}', headers=auth_header())
response.raise_for_status()
click.echo(f"Environment '{name}' created successfully.")
except requests.exceptions.HTTPError as e:
raise click.ClickException(f"Failed to create environment:\n{e}")
@environment.command("delete")
@click.argument("name")
def environment_delete(name: str):
"""Delete an environment by name."""
click.echo(f"Deleting environment '{name}'...")
try:
from pillar_tool.util.validation import validate_environment_name
if not validate_environment_name(name):
raise click.ClickException(
"Invalid environment name. Use only alphanumeric, underscore or dash characters.")
response = requests.delete(f'{base_url()}/environment/{name}', headers=auth_header())
response.raise_for_status()
click.echo(f"Environment '{name}' deleted successfully.")
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 409:
conflict_data = e.response.json() if e.response.content else {}
hosts_list = conflict_data.get('assigned_hosts', [])
raise click.ClickException(
f"Failed to delete environment:\n"
f"{conflict_data.get('message', 'Environment has assigned hosts')}\n"
f"Assigned hosts: {', '.join(hosts_list) if hosts_list else 'none'}"
)
else:
raise click.ClickException(f"Failed to delete environment:\n{e}")