import json import re PATH_REGEX = re.compile(r'^[a-zA-Z_-][a-zA-Z0-9_-]*$') FQDN_REGEX = re.compile(r'^([a-zA-Z0-9.-]+\.)+[a-zA-Z]{2,}$') ENV_NAME_REGEX = re.compile(r'^[a-zA-Z0-9_-]+$') STATE_NAME_REGEX = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_-]*(\.[a-zA-Z_][a-zA-Z0-9_-]*)*$') # TODO: improve doc comment for this function def validate_environment_name(name: str) -> bool: """ Validates an environment name. Args: name: The environment name to validate (e.g., "production", "dev_env") Returns: True if the name contains only alphanumeric characters, underscores, or dashes. False otherwise. Note: Environment names cannot be empty and must match the pattern [a-zA-Z0-9_-]+ """ return bool(ENV_NAME_REGEX.match(name)) def validate_state_name(name: str) -> bool: """ Validates a state name. Args: name: The state name to validate (e.g., "active", "pending_removal") Returns: True if the name contains only alphanumeric characters, underscores, or dashes, and starts with an alphabetic character or underscore. False otherwise. Note: State names cannot be empty and must match the pattern [a-zA-Z_][a-zA-Z0-9_-]* """ return bool(STATE_NAME_REGEX.match(name)) def validate_fqdn(fqdn: str) -> bool: """ Validates a string against the FQDN regex pattern. Args: fqdn: The fully qualified domain name to validate (e.g., "example.com") Returns: True if the input matches the FQDN pattern, False otherwise """ return re.match(FQDN_REGEX, fqdn) is not None def split_and_validate_path(path: str) -> list[str] | None: """ Splits a path string by slashes, filters out empty fragments, and validates each label. Args: path: Input path string in format like "a/b/c" or "/a/b/c" Returns: List of validated path labels, or None if validation fails (empty input or invalid characters) """ # Split by slashes and filter out empty fragments labels = list(filter(lambda x: x != "", path.strip().split("/"))) # Return None for empty paths if len(labels) == 0: return None # Validate each label against the PATH_REGEX pattern for label in labels: if not re.match(PATH_REGEX, label): return None return labels def type_from_name(data_type: str) -> type | None: match data_type: case 'integer': return int case 'float': return float case 'string': return str case 'bool': return bool case 'dict': return dict case 'list': return list case _: raise ValueError(f"Invalid pillar input: Unsupported data type: {data_type}") def name_from_type(value) -> str: if type(value) is int: return 'integer' if type(value) is float: return 'float' if type(value) is str: return 'string' if type(value) is bool: return 'bool' if type(value) is dict: return 'dict' if type(value) is list: return 'list' raise ValueError(f"Invalid pillar input: Unsupported data type: {type(value)}") def validate_pillar_input_data(value: str, data_type: str): decoded_data = json.loads(value) if type(decoded_data) is type_from_name(data_type): return decoded_data raise ValueError(f"Invalid pillar input: datatype does not match value")