Compare commits

...

2 Commits

11 changed files with 187 additions and 90 deletions

View File

@ -0,0 +1,32 @@
"""mark hosts as hostgroups
Revision ID: e33744090598
Revises: 0a912926be8b
Create Date: 2026-02-08 22:20:42.019833
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e33744090598'
down_revision: Union[str, Sequence[str], None] = '0a912926be8b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('pillar_tool_host', sa.Column('is_hostgroup', sa.Boolean(), server_default='FALSE', nullable=False))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('pillar_tool_host', 'is_hostgroup')
# ### end Alembic commands ###

View File

@ -28,6 +28,7 @@ class Host(Base):
id = Column(UUID, primary_key=True)
name = Column(String, nullable=False)
parent_id = Column(UUID, ForeignKey('pillar_tool_host.id'), nullable=True)
is_hostgroup = Column(Boolean, nullable=False, server_default="FALSE")
__table_args__ = (
UniqueConstraint('name', 'parent_id', name='pillar_tool_host_unique_name'),

View File

@ -106,77 +106,6 @@ async def hostgroup_list(request: Request):
all_hosts = list_all_hosts(request.state.db)
return JSONResponse([x.name for x in all_hosts if x.parent_id is not None])
@app.post("/host/{fqdn}")
async def host_add(request: Request, fqdn: str, params: HostCreateParams):
"""
Creates a hierarchical host structure based on the provided FQDN path.
Args:
request: The HTTP request containing database connection and other metadata.
fqdn: Fully Qualified Domain Name in dot-separated format (e.g., "a/b/c").
params: Host creation parameters including optional parent reference.
Returns:
JSON response with:
- message: Confirmation of host creation
- host: The final created host object
- path: Full hierarchical path if applicable
Raises:
HTTPException(400): If the provided FQDN is invalid or malformed.
"""
# Validate and split FQDN into hierarchical labels (e.g., "a/b/c" -> ["a", "b", "c"])
labels = validate_and_split_path_and_domain_name(fqdn)
if labels is None:
raise HTTPException(status_code=400, detail="Invalid Path provided")
# walk through the labels and create the requested groups and host
created = []
last_parent = None
parent: str | None = params.parent # start with the optional parent parameter
for label in labels:
new_host = create_host(request.state.db, label, parent)
last_parent = parent
if parent is not None:
# update path to parent for the next label
parent += f"/{new_host.name}"
else:
# set first level otherwise
parent = new_host.name
created.append(new_host)
# Prepare response with creation details
output = {
"message": "Host created",
"host": created[-1], # return the final host in the hierarchy
}
# include the full path to the new host if it exists
if last_parent is not None:
output.update({
"path": last_parent
})
return JSONResponse(output)
@app.delete("/host/{fqdn}")
async def host_delete(request: Request, fqdn: str):
# Validate and split FQDN into hierarchical labels (e.g., "a/b/c" -> ["a", "b", "c"])
labels = validate_and_split_path_and_domain_name(fqdn)
if labels is None:
raise HTTPException(status_code=400, detail="Invalid Path provided")
error_msg: str | None = delete_host(request.state.db, labels)
if error_msg is not None:
return JSONResponse({
'message': "Failed to delete host",
'details': error_msg
}, status_code=400)
else:
return JSONResponse({
'message': "Host deleted",
'path': fqdn
})
@app.get("/top/{fqdn}")
async def host_top(request: Request, fqdn: str):

View File

View File

View File

@ -0,0 +1,90 @@
import uuid
from sqlalchemy import select, insert, bindparam
from sqlalchemy.orm import Session
from starlette.exceptions import HTTPException
from starlette.requests import Request
from fastapi import APIRouter
from starlette.responses import JSONResponse
from pillar_tool.db import Host
from pillar_tool.db.queries.host_queries import create_host
from pillar_tool.schemas import HostCreateParams
from pillar_tool.util.validation import validate_fqdn, split_and_validate_path
router = APIRouter(
prefix="/host",
tags=["Host"],
)
@router.get("")
def hosts_get(req: Request):
db: Session = req.state.db
@router.get("/{fqdn}")
def host_get(req: Request, fqdn: str):
db: Session = req.state.db
@router.post("/{fqdn}")
async def host_add(request: Request, fqdn: str, params: HostCreateParams):
db: Session = request.state.db
if not validate_fqdn(fqdn):
raise HTTPException(status_code=400, detail="Provided host is not an FQDN")
if params.parent is not None:
parent_labels = split_and_validate_path(params.parent)
if parent_labels is None:
raise HTTPException(status_code=400, detail="Provided parent is not a valid path")
else:
parent_labels = []
parent_id = None
stmt_select_respecting_parent = select(Host).where(Host.name == bindparam("label") and Host.parent_id == bindparam("parent_id"))
for label in parent_labels:
result = db.execute(stmt_select_respecting_parent, {
'label': label,
'parent_id': parent_id
}).fetchall()
if len(result) == 0:
raise HTTPException(status_code=400, detail="Parent does not exist")
# Note: this should be enforced by the database
assert len(result) == 1
parent_id = result[0][0].parent_id
new_host = Host(
id=uuid.uuid4(),
name=fqdn,
parent_id=parent_id,
is_hostgroup=False
)
stmt_create_host_with_parent = insert(Host).values(id=new_host.id, name=new_host.name, parent_id=new_host.parent_id, is_hostgroup=new_host.is_hostgroup)
db.execute(stmt_create_host_with_parent).fetchall()
# Prepare response with creation details
output = {
"message": "Host created",
"host": new_host, # return the final host in the hierarchy
}
# include the full path to the new host if it exists
if params.parent is not None:
output.update({
"path": params.parent
})
return JSONResponse(output)
@router.delete("/{fqdn}")
async def host_delete(request: Request, fqdn: str):
pass

View File

@ -0,0 +1,34 @@
from starlette.requests import Request
from fastapi import APIRouter
from pillar_tool.schemas import HostgroupCreateParams
router = APIRouter(
prefix="/hostgroup",
tags=["Host Group"],
)
@router.get("")
def hostgroups_get(req: Request):
pass
@router.get("/{name}")
def hostgroup_get(req: Request, name: str):
pass
@router.post("/{name}")
def hostgroup_create(req: Request, name: str, params: HostgroupCreateParams):
pass
@router.patch("/{name}")
def hostgroup_update(req: Request, name: str, params: HostgroupCreateParams):
pass
@router.delete("/{name}")
def hostgroup_delete(req: Request, name: str, params: HostgroupCreateParams):
pass

View File

View File

View File

@ -1,4 +1,7 @@
from pydantic import BaseModel
class HostCreateParams(BaseModel):
parent: str | None
class HostgroupCreateParams(BaseModel):
parent: str | None

View File

@ -1,36 +1,44 @@
import re
DOMAIN_NAME_REGEX = r'^[a-zA-Z0-9._-]+$' # could be a FQDN, but also just a name
FQDN_REGEX = r'^([a-zA-Z0-9.-]+\.)+[a-zA-Z]{2,}$'
PATH_REGEX = re.compile(r'^[a-zA-Z_-][a-zA-Z0-9_-]*$')
FQDN_REGEX = re.compile(r'^([a-zA-Z0-9.-]+\.)+[a-zA-Z]{2,}$')
def validate_and_split_path_and_domain_name(path_or_dn: str) -> list[str] | None:
def validate_fqdn(fqdn: str) -> bool:
"""
Splits a string along slashes and validates each fragment.
Validates a string against the FQDN regex pattern.
Args:
path_or_dn: Input string that may contain slashes (Some path, name, FQDN or a combination of them)
fqdn: The fully qualified domain name to validate (e.g., "example.com")
Returns:
List of validated fragments in original order, or None if validation fails
True if the input matches the FQDN pattern, False otherwise
"""
import re
return re.match(FQDN_REGEX, fqdn) is not None
# Split the input by slashes
fragments = [frag for frag in path_or_dn.strip().split('/')]
# Validate each fragment contains only allowed characters
validated_fragments = []
def split_and_validate_path(path: str) -> list[str] | None:
"""
Splits a path string by slashes, filters out empty fragments, and validates each label.
for frag in fragments:
if not re.match(DOMAIN_NAME_REGEX, frag):
Args:
path: Input path string in format like "a/b/c" or "/a/b/c"
Returns:
List of validated path labels, or None if validation fails (empty input or invalid characters)
"""
# Split by slashes and filter out empty fragments
labels = list(filter(lambda x: x != "", path.strip().split("/")))
# Return None for empty paths
if len(labels) == 0:
return None
# Validate each label against the PATH_REGEX pattern
for label in labels:
if not re.match(PATH_REGEX, label):
return None
validated_fragments.append(frag)
# Return the list of validated fragments or None if any failed validation
# NOTE: validated_fragments could be falsy if input was empty or only slashes
return validated_fragments if validated_fragments else None
return labels