import uuid from typing import Annotated from sqlalchemy import select, insert, bindparam, delete, and_ from sqlalchemy.orm import Session from starlette.exceptions import HTTPException from starlette.requests import Request from fastapi import APIRouter, Depends from starlette.responses import JSONResponse from pillar_tool.db import Host from pillar_tool.schemas import HostgroupParams, get_model_from_query from pillar_tool.util.validation import split_and_validate_path router = APIRouter( prefix="/hostgroup", tags=["Host Group"], ) @router.get("") def hostgroups_get(req: Request): """ Retrieve all host groups. Fetches and returns a list of host group names from the database. A host group is defined as a `Host` record where `is_hostgroup == True`. Returns: JSONResponse: A JSON response with status code 200 containing a list of host group names (strings). """ db: Session = req.state.db print("[DEBUG] hostgroups_get: retrieving all host groups") result = db.execute(select(Host).where(Host.is_hostgroup == True)).fetchall() hosts: list[Host] = list(map(lambda x: x[0], result)) print("[DEBUG] hostgroups_get: found {} host group(s) in database".format(len(hosts))) all_hostgroups = { x.id: x for x in hosts } all_hostgroup_names = [] for host in hosts: ancestors = [host] while ancestors[-1].parent_id is not None: ancestors.append(all_hostgroups[ancestors[-1].parent_id]) all_hostgroup_names.append('/'.join(map(lambda x: x.name, reversed(ancestors)))) print("[DEBUG] hostgroups_get: resolved hierarchical names for {} host group(s): {}".format(len(all_hostgroup_names), all_hostgroup_names)) return JSONResponse(status_code=200, content=all_hostgroup_names) @router.get("/{name}") def hostgroup_get(req: Request, name: str, params: HostgroupParams): """ Retrieve a specific host group by name with additional details Fetches and returns details of the specified host group. Returns 404 if no such host group exists. Args: req (Request): The incoming request object. name (str): The name of the host group to retrieve. params (HostgroupCreateParams): the path of the group if desired Returns: JSONResponse: A JSON response with status code 200 and the host group details on success, or 404 if not found. """ db: Session = req.state.db print("[DEBUG] hostgroup_get: retrieving hostgroup '{}'".format(name)) # decode the path last = None ancestors = [] path = [] if params: path = split_and_validate_path(params.path) if params.path else [] if len(path) > 0: print("[DEBUG] hostgroup_get: resolving parent path with {} segment(s): {}".format(len(path), path)) # get the path from the db path_stmt = select(Host).where(and_(Host.name == bindparam('name') and Host.parent_id == bindparam('parent_id'))) for label in path: result = db.execute(path_stmt, {'name': label, 'parent_id': last}).fetchall() # error 404 if there is no matching item if len(result) != 1: print("[DEBUG] hostgroup_get: ERROR - No hostgroup found with name '{}' at parent level '{}'. Path traversal failed.".format(label, last)) raise HTTPException(status_code=404, detail="No such hostgroup path exists") tmp: Host = result[0][0] ancestors.append(tmp) last = tmp.id if len(ancestors) > 0: print("[DEBUG] hostgroup_get: resolved parent path '{}'".format('/'.join(x.name for x in ancestors))) # get the host in question stmt = select(Host).where(and_(Host.name == name, Host.is_hostgroup == True, Host.parent_id == last)) result = db.execute(stmt).fetchall() if len(result) == 0: print("[DEBUG] hostgroup_get: ERROR - No hostgroup found with name '{}' and parent_id '{}'. This may indicate the hostgroup does not exist or has a different parent.".format(name, last)) raise HTTPException(status_code=404, detail="No such hostgroup exists") # Note: this should be enforced by the database assert len(result) == 1 hg: Host = result[0][0] print("[DEBUG] hostgroup_get: resolved hostgroup '{}' with id={}".format(hg.name, hg.id)) return JSONResponse(status_code=200, content={ 'hostgroup': hg.name, 'path': '/'.join(x.name for x in ancestors) }) @router.post("/{name}") def hostgroup_create(req: Request, name: str, params: HostgroupParams): """ Create a new host group. Creates a new host group record in the database with the provided parameters. Args: req (Request): The incoming request object. name (str): The name of the host group (used as identifier). params (HostgroupCreateParams): Additional Parameters Returns: JSONResponse: A JSON response with status code 201 on success, or appropriate error codes (e.g., 409 if already exists). """ db = req.state.db path = params.path labels = ( split_and_validate_path(path) if path is not None else [] ) or [] labels += [ name ] print("[DEBUG] hostgroup_create: creating hostgroup hierarchy with {} label(s): {}".format(len(labels), labels)) stmt = select(Host).where(and_(Host.name == bindparam('name'), Host.is_hostgroup == True, Host.parent_id == bindparam('last'))) last = None for label in labels: result = db.execute(stmt, {'name': label, 'last': last}).fetchall() if len(result) == 1: # simply step down through the hierarchy host = result[0][0] print("[DEBUG] hostgroup_create: existing hostgroup '{}' (id={}) found at parent level '{}', reusing".format(label, host.id, last)) last = host.id elif len(result) == 0: new_id = uuid.uuid4() db.execute(insert(Host).values(id=new_id, name=label, is_hostgroup=True, parent_id=last)) print("[DEBUG] hostgroup_create: created new hostgroup '{}' with id={} at parent level {}".format(label, new_id, last)) last = new_id else: # this should not be possible print("[DEBUG] hostgroup_create: ERROR - Multiple hostgroups found with name '{}' and parent '{}'. This indicates a database integrity violation (duplicate hostgroup names). Expected unique constraint enforcement.".format(label, last)) assert False print("[DEBUG] hostgroup_create: successfully created/resolved hostgroup hierarchy ending at '{}'".format(labels[-1])) # TODO: return the newly created hostgroups return JSONResponse(status_code=201, content={}) @router.delete("/{name}") def hostgroup_delete(req: Request, name: str, params: HostgroupParams = Depends(get_model_from_query(HostgroupParams))): """ Delete a host group by name. Deletes the specified host group from the database. Returns 404 if no such host group exists. Args: req (Request): The incoming request object. name (str): The name of the host group to delete. params (HostgroupCreateParams): Included for consistency but typically unused in deletions. Returns: JSONResponse: A JSON response with status code 204 on successful deletion, or 404 if not found. """ db = req.state.db labels = split_and_validate_path(params.path) or [] labels.append(name) last = None print("[DEBUG] hostgroup_delete: deleting hostgroup hierarchy with {} label(s): {}".format(len(labels), labels)) stmt_step = select(Host).where(and_(Host.name == bindparam('name'), Host.parent_id == bindparam('last'), Host.is_hostgroup == True)) for label in labels: result = db.execute(stmt_step, {'name': label, 'last': last}).fetchall() if len(result) == 0: print("[DEBUG] hostgroup_delete: ERROR - No hostgroup found with name '{}' at parent level '{}'. Path traversal failed.".format(label, last)) return JSONResponse(status_code=404, content={}) # TODO: truly define a error format # this should be enforced by the database assert len(result) == 1 host: Host = result[0][0] print("[DEBUG] hostgroup_delete: resolved hostgroup '{}' (id={}) at parent level {}".format(label, host.id, last)) last = host.id children_stmt = select(Host).where(Host.parent_id == last) children: list[Host] = list(map(lambda x: x[0], db.execute(children_stmt).fetchall())) if len(children) != 0: print("[DEBUG] hostgroup_delete: ERROR - Cannot delete hostgroup '{}' because it has {} child(ren): {}. All children must be deleted first.".format(labels[-1], len(children), [ '/'.join(labels + [x.name]) for x in children ])) return JSONResponse(status_code=400, content={ 'message': "Cannot delete a hostgroup that still has children", 'children': [ '/'.join(labels + [x.name]) for x in children ] }) print("[DEBUG] hostgroup_delete: deleting hostgroup '{}' (id={}) with no remaining children".format(labels[-1], last)) db.execute(delete(Host).where(Host.id == last)) return JSONResponse(status_code=204, content={})