added input validation and splitting input along path separators

This commit is contained in:
Linus Vogel 2026-02-08 21:40:02 +01:00
parent c47794ecda
commit fc670efb14
3 changed files with 68 additions and 4 deletions

View File

@ -44,3 +44,6 @@ def create_host(db: Session, fqdn: str, parent: str | None) -> Host:
else:
raise HTTPException(status_code=500, detail="Invalid state of database")
def delete_host(db: Session, fqdn: str):
pass

View File

@ -1,5 +1,7 @@
# load config so everything else can work
from pillar_tool.util import load_config, config
from pillar_tool.util.validation import validate_and_split_path_and_domain_name
load_config()
from http.server import BaseHTTPRequestHandler
@ -106,19 +108,45 @@ async def hostgroup_list(request: Request):
@app.post("/host/{fqdn}")
async def host_add(request: Request, fqdn: str, params: HostCreateParams):
new_host = create_host(request.state.db, fqdn, params.parent)
# Validate and split FQDN into hierarchical labels (e.g., "a/b/c" -> ["a", "b", "c"])
labels = validate_and_split_path_and_domain_name(fqdn)
if labels is None:
raise HTTPException(status_code=400, detail="Invalid Path provided")
# walk through the labels and create the requested groups and host
created = []
last_parent = None
parent: str | None = params.parent # start with the optional parent parameter
for label in labels:
new_host = create_host(request.state.db, label, parent)
last_parent = parent
if parent is not None:
# update path to parent for the next label
parent += f"/{new_host.name}"
else:
# set first level otherwise
parent = new_host.name
created.append(new_host)
# Prepare response with creation details
output = {
"message": "Host created",
"host": new_host,
"host": created[-1], # return the final host in the hierarchy
}
if params.parent:
# include the full path to the new host if it exists
if last_parent is not None:
output.update({
"parent": params.parent
"path": last_parent
})
return JSONResponse(output)
@app.delete("/host/{fqdn}")
async def host_delete(request: Request, fqdn: str):
delete_host(request.state.db, fqdn)
return JSONResponse({})
@app.get("/top/{fqdn}")
async def host_top(request: Request, fqdn: str):
# TODO: implement

View File

@ -0,0 +1,33 @@
import re
DOMAIN_NAME_REGEX = r'^[a-zA-Z0-9._-]+$' # could be a FQDN, but also just a name
def validate_and_split_path_and_domain_name(path_or_dn: str) -> list[str] | None:
"""
Splits a string along slashes and validates each fragment.
Args:
path_or_dn: Input string that may contain slashes (Some path, name, FQDN or a combination of them)
Returns:
List of validated fragments in original order, or None if validation fails
"""
import re
# Split the input by slashes
fragments = [frag for frag in path_or_dn.strip().split('/')]
# Validate each fragment contains only allowed characters
validated_fragments = []
for frag in fragments:
if not re.match(DOMAIN_NAME_REGEX, frag):
return None
validated_fragments.append(frag)
# Return the list of validated fragments or None if any failed validation
# NOTE: validated_fragments could be falsy if input was empty or only slashes
return validated_fragments if validated_fragments else None