220 lines
9.1 KiB
Python

import uuid
from typing import Annotated
from sqlalchemy import select, insert, bindparam, delete, and_
from sqlalchemy.orm import Session
from starlette.exceptions import HTTPException
from starlette.requests import Request
from fastapi import APIRouter, Depends
from starlette.responses import JSONResponse
from pillar_tool.db import Host
from pillar_tool.schemas import HostgroupParams, get_model_from_query
from pillar_tool.util.validation import split_and_validate_path
router = APIRouter(
prefix="/hostgroup",
tags=["Host Group"],
)
@router.get("")
def hostgroups_get(req: Request):
"""
Retrieve all host groups.
Fetches and returns a list of host group names from the database.
A host group is defined as a `Host` record where `is_hostgroup == True`.
Returns:
JSONResponse: A JSON response with status code 200 containing a list of host group names (strings).
"""
db: Session = req.state.db
print("[DEBUG] hostgroups_get: retrieving all host groups")
result = db.execute(select(Host).where(Host.is_hostgroup == True)).fetchall()
hosts: list[Host] = list(map(lambda x: x[0], result))
print("[DEBUG] hostgroups_get: found {} host group(s) in database".format(len(hosts)))
all_hostgroups = { x.id: x for x in hosts }
all_hostgroup_names = []
for host in hosts:
ancestors = [host]
while ancestors[-1].parent_id is not None:
ancestors.append(all_hostgroups[ancestors[-1].parent_id])
all_hostgroup_names.append('/'.join(map(lambda x: x.name, reversed(ancestors)))) # type: ignore
print("[DEBUG] hostgroups_get: resolved hierarchical names for {} host group(s): {}".format(len(all_hostgroup_names), all_hostgroup_names))
return JSONResponse(status_code=200, content=all_hostgroup_names)
@router.get("/{name}")
def hostgroup_get(req: Request, name: str, params: HostgroupParams):
"""
Retrieve a specific host group by name with additional details
Fetches and returns details of the specified host group.
Returns 404 if no such host group exists.
Args:
req (Request): The incoming request object.
name (str): The name of the host group to retrieve.
params (HostgroupCreateParams): the path of the group if desired
Returns:
JSONResponse: A JSON response with status code 200 and the host group details on success,
or 404 if not found.
"""
db: Session = req.state.db
print("[DEBUG] hostgroup_get: retrieving hostgroup '{}'".format(name))
# decode the path
last = None
ancestors = []
path: list[str] = []
if params:
path = (split_and_validate_path(params.path) or []) if params.path else []
if len(path) > 0:
print("[DEBUG] hostgroup_get: resolving parent path with {} segment(s): {}".format(len(path), path))
# get the path from the db
path_stmt = select(Host).where(and_(Host.name == bindparam('name') and Host.parent_id == bindparam('parent_id')))
for label in path:
result = db.execute(path_stmt, {'name': label, 'parent_id': last}).fetchall()
# error 404 if there is no matching item
if len(result) != 1:
print("[DEBUG] hostgroup_get: ERROR - No hostgroup found with name '{}' at parent level '{}'. Path traversal failed.".format(label, last))
raise HTTPException(status_code=404, detail="No such hostgroup path exists")
tmp: Host = result[0][0]
ancestors.append(tmp)
last = tmp.id
if len(ancestors) > 0:
print("[DEBUG] hostgroup_get: resolved parent path '{}'".format('/'.join(x.name for x in ancestors)))
# get the host in question
stmt = select(Host).where(and_(Host.name == name, Host.is_hostgroup == True, Host.parent_id == last))
result = db.execute(stmt).fetchall()
if len(result) == 0:
print("[DEBUG] hostgroup_get: ERROR - No hostgroup found with name '{}' and parent_id '{}'. This may indicate the hostgroup does not exist or has a different parent.".format(name, last))
raise HTTPException(status_code=404, detail="No such hostgroup exists")
# Note: this should be enforced by the database
assert len(result) == 1
hg: Host = result[0][0]
print("[DEBUG] hostgroup_get: resolved hostgroup '{}' with id={}".format(hg.name, hg.id))
return JSONResponse(status_code=200, content={
'hostgroup': hg.name,
'path': '/'.join(x.name for x in ancestors)
})
@router.post("/{name}")
def hostgroup_create(req: Request, name: str, params: HostgroupParams):
"""
Create a new host group.
Creates a new host group record in the database with the provided parameters.
Args:
req (Request): The incoming request object.
name (str): The name of the host group (used as identifier).
params (HostgroupCreateParams): Additional Parameters
Returns:
JSONResponse: A JSON response with status code 201 on success,
or appropriate error codes (e.g., 409 if already exists).
"""
db = req.state.db
path = params.path
labels = ( split_and_validate_path(path) if path is not None else [] ) or []
labels += [ name ]
print("[DEBUG] hostgroup_create: creating hostgroup hierarchy with {} label(s): {}".format(len(labels), labels))
last = None
for label in labels:
stmt = select(Host).where(and_(Host.name == label, Host.is_hostgroup == True, Host.parent_id == last))
result = db.execute(stmt).fetchall()
if len(result) == 1:
# simply step down through the hierarchy
host = result[0][0]
print("[DEBUG] hostgroup_create: existing hostgroup '{}' (id={}) found at parent level '{}', reusing".format(label, host.id, last))
last = host.id
elif len(result) == 0:
new_id = uuid.uuid4()
db.execute(insert(Host).values(id=new_id, name=label, is_hostgroup=True, parent_id=last))
print("[DEBUG] hostgroup_create: created new hostgroup '{}' with id={} at parent level {}".format(label, new_id, last))
last = new_id
else:
# this should not be possible
print("[DEBUG] hostgroup_create: ERROR - Multiple hostgroups found with name '{}' and parent '{}'. This indicates a database integrity violation (duplicate hostgroup names). Expected unique constraint enforcement.".format(label, last))
assert False
print("[DEBUG] hostgroup_create: successfully created/resolved hostgroup hierarchy ending at '{}'".format(labels[-1]))
# TODO: return the newly created hostgroups
return JSONResponse(status_code=201, content={})
@router.delete("/{name}")
def hostgroup_delete(req: Request, name: str, params: HostgroupParams = Depends(get_model_from_query(HostgroupParams))):
"""
Delete a host group by name.
Deletes the specified host group from the database.
Returns 404 if no such host group exists.
Args:
req (Request): The incoming request object.
name (str): The name of the host group to delete.
params (HostgroupCreateParams): Included for consistency but typically unused in deletions.
Returns:
JSONResponse: A JSON response with status code 204 on successful deletion,
or 404 if not found.
"""
db = req.state.db
labels = (split_and_validate_path(params.path) or []) if params.path is not None else []
labels.append(name)
last = None
print("[DEBUG] hostgroup_delete: deleting hostgroup hierarchy with {} label(s): {}".format(len(labels), labels))
stmt_step = select(Host).where(and_(Host.name == bindparam('name'), Host.parent_id == bindparam('last'), Host.is_hostgroup == True))
for label in labels:
result = db.execute(stmt_step, {'name': label, 'last': last}).fetchall()
if len(result) == 0:
print("[DEBUG] hostgroup_delete: ERROR - No hostgroup found with name '{}' at parent level '{}'. Path traversal failed.".format(label, last))
return JSONResponse(status_code=404, content={}) # TODO: truly define a error format
# this should be enforced by the database
assert len(result) == 1
host: Host = result[0][0]
print("[DEBUG] hostgroup_delete: resolved hostgroup '{}' (id={}) at parent level {}".format(label, host.id, last))
last = host.id
children_stmt = select(Host).where(Host.parent_id == last)
children: list[Host] = list(map(lambda x: x[0], db.execute(children_stmt).fetchall()))
if len(children) != 0:
print("[DEBUG] hostgroup_delete: ERROR - Cannot delete hostgroup '{}' because it has {} child(ren): {}. All children must be deleted first.".format(labels[-1], len(children), [ '/'.join(labels + [x.name]) for x in children ])) # type: ignore
return JSONResponse(status_code=400, content={
'message': "Cannot delete a hostgroup that still has children",
'children': [ '/'.join(labels + [x.name]) for x in children ] # type: ignore
})
print("[DEBUG] hostgroup_delete: deleting hostgroup '{}' (id={}) with no remaining children".format(labels[-1], last))
db.execute(delete(Host).where(Host.id == last))
return JSONResponse(status_code=204, content={})