From 89987f36b07b057ea81f1c2d5e64ca50879b54f1 Mon Sep 17 00:00:00 2001 From: Linus Vogel Date: Sun, 8 Feb 2026 23:03:20 +0100 Subject: [PATCH] worked on moving the endpoints to their own filesf --- pillar_tool/db/models/pillar_data.py | 1 + pillar_tool/main.py | 71 ---------------------- pillar_tool/routers/host.py | 90 ++++++++++++++++++++++++++++ pillar_tool/routers/hostgroup.py | 34 +++++++++++ pillar_tool/schemas.py | 3 + pillar_tool/util/validation.py | 46 ++++++++------ 6 files changed, 155 insertions(+), 90 deletions(-) diff --git a/pillar_tool/db/models/pillar_data.py b/pillar_tool/db/models/pillar_data.py index f82385b..c20cafb 100644 --- a/pillar_tool/db/models/pillar_data.py +++ b/pillar_tool/db/models/pillar_data.py @@ -28,6 +28,7 @@ class Host(Base): id = Column(UUID, primary_key=True) name = Column(String, nullable=False) parent_id = Column(UUID, ForeignKey('pillar_tool_host.id'), nullable=True) + is_hostgroup = Column(Boolean, nullable=False, server_default="FALSE") __table_args__ = ( UniqueConstraint('name', 'parent_id', name='pillar_tool_host_unique_name'), diff --git a/pillar_tool/main.py b/pillar_tool/main.py index c79a6f0..2cabaad 100644 --- a/pillar_tool/main.py +++ b/pillar_tool/main.py @@ -106,77 +106,6 @@ async def hostgroup_list(request: Request): all_hosts = list_all_hosts(request.state.db) return JSONResponse([x.name for x in all_hosts if x.parent_id is not None]) -@app.post("/host/{fqdn}") -async def host_add(request: Request, fqdn: str, params: HostCreateParams): - """ - Creates a hierarchical host structure based on the provided FQDN path. - - Args: - request: The HTTP request containing database connection and other metadata. - fqdn: Fully Qualified Domain Name in dot-separated format (e.g., "a/b/c"). - params: Host creation parameters including optional parent reference. - - Returns: - JSON response with: - - message: Confirmation of host creation - - host: The final created host object - - path: Full hierarchical path if applicable - - Raises: - HTTPException(400): If the provided FQDN is invalid or malformed. - """ - # Validate and split FQDN into hierarchical labels (e.g., "a/b/c" -> ["a", "b", "c"]) - labels = validate_and_split_path_and_domain_name(fqdn) - if labels is None: - raise HTTPException(status_code=400, detail="Invalid Path provided") - - # walk through the labels and create the requested groups and host - created = [] - last_parent = None - parent: str | None = params.parent # start with the optional parent parameter - for label in labels: - new_host = create_host(request.state.db, label, parent) - last_parent = parent - if parent is not None: - # update path to parent for the next label - parent += f"/{new_host.name}" - else: - # set first level otherwise - parent = new_host.name - created.append(new_host) - - # Prepare response with creation details - output = { - "message": "Host created", - "host": created[-1], # return the final host in the hierarchy - } - - # include the full path to the new host if it exists - if last_parent is not None: - output.update({ - "path": last_parent - }) - - return JSONResponse(output) - -@app.delete("/host/{fqdn}") -async def host_delete(request: Request, fqdn: str): - # Validate and split FQDN into hierarchical labels (e.g., "a/b/c" -> ["a", "b", "c"]) - labels = validate_and_split_path_and_domain_name(fqdn) - if labels is None: - raise HTTPException(status_code=400, detail="Invalid Path provided") - - error_msg: str | None = delete_host(request.state.db, labels) - if error_msg is not None: - return JSONResponse({ - 'message': "Failed to delete host", - 'details': error_msg - }, status_code=400) - else: - return JSONResponse({ - 'message': "Host deleted", - 'path': fqdn - }) @app.get("/top/{fqdn}") async def host_top(request: Request, fqdn: str): diff --git a/pillar_tool/routers/host.py b/pillar_tool/routers/host.py index e69de29..ab66bf4 100644 --- a/pillar_tool/routers/host.py +++ b/pillar_tool/routers/host.py @@ -0,0 +1,90 @@ +import uuid + +from sqlalchemy import select, insert, bindparam +from sqlalchemy.orm import Session +from starlette.exceptions import HTTPException +from starlette.requests import Request +from fastapi import APIRouter +from starlette.responses import JSONResponse + +from pillar_tool.db import Host +from pillar_tool.db.queries.host_queries import create_host +from pillar_tool.schemas import HostCreateParams +from pillar_tool.util.validation import validate_fqdn, split_and_validate_path + +router = APIRouter( + prefix="/host", + tags=["Host"], +) + + + +@router.get("") +def hosts_get(req: Request): + db: Session = req.state.db + + +@router.get("/{fqdn}") +def host_get(req: Request, fqdn: str): + db: Session = req.state.db + + +@router.post("/{fqdn}") +async def host_add(request: Request, fqdn: str, params: HostCreateParams): + db: Session = request.state.db + + if not validate_fqdn(fqdn): + raise HTTPException(status_code=400, detail="Provided host is not an FQDN") + + if params.parent is not None: + parent_labels = split_and_validate_path(params.parent) + if parent_labels is None: + raise HTTPException(status_code=400, detail="Provided parent is not a valid path") + else: + parent_labels = [] + + parent_id = None + stmt_select_respecting_parent = select(Host).where(Host.name == bindparam("label") and Host.parent_id == bindparam("parent_id")) + for label in parent_labels: + result = db.execute(stmt_select_respecting_parent, { + 'label': label, + 'parent_id': parent_id + }).fetchall() + + if len(result) == 0: + raise HTTPException(status_code=400, detail="Parent does not exist") + + # Note: this should be enforced by the database + assert len(result) == 1 + + parent_id = result[0][0].parent_id + + new_host = Host( + id=uuid.uuid4(), + name=fqdn, + parent_id=parent_id, + is_hostgroup=False + ) + stmt_create_host_with_parent = insert(Host).values(id=new_host.id, name=new_host.name, parent_id=new_host.parent_id, is_hostgroup=new_host.is_hostgroup) + db.execute(stmt_create_host_with_parent).fetchall() + + # Prepare response with creation details + output = { + "message": "Host created", + "host": new_host, # return the final host in the hierarchy + } + + # include the full path to the new host if it exists + if params.parent is not None: + output.update({ + "path": params.parent + }) + + return JSONResponse(output) + + +@router.delete("/{fqdn}") +async def host_delete(request: Request, fqdn: str): + pass + + diff --git a/pillar_tool/routers/hostgroup.py b/pillar_tool/routers/hostgroup.py index e69de29..54ef756 100644 --- a/pillar_tool/routers/hostgroup.py +++ b/pillar_tool/routers/hostgroup.py @@ -0,0 +1,34 @@ +from starlette.requests import Request +from fastapi import APIRouter + +from pillar_tool.schemas import HostgroupCreateParams + +router = APIRouter( + prefix="/hostgroup", + tags=["Host Group"], +) + + +@router.get("") +def hostgroups_get(req: Request): + pass + + +@router.get("/{name}") +def hostgroup_get(req: Request, name: str): + pass + + +@router.post("/{name}") +def hostgroup_create(req: Request, name: str, params: HostgroupCreateParams): + pass + + +@router.patch("/{name}") +def hostgroup_update(req: Request, name: str, params: HostgroupCreateParams): + pass + + +@router.delete("/{name}") +def hostgroup_delete(req: Request, name: str, params: HostgroupCreateParams): + pass diff --git a/pillar_tool/schemas.py b/pillar_tool/schemas.py index 2163cae..f441260 100644 --- a/pillar_tool/schemas.py +++ b/pillar_tool/schemas.py @@ -1,4 +1,7 @@ from pydantic import BaseModel class HostCreateParams(BaseModel): + parent: str | None + +class HostgroupCreateParams(BaseModel): parent: str | None \ No newline at end of file diff --git a/pillar_tool/util/validation.py b/pillar_tool/util/validation.py index 57e891e..a152351 100644 --- a/pillar_tool/util/validation.py +++ b/pillar_tool/util/validation.py @@ -1,36 +1,44 @@ import re -DOMAIN_NAME_REGEX = r'^[a-zA-Z0-9._-]+$' # could be a FQDN, but also just a name -FQDN_REGEX = r'^([a-zA-Z0-9.-]+\.)+[a-zA-Z]{2,}$' +PATH_REGEX = re.compile(r'^[a-zA-Z_-][a-zA-Z0-9_-]*$') +FQDN_REGEX = re.compile(r'^([a-zA-Z0-9.-]+\.)+[a-zA-Z]{2,}$') -def validate_and_split_path_and_domain_name(path_or_dn: str) -> list[str] | None: +def validate_fqdn(fqdn: str) -> bool: """ - Splits a string along slashes and validates each fragment. + Validates a string against the FQDN regex pattern. Args: - path_or_dn: Input string that may contain slashes (Some path, name, FQDN or a combination of them) + fqdn: The fully qualified domain name to validate (e.g., "example.com") Returns: - List of validated fragments in original order, or None if validation fails + True if the input matches the FQDN pattern, False otherwise """ - import re + return re.match(FQDN_REGEX, fqdn) is not None - # Split the input by slashes - fragments = [frag for frag in path_or_dn.strip().split('/')] - # Validate each fragment contains only allowed characters - validated_fragments = [] +def split_and_validate_path(path: str) -> list[str] | None: + """ + Splits a path string by slashes, filters out empty fragments, and validates each label. - for frag in fragments: - if not re.match(DOMAIN_NAME_REGEX, frag): + Args: + path: Input path string in format like "a/b/c" or "/a/b/c" + + Returns: + List of validated path labels, or None if validation fails (empty input or invalid characters) + """ + # Split by slashes and filter out empty fragments + labels = list(filter(lambda x: x != "", path.strip().split("/"))) + + # Return None for empty paths + if len(labels) == 0: + return None + + # Validate each label against the PATH_REGEX pattern + for label in labels: + if not re.match(PATH_REGEX, label): return None - validated_fragments.append(frag) - - # Return the list of validated fragments or None if any failed validation - # NOTE: validated_fragments could be falsy if input was empty or only slashes - return validated_fragments if validated_fragments else None - + return labels