PillarTool/pillar_tool/db/queries/pillar_queries.py

42 lines
1.3 KiB
Python

import json
from collections import defaultdict
from pillar_tool.db.models.pillar_data import *
from uuid import UUID
from sqlalchemy import select, insert, union
from sqlalchemy.orm import Session
def get_pillar_name_sequence(name: str) -> list[str]:
return name.split(':')
def decode_pillar_value(pillar: Pillar) -> str | int | float | bool | list | dict:
match pillar.parameter_type:
case 'string': return pillar.value
case 'integer': return int(pillar.value)
case 'float': return float(pillar.value)
case 'boolean': return bool(pillar.value)
case 'array': return json.loads(pillar.value)
case 'dict': return json.loads(pillar.value)
raise RuntimeError(f"Failed to decode pillar value: Invalid type '{pillar.parameter_type}'")
def get_pillar_for_target(db: Session, target: UUID) -> dict:
pillar_stmt = select(Pillar).where(Pillar.host_id == target)
result = db.execute(pillar_stmt).fetchall()
out = {}
for row in result:
row: Pillar = row[0]
name = row.pillar_name
value = decode_pillar_value(row)
labels = get_pillar_name_sequence(name)
current = out
l = len(labels)
for i, label in enumerate(labels):
if label not in current:
current[label] = {} if i < l-1 else value
return out