From fc670efb148c5efdf420239ef95fc1aeb34ca94a Mon Sep 17 00:00:00 2001 From: Linus Vogel Date: Sun, 8 Feb 2026 21:40:02 +0100 Subject: [PATCH] added input validation and splitting input along path separators --- pillar_tool/db/queries/host_queries.py | 3 +++ pillar_tool/main.py | 36 +++++++++++++++++++++++--- pillar_tool/util/validation.py | 33 +++++++++++++++++++++++ 3 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 pillar_tool/util/validation.py diff --git a/pillar_tool/db/queries/host_queries.py b/pillar_tool/db/queries/host_queries.py index b42b8bf..aa684fe 100644 --- a/pillar_tool/db/queries/host_queries.py +++ b/pillar_tool/db/queries/host_queries.py @@ -44,3 +44,6 @@ def create_host(db: Session, fqdn: str, parent: str | None) -> Host: else: raise HTTPException(status_code=500, detail="Invalid state of database") + +def delete_host(db: Session, fqdn: str): + pass \ No newline at end of file diff --git a/pillar_tool/main.py b/pillar_tool/main.py index 642f171..be27258 100644 --- a/pillar_tool/main.py +++ b/pillar_tool/main.py @@ -1,5 +1,7 @@ # load config so everything else can work from pillar_tool.util import load_config, config +from pillar_tool.util.validation import validate_and_split_path_and_domain_name + load_config() from http.server import BaseHTTPRequestHandler @@ -106,19 +108,45 @@ async def hostgroup_list(request: Request): @app.post("/host/{fqdn}") async def host_add(request: Request, fqdn: str, params: HostCreateParams): - new_host = create_host(request.state.db, fqdn, params.parent) + # Validate and split FQDN into hierarchical labels (e.g., "a/b/c" -> ["a", "b", "c"]) + labels = validate_and_split_path_and_domain_name(fqdn) + if labels is None: + raise HTTPException(status_code=400, detail="Invalid Path provided") + # walk through the labels and create the requested groups and host + created = [] + last_parent = None + parent: str | None = params.parent # start with the optional parent parameter + for label in labels: + new_host = create_host(request.state.db, label, parent) + last_parent = parent + if parent is not None: + # update path to parent for the next label + parent += f"/{new_host.name}" + else: + # set first level otherwise + parent = new_host.name + created.append(new_host) + + # Prepare response with creation details output = { "message": "Host created", - "host": new_host, + "host": created[-1], # return the final host in the hierarchy } - if params.parent: + + # include the full path to the new host if it exists + if last_parent is not None: output.update({ - "parent": params.parent + "path": last_parent }) return JSONResponse(output) +@app.delete("/host/{fqdn}") +async def host_delete(request: Request, fqdn: str): + delete_host(request.state.db, fqdn) + return JSONResponse({}) + @app.get("/top/{fqdn}") async def host_top(request: Request, fqdn: str): # TODO: implement diff --git a/pillar_tool/util/validation.py b/pillar_tool/util/validation.py new file mode 100644 index 0000000..4164b7e --- /dev/null +++ b/pillar_tool/util/validation.py @@ -0,0 +1,33 @@ +import re + + +DOMAIN_NAME_REGEX = r'^[a-zA-Z0-9._-]+$' # could be a FQDN, but also just a name + + +def validate_and_split_path_and_domain_name(path_or_dn: str) -> list[str] | None: + """ + Splits a string along slashes and validates each fragment. + + Args: + path_or_dn: Input string that may contain slashes (Some path, name, FQDN or a combination of them) + + Returns: + List of validated fragments in original order, or None if validation fails + """ + import re + + # Split the input by slashes + fragments = [frag for frag in path_or_dn.strip().split('/')] + + # Validate each fragment contains only allowed characters + validated_fragments = [] + + for frag in fragments: + if not re.match(DOMAIN_NAME_REGEX, frag): + return None + + validated_fragments.append(frag) + + # Return the list of validated fragments or None if any failed validation + # NOTE: validated_fragments could be falsy if input was empty or only slashes + return validated_fragments if validated_fragments else None