from backend.pipeline.state import State from backend.scheduler.optimizer import choose_best_node from backend.executor.runtime_router import execute_action from backend.cache.minio_cache import MinioCache from backend.graph.plan import build_action_graph from backend.events.bus import EventBus class PipelineEngine: def __init__(self, nodes, node_registry, event_bus: EventBus): self.nodes = nodes self.node_registry = node_registry self.bus = event_bus self.cache = MinioCache( "localhost:9000", "minioadmin", "minioadmin" ) async def run(self, project): actions = build_action_graph(project) results = [] for action in actions: node = choose_best_node(self.nodes, action) self.bus.emit( "task_update", node=node["name"], action=action["name"], state="scheduled" ) if self.cache.exists(action["hash"]): self.bus.emit( "cache_update", node=node["name"], action=action["name"], state="hit" ) continue rc = execute_action(action, {}, node) state = "done" if rc == 0 else "failed" self.bus.emit( "task_update", node=node["name"], action=action["name"], state=state ) results.append((action["name"], state)) return results