# analysis/failure_propagation.py class FailurePropagation: def __init__(self, graph): """ graph = { node: [deps...] } """ self.graph = graph # reverse graph for downstream impact self.reverse = self._build_reverse(graph) def _build_reverse(self, graph): reverse = {} for node, deps in graph.items(): if node not in reverse: reverse[node] = [] for d in deps: reverse.setdefault(d, []).append(node) return reverse # ----------------------------- # BACKWARD FAILURE TRACE # ----------------------------- def propagate_backward(self, failed_node): """ Walk dependencies backward (root cause side) """ visited = set() impacted = [] def walk(node): for dep in self.graph.get(node, []): if dep in visited: continue visited.add(dep) impacted.append({ "node": dep, "type": "root_cause_candidate" }) walk(dep) walk(failed_node) return impacted # ----------------------------- # FORWARD IMPACT TRACE # ----------------------------- def propagate_forward(self, failed_node): """ Walk dependents forward (blast radius side) """ visited = set() impacted = [] def walk(node): for child in self.reverse.get(node, []): if child in visited: continue visited.add(child) impacted.append({ "node": child, "type": "downstream_affected" }) walk(child) walk(failed_node) return impacted # ----------------------------- # FULL FAILURE FIELD MAP # ----------------------------- def map_failure(self, failed_node): backward = self.propagate_backward(failed_node) forward = self.propagate_forward(failed_node) return { "failed_node": failed_node, "root_cause_candidates": backward, "downstream_impact": forward, "severity": len(forward) + len(backward) }