190 lines
6.5 KiB
Python

import uuid
from sqlalchemy import select, insert, bindparam, delete, and_
from sqlalchemy.orm import Session
from starlette.exceptions import HTTPException
from starlette.requests import Request
from fastapi import APIRouter, Query, Depends
from starlette.responses import JSONResponse
from pillar_tool.db import Host
from pillar_tool.schemas import HostgroupParams, get_model_from_query
from pillar_tool.util.validation import split_and_validate_path
router = APIRouter(
prefix="/hostgroup",
tags=["Host Group"],
)
@router.get("")
def hostgroups_get(req: Request):
"""
Retrieve all host groups.
Fetches and returns a list of host group names from the database.
A host group is defined as a `Host` record where `is_hostgroup == True`.
Returns:
JSONResponse: A JSON response with status code 200 containing a list of host group names (strings).
"""
db: Session = req.state.db
result = db.execute(select(Host).where(Host.is_hostgroup == True)).fetchall()
hosts: list[Host] = list(map(lambda x: x[0], result))
all_hostgroups = { x.id: x for x in hosts }
all_hostgroup_names = []
for host in hosts:
ancestors = [host]
while ancestors[-1].parent_id is not None:
ancestors.append(all_hostgroups[ancestors[-1].parent_id])
all_hostgroup_names.append('/'.join(map(lambda x: x.name, reversed(ancestors))))
return JSONResponse(status_code=200, content=all_hostgroup_names)
@router.get("/{name}")
def hostgroup_get(req: Request, name: str, params: HostgroupParams = Depends(get_model_from_query(HostgroupParams))):
"""
Retrieve a specific host group by name with additional details
Fetches and returns details of the specified host group.
Returns 404 if no such host group exists.
Args:
req (Request): The incoming request object.
name (str): The name of the host group to retrieve.
params (HostgroupCreateParams): the path of the group if desired
Returns:
JSONResponse: A JSON response with status code 200 and the host group details on success,
or 404 if not found.
"""
db: Session = req.state.db
# decode the path
last = None
ancestors = []
path = split_and_validate_path(params.path) if params.path else []
# get the path from the db
path_stmt = select(Host).where(and_(Host.name == bindparam('name') and Host.parent_id == bindparam('parent_id')))
for label in path:
result = db.execute(path_stmt, {'name': label, 'parent_id': last}).fetchall()
# error 404 if there is no matching item
if len(result) != 1:
raise HTTPException(status_code=404, detail="No such hostgroup path exists")
tmp: Host = result[0][0]
ancestors.append(tmp)
last = tmp.id
# get the host in question
stmt = select(Host).where(and_(Host.name == name, Host.is_hostgroup == True, Host.parent_id == last))
result = db.execute(stmt).fetchall()
if len(result) == 0:
raise HTTPException(status_code=404, detail="No such hostgroup exists")
# Note: this should be enforced by the database
assert len(result) == 1
hg: Host = result[0][0]
return JSONResponse(status_code=200, content={
'hostgroup': hg.name,
'path': '/'.join(x.name for x in ancestors)
})
@router.post("/{name}")
def hostgroup_create(req: Request, name: str, params: HostgroupParams):
"""
Create a new host group.
Creates a new host group record in the database with the provided parameters.
Args:
req (Request): The incoming request object.
name (str): The name of the host group (used as identifier).
params (HostgroupCreateParams): Additional Parameters
Returns:
JSONResponse: A JSON response with status code 201 on success,
or appropriate error codes (e.g., 409 if already exists).
"""
db = req.state.db
path = params.path
labels = split_and_validate_path(path) if path is not None else []
labels += [ name ]
stmt = select(Host).where(and_(Host.name == bindparam('name'), Host.is_hostgroup == True, Host.parent_id == bindparam('last')))
last = None
for label in labels:
result = db.execute(stmt, {'name': label, 'last': last}).fetchall()
if len(result) == 1:
# simply step down through the hierarchy
host = result[0][0]
last = host.id
elif len(result) == 0:
new_id = uuid.uuid4()
db.execute(insert(Host).values(id=new_id, name=label, is_hostgroup=True, parent_id=last))
last = new_id
else:
# this should not be possible
assert False
# TODO: return the newly created hostgroups
return JSONResponse(status_code=201, content={})
@router.delete("/{name}")
def hostgroup_delete(req: Request, name: str, params: HostgroupParams = Depends(get_model_from_query(HostgroupParams))):
"""
Delete a host group by name.
Deletes the specified host group from the database.
Returns 404 if no such host group exists.
Args:
req (Request): The incoming request object.
name (str): The name of the host group to delete.
params (HostgroupCreateParams): Included for consistency but typically unused in deletions.
Returns:
JSONResponse: A JSON response with status code 204 on successful deletion,
or 404 if not found.
"""
db = req.state.db
labels = split_and_validate_path(params.path) or []
labels.append(name)
last = None
stmt_step = select(Host).where(and_(Host.name == bindparam('name'), Host.parent_id == bindparam('last'), Host.is_hostgroup == True))
for label in labels:
result = db.execute(stmt_step, {'name': label, 'last': last}).fetchall()
if len(result) == 0:
return JSONResponse(status_code=404, content={}) # TODO: truly define a error format
# this should be enforced by the database
assert len(result) == 1
host: Host = result[0][0]
last = host.id
children_stmt = select(Host).where(Host.parent_id == last)
children: list[Host] = list(map(lambda x: x[0], db.execute(children_stmt).fetchall()))
if len(children) != 0:
return JSONResponse(status_code=400, content={
'message': "Cannot delete a hostgroup that still has children",
'children': [ '/'.join(labels + [x.name]) for x in children ]
})
db.execute(delete(Host).where(Host.id == last))
return JSONResponse(status_code=204, content={})