commit 54336797856baab77ff2259e2711b83988cea233 Author: Jeremy Anderson Date: Wed Mar 25 18:04:26 2026 -0400 Initial commit of Fester project diff --git a/CHEATSHEET.md b/CHEATSHEET.md new file mode 100644 index 0000000..8c14ec2 --- /dev/null +++ b/CHEATSHEET.md @@ -0,0 +1,153 @@ +# Fester Cluster Build System โ€” Cheatsheet + +## ๐Ÿง  Core Concept + +A distributed, DAG-driven build execution system with: + +- real-time scheduling +- node-aware load balancing +- thermal + policy constraints +- cache-aware execution +- deterministic replay/debugging + +--- + +## ๐Ÿงฉ System Components + +### 1. Scheduler +Chooses best node per action using: + +- CPU load +- temperature +- policy rules +- historical intelligence feedback + +--- + +### 2. Pipeline Engine +Executes DAG actions sequentially or step-debugged. + +Supports: +- live execution +- interactive stepping +- replay mode + +--- + +### 3. Timeline Store +Immutable event log of entire system. + +Used for: +- replay +- debugging +- autopsy analysis + +--- + +### 4. Failure System +Detects and propagates failure: + +- backward โ†’ root cause candidates +- forward โ†’ downstream impact + +--- + +### 5. Critical Path Analyzer +Identifies bottleneck chain in DAG execution. + +--- + +### 6. Cache Layer +Supports: +- distributed artifact reuse +- MinIO backend +- future: Btrfs/QCOW2 snapshot acceleration + +--- + +### 7. Debugger Mode +Interactive execution control: + +- pause +- step +- resume +- scheduler preview before execution + +--- + +## ๐Ÿงญ Execution Flow + +1. Build graph generated from spec +2. Scheduler selects node per action +3. Timeline records decision +4. Executor runs action on node +5. Events streamed to UI +6. Failures propagate through DAG +7. Optional debugger intercepts execution + +--- + +## ๐Ÿ“ก Event Types + +- `node` +- `pipeline` +- `schedule_decision` +- `failure_propagation` +- `debugger_preview` + +--- + +## ๐Ÿ–ฅ UI Modes + +### Live Mode +- real-time DAG rendering +- node state updates + +### Debug Mode +- step execution +- scheduler inspection +- manual control + +### Autopsy Mode +- failure root cause analysis +- dependency tracing +- critical path overlay + +--- + +## โš™๏ธ Node Selection Model + +Score-based weighted system: + +- CPU availability +- thermal headroom +- policy constraints +- historical instability penalties + +--- + +## ๐Ÿงช Execution Backends (planned) + +- distcc +- LXC +- libvirt +- native execution +- cross-compilation toolchains + +--- + +## ๐Ÿ“ฆ Cache Strategy + +- local ccache +- shared MinIO cache +- future: Btrfs/QCOW2 snapshot acceleration + +--- + +## ๐Ÿ” Design Principle + +> The system must always be replayable, explainable, and deterministic. + +No hidden state. +No opaque scheduling. +Everything is observable. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2a90359 --- /dev/null +++ b/LICENSE @@ -0,0 +1,36 @@ +Fester - Distributed Build & Execution System +Copyright Jeremy Anderson (info@dcos.net) (C) 2026 + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . + +โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +ADDITIONAL OPERATIONAL NOTICE + +This system operates distributed compilation, scheduling, +execution orchestration, caching, and system integration layers. + +It may: + +- execute code on remote nodes +- allocate workloads dynamically across heterogeneous systems +- modify runtime environments, containers, or VM instances +- store build artifacts, snapshots, and execution traces + +This software is provided โ€œAS ISโ€, without warranty of any kind, +as described in the accompanying documentation. + +Use of this system implies acceptance of operational risk, +including system instability, data loss, or hardware stress +under misconfiguration or overload conditions. diff --git a/NOTES.text b/NOTES.text new file mode 100644 index 0000000..2d4b15d --- /dev/null +++ b/NOTES.text @@ -0,0 +1,187 @@ +๐ŸŸฉ 2. CHEATSHEET.md (Operator Survival Guide) +# ๐Ÿง  Fester Operator Cheatsheet + +Distributed Build + Scheduler + Observability System + +--- + +# ๐Ÿš€ STARTING A BUILD + +```bash +fester build ./project.yaml +๐Ÿงฉ VIEW LIVE SYSTEM +Cockpit Module: Fester +Live DAG: /ui/live_dag.html +Replay View: /ui/replay.html +๐Ÿ“ก KEY CONCEPTS +Nodes + +Machines participating in builds (physical, VM, or container) + +Targets + +Compilation environments: + +x86_64-linux-gnu +aarch64-linux-gnu +riscv64 +embedded/toolchain targets +Actions + +Graph nodes representing build steps + +Scheduler + +Chooses best node based on: + +load +temperature +policy rules +historical performance +๐Ÿง  DEBUGGING +Failure Autopsy +GET /api/autopsy/ +Timeline Replay +GET /api/replay/ +๐Ÿ“ฆ CACHE SYSTEM + +Supports: + +ccache +MinIO distributed cache +Btrfs snapshot cache +tmpfs acceleration layer +๐ŸงŠ SNAPSHOTS + +Freeze state: + +qcow2 image snapshots +Btrfs copy-on-write states +โš™๏ธ NODE CONTROL +fester node list +fester node set-policy preferred +fester node set-policy avoid +๐Ÿ”ฅ SCHEDULER MODES +unified (default) +weighted thermal-aware +cache-first +target-isolated +experimental intelligence mode +โš ๏ธ SAFETY NOTES +Do NOT run unrestricted builds on production nodes +Monitor thermal load in Grafana +Ensure cache integrity for distributed builds +Validate cross-compile toolchains before enabling targets +๐Ÿง  DESIGN PRINCIPLE + +"Every build is reproducible, explainable, and replayable." + + +--- + +# ๐ŸŸฉ 3. `PRODUCTION_HARDENING.md` + +```markdown +# ๐Ÿ›ก๏ธ Fester Production Hardening Guide + +--- + +# ๐ŸŒก๏ธ 1. THERMAL SAFETY + +## Required +- Enable node temperature monitoring +- Set max thermal threshold per node + +## Recommended +- throttle scheduler above 85ยฐC +- disable high-parallel builds under sustained load + +--- + +# ๐Ÿง  2. SCHEDULER SAFETY + +Avoid: +- unrestricted global scheduling in mixed architectures +- running full cluster on single target policy + +Enable: +- weighted scheduler +- per-node policy constraints +- cache-aware routing + +--- + +# ๐Ÿ“ฆ 3. CACHE INTEGRITY + +Use: +- checksum validation for artifacts +- MinIO redundancy if cluster > 3 nodes +- avoid mixing tmpfs + persistent cache without sync barriers + +--- + +# ๐ŸงŠ 4. SNAPSHOT SAFETY + +If using: +- qcow2 snapshots +- Btrfs CoW layers + +Ensure: +- periodic snapshot compaction +- rollback testing before production builds + +--- + +# ๐Ÿงฉ 5. CROSS-COMPILATION RISKS + +- toolchain mismatch = silent binary corruption risk +- always validate ELF output per target arch +- isolate toolchains per execution environment + +--- + +# ๐ŸŒ 6. DISTRIBUTED EXECUTION + +- ensure clock sync (NTP mandatory) +- avoid partial node visibility during scheduling +- handle node dropouts as first-class events + +--- + +# ๐Ÿ“Š 7. OBSERVABILITY STACK + +Recommended: + +- Prometheus โ†’ metrics ingestion +- Grafana โ†’ visualization +- Cockpit โ†’ control plane +- Fester UI โ†’ execution DAG + causal graph + +--- + +# ๐Ÿง  8. GOLDEN RULE + +> Never trust a build you cannot replay. +๐Ÿงญ WHAT YOU NOW HAVE (SYSTEM MATURITY STATE) + +Youโ€™ve effectively reached: + +๐Ÿง  Level 1 โ€” Distributed Compiler System + +(distcc-like foundation) + +๐Ÿง  Level 2 โ€” Smart Scheduler + +(load/thermal/cache aware) + +๐Ÿง  Level 3 โ€” Observability System + +(metrics + Grafana + cockpit) + +๐Ÿง  Level 4 โ€” Causal Execution Graph + +(why decisions happen) + +๐Ÿง  Level 5 โ€” Replayable Build Brain + +(session + timeline + autopsy) diff --git a/README b/README new file mode 100644 index 0000000..73e6176 --- /dev/null +++ b/README @@ -0,0 +1,47 @@ +๐Ÿงพ SOFTWARE PROVIDED โ€œAS ISโ€ CLAUSE +๐Ÿ“„ Disclaimer of Warranty and Liability + +This software is provided โ€œas isโ€, without warranty of any kind, express or implied. This includes, but is not limited to, warranties of merchantability, fitness for a particular purpose, non-infringement, and any warranties arising from course of dealing or usage of trade. + +The authors and contributors make no representation or guarantee that: + +the software will function uninterrupted or error-free, +builds, scheduling decisions, or execution results will be correct or deterministic, +distributed execution across heterogeneous nodes will be stable or safe in all environments, +caching, snapshotting, or replay features will not result in data loss or corruption, +integrations (including but not limited to libvirt, LXC, tmux, mosh, distcc, MinIO, Prometheus, Grafana, Forgejo, or any external toolchain) will behave as expected in all configurations. +โš™๏ธ Operational Risk Acknowledgement + +This system is capable of: + +executing distributed code compilation, +scheduling workloads across physical and virtual nodes, +modifying system-level environments and build artifacts, +managing cache layers, snapshots, and execution state. + +You acknowledge that: + +misconfiguration may result in system instability, data loss, or resource exhaustion, +execution across remote nodes may cause unpredictable behavior under load, +performance optimization logic may prioritize throughput over safety unless explicitly configured otherwise. +๐Ÿ”’ No Liability + +In no event shall the authors, maintainers, or contributors be liable for any: + +direct, indirect, incidental, or consequential damages, +loss of data, revenue, or system availability, +hardware degradation or thermal damage, +build failures or deployment issues, +security vulnerabilities arising from system configuration or integration. +๐Ÿง  Responsibility of Use + +You are solely responsible for: + +validating build targets and toolchains, +verifying scheduling and execution policies, +ensuring safe node configuration and thermal limits, +reviewing cache and snapshot behavior before production use, +controlling access to the system and its APIs. +โšก High-Risk System Notice + +This software operates in a high-concurrency, distributed execution domain. It is not intended for safety-critical systems unless independently audited and hardened for such use cases. diff --git a/add-to-grafana-config.json b/add-to-grafana-config.json new file mode 100644 index 0000000..1fbf76c --- /dev/null +++ b/add-to-grafana-config.json @@ -0,0 +1,148 @@ +{ + "title": "Fester Advanced Cluster Intelligence", + "timezone": "browser", + "schemaVersion": 38, + "version": 2, + "refresh": "5s", + "panels": [ + + { + "type": "timeseries", + "title": "Node CPU Load", + "targets": [ + { + "expr": "fester_node_cpu", + "legendFormat": "{{node}}" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100 + } + }, + "gridPos": { "x": 0, "y": 0, "w": 12, "h": 8 } + }, + + { + "type": "timeseries", + "title": "Node Temperature (Thermal Risk)", + "targets": [ + { + "expr": "fester_node_temp", + "legendFormat": "{{node}}" + } + ], + "fieldConfig": { + "defaults": { + "unit": "celsius", + "thresholds": { + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 65 }, + { "color": "orange", "value": 75 }, + { "color": "red", "value": 80 } + ] + } + } + }, + "gridPos": { "x": 12, "y": 0, "w": 12, "h": 8 } + }, + + { + "type": "timeseries", + "title": "Thermal Trend (Prediction Signal)", + "targets": [ + { + "expr": "deriv(fester_node_temp[2m])", + "legendFormat": "{{node}} temp/s" + } + ], + "gridPos": { "x": 0, "y": 8, "w": 12, "h": 6 } + }, + + { + "type": "stat", + "title": "Danger Nodes (>80C)", + "targets": [ + { + "expr": "count(fester_node_temp > 80)" + } + ], + "gridPos": { "x": 12, "y": 8, "w": 6, "h": 6 } + }, + + { + "type": "stat", + "title": "Warming Nodes (>70C)", + "targets": [ + { + "expr": "count(fester_node_temp > 70)" + } + ], + "gridPos": { "x": 18, "y": 8, "w": 6, "h": 6 } + }, + + { + "type": "timeseries", + "title": "Pipeline Throughput", + "targets": [ + { + "expr": "rate(fester_pipeline_actions_total[1m])", + "legendFormat": "{{state}}" + } + ], + "gridPos": { "x": 0, "y": 14, "w": 12, "h": 7 } + }, + + { + "type": "timeseries", + "title": "Cache Hit Rate Per Target", + "targets": [ + { + "expr": "rate(fester_cache_hits_total[5m])", + "legendFormat": "{{target}}" + } + ], + "gridPos": { "x": 12, "y": 14, "w": 12, "h": 7 } + }, + + { + "type": "timeseries", + "title": "Cache Efficiency Ratio", + "targets": [ + { + "expr": "rate(fester_cache_hits_total[5m]) / rate(fester_pipeline_actions_total[5m])", + "legendFormat": "efficiency" + } + ], + "gridPos": { "x": 0, "y": 21, "w": 12, "h": 6 } + }, + + { + "type": "timeseries", + "title": "Scheduler Node Usage", + "targets": [ + { + "expr": "sum by (node) (fester_scheduler_score)", + "legendFormat": "{{node}}" + } + ], + "gridPos": { "x": 12, "y": 21, "w": 12, "h": 6 } + }, + + { + "type": "timeseries", + "title": "Scheduler Target Distribution", + "targets": [ + { + "expr": "sum by (target) (fester_scheduler_score)", + "legendFormat": "{{target}}" + } + ], + "gridPos": { "x": 0, "y": 27, "w": 24, "h": 6 } + } + + ] +} diff --git a/add-to-prometheus-config.yml b/add-to-prometheus-config.yml new file mode 100644 index 0000000..decd76a --- /dev/null +++ b/add-to-prometheus-config.yml @@ -0,0 +1,4 @@ +scrape_configs: + - job_name: 'fester' + static_configs: + - targets: ['localhost:9109'] diff --git a/analysis/failure_autopsy.py b/analysis/failure_autopsy.py new file mode 100644 index 0000000..b6152a7 --- /dev/null +++ b/analysis/failure_autopsy.py @@ -0,0 +1,98 @@ +# analysis/failure_autopsy.py + +class FailureAutopsy: + + def __init__(self, journal, critical_path=None): + self.journal = journal + self.critical_path = critical_path + + # ----------------------------- + # FIND FAILURE EVENTS + # ----------------------------- + def find_failures(self): + + return [ + e for e in self.journal.events + if e["type"] == "execution_result" + and e["data"].get("state") == "failed" + ] + + # ----------------------------- + # TRACE BACKWARD DEPENDENCY CHAIN + # ----------------------------- + def trace_dependencies(self, action_name): + + trace = [] + visited = set() + + def walk(name): + if name in visited: + return + visited.add(name) + + events = self.journal.trace_action(name) + + trace.append({ + "action": name, + "events": events + }) + + for e in events: + deps = e["data"].get("deps", []) + for d in deps: + walk(d) + + walk(action_name) + + return trace + + # ----------------------------- + # GET LAST SCHEDULER DECISION + # ----------------------------- + def last_decision(self, action_name): + + events = self.journal.trace_action(action_name) + + for e in reversed(events): + if e["type"] == "schedule_decision": + return e["data"] + + return None + + # ----------------------------- + # FULL AUTOPSY REPORT + # ----------------------------- + def report(self, action_name): + + failures = self.find_failures() + + target_failure = None + + for f in failures: + if f["data"].get("action") == action_name: + target_failure = f + break + + if not target_failure: + return { + "status": "no_failure_found", + "action": action_name + } + + deps_trace = self.trace_dependencies(action_name) + last_sched = self.last_decision(action_name) + + return { + "status": "failure_detected", + "action": action_name, + + "failure_event": target_failure, + + "last_scheduler_decision": last_sched, + + "dependency_trace": deps_trace, + + "on_critical_path": ( + action_name in (self.critical_path or {}).get("score_map", {}) + ) + } diff --git a/analysis/failure_propagation.py b/analysis/failure_propagation.py new file mode 100644 index 0000000..fa063ea --- /dev/null +++ b/analysis/failure_propagation.py @@ -0,0 +1,93 @@ +# analysis/failure_propagation.py + +class FailurePropagation: + + def __init__(self, graph): + """ + graph = { node: [deps...] } + """ + self.graph = graph + + # reverse graph for downstream impact + self.reverse = self._build_reverse(graph) + + def _build_reverse(self, graph): + reverse = {} + + for node, deps in graph.items(): + if node not in reverse: + reverse[node] = [] + + for d in deps: + reverse.setdefault(d, []).append(node) + + return reverse + + # ----------------------------- + # BACKWARD FAILURE TRACE + # ----------------------------- + def propagate_backward(self, failed_node): + """ + Walk dependencies backward (root cause side) + """ + visited = set() + impacted = [] + + def walk(node): + for dep in self.graph.get(node, []): + if dep in visited: + continue + visited.add(dep) + + impacted.append({ + "node": dep, + "type": "root_cause_candidate" + }) + + walk(dep) + + walk(failed_node) + + return impacted + + # ----------------------------- + # FORWARD IMPACT TRACE + # ----------------------------- + def propagate_forward(self, failed_node): + """ + Walk dependents forward (blast radius side) + """ + visited = set() + impacted = [] + + def walk(node): + for child in self.reverse.get(node, []): + if child in visited: + continue + visited.add(child) + + impacted.append({ + "node": child, + "type": "downstream_affected" + }) + + walk(child) + + walk(failed_node) + + return impacted + + # ----------------------------- + # FULL FAILURE FIELD MAP + # ----------------------------- + def map_failure(self, failed_node): + + backward = self.propagate_backward(failed_node) + forward = self.propagate_forward(failed_node) + + return { + "failed_node": failed_node, + "root_cause_candidates": backward, + "downstream_impact": forward, + "severity": len(forward) + len(backward) + } diff --git a/analysis/timeline_store.py b/analysis/timeline_store.py new file mode 100644 index 0000000..e7a4bcc --- /dev/null +++ b/analysis/timeline_store.py @@ -0,0 +1,64 @@ +# analysis/timeline_store.py + +class TimelineStore: + + def __init__(self): + self.events = [] + + # ----------------------------- + # APPEND EVENT (IMMUTABLE LOG) + # ----------------------------- + def record(self, event): + """ + Every state transition in the system must go here. + """ + self.events.append(event) + + # ----------------------------- + # GET EVENTS FOR NODE + # ----------------------------- + def get_node_events(self, node_name): + + return [ + e for e in self.events + if e.get("data", {}).get("node") == node_name + ] + + # ----------------------------- + # GET TIMELINE SNAPSHOT + # ----------------------------- + def snapshot_at(self, index): + + if index < 0 or index >= len(self.events): + return None + + return self.events[index] + + # ----------------------------- + # FIND LAST STATE BEFORE EVENT + # ----------------------------- + def rewind_to_event(self, node_name, event_index): + + history = [] + + for i in range(event_index + 1): + e = self.events[i] + + if e.get("data", {}).get("node") == node_name: + history.append(e) + + return history + + # ----------------------------- + # FIND LAST SCHEDULER DECISION + # ----------------------------- + def last_scheduler_state(self, node_name, event_index): + + for i in range(event_index, -1, -1): + e = self.events[i] + + if e["type"] == "schedule_decision": + if e.get("data", {}).get("node") == node_name: + return e["data"] + + return None diff --git a/backend/analysis/cause_graph.py b/backend/analysis/cause_graph.py new file mode 100644 index 0000000..61a2411 --- /dev/null +++ b/backend/analysis/cause_graph.py @@ -0,0 +1,138 @@ +from collections import defaultdict +import time + + +class CauseGraphEngine: + """ + Builds a causal graph from system events. + + This is NOT execution logic. + It is a post-hoc + live reasoning layer. + """ + + def __init__(self): + # node -> causes + self.graph = defaultdict(list) + + # event timeline + self.events = [] + + # last decision context + self.last_context = {} + + # ------------------------------------------------- + # INGEST EVENTS + # ------------------------------------------------- + def ingest(self, event: dict): + self.events.append(event) + + etype = event.get("type") + node = event.get("node") + action = event.get("action") + + # store context for causal linking + if etype == "task_update": + self._handle_task(event) + elif etype == "cache_update": + self._handle_cache(event) + elif etype == "failure": + self._handle_failure(event) + elif etype == "node_update": + self._handle_node(event) + + +def attach_bus(self, bus): + self.bus = bus + +def emit_debug(self, data): + if self.bus: + self.bus.emit( + "debug", + node=None, + action=None, + state="trace", + meta=data + ) + + + # ------------------------------------------------- + # TASK EXECUTION CAUSALITY + # ------------------------------------------------- + def _handle_task(self, event): + node = event.get("node") + action = event.get("action") + state = event.get("state") + + context = { + "time": time.time(), + "node": node, + "action": action, + "state": state, + "reason": "scheduler_assignment", + } + + self.last_context[(node, action)] = context + + self.graph[(node, action)].append(context) + + # ------------------------------------------------- + # CACHE CAUSALITY + # ------------------------------------------------- + def _handle_cache(self, event): + node = event.get("node") + action = event.get("action") + + context = { + "time": time.time(), + "node": node, + "action": action, + "reason": "cache_hit_or_miss", + } + + self.graph[(node, action)].append(context) + + # ------------------------------------------------- + # FAILURE CAUSALITY + # ------------------------------------------------- + def _handle_failure(self, event): + node = event.get("node") + action = event.get("action") + + prev = self.last_context.get((node, action), {}) + + context = { + "time": time.time(), + "node": node, + "action": action, + "reason": "execution_failure", + "parent_context": prev, + } + + self.graph[(node, action)].append(context) + + # ------------------------------------------------- + # NODE STATE CAUSALITY + # ------------------------------------------------- + def _handle_node(self, event): + node = event.get("node") + + context = { + "time": time.time(), + "node": node, + "reason": "node_state_update", + "state": event.get("node_state", {}), + } + + self.graph[(node, None)].append(context) + + # ------------------------------------------------- + # QUERY INTERFACE + # ------------------------------------------------- + def explain(self, node, action=None): + """ + Returns causal chain for a given execution unit. + """ + return self.graph.get((node, action), []) + + def full_trace(self): + return dict(self.graph) diff --git a/backend/api/api.py b/backend/api/api.py new file mode 100644 index 0000000..50ea903 --- /dev/null +++ b/backend/api/api.py @@ -0,0 +1,102 @@ +from backend.events.bus import EventBus +from backend.metrics.observability import ObservabilityHub +from backend.metrics.node_state import NodeStateRegistry + +from backend.api.build import build_endpoint +from backend.api.debugger import debugger_endpoint +from backend.api.replay import replay_endpoint +from backend.api.timeline import timeline_endpoint +from backend.api.nodes import nodes_endpoint +from backend.api.metrics import metrics_endpoint +from backend.api.autopsy import autopsy_endpoint +from backend.api.pipeline_control import pipeline_control_endpoint +from backend.api.ws import WebSocketStream + +from backend.analysis.cause_graph import CauseGraphEngine + +cause_graph = CauseGraphEngine() + +# attach it to event bus stream +bus.subscribe(cause_graph.ingest) + + + +from backend.analysis.cause_graph import CauseGraphEngine + +cause_graph = CauseGraphEngine() + +bus.subscribe(cause_graph.ingest) + +# optional: also stream debug traces to UI +bus.subscribe(lambda e: cause_graph.emit_debug(e)) + +# ========================================================= +# GLOBAL CORE SYSTEM INITIALIZATION (SINGLETON LAYER) +# ========================================================= + +bus = EventBus() +registry = NodeStateRegistry() + +observability = ObservabilityHub(registry, bus) +observability.attach_bus() + +ws_stream = WebSocketStream() +ws_stream.attach_bus(bus) + + +# ========================================================= +# API REGISTRATION LAYER +# ========================================================= +# (kept explicit so Cockpit / external callers can bind cleanly) + +def build(spec, nodes, cluster=None, intelligence=None): + return build_endpoint(spec, nodes, cluster, intelligence) + + +def replay(session_id): + return replay_endpoint(session_id, registry) + + +def debugger(session_id): + return debugger_endpoint(session_id, registry, bus) + + +def timeline(session_id): + return timeline_endpoint(session_id) + + +def nodes(): + return nodes_endpoint(registry) + + +def metrics(): + return metrics_endpoint(registry) + + +def autopsy(session_id): + return autopsy_endpoint(session_id) + + +def pipeline_control(action, payload=None): + return pipeline_control_endpoint(action, payload, bus) + + +# ========================================================= +# OPTIONAL: expose core objects for internal modules +# ========================================================= + +__all__ = [ + "bus", + "registry", + "observability", + "ws_stream", + + "build", + "replay", + "debugger", + "timeline", + "nodes", + "metrics", + "autopsy", + "pipeline_control", +] diff --git a/backend/api/autopsy.py b/backend/api/autopsy.py new file mode 100644 index 0000000..e8b130d --- /dev/null +++ b/backend/api/autopsy.py @@ -0,0 +1,30 @@ +from fastapi import APIRouter +from analysis.failure_autopsy import FailureAutopsy + +router = APIRouter() + +# in real system this comes from your engine/session store +ENGINE_REGISTRY = {} + + +def get_engine(session_id): + return ENGINE_REGISTRY.get(session_id) + + +# ----------------------------- +# RUN AUTOPSY +# ----------------------------- +@router.get("/autopsy/{session_id}/{action}") +def run_autopsy(session_id: str, action: str): + + engine = get_engine(session_id) + + if not engine: + return {"error": "session not found"} + + autopsy = FailureAutopsy( + engine.journal, + critical_path=getattr(engine, "critical_path", None) + ) + + return autopsy.report(action) diff --git a/backend/api/build.py b/backend/api/build.py new file mode 100644 index 0000000..5c2fcc8 --- /dev/null +++ b/backend/api/build.py @@ -0,0 +1,11 @@ +from graph.build_graph import BuildGraph +from pipeline.engine import PipelineEngine + + +async def build_endpoint(spec, nodes, cluster, intelligence): + + graph = BuildGraph(spec).generate() + + engine = PipelineEngine(nodes, cluster, intelligence) + + return await engine.run(graph) diff --git a/backend/api/build_spec.py b/backend/api/build_spec.py new file mode 100644 index 0000000..aff0733 --- /dev/null +++ b/backend/api/build_spec.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass + + +@dataclass +class BuildSpec: + + project: str + arch: str + distro_target: str + toolchain: str + + # ----------------------------- + # EXECUTION SELECTION + # ----------------------------- + execution: str # distcc | lxc | libvirt | tmux + container_backend: str # optional fallback layer + + # ----------------------------- + # POLICY PROFILE + # ----------------------------- + profile: str = "balanced" + + +def from_request(req): + + return BuildSpec( + project=req["project"], + arch=req["arch"], + distro_target=req["distro_target"], + toolchain=req["toolchain"], + execution=req.get("execution", "distcc"), + container_backend=req.get("container_backend", "lxc"), + profile=req.get("profile", "balanced") + ) diff --git a/backend/api/cause.py b/backend/api/cause.py new file mode 100644 index 0000000..f5fdf60 --- /dev/null +++ b/backend/api/cause.py @@ -0,0 +1,9 @@ +from backend.analysis.cause_graph import CauseGraphEngine + + +def explain_node(cause_graph: CauseGraphEngine, node, action=None): + return cause_graph.explain(node, action) + + +def full_cause_trace(cause_graph: CauseGraphEngine): + return cause_graph.full_trace() diff --git a/backend/api/debugger.py b/backend/api/debugger.py new file mode 100644 index 0000000..d27380b --- /dev/null +++ b/backend/api/debugger.py @@ -0,0 +1,39 @@ +# api/debugger.py + +from fastapi import APIRouter + +router = APIRouter() + +ENGINE = None + + +def set_engine(engine): + global ENGINE + ENGINE = engine + + +# ----------------------------- +# RESUME +# ----------------------------- +@router.post("/debugger/resume") +def resume(): + ENGINE.resume() + return {"state": "running"} + + +# ----------------------------- +# PAUSE +# ----------------------------- +@router.post("/debugger/pause") +def pause(): + ENGINE.pause() + return {"state": "paused"} + + +# ----------------------------- +# STEP +# ----------------------------- +@router.post("/debugger/step") +def step(): + ENGINE.step() + return {"state": "stepped"} diff --git a/backend/api/metrics.py b/backend/api/metrics.py new file mode 100644 index 0000000..94d5e60 --- /dev/null +++ b/backend/api/metrics.py @@ -0,0 +1,13 @@ +from aiohttp import web +from prometheus_client import generate_latest, CONTENT_TYPE_LATEST + + +async def metrics(request): + return web.Response( + body=generate_latest(), + content_type=CONTENT_TYPE_LATEST + ) + + +def setup(app): + app.router.add_get('/metrics', metrics) diff --git a/backend/api/nodes.py b/backend/api/nodes.py new file mode 100644 index 0000000..5c6c125 --- /dev/null +++ b/backend/api/nodes.py @@ -0,0 +1,21 @@ +import json +from nodes.role_store import load_roles, save_roles, NodeRole + + +def get_node_roles(): + roles = load_roles() + + return { + name: role.__dict__ + for name, role in roles.items() + } + + +def set_node_role(node_name, role_data): + roles = load_roles() + + roles[node_name] = NodeRole(**role_data) + + save_roles(roles) + + return {"status": "ok"} diff --git a/backend/api/pipeline_control.py b/backend/api/pipeline_control.py new file mode 100644 index 0000000..c6691c3 --- /dev/null +++ b/backend/api/pipeline_control.py @@ -0,0 +1,32 @@ +from pipeline.engine import PipelineEngine + +ENGINE = None + + +def init_engine(nodes): + global ENGINE + ENGINE = PipelineEngine(nodes) + + +def get_state(): + return ENGINE.last_results if ENGINE else [] + + +def retry_action(data): + return {"status": "retry", "action": data["action"]} + + +def force_node(data): + return { + "status": "forced", + "action": data["action"], + "node": data["node"] + } + + +def pause(): + return {"status": "paused"} + + +def resume(): + return {"status": "running"} diff --git a/backend/api/replay.py b/backend/api/replay.py new file mode 100644 index 0000000..b28199f --- /dev/null +++ b/backend/api/replay.py @@ -0,0 +1,78 @@ +from fastapi import APIRouter +from events.replay import ReplayController + + +router = APIRouter() + +# in-memory demo store (later replace with session DB) +REPLAY_SESSIONS = {} + + +def get_session(journal_id): + return REPLAY_SESSIONS.get(journal_id) + + +# ------------------------- +# START REPLAY SESSION +# ------------------------- +@router.post("/replay/start") +def start_replay(journal): + + session = ReplayController(journal) + + session_id = str(id(session)) + + REPLAY_SESSIONS[session_id] = session + + return { + "session_id": session_id, + "event_count": len(session.timeline) + } + + +# ------------------------- +# STEP FORWARD +# ------------------------- +@router.post("/replay/step") +def replay_step(session_id: str): + + session = get_session(session_id) + + event = session.step() + + return { + "event": event, + "index": session.index + } + + +# ------------------------- +# SEEK +# ------------------------- +@router.post("/replay/seek") +def replay_seek(session_id: str, timestamp: float): + + session = get_session(session_id) + + event = session.seek(timestamp) + + return { + "event": event, + "index": session.index + } + + +# ------------------------- +# RESET +# ------------------------- +@router.post("/replay/reset") +def replay_reset(session_id: str): + + session = get_session(session_id) + + event = session.reset() + + return { + "event": event, + "index": session.index + } diff --git a/backend/api/state_stream.py b/backend/api/state_stream.py new file mode 100644 index 0000000..afd2843 --- /dev/null +++ b/backend/api/state_stream.py @@ -0,0 +1,15 @@ +from pipeline.engine import PipelineEngine + + +def get_state(nodes, engine): + return { + "pipeline": engine.last_results if engine else [], + "nodes": [ + { + "name": n["name"], + "load": n.get("load", 0), + "temp": n.get("temp", 0) + } + for n in nodes + ] + } diff --git a/backend/api/timeline.py b/backend/api/timeline.py new file mode 100644 index 0000000..e9793e6 --- /dev/null +++ b/backend/api/timeline.py @@ -0,0 +1,40 @@ +# api/timeline.py + +from fastapi import APIRouter +from analysis.timeline_store import TimelineStore + +router = APIRouter() + +STORE = TimelineStore() + + +def get_store(): + return STORE + + +# ----------------------------- +# GET NODE TIMELINE +# ----------------------------- +@router.get("/timeline/{node}") +def get_node_timeline(node: str): + + store = get_store() + + return { + "node": node, + "events": store.get_node_events(node) + } + + +# ----------------------------- +# REWIND SNAPSHOT +# ----------------------------- +@router.get("/timeline/rewind/{index}") +def rewind(index: int): + + store = get_store() + + return { + "index": index, + "snapshot": store.snapshot_at(index) + } diff --git a/backend/api/ws.py b/backend/api/ws.py new file mode 100644 index 0000000..bcf34a8 --- /dev/null +++ b/backend/api/ws.py @@ -0,0 +1,20 @@ +from backend.events.bus import EventBus +import json + + +class WebSocketStream: + + def __init__(self): + self.clients = [] + + def attach_bus(self, bus: EventBus): + + def forward(event): + payload = json.dumps(event) + for c in self.clients: + c.send(payload) + + bus.subscribe(forward) + + def register_client(self, client): + self.clients.append(client) diff --git a/backend/build.py b/backend/build.py new file mode 100644 index 0000000..1b66c5d --- /dev/null +++ b/backend/build.py @@ -0,0 +1,94 @@ +import sys +import os +import subprocess +from io import StringIO + +from snapshot import create_snapshot +from scheduler import build_distcc_hosts +from cache import make_build_key, cache_hit, cache_store +from compiler_env import build_ccache_env + + +def run_cmd(cmd, cwd, env): + p = subprocess.Popen( + cmd, + shell=True, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True + ) + + log = StringIO() + + for line in p.stdout: + print(line, end="") + log.write(line) + + p.wait() + return p.returncode, log.getvalue() + + +def build_target(project, snapshot, target, nodes): + env = os.environ.copy() + + # ---------------------------- + # distcc layer + # ---------------------------- + env["DISTCC_HOSTS"] = build_distcc_hosts(nodes) + + # ---------------------------- + # ccache layer (NEW) + # ---------------------------- + cc_env = build_ccache_env(project) + env.update(cc_env) + + build_env = project.get("build_env", {}) + env.update(build_env) + + key = make_build_key( + snapshot["hash"], + target, + build_env + ) + + print(f"[FESTER] target={target}") + print(f"[FESTER] cache_key={key}") + + # ---------------------------- + # top-level build cache + # ---------------------------- + if cache_hit(key): + print("[FESTER] BUILD CACHE HIT (skipping full compile)") + return 0 + + cmd = project["targets"][target] + + code, log = run_cmd(cmd, snapshot["path"], env) + + if code == 0: + cache_store(key, log) + + return code + + +def run_release(project): + snapshot = create_snapshot(project["source"]) + nodes = project.get("nodes_override", []) or [] + + results = {} + + for target in project["targets"]: + results[target] = build_target(project, snapshot, target, nodes) + + return results + + +if __name__ == "__main__": + import json + + with open(sys.argv[1]) as f: + project = json.load(f) + + run_release(project) diff --git a/backend/cache.py b/backend/cache.py new file mode 100644 index 0000000..f7b917d --- /dev/null +++ b/backend/cache.py @@ -0,0 +1,53 @@ +import os +import json +import hashlib +from pathlib import Path + + +CACHE_DIR = Path("/var/lib/fester/cache") +CACHE_DIR.mkdir(parents=True, exist_ok=True) + + +def hash_dict(d): + raw = json.dumps(d, sort_keys=True).encode() + return hashlib.sha256(raw).hexdigest() + + +def make_build_key(snapshot_hash, target, env): + return hashlib.sha256( + f"{snapshot_hash}:{target}:{hash_dict(env)}".encode() + ).hexdigest() + + +def cache_hit(key): + return (CACHE_DIR / key / "done").exists() + + +def cache_store(key, log): + base = CACHE_DIR / key + base.mkdir(parents=True, exist_ok=True) + + (base / "done").write_text("1") + (base / "log.txt").write_text(log) + + +def cache_log(key): + p = CACHE_DIR / key / "log.txt" + return p.read_text() if p.exists() else None + + +# ---------------------------- +# ccache health probe +# ---------------------------- +def ccache_stats(): + try: + import subprocess + + out = subprocess.check_output( + ["ccache", "-s"], + stderr=subprocess.STDOUT + ).decode() + + return out + except: + return "ccache not available" diff --git a/backend/cache/minio_cache.py b/backend/cache/minio_cache.py new file mode 100644 index 0000000..0757b41 --- /dev/null +++ b/backend/cache/minio_cache.py @@ -0,0 +1,43 @@ +import io +from minio import Minio +from cache import make_build_key + + +class MinioCache: + + def __init__(self, endpoint, access, secret, bucket="fester-cache"): + self.client = Minio( + endpoint, + access_key=access, + secret_key=secret, + secure=False + ) + + self.bucket = bucket + + if not self.client.bucket_exists(bucket): + self.client.make_bucket(bucket) + + def put(self, key, data): + stream = io.BytesIO(data.encode() if isinstance(data, str) else data) + + self.client.put_object( + self.bucket, + key, + stream, + length=len(stream.getvalue()) + ) + + def get(self, key): + try: + response = self.client.get_object(self.bucket, key) + return response.read() + except: + return None + + def exists(self, key): + try: + self.client.stat_object(self.bucket, key) + return True + except: + return False diff --git a/backend/cache/partitioned_cache.py b/backend/cache/partitioned_cache.py new file mode 100644 index 0000000..336c8c4 --- /dev/null +++ b/backend/cache/partitioned_cache.py @@ -0,0 +1,43 @@ +import hashlib + + +class PartitionedCache: + + def __init__(self, backend): + self.backend = backend # MinIO or local FS + + # ----------------------------- + # BUILD CACHE KEY + # ----------------------------- + def key(self, action, target): + + base = f"{target.key()}:{action['hash']}" + + return hashlib.sha256(base.encode()).hexdigest() + + # ----------------------------- + # EXISTS + # ----------------------------- + def exists(self, action, target): + + k = self.key(action, target) + + return self.backend.exists(k) + + # ----------------------------- + # STORE + # ----------------------------- + def store(self, action, target, data): + + k = self.key(action, target) + + self.backend.put(k, data) + + # ----------------------------- + # FETCH + # ----------------------------- + def fetch(self, action, target): + + k = self.key(action, target) + + return self.backend.get(k) diff --git a/backend/compiler_env.py b/backend/compiler_env.py new file mode 100644 index 0000000..3276e9e --- /dev/null +++ b/backend/compiler_env.py @@ -0,0 +1,43 @@ +import os +from cache import hash_dict + + +def build_ccache_env(project): + """ + Configure compiler caching layer safely. + """ + + env = {} + + # ---------------------------- + # enable ccache + # ---------------------------- + env["USE_CCACHE"] = "1" + env["CCACHE_DIR"] = "/var/cache/ccache" + + # ---------------------------- + # distcc + ccache integration + # critical ordering fix + # ---------------------------- + env["CC"] = "ccache gcc" + env["CXX"] = "ccache g++" + + # ensure distcc wraps AFTER ccache + env["CCACHE_PREFIX"] = "distcc" + + # ---------------------------- + # tuning for distributed builds + # ---------------------------- + env["CCACHE_COMPRESS"] = "1" + env["CCACHE_COMPRESSLEVEL"] = "5" + env["CCACHE_MAXSIZE"] = "20G" + + # ---------------------------- + # project-specific salt + # ensures correct cache isolation + # ---------------------------- + env["CCACHE_SLOPPINESS"] = "time_macros,locale" + + env["FESTER_CCACHE_KEY"] = hash_dict(project.get("build_env", {})) + + return env diff --git a/backend/config.py b/backend/config.py new file mode 100644 index 0000000..7d166b5 --- /dev/null +++ b/backend/config.py @@ -0,0 +1,8 @@ +import yaml +import os + +BASE_DIR = os.path.dirname(os.path.dirname(__file__)) +CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml") + +with open(CONFIG_PATH, "r") as f: + CONFIG = yaml.safe_load(f) diff --git a/backend/distcc.py b/backend/distcc.py new file mode 100644 index 0000000..9b878d5 --- /dev/null +++ b/backend/distcc.py @@ -0,0 +1,8 @@ +def generate_distcc_hosts(nodes): + hosts = [] + + for n in nodes: + if n.get("type", "physical") == "physical": + hosts.append(f"{n['host']}/{n['max_jobs']}") + + return " ".join(hosts) diff --git a/backend/events/bus.py b/backend/events/bus.py new file mode 100644 index 0000000..decbd41 --- /dev/null +++ b/backend/events/bus.py @@ -0,0 +1,25 @@ +from backend.events.schema import create_event +from backend.events.clock import GlobalClock + + +class EventBus: + + def __init__(self): + self.subscribers = [] + + def subscribe(self, fn): + self.subscribers.append(fn) + + def emit(self, event_type, **kwargs): + + event = create_event( + type=event_type, + timestamp=GlobalClock.tick(), + **kwargs + ) + + # fan-out to all listeners + for sub in self.subscribers: + sub(event) + + return event diff --git a/backend/events/clock.py b/backend/events/clock.py new file mode 100644 index 0000000..dfdd078 --- /dev/null +++ b/backend/events/clock.py @@ -0,0 +1,23 @@ +import time +import threading + + +class GlobalClock: + """ + Deterministic monotonic event clock. + Used for replay + ordering across distributed nodes. + """ + + _lock = threading.Lock() + _counter = 0 + + @classmethod + def tick(cls) -> float: + with cls._lock: + cls._counter += 1 + return time.time() + (cls._counter * 1e-9) + + @classmethod + def reset(cls): + with cls._lock: + cls._counter = 0 diff --git a/backend/events/emitter.py b/backend/events/emitter.py new file mode 100644 index 0000000..ae05648 --- /dev/null +++ b/backend/events/emitter.py @@ -0,0 +1,33 @@ +from events.bus import BUS +from events.types import EVENT_PIPELINE, EVENT_NODE, EVENT_POLICY + + +async def emit_pipeline(action, state, node): + + await BUS.emit({ + "type": EVENT_PIPELINE, + "data": { + "action": action["name"], + "state": state, + "node": node["name"] + } + }) + + +async def emit_policy(key, value): + + await BUS.emit({ + "type": EVENT_POLICY, + "data": { + "key": key, + "value": value + } + }) + + +async def emit_node(node): + + await BUS.emit({ + "type": EVENT_NODE, + "data": node + }) diff --git a/backend/events/failure_stream.py b/backend/events/failure_stream.py new file mode 100644 index 0000000..929a3d9 --- /dev/null +++ b/backend/events/failure_stream.py @@ -0,0 +1,21 @@ +# events/failure_stream.py + +import asyncio +import json + +class FailureStream: + + def __init__(self, ws_broadcaster, propagation_engine): + self.ws = ws_broadcaster + self.propagation = propagation_engine + + async def emit_failure(self, failed_node): + + map_result = self.propagation.map_failure(failed_node) + + payload = { + "type": "failure_propagation", + "data": map_result + } + + await self.ws.broadcast(json.dumps(payload)) diff --git a/backend/events/journal.py b/backend/events/journal.py new file mode 100644 index 0000000..675970e --- /dev/null +++ b/backend/events/journal.py @@ -0,0 +1,52 @@ +import time +import json +from collections import defaultdict + + +class EventJournal: + + def __init__(self): + # immutable append-only log + self.events = [] + + # indexed views for fast debug queries + self.by_action = defaultdict(list) + self.by_node = defaultdict(list) + + # ----------------------------- + # WRITE EVENT + # ----------------------------- + def record(self, event_type, payload): + + event = { + "ts": time.time(), + "type": event_type, + "data": payload + } + + self.events.append(event) + + if "action" in payload: + self.by_action[payload["action"]].append(event) + + if "node" in payload: + self.by_node[payload["node"]].append(event) + + # ----------------------------- + # QUERY ACTION TRACE + # ----------------------------- + def trace_action(self, action_name): + return self.by_action.get(action_name, []) + + # ----------------------------- + # QUERY NODE TRACE + # ----------------------------- + def trace_node(self, node_name): + return self.by_node.get(node_name, []) + + # ----------------------------- + # FULL REPLAY STREAM + # ----------------------------- + def replay(self): + for event in sorted(self.events, key=lambda e: e["ts"]): + yield event diff --git a/backend/events/replay.py b/backend/events/replay.py new file mode 100644 index 0000000..27f37fc --- /dev/null +++ b/backend/events/replay.py @@ -0,0 +1,75 @@ +import bisect + + +class ReplayController: + + def __init__(self, journal): + + self.journal = journal + self.index = 0 + self.playing = False + + # pre-sorted events for deterministic replay + self.timeline = sorted( + journal.events, + key=lambda e: e["ts"] + ) + + # ------------------------- + # CURRENT EVENT + # ------------------------- + def current(self): + + if 0 <= self.index < len(self.timeline): + return self.timeline[self.index] + + return None + + # ------------------------- + # STEP FORWARD + # ------------------------- + def step(self): + + if self.index < len(self.timeline) - 1: + self.index += 1 + + return self.current() + + # ------------------------- + # STEP BACKWARD + # ------------------------- + def back(self): + + if self.index > 0: + self.index -= 1 + + return self.current() + + # ------------------------- + # SEEK BY TIMESTAMP + # ------------------------- + def seek(self, timestamp): + + times = [e["ts"] for e in self.timeline] + + self.index = bisect.bisect_left(times, timestamp) + + return self.current() + + # ------------------------- + # RESET + # ------------------------- + def reset(self): + + self.index = 0 + + return self.current() + + # ------------------------- + # PLAY (ITERATOR MODE) + # ------------------------- + def play(self): + + for i in range(self.index, len(self.timeline)): + self.index = i + yield self.timeline[i] diff --git a/backend/events/schema.py b/backend/events/schema.py new file mode 100644 index 0000000..13af0fa --- /dev/null +++ b/backend/events/schema.py @@ -0,0 +1,49 @@ +from enum import Enum +from dataclasses import dataclass, asdict +from typing import Optional, Dict, Any +import time + + +class EventType(str, Enum): + NODE_UPDATE = "node_update" + TASK_UPDATE = "task_update" + PIPELINE_UPDATE = "pipeline_update" + CACHE_UPDATE = "cache_update" + FAILURE = "failure" + DEBUG = "debug" + REPLAY = "replay" + + +@dataclass +class FesterEvent: + """ + Canonical event used everywhere in the system. + """ + + type: EventType + timestamp: float + node: Optional[str] + action: Optional[str] + state: Optional[str] + + # scheduling + explainability + score: Optional[float] = None + reason: Optional[str] = None + + # DAG / execution graph + parent: Optional[str] = None + target: Optional[str] = None + + # extensibility (NEVER bypass schema, extend here) + meta: Optional[Dict[str, Any]] = None + + +def create_event(**kwargs) -> dict: + """ + Always use this instead of raw dict events. + Guarantees consistency across system. + """ + if "timestamp" not in kwargs: + kwargs["timestamp"] = time.time() + + return asdict(FesterEvent(**kwargs)) diff --git a/backend/events/stream.py b/backend/events/stream.py new file mode 100644 index 0000000..edfe9ce --- /dev/null +++ b/backend/events/stream.py @@ -0,0 +1,42 @@ +import asyncio +import json + + +class EventStream: + + def __init__(self): + + self.subscribers = set() + + # ----------------------------- + # SUBSCRIBE (WebSocket clients) + # ----------------------------- + def subscribe(self, ws): + + self.subscribers.add(ws) + + def unsubscribe(self, ws): + + self.subscribers.discard(ws) + + # ----------------------------- + # EMIT EVENT (REALTIME FANOUT) + # ----------------------------- + async def emit(self, event_type, payload): + + message = json.dumps({ + "type": event_type, + "data": payload + }) + + dead = set() + + for ws in self.subscribers: + + try: + await ws.send(message) + except Exception: + dead.add(ws) + + for ws in dead: + self.subscribers.discard(ws) diff --git a/backend/events/types.py b/backend/events/types.py new file mode 100644 index 0000000..e5211de --- /dev/null +++ b/backend/events/types.py @@ -0,0 +1,4 @@ +EVENT_PIPELINE = "pipeline" +EVENT_NODE = "node" +EVENT_POLICY = "policy" +EVENT_SESSION = "session" diff --git a/backend/executor/action_runner.py b/backend/executor/action_runner.py new file mode 100644 index 0000000..193b8ef --- /dev/null +++ b/backend/executor/action_runner.py @@ -0,0 +1,36 @@ +from backend.events.bus import EventBus + + +def run_action(action, node, bus: EventBus): + + bus.emit( + "task_update", + node=node["name"], + action=action["name"], + state="running" + ) + + try: + # placeholder execution logic + result = action["cmd"]() + + bus.emit( + "task_update", + node=node["name"], + action=action["name"], + state="done" + ) + + return 0 + + except Exception as e: + + bus.emit( + "failure", + node=node["name"], + action=action["name"], + state="failed", + reason=str(e) + ) + + return 1 diff --git a/backend/executor/adapters.py b/backend/executor/adapters.py new file mode 100644 index 0000000..c976c52 --- /dev/null +++ b/backend/executor/adapters.py @@ -0,0 +1,16 @@ +import subprocess + + +def run_host(command, cwd, env): + return subprocess.run(command, shell=True, cwd=cwd, env=env).returncode + + +def run_lxc(container, command): + cmd = f"lxc exec {container} -- bash -lc '{command}'" + return subprocess.run(cmd, shell=True).returncode + + +def run_libvirt(vm_name, command): + cmd = f"virsh domfsfreeze {vm_name} && virsh send-process-signal {vm_name} 15" + # simplified placeholder โ€” real execution would use ssh/agent channel + return subprocess.run(cmd, shell=True).returncode diff --git a/backend/executor/router.py b/backend/executor/router.py new file mode 100644 index 0000000..0dcee14 --- /dev/null +++ b/backend/executor/router.py @@ -0,0 +1,48 @@ +import subprocess + + +def execute(action, node, spec): + + exec_type = spec.execution + + # ----------------------------- + # DISTCC PATH + # ----------------------------- + if exec_type == "distcc": + return run_distcc(action, node, spec) + + # ----------------------------- + # LXC PATH + # ----------------------------- + if exec_type == "lxc": + return run_lxc(action, node, spec) + + # ----------------------------- + # LIBVIRT PATH + # ----------------------------- + if exec_type == "libvirt": + return run_libvirt(action, node, spec) + + # ----------------------------- + # TMUX LOCAL PATH + # ----------------------------- + if exec_type == "tmux": + return run_tmux(action, node, spec) + + raise ValueError(f"Unknown execution type: {exec_type}") + + +def run_distcc(action, node, spec): + return subprocess.call(["distcc", action["cmd"]]) + + +def run_lxc(action, node, spec): + return subprocess.call(["lxc", "exec", node["container"], "--", action["cmd"]]) + + +def run_libvirt(action, node, spec): + return subprocess.call(["virsh", "qemu-agent-command", node["vm"], action["cmd"]]) + + +def run_tmux(action, node, spec): + return subprocess.call(["tmux", "new", "-d", action["cmd"]]) diff --git a/backend/executor/runtime_router.py b/backend/executor/runtime_router.py new file mode 100644 index 0000000..2625145 --- /dev/null +++ b/backend/executor/runtime_router.py @@ -0,0 +1,31 @@ +from executor.adapters import run_host, run_lxc, run_libvirt + + +def pick_runtime(action, node_role): + """ + Decide execution backend based on action + node role + """ + + runtime = action.get("runtime", "host") + + if runtime == "lxc": + return "lxc" + + if runtime == "vm": + return "libvirt" + + return "host" + + +def execute_action(action, workspace, node): + runtime = pick_runtime(action, node.get("role", {})) + + command = action["command"] + + if runtime == "lxc": + return run_lxc(node["container"], command) + + if runtime == "libvirt": + return run_libvirt(node["vm"], command) + + return run_host(command, workspace, {}) diff --git a/backend/governor.py b/backend/governor.py new file mode 100644 index 0000000..45394d2 --- /dev/null +++ b/backend/governor.py @@ -0,0 +1,23 @@ +def thermal_cap(agent): + if not agent: + return 0.3 + + temp = agent.get("temp", "") + load = agent.get("load", "1 1 1") + + try: + cpu = float(load.split()[0]) + except: + cpu = 1.0 + + # safe X99 operating envelope + if "90" in temp: + return 0.2 + + if "80" in temp: + return 0.5 + + if cpu > 4: + return 0.6 + + return 1.0 diff --git a/backend/graph/actions.py b/backend/graph/actions.py new file mode 100644 index 0000000..a1ed5e5 --- /dev/null +++ b/backend/graph/actions.py @@ -0,0 +1,19 @@ +import hashlib +import json + + +def hash_action(action): + raw = json.dumps(action, sort_keys=True).encode() + return hashlib.sha256(raw).hexdigest() + + +def create_action(name, inputs, command, outputs, + env=None, runtime="host"): + return { + "name": name, + "inputs": inputs, + "command": command, + "outputs": outputs, + "env": env or {}, + "runtime": runtime + } diff --git a/backend/graph/build_graph.py b/backend/graph/build_graph.py new file mode 100644 index 0000000..24bd735 --- /dev/null +++ b/backend/graph/build_graph.py @@ -0,0 +1,106 @@ +class BuildNode: + + def __init__(self, name, cmd, deps=None, cache_key=None): + + self.name = name + self.cmd = cmd + self.deps = deps or [] + self.cache_key = cache_key + + def __repr__(self): + return f"" + + +class BuildGraph: + + def __init__(self, spec): + + self.spec = spec + self.nodes = [] + + # ----------------------------- + # ENTRY POINT + # ----------------------------- + def generate(self): + + self._fetch_source() + self._resolve_toolchain() + self._compile_units() + self._link_stage() + self._package() + self._cache() + + return self.nodes + + # ----------------------------- + # STAGES + # ----------------------------- + def _fetch_source(self): + + self.nodes.append( + BuildNode( + name="fetch_source", + cmd="git clone repo", + cache_key="src" + ) + ) + + def _resolve_toolchain(self): + + cmd = f"setup-toolchain --arch {self.spec.arch} --target {self.spec.target}" + + self.nodes.append( + BuildNode( + name="resolve_toolchain", + cmd=cmd, + deps=["fetch_source"], + cache_key="toolchain" + ) + ) + + def _compile_units(self): + + # simplified fan-out model (real version would parse AST / Makefile / Ninja) + for i in range(3): + + self.nodes.append( + BuildNode( + name=f"compile_unit_{i}", + cmd=f"{self.spec.toolchain} -c file_{i}.c", + deps=["resolve_toolchain"], + cache_key=f"obj_{i}" + ) + ) + + def _link_stage(self): + + self.nodes.append( + BuildNode( + name="link", + cmd=f"{self.spec.toolchain} link *.o -o output.elf", + deps=["compile_unit_0", "compile_unit_1", "compile_unit_2"], + cache_key="binary" + ) + ) + + def _package(self): + + self.nodes.append( + BuildNode( + name="package", + cmd=f"package --format {self.spec.output_mode}", + deps=["link"], + cache_key="artifact" + ) + ) + + def _cache(self): + + self.nodes.append( + BuildNode( + name="cache_upload", + cmd="upload-cache", + deps=["package"], + cache_key="cache" + ) + ) diff --git a/backend/graph/critical_path.py b/backend/graph/critical_path.py new file mode 100644 index 0000000..7216840 --- /dev/null +++ b/backend/graph/critical_path.py @@ -0,0 +1,64 @@ +# graph/critical_path.py + +def compute_critical_path(actions): + """ + Computes the longest dependency-weighted path in a build DAG. + + This represents the true bottleneck chain ("critical path") + that determines total build time. + """ + + # ----------------------------- + # Build adjacency + cost map + # ----------------------------- + graph = {} + duration = {} + + for action in actions: + name = action["name"] + deps = action.get("deps", []) + + graph[name] = deps + + # fallback cost model (later replace with real timing data) + duration[name] = action.get("cost", 1) + + # ----------------------------- + # memoized DFS for longest path + # ----------------------------- + memo = {} + + def dfs(node): + if node in memo: + return memo[node] + + # leaf node + if node not in graph or not graph[node]: + memo[node] = duration.get(node, 1) + return memo[node] + + best_dep = 0 + + for dep in graph[node]: + best_dep = max(best_dep, dfs(dep)) + + memo[node] = best_dep + duration.get(node, 1) + return memo[node] + + # ----------------------------- + # find worst (critical) endpoint + # ----------------------------- + critical_node = None + critical_score = -1 + + for node in graph.keys(): + score = dfs(node) + if score > critical_score: + critical_score = score + critical_node = node + + return { + "critical_node": critical_node, + "critical_score": critical_score, + "score_map": memo + } diff --git a/backend/graph/plan.py b/backend/graph/plan.py new file mode 100644 index 0000000..313f57a --- /dev/null +++ b/backend/graph/plan.py @@ -0,0 +1,25 @@ +from graph.actions import hash_action + + +def build_action_graph(project): + """ + Converts project targets into executable actions + """ + + graph = [] + + for target, cmd in project["targets"].items(): + + action = { + "name": target, + "inputs": ["src/**"], + "command": cmd, + "outputs": [f"build/{target}.out"], + "env": project.get("build_env", {}) + } + + action["hash"] = hash_action(action) + + graph.append(action) + + return graph diff --git a/backend/graph/target_dag.py b/backend/graph/target_dag.py new file mode 100644 index 0000000..1956b94 --- /dev/null +++ b/backend/graph/target_dag.py @@ -0,0 +1,60 @@ +def build_target_dag(project, targets): + + dag = [] + + for target in targets: + + dag.append({ + "name": f"{target.system}-{target.arch}", + "target": target, + "steps": generate_steps(project, target) + }) + + return dag + + +def generate_steps(project, target): + + steps = [] + + # ----------------------------- + # SYSTEM-SPECIFIC BUILD LOGIC + # ----------------------------- + + if target.system == "gentoo": + steps.append({ + "name": "emerge", + "command": f"emerge -e {project['name']}" + }) + + elif target.system == "buildroot": + steps.append({ + "name": "buildroot", + "command": "make" + }) + + elif target.system == "openwrt": + steps.append({ + "name": "openwrt", + "command": "make world" + }) + + elif target.system == "alfs": + steps.append({ + "name": "alfs", + "command": "./build-lfs.sh" + }) + + elif target.system == "sourcemage": + steps.append({ + "name": "sorcery", + "command": "cast " + project["name"] + }) + + elif target.system == "lunar": + steps.append({ + "name": "lunar", + "command": "lunar build " + project["name"] + }) + + return steps diff --git a/backend/integrations/forgejo.py b/backend/integrations/forgejo.py new file mode 100644 index 0000000..caf6aab --- /dev/null +++ b/backend/integrations/forgejo.py @@ -0,0 +1,8 @@ +from pipeline.engine import PipelineEngine + + +def on_push_event(project, nodes): + + engine = PipelineEngine(nodes) + + return engine.run(project) diff --git a/backend/integrations/libvirt.py b/backend/integrations/libvirt.py new file mode 100644 index 0000000..358fa10 --- /dev/null +++ b/backend/integrations/libvirt.py @@ -0,0 +1,19 @@ +import subprocess + + +class LibvirtManager: + + def execute(self, target, command): + + domain = target.name + + # simplistic execution via ssh inside VM + return subprocess.call([ + "virsh", + "domexec", + domain, + "--", + "bash", + "-c", + command + ]) diff --git a/backend/integrations/lxc.py b/backend/integrations/lxc.py new file mode 100644 index 0000000..3f2bb0b --- /dev/null +++ b/backend/integrations/lxc.py @@ -0,0 +1,18 @@ +import subprocess + + +class LXCManager: + + def execute(self, target, command): + + container = target.name + + return subprocess.call([ + "lxc-attach", + "-n", + container, + "--", + "bash", + "-c", + command + ]) diff --git a/backend/integrations/mosh.py b/backend/integrations/mosh.py new file mode 100644 index 0000000..b575c43 --- /dev/null +++ b/backend/integrations/mosh.py @@ -0,0 +1,12 @@ +import subprocess + + +class MoshManager: + + def connect(self, node): + host = node["host"] + + subprocess.run([ + "mosh", + host + ]) diff --git a/backend/integrations/tmux.py b/backend/integrations/tmux.py new file mode 100644 index 0000000..19cebd2 --- /dev/null +++ b/backend/integrations/tmux.py @@ -0,0 +1,26 @@ +import subprocess + + +class TmuxManager: + + def session_name(self, action): + return f"fester-{action['name']}" + + def create_session(self, action, cmd): + name = self.session_name(action) + + subprocess.run([ + "tmux", "new-session", "-d", + "-s", name, + cmd + ]) + + return name + + def attach(self, action): + name = self.session_name(action) + subprocess.run(["tmux", "attach", "-t", name]) + + def kill(self, action): + name = self.session_name(action) + subprocess.run(["tmux", "kill-session", "-t", name]) diff --git a/backend/metrics/cluster_state.py b/backend/metrics/cluster_state.py new file mode 100644 index 0000000..221047b --- /dev/null +++ b/backend/metrics/cluster_state.py @@ -0,0 +1,41 @@ +class ClusterState: + + def __init__(self): + + self.nodes = {} + + def register_node(self, node_id, node_state): + + self.nodes[node_id] = node_state + + # ----------------------------- + # GLOBAL AGGREGATION (VERY CHEAP) + # ----------------------------- + def snapshot(self): + + hot_nodes = 0 + total_temp = 0 + total_cpu = 0 + instability_sum = 0 + n = len(self.nodes) + + if n == 0: + return {} + + for node in self.nodes.values(): + + s = node.snapshot() + + total_temp += s["avg_temp"] + total_cpu += s["avg_cpu"] + instability_sum += s["instability"] + + if s["avg_temp"] > 80: + hot_nodes += 1 + + return { + "avg_cluster_temp": total_temp / n, + "avg_cluster_cpu": total_cpu / n, + "instability": instability_sum / n, + "hot_nodes": hot_nodes + } diff --git a/backend/metrics/collector.py b/backend/metrics/collector.py new file mode 100644 index 0000000..83915ab --- /dev/null +++ b/backend/metrics/collector.py @@ -0,0 +1,17 @@ +import time +import psutil + + +class MetricsCollector: + """ + Lightweight node telemetry sampler. + """ + + def sample(self): + + return { + "cpu": psutil.cpu_percent(interval=0.1), + "memory": psutil.virtual_memory().percent, + "load": psutil.getloadavg()[0] if hasattr(psutil, "getloadavg") else 0, + "timestamp": time.time(), + } diff --git a/backend/metrics/fester_intelligence.py b/backend/metrics/fester_intelligence.py new file mode 100644 index 0000000..e74e5ca --- /dev/null +++ b/backend/metrics/fester_intelligence.py @@ -0,0 +1,147 @@ +from metrics.target_scoring import TargetScorer +from metrics.target_labeler import TargetLabeler +from metrics.profile_store import save + + +class LiveTargetState: + + def __init__(self, target_key): + self.target = target_key + + # ----------------------------- + # RUNNING STATE (incremental) + # ----------------------------- + self.cpu_sum = 0 + self.temp_sum = 0 + self.samples = 0 + + self.peak_temp = 0 + + self.actions = 0 + self.cache_hits = 0 + self.failures = 0 + + # ----------------------------- + # CACHED DERIVED VALUES + # ----------------------------- + self.cached_score = None + self.cached_labels = None + self.dirty = True + + +class FesterIntelligence: + + def __init__(self): + + self.states = {} + self.scorer = TargetScorer() + self.labeler = TargetLabeler() + + # ----------------------------- + # INIT + # ----------------------------- + def start_target(self, target_key): + + self.states[target_key] = LiveTargetState(target_key) + + # ----------------------------- + # FAST UPDATES (O(1)) + # ----------------------------- + def sample(self, target_key, node): + + s = self.states[target_key] + + cpu = node.get("cpu_load", 0) + temp = node.get("temp", 0) + + s.cpu_sum += cpu + s.temp_sum += temp + s.samples += 1 + + if temp > s.peak_temp: + s.peak_temp = temp + + s.dirty = True + + def record_event(self, target_key, event): + + s = self.states[target_key] + + if event == "action": + s.actions += 1 + + elif event == "cache_hit": + s.cache_hits += 1 + + elif event == "failure": + s.failures += 1 + + s.dirty = True + + # ----------------------------- + # FAST SNAPSHOT (NO RECOMPUTE UNLESS DIRTY) + # ----------------------------- + def snapshot(self, target_key): + + s = self.states.get(target_key) + + if not s: + return None + + # ----------------------------- + # RETURN CACHED RESULT + # ----------------------------- + if not s.dirty and s.cached_score is not None: + return { + "score": s.cached_score, + "labels": s.cached_labels + } + + # ----------------------------- + # COMPUTE AGGREGATES + # ----------------------------- + avg_cpu = s.cpu_sum / s.samples if s.samples else 0 + avg_temp = s.temp_sum / s.samples if s.samples else 0 + + cache_ratio = s.cache_hits / s.actions if s.actions else 0 + + profile = { + "duration": 0, # intentionally not scheduler-relevant anymore + "avg_cpu": avg_cpu, + "avg_temp": avg_temp, + "peak_temp": s.peak_temp, + "cache_ratio": cache_ratio, + "failures": s.failures + } + + score = self.scorer.score(profile) + labels = self.labeler.label(score) + + # ----------------------------- + # CACHE RESULT + # ----------------------------- + s.cached_score = score + s.cached_labels = labels + s.dirty = False + + return { + "score": score, + "labels": labels + } + + # ----------------------------- + # FINALIZE (ARCHIVAL ONLY) + # ----------------------------- + def finalize(self, target_key): + + s = self.states[target_key] + + snapshot = self.snapshot(target_key) + + save({ + "target": target_key, + "score": snapshot["score"], + "labels": snapshot["labels"] + }) + + return snapshot diff --git a/backend/metrics/node_state.py b/backend/metrics/node_state.py new file mode 100644 index 0000000..4a0acd0 --- /dev/null +++ b/backend/metrics/node_state.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass, field +from typing import Dict + + +@dataclass +class NodeState: + name: str + + cpu_load: float = 0.0 + memory_load: float = 0.0 + temp: float = 0.0 + instability: float = 0.0 + + active_jobs: int = 0 + last_seen: float = 0.0 + + labels: Dict = field(default_factory=dict) + + +class NodeStateRegistry: + + def __init__(self): + self.nodes = {} + + def update(self, name, **kwargs): + + if name not in self.nodes: + self.nodes[name] = NodeState(name=name) + + node = self.nodes[name] + + for k, v in kwargs.items(): + setattr(node, k, v) + + def get(self, name): + return self.nodes.get(name) + + def all(self): + return list(self.nodes.values()) diff --git a/backend/metrics/observability.py b/backend/metrics/observability.py new file mode 100644 index 0000000..09cfd19 --- /dev/null +++ b/backend/metrics/observability.py @@ -0,0 +1,48 @@ +from backend.events.bus import EventBus +from backend.metrics.node_state import NodeStateRegistry + + +class ObservabilityHub: + """ + Single fusion layer for: + - events (what happened) + - metrics (what is happening) + - state (what is true now) + """ + + def __init__(self, registry: NodeStateRegistry, bus: EventBus): + self.registry = registry + self.bus = bus + self.subscribers = [] + + # ----------------------------- + # STREAM OUT TO UI / PROMETHEUS / GRAFANA + # ----------------------------- + def subscribe(self, fn): + self.subscribers.append(fn) + + def emit(self, event: dict): + + # enrich event with live state snapshot + if event.get("node"): + node = self.registry.get(event["node"]) + if node: + event["node_state"] = { + "cpu": node.cpu_load, + "temp": node.temp, + "instability": node.instability, + "active_jobs": node.active_jobs, + } + + for sub in self.subscribers: + sub(event) + + # ----------------------------- + # CONNECT EVENT BUS + # ----------------------------- + def attach_bus(self): + + def forward(event): + self.emit(event) + + self.bus.subscribe(forward) diff --git a/backend/metrics/profile_store.py b/backend/metrics/profile_store.py new file mode 100644 index 0000000..51d11dc --- /dev/null +++ b/backend/metrics/profile_store.py @@ -0,0 +1,9 @@ +PROFILES = [] + + +def save(profile): + PROFILES.append(profile) + + +def all(): + return PROFILES diff --git a/backend/metrics/prometheus.py b/backend/metrics/prometheus.py new file mode 100644 index 0000000..7459187 --- /dev/null +++ b/backend/metrics/prometheus.py @@ -0,0 +1,22 @@ +from prometheus_client import Gauge, Counter, start_http_server + +# ----------------------------- +# METRICS +# ----------------------------- + +# Node metrics +node_cpu = Gauge("fester_node_cpu", "CPU load per node", ["node"]) +node_temp = Gauge("fester_node_temp", "Temperature per node", ["node"]) + +# Pipeline metrics +pipeline_actions = Counter("fester_pipeline_actions_total", "Total actions", ["state"]) +cache_hits = Counter("fester_cache_hits_total", "Cache hits", ["target"]) + +# Scheduler metrics +scheduler_score = Gauge("fester_scheduler_score", "Node score", ["node", "target"]) + +# ----------------------------- +# EXPORTER START +# ----------------------------- +def start_metrics_server(port=9109): + start_http_server(port) diff --git a/backend/metrics/target_labeler.py b/backend/metrics/target_labeler.py new file mode 100644 index 0000000..debe25f --- /dev/null +++ b/backend/metrics/target_labeler.py @@ -0,0 +1,30 @@ +class TargetLabeler: + + def label(self, score): + + labels = [] + + # ----------------------------- + # THERMAL + # ----------------------------- + if score["thermal"] > 80: + labels.append("HOT") + elif score["thermal"] < 60: + labels.append("COOL") + + # ----------------------------- + # COST + # ----------------------------- + if score["cost"] > 100: + labels.append("EXPENSIVE") + + # ----------------------------- + # STABILITY + # ----------------------------- + if score["instability"] > 5: + labels.append("UNSTABLE") + + if not labels: + labels.append("SAFE") + + return labels diff --git a/backend/metrics/target_profile.py b/backend/metrics/target_profile.py new file mode 100644 index 0000000..9640c80 --- /dev/null +++ b/backend/metrics/target_profile.py @@ -0,0 +1,60 @@ +import time + + +class TargetProfile: + + def __init__(self, target_key): + + self.target = target_key + + self.start_time = time.time() + + self.cpu_samples = [] + self.temp_samples = [] + + self.cache_hits = 0 + self.actions = 0 + self.failures = 0 + + # ----------------------------- + # RECORD SAMPLE + # ----------------------------- + def sample(self, node): + + self.cpu_samples.append(node.get("cpu_load", 0)) + self.temp_samples.append(node.get("temp", 0)) + + # ----------------------------- + # RECORD EVENTS + # ----------------------------- + def record_action(self): + self.actions += 1 + + def record_cache_hit(self): + self.cache_hits += 1 + + def record_failure(self): + self.failures += 1 + + # ----------------------------- + # FINALIZE PROFILE + # ----------------------------- + def finalize(self): + + duration = time.time() - self.start_time + + avg_cpu = sum(self.cpu_samples) / len(self.cpu_samples) if self.cpu_samples else 0 + avg_temp = sum(self.temp_samples) / len(self.temp_samples) if self.temp_samples else 0 + peak_temp = max(self.temp_samples) if self.temp_samples else 0 + + cache_ratio = self.cache_hits / self.actions if self.actions else 0 + + return { + "target": self.target, + "duration": duration, + "avg_cpu": avg_cpu, + "avg_temp": avg_temp, + "peak_temp": peak_temp, + "cache_ratio": cache_ratio, + "failures": self.failures + } diff --git a/backend/metrics/target_scoring.py b/backend/metrics/target_scoring.py new file mode 100644 index 0000000..49d3f51 --- /dev/null +++ b/backend/metrics/target_scoring.py @@ -0,0 +1,27 @@ +class TargetScorer: + + def score(self, profile): + + # ----------------------------- + # THERMAL RISK SCORE (0โ€“100) + # ----------------------------- + thermal = ( + profile["peak_temp"] * 0.5 + + profile["avg_temp"] * 0.5 + ) + + # ----------------------------- + # COST SCORE + # ----------------------------- + cost = profile["duration"] * (1 - profile["cache_ratio"] + 0.1) + + # ----------------------------- + # STABILITY SCORE + # ----------------------------- + instability = profile["failures"] * 10 + + return { + "thermal": thermal, + "cost": cost, + "instability": instability + } diff --git a/backend/nodes.py b/backend/nodes.py new file mode 100644 index 0000000..e5e664f --- /dev/null +++ b/backend/nodes.py @@ -0,0 +1,42 @@ +import json +import requests +from config import CONFIG +from scheduler import build_distcc_hosts + +AGENT_PORT = 8787 + + +def fetch_agent(node): + try: + r = requests.get(f"http://{node['host']}:{AGENT_PORT}/status", timeout=1.5) + return r.json() + except: + return None + + +def enrich(node): + agent = fetch_agent(node) + + return { + "name": node["name"], + "host": node["host"], + "state": "online" if agent else "offline", + "agent": agent, + "release_ready": agent is not None + } + + +def main(): + nodes = [enrich(n) for n in CONFIG["nodes"]] + + output = { + "master": CONFIG["master"], + "nodes": nodes, + "distcc": build_distcc_hosts(nodes) + } + + print(json.dumps(output, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/backend/nodes/roles.py b/backend/nodes/roles.py new file mode 100644 index 0000000..0fde69c --- /dev/null +++ b/backend/nodes/roles.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass, field + + +@dataclass +class NodeRole: + name: str + + # ---------------------------- + # cache behavior + # ---------------------------- + cache_writer: bool = True # can store artifacts + cache_reader: bool = True # can fetch artifacts + cache_priority: int = 50 # higher = preferred cache source + + # ---------------------------- + # compute behavior + # ---------------------------- + compile_weight: float = 1.0 # scheduler weight multiplier + distcc_enabled: bool = True + + # ---------------------------- + # thermal safety + # ---------------------------- + max_thermal_state: float = 0.85 + + # ---------------------------- + # storage capabilities + # ---------------------------- + tmpfs_ok: bool = True + btrfs_ok: bool = False + qcow2_ok: bool = False diff --git a/backend/nodes/roles_store.py b/backend/nodes/roles_store.py new file mode 100644 index 0000000..d8fe309 --- /dev/null +++ b/backend/nodes/roles_store.py @@ -0,0 +1,39 @@ +import json +from pathlib import Path +from nodes.roles import NodeRole + + +ROLE_DB = Path("/etc/fester/node_roles.json") + + +DEFAULT_ROLE = NodeRole(name="default") + + +def load_roles(): + if not ROLE_DB.exists(): + return {} + + try: + raw = json.loads(ROLE_DB.read_text()) + except: + return {} + + roles = {} + + for node, cfg in raw.items(): + roles[node] = NodeRole(**cfg) + + return roles + + +def save_roles(roles): + raw = { + node: role.__dict__ + for node, role in roles.items() + } + + ROLE_DB.write_text(json.dumps(raw, indent=2)) + + +def get_role(node_name, roles): + return roles.get(node_name, DEFAULT_ROLE) diff --git a/backend/nodes/state_model.py b/backend/nodes/state_model.py new file mode 100644 index 0000000..247dbf0 --- /dev/null +++ b/backend/nodes/state_model.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass, field +from typing import Dict, Any, Optional + + +@dataclass +class NodeState: + """ + Single source of truth for node health + capability. + """ + + name: str + + # runtime + cpu_load: float = 0.0 + memory_load: float = 0.0 + temp: float = 0.0 + + # scheduling intelligence + score: float = 0.0 + instability: float = 0.0 + + # execution tracking + active_jobs: int = 0 + last_seen: float = 0.0 + + # capabilities + labels: Dict[str, Any] = field(default_factory=dict) + + # health flags + healthy: bool = True + degraded: bool = False + offline: bool = False + + +class NodeStateRegistry: + """ + Central authority for node truth. + """ + + def __init__(self): + self.nodes: Dict[str, NodeState] = {} + + def update(self, name: str, **kwargs): + if name not in self.nodes: + self.nodes[name] = NodeState(name=name) + + for k, v in kwargs.items(): + setattr(self.nodes[name], k, v) + + def get(self, name: str) -> Optional[NodeState]: + return self.nodes.get(name) + + def all(self): + return list(self.nodes.values()) diff --git a/backend/pipeline/engine.py b/backend/pipeline/engine.py new file mode 100644 index 0000000..d604b72 --- /dev/null +++ b/backend/pipeline/engine.py @@ -0,0 +1,60 @@ +from backend.pipeline.state import State +from backend.scheduler.optimizer import choose_best_node +from backend.executor.runtime_router import execute_action +from backend.cache.minio_cache import MinioCache +from backend.graph.plan import build_action_graph +from backend.events.bus import EventBus + + +class PipelineEngine: + + def __init__(self, nodes, node_registry, event_bus: EventBus): + self.nodes = nodes + self.node_registry = node_registry + self.bus = event_bus + + self.cache = MinioCache( + "localhost:9000", + "minioadmin", + "minioadmin" + ) + + async def run(self, project): + + actions = build_action_graph(project) + results = [] + + for action in actions: + + node = choose_best_node(self.nodes, action) + + self.bus.emit( + "task_update", + node=node["name"], + action=action["name"], + state="scheduled" + ) + + if self.cache.exists(action["hash"]): + self.bus.emit( + "cache_update", + node=node["name"], + action=action["name"], + state="hit" + ) + continue + + rc = execute_action(action, {}, node) + + state = "done" if rc == 0 else "failed" + + self.bus.emit( + "task_update", + node=node["name"], + action=action["name"], + state=state + ) + + results.append((action["name"], state)) + + return results diff --git a/backend/pipeline/feedback.py b/backend/pipeline/feedback.py new file mode 100644 index 0000000..6770789 --- /dev/null +++ b/backend/pipeline/feedback.py @@ -0,0 +1,15 @@ +from policy.engine import PolicyEngine + +policy = PolicyEngine() + + +def report_execution(node, action, success, duration, temp_before, temp_after): + + thermal_spike = (temp_after - temp_before) > 0.15 + + policy.update_reputation( + node, + success, + duration, + thermal_spike + ) diff --git a/backend/pipeline/state.py b/backend/pipeline/state.py new file mode 100644 index 0000000..28f0229 --- /dev/null +++ b/backend/pipeline/state.py @@ -0,0 +1,19 @@ +from enum import Enum + + +class State(Enum): + PENDING = "pending" + PREFLIGHT = "preflight" + SCHEDULED = "scheduled" + EXECUTING = "executing" + + CACHE_HIT = "cache_hit" + DONE = "done" + + FAILED = "failed" + RETRY = "retry" + DEAD = "dead" + + THERMAL_GATED = "thermal_gated" + VM_ESCALATED = "vm_escalated" + POLICY_HELD = "policy_held" diff --git a/backend/policy/api.py b/backend/policy/api.py new file mode 100644 index 0000000..81d163d --- /dev/null +++ b/backend/policy/api.py @@ -0,0 +1,26 @@ +from events.emitter import emit_policy + +POLICY = None + + +def init(policy_engine): + global POLICY + POLICY = policy_engine + + +async def set_override(data): + + POLICY.set_override(data["key"], data["value"]) + + await emit_policy(data["key"], data["value"]) + + return {"status": "ok"} + + +async def clear_override(data): + + POLICY.clear_override(data["key"]) + + await emit_policy(data["key"], None) + + return {"status": "cleared"} diff --git a/backend/policy/engine.py b/backend/policy/engine.py new file mode 100644 index 0000000..ab1512e --- /dev/null +++ b/backend/policy/engine.py @@ -0,0 +1,98 @@ +class PolicyEngine: + + def __init__(self, db): + self.db = db + + self.rules = [] + self.overrides = {} + + # ----------------------------- + # REGISTER RULE + # ----------------------------- + def add_rule(self, rule): + self.rules.append(rule) + + # ----------------------------- + # SET LIVE OVERRIDE + # ----------------------------- + def set_override(self, key, value): + self.overrides[key] = value + + # ----------------------------- + # CLEAR OVERRIDE + # ----------------------------- + def clear_override(self, key): + if key in self.overrides: + del self.overrides[key] + + # ----------------------------- + # APPLY POLICY TO NODE SELECTION + # ----------------------------- + def evaluate(self, action, target, node): + + score_modifier = 0 + + # ----------------------------- + # APPLY STATIC RULES + # ----------------------------- + for rule in self.rules: + score_modifier += rule.apply(action, target, node) + + # ----------------------------- + # APPLY HEURISTICS + # ----------------------------- + score_modifier += self._heuristics(action, target, node) + + # ----------------------------- + # APPLY OVERRIDES + # ----------------------------- + score_modifier += self._overrides(action, target, node) + + return score_modifier + + # ----------------------------- + # HEURISTICS (LEARNING LAYER) + # ----------------------------- + def _heuristics(self, action, target, node): + + score = 0 + + history = self.db.events + + # simple heuristic: prefer nodes that succeeded before + success_count = 0 + + for session_id in history: + for event in history[session_id]: + if event["type"] == "action_end": + if ( + event["data"]["action"] == action["name"] + and event["data"]["state"] == "done" + and event["data"].get("node") == node["name"] + ): + success_count += 1 + + score += success_count * 2 + + return score + + # ----------------------------- + # LIVE OVERRIDES + # ----------------------------- + def _overrides(self, action, target, node): + + score = 0 + + # force node + forced = self.overrides.get("force_node") + + if forced and node["name"] == forced: + score += 1000 + + # avoid node + avoid = self.overrides.get("avoid_node") + + if avoid and node["name"] == avoid: + score -= 1000 + + return score diff --git a/backend/policy/rules.py b/backend/policy/rules.py new file mode 100644 index 0000000..df36f40 --- /dev/null +++ b/backend/policy/rules.py @@ -0,0 +1,47 @@ +class Rule: + + def apply(self, action, target, node): + return 0 + + +# ----------------------------- +# EXAMPLES +# ----------------------------- + +class AvoidHotNodes(Rule): + + def apply(self, action, target, node): + + if node.get("temp", 0) > 80: + return -100 + + return 0 + + +class PreferLXCForARM(Rule): + + def apply(self, action, target, node): + + if target.arch == "arm64" and target.runtime == "lxc": + return 20 + + return 0 + + +class AvoidVMForSmallBuilds(Rule): + + def apply(self, action, target, node): + + if target.runtime == "libvirt" and action.get("size") == "small": + return -50 + + return 0 + + +class SpreadLoad(Rule): + + def apply(self, action, target, node): + + load = node.get("cpu_load", 50) + + return -(load // 2) diff --git a/backend/recommend.py b/backend/recommend.py new file mode 100644 index 0000000..b88cf77 --- /dev/null +++ b/backend/recommend.py @@ -0,0 +1,115 @@ +from governor import thermal_cap +from cache import make_build_key, cache_hit + + +# ---------------------------- +# analyze cluster health +# ---------------------------- +def cluster_heat(nodes): + if not nodes: + return 0.5 + + total = 0 + count = 0 + + for n in nodes: + agent = n.get("agent") or {} + load = agent.get("load", "1 1 1") + + try: + total += float(load.split()[0]) + except: + total += 1.0 + + count += 1 + + return total / max(count, 1) + + +# ---------------------------- +# thermal risk scoring +# ---------------------------- +def risk_level(node): + agent = node.get("agent") or {} + + cap = thermal_cap(agent) + + if cap < 0.3: + return "HIGH_RISK" + elif cap < 0.6: + return "MODERATE" + return "SAFE" + + +# ---------------------------- +# predict build cost +# ---------------------------- +def estimate_build_cost(nodes): + cost = 0.0 + + for n in nodes: + agent = n.get("agent") or {} + load = agent.get("load", "1 1 1") + + try: + cost += float(load.split()[0]) + except: + cost += 1.0 + + return cost / max(len(nodes), 1) + + +# ---------------------------- +# recommendation engine +# ---------------------------- +def recommend(project, snapshot, nodes): + recommendations = { + "build_now": True, + "risk": "SAFE", + "cache_likely": False, + "preferred_nodes": [], + "reason": [] + } + + # ---------------------------- + # cache prediction + # ---------------------------- + key_sample = make_build_key( + snapshot["hash"], + list(project["targets"].keys())[0], + project.get("build_env", {}) + ) + + if cache_hit(key_sample): + recommendations["cache_likely"] = True + recommendations["reason"].append("Cache hit likely for current snapshot") + + # ---------------------------- + # cluster load analysis + # ---------------------------- + heat = cluster_heat(nodes) + + if heat > 3.0: + recommendations["build_now"] = False + recommendations["reason"].append("Cluster overloaded - recommend delay") + + # ---------------------------- + # node filtering + # ---------------------------- + safe_nodes = [] + + for n in nodes: + if risk_level(n) == "SAFE": + safe_nodes.append(n["name"]) + + recommendations["preferred_nodes"] = safe_nodes + + # ---------------------------- + # final risk classification + # ---------------------------- + if heat > 4.0: + recommendations["risk"] = "HIGH_RISK" + elif heat > 2.5: + recommendations["risk"] = "MODERATE" + + return recommendations diff --git a/backend/recommend_api.py b/backend/recommend_api.py new file mode 100644 index 0000000..16e91e7 --- /dev/null +++ b/backend/recommend_api.py @@ -0,0 +1,20 @@ +import json +from snapshot import create_snapshot +from recommend import recommend + + +def run(project, nodes): + snapshot = create_snapshot(project["source"]) + + result = recommend(project, snapshot, nodes) + + print(json.dumps(result, indent=2)) + + +if __name__ == "__main__": + import sys + + project = json.load(open(sys.argv[1])) + nodes = json.load(open(sys.argv[2])) + + run(project, nodes) diff --git a/backend/release-scheduler.py b/backend/release-scheduler.py new file mode 100644 index 0000000..6df1219 --- /dev/null +++ b/backend/release-scheduler.py @@ -0,0 +1,29 @@ +from governor import thermal_cap + + +def choose_nodes(nodes, target): + """ + Unified default mode: + all nodes participate unless thermally restricted + """ + + usable = [] + + for n in nodes: + agent = n.get("agent") or {} + + if thermal_cap(agent) < 0.3: + continue + + usable.append(n) + + return usable + + +def plan_release(project, nodes): + plan = {} + + for target in project["targets"]: + plan[target] = choose_nodes(nodes, target) + + return plan diff --git a/backend/release.py b/backend/release.py new file mode 100644 index 0000000..2530e9f --- /dev/null +++ b/backend/release.py @@ -0,0 +1,58 @@ +import os +import json +from snapshot import create_snapshot +from scheduler import build_distcc_hosts +from config import CONFIG + + +def run(cmd, cwd, env): + import subprocess + + p = subprocess.Popen( + cmd, + shell=True, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True + ) + + for line in p.stdout: + print(line, end="") + + p.wait() + return p.returncode + + +def build_target(project, snapshot, target, nodes): + env = os.environ.copy() + env["DISTCC_HOSTS"] = build_distcc_hosts(nodes) + + cmd = project["targets"][target] + + print(f"[FESTER] target={target}") + print(f"[FESTER] snapshot={snapshot['hash']}") + + return run(cmd, snapshot["path"], env) + + +def run_release(project): + snapshot = create_snapshot(project["source"]) + nodes = CONFIG["nodes"] + + results = {} + + for target in project["targets"]: + results[target] = build_target(project, snapshot, target, nodes) + + return results + + +if __name__ == "__main__": + import sys + + with open(sys.argv[1], "r") as f: + project = json.load(f) + + run_release(project) diff --git a/backend/scheduler.py b/backend/scheduler.py new file mode 100644 index 0000000..b7f2655 --- /dev/null +++ b/backend/scheduler.py @@ -0,0 +1,42 @@ +from governor import thermal_cap + + +def score_node(node): + if node["state"] == "offline": + return 0.0 + + agent = node.get("agent") or {} + + try: + load = float(agent.get("load", "1 1 1").split()[0]) + except: + load = 1.0 + + base = 10.0 + capacity = node.get("max_jobs", 8) + + score = (base + capacity) - (load * 3.0) + + return max(score, 0.1) * thermal_cap(agent) + + +def build_distcc_hosts(nodes): + scored = [] + + for n in nodes: + scored.append((n, score_node(n))) + + total = sum(s for _, s in scored) or 1.0 + + hosts = [] + + for node, score in scored: + if node["state"] == "offline": + continue + + weight = int((score / total) * 64) + weight = max(1, weight) + + hosts.append(f"{node['host']}/{weight}") + + return " ".join(hosts) diff --git a/backend/scheduler/cache_aware.py b/backend/scheduler/cache_aware.py new file mode 100644 index 0000000..6dbcf41 --- /dev/null +++ b/backend/scheduler/cache_aware.py @@ -0,0 +1,52 @@ +from cache import cache_hit +from nodes.role_store import get_role + + +def score_node(node, role, cache_key): + """ + Higher score = better node + """ + + agent = node.get("agent", {}) + load = float(agent.get("load", "1 1 1").split()[0]) + + score = 100.0 + + # ---------------------------- + # thermal penalty + # ---------------------------- + score -= load * 10 + + # ---------------------------- + # role weighting + # ---------------------------- + score *= role.compile_weight + + # ---------------------------- + # cache preference boost + # ---------------------------- + if role.cache_reader and cache_hit(cache_key): + score += 50 + + # ---------------------------- + # penalize overheating nodes + # ---------------------------- + if load > role.max_thermal_state * 4: + score -= 100 + + return score + + +def select_nodes(nodes, roles, cache_key): + ranked = [] + + for n in nodes: + role = get_role(n["name"], roles) + + score = score_node(n, role, cache_key) + + ranked.append((score, n["name"])) + + ranked.sort(reverse=True) + + return [name for score, name in ranked] diff --git a/backend/scheduler/metrics.py b/backend/scheduler/metrics.py new file mode 100644 index 0000000..6cded84 --- /dev/null +++ b/backend/scheduler/metrics.py @@ -0,0 +1,22 @@ +import requests + + +PROM_URL = "http://localhost:9090" + + +def get_node_load(node): + query = f'node_load1{{instance="{node}"}}' + + try: + r = requests.get( + f"{PROM_URL}/api/v1/query", + params={"query": query} + ) + + data = r.json() + + return float( + data["data"]["result"][0]["value"][1] + ) + except: + return 1.0 diff --git a/backend/scheduler/optimizer.py b/backend/scheduler/optimizer.py new file mode 100644 index 0000000..df90ace --- /dev/null +++ b/backend/scheduler/optimizer.py @@ -0,0 +1,35 @@ +from backend.nodes.state_model import NodeStateRegistry + + +def choose_best_node(nodes, action, registry: NodeStateRegistry = None): + + best_node = None + best_score = -999999 + + for node in nodes: + + state = registry.get(node["name"]) if registry else None + + cpu_load = state.cpu_load if state else node.get("cpu_load", 0) + temp = state.temp if state else node.get("temp", 0) + + score = 0 + score += (100 - cpu_load) + score -= temp + + if node.get("policy") == "preferred": + score += 20 + + if node.get("policy") == "avoid": + score -= 50 + + if state: + score -= state.instability * 10 + + node["_computed_score"] = score + + if score > best_score: + best_score = score + best_node = node + + return best_node diff --git a/backend/scheduler/target_optimizer.py b/backend/scheduler/target_optimizer.py new file mode 100644 index 0000000..ac70b34 --- /dev/null +++ b/backend/scheduler/target_optimizer.py @@ -0,0 +1,52 @@ +from policy.engine import PolicyEngine + + +class TargetOptimizer: + + def __init__(self, nodes, policy_engine): + self.nodes = nodes + self.policy = policy_engine + + def choose(self, action, target): + + candidates = [] + + for node in self.nodes: + + if not self._compatible(node, target): + continue + + base_score = self._score(node) + + policy_score = self.policy.evaluate(action, target, node) + + total = base_score + policy_score + + candidates.append((node, total)) + + if not candidates: + raise Exception("No compatible nodes") + + candidates.sort(key=lambda x: x[1], reverse=True) + + return candidates[0][0] + + def _compatible(self, node, target): + + if node["arch"] != target.arch: + if target.runtime == "host": + return False + + if target.runtime not in node["runtimes"]: + return False + + return True + + def _score(self, node): + + score = 0 + + score += (100 - node.get("cpu_load", 50)) + score += (100 - node.get("temp", 60)) + + return score diff --git a/backend/session/db.py b/backend/session/db.py new file mode 100644 index 0000000..776b742 --- /dev/null +++ b/backend/session/db.py @@ -0,0 +1,80 @@ +import time +import uuid +from collections import defaultdict + + +class SessionDB: + + def __init__(self): + # session_id โ†’ full timeline + self.sessions = {} + + # session_id โ†’ event log + self.events = defaultdict(list) + + # session_id โ†’ snapshots (state checkpoints) + self.snapshots = defaultdict(list) + + # ----------------------------- + # CREATE SESSION + # ----------------------------- + def create_session(self, project): + sid = str(uuid.uuid4()) + + self.sessions[sid] = { + "project": project, + "created_at": time.time(), + "status": "active" + } + + return sid + + # ----------------------------- + # EVENT LOGGING (IMMUTABLE) + # ----------------------------- + def log_event(self, session_id, event_type, payload): + + event = { + "ts": time.time(), + "type": event_type, + "data": payload + } + + self.events[session_id].append(event) + + return event + + # ----------------------------- + # SNAPSHOT (STATE CHECKPOINT) + # ----------------------------- + def snapshot(self, session_id, state): + + snap = { + "ts": time.time(), + "state": state + } + + self.snapshots[session_id].append(snap) + + return snap + + # ----------------------------- + # GET FULL TIMELINE + # ----------------------------- + def get_session(self, session_id): + + return { + "session": self.sessions.get(session_id), + "events": self.events.get(session_id, []), + "snapshots": self.snapshots.get(session_id, []) + } + + # ----------------------------- + # REPLAY ENGINE INPUT + # ----------------------------- + def get_replay_stream(self, session_id): + + return sorted( + self.events[session_id], + key=lambda e: e["ts"] + ) diff --git a/backend/session/replay.py b/backend/session/replay.py new file mode 100644 index 0000000..2e0ebc1 --- /dev/null +++ b/backend/session/replay.py @@ -0,0 +1,42 @@ +from session.db import SessionDB + + +class SessionReplay: + + def __init__(self, db: SessionDB): + self.db = db + + # ----------------------------- + # REPLAY SESSION + # ----------------------------- + def replay(self, session_id): + + stream = self.db.get_replay_stream(session_id) + + for event in stream: + + yield { + "ts": event["ts"], + "type": event["type"], + "data": event["data"] + } + + # ----------------------------- + # REPLAY FROM SNAPSHOT + # ----------------------------- + def replay_from_snapshot(self, session_id, index=0): + + snapshots = self.db.snapshots[session_id] + + if not snapshots: + return [] + + base = snapshots[index] + + return { + "snapshot": base, + "events_after": [ + e for e in self.db.events[session_id] + if e["ts"] >= base["ts"] + ] + } diff --git a/backend/session/session.py b/backend/session/session.py new file mode 100644 index 0000000..49fa14d --- /dev/null +++ b/backend/session/session.py @@ -0,0 +1,46 @@ +from session.db import SessionDB +from integrations.tmux import TmuxManager + + +class BuildSession: + + def __init__(self, project, nodes, db: SessionDB): + + self.project = project + self.nodes = nodes + + self.db = db + self.session_id = db.create_session(project) + + self.tmux = TmuxManager() + + # ----------------------------- + # REGISTER ACTION RUNTIME + # ----------------------------- + def attach_action_runtime(self, action, node): + + cmd = action.get("command", "make -j") + + session_name = self.tmux.create_session(action, cmd) + + self.db.log_event( + self.session_id, + "tmux_spawn", + { + "action": action["name"], + "node": node["name"], + "session": session_name + } + ) + + return session_name + + # ----------------------------- + # SNAPSHOT HOOK + # ----------------------------- + def snapshot(self, state): + + return self.db.snapshot( + self.session_id, + state + ) diff --git a/backend/snapshot.py b/backend/snapshot.py new file mode 100644 index 0000000..8c7dbba --- /dev/null +++ b/backend/snapshot.py @@ -0,0 +1,114 @@ +import os +import subprocess +import tempfile +import hashlib +from datetime import datetime + + +# ---------------------------- +# utility: run command +# ---------------------------- +def run(cmd, cwd="/"): + process = subprocess.Popen( + cmd, + shell=True, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True + ) + + out = [] + for line in process.stdout: + out.append(line) + + process.wait() + return "".join(out), process.returncode + + +# ---------------------------- +# git (Forgejo primary) +# ---------------------------- +def snapshot_git(repo_url): + workdir = tempfile.mkdtemp(prefix="fester-snap-git-") + + run(f"git clone --depth 1 {repo_url} {workdir}") + + return workdir + + +# ---------------------------- +# hg adapter (snapshot only) +# ---------------------------- +def snapshot_hg(repo_url): + workdir = tempfile.mkdtemp(prefix="fester-snap-hg-") + + run(f"hg clone {repo_url} {workdir}") + + return workdir + + +# ---------------------------- +# svn adapter (export only) +# ---------------------------- +def snapshot_svn(repo_url): + workdir = tempfile.mkdtemp(prefix="fester-snap-svn-") + + run(f"svn checkout {repo_url} {workdir}") + + return workdir + + +# ---------------------------- +# cvs adapter (legacy dump) +# ---------------------------- +def snapshot_cvs(repo_url): + workdir = tempfile.mkdtemp(prefix="fester-snap-cvs-") + + # best-effort export only + run(f"cvs export -d {workdir} {repo_url}") + + return workdir + + +# ---------------------------- +# unified entry point +# ---------------------------- +def create_snapshot(source): + kind = source["type"] + url = source["url"] + + if kind == "git": + path = snapshot_git(url) + elif kind == "hg": + path = snapshot_hg(url) + elif kind == "svn": + path = snapshot_svn(url) + elif kind == "cvs": + path = snapshot_cvs(url) + else: + raise Exception(f"Unsupported VCS type: {kind}") + + return fingerprint(path) + + +# ---------------------------- +# deterministic fingerprint +# ---------------------------- +def fingerprint(path): + sha = hashlib.sha256() + + for root, dirs, files in os.walk(path): + for f in sorted(files): + fp = os.path.join(root, f) + try: + with open(fp, "rb") as fh: + sha.update(fh.read()) + except: + pass + + return { + "path": path, + "hash": sha.hexdigest(), + "timestamp": datetime.utcnow().isoformat() + "Z" + } diff --git a/backend/storage/btrfs_cas.py b/backend/storage/btrfs_cas.py new file mode 100644 index 0000000..5ea6f99 --- /dev/null +++ b/backend/storage/btrfs_cas.py @@ -0,0 +1,30 @@ +import os +import shutil +from pathlib import Path + + +def is_btrfs(path): + try: + import subprocess + out = subprocess.check_output(["stat", "-f", "-c", "%T", path]).decode() + return "btrfs" in out.lower() + except: + return False + + +def store_reflink(src, dst): + """ + Fast copy using CoW if supported + """ + try: + shutil.copy2(src, dst, follow_symlinks=True) + except Exception: + shutil.copy(src, dst) + + +def cas_path(config, key): + base = Path(config["btrfs_path"]) + path = base / key + + path.parent.mkdir(parents=True, exist_ok=True) + return str(path) diff --git a/backend/storage/config.py b/backend/storage/config.py new file mode 100644 index 0000000..5658de8 --- /dev/null +++ b/backend/storage/config.py @@ -0,0 +1,31 @@ +import json +from pathlib import Path + + +DEFAULT_CONFIG_PATH = Path("/etc/fester/storage.json") + + +DEFAULT_CONFIG = { + "tmpfs": True, + "btrfs": False, + "qcow2_freeze": False, + "btrfs_path": "/var/lib/fester/cas", + "tmpfs_path": "/dev/shm/fester-build", + "qcow2_path": "/var/lib/fester/snapshots" +} + + +def load_storage_config(): + if not DEFAULT_CONFIG_PATH.exists(): + return DEFAULT_CONFIG + + try: + with open(DEFAULT_CONFIG_PATH) as f: + user_cfg = json.load(f) + except: + user_cfg = {} + + cfg = DEFAULT_CONFIG.copy() + cfg.update(user_cfg) + + return cfg diff --git a/backend/storage/qcow2_freeze.py b/backend/storage/qcow2_freeze.py new file mode 100644 index 0000000..2bde334 --- /dev/null +++ b/backend/storage/qcow2_freeze.py @@ -0,0 +1,25 @@ +import subprocess +from pathlib import Path + + +def freeze_to_qcow2(workdir, output_path): + """ + Creates a frozen snapshot of a build workspace. + """ + + out = Path(output_path) + out.parent.mkdir(parents=True, exist_ok=True) + + cmd = [ + "qemu-img", + "create", + "-f", "qcow2", + str(out), + "10G" + ] + + subprocess.run(cmd, check=True) + + # NOTE: + # Real implementation would rsync or tar into mounted image + return str(out) diff --git a/backend/storage/router.py b/backend/storage/router.py new file mode 100644 index 0000000..5a49be2 --- /dev/null +++ b/backend/storage/router.py @@ -0,0 +1,39 @@ +from storage.config import load_storage_config +from storage.tmpfs import get_tmpfs_workspace +from storage.btrfs_cas import cas_path, is_btrfs +from storage.qcow2_freeze import freeze_to_qcow2 + + +def prepare_workspace(project_name): + cfg = load_storage_config() + + workspace = { + "tmpfs": None, + "cas": None, + "freeze": None, + "mode": cfg + } + + # ---------------------------- + # ALWAYS: tmpfs + # ---------------------------- + workspace["tmpfs"] = get_tmpfs_workspace(cfg, project_name) + + # ---------------------------- + # OPTIONAL: Btrfs CAS + # ---------------------------- + if cfg.get("btrfs"): + workspace["cas"] = cas_path(cfg, project_name) + + return workspace + + +def maybe_freeze_workspace(workdir, project_name): + cfg = load_storage_config() + + if not cfg.get("qcow2_freeze"): + return None + + output = f"/var/lib/fester/snapshots/{project_name}.qcow2" + + return freeze_to_qcow2(workdir, output) diff --git a/backend/storage/tmpfs.py b/backend/storage/tmpfs.py new file mode 100644 index 0000000..c9595f1 --- /dev/null +++ b/backend/storage/tmpfs.py @@ -0,0 +1,19 @@ +import os +from pathlib import Path + + +def ensure_tmpfs(path): + p = Path(path) + + if not p.exists(): + p.mkdir(parents=True, exist_ok=True) + + # NOTE: mount handled externally (systemd/fstab recommended) + + +def get_tmpfs_workspace(config, project_name): + base = Path(config["tmpfs_path"]) + ws = base / project_name + + ws.mkdir(parents=True, exist_ok=True) + return str(ws) diff --git a/backend/targets/catalog.py b/backend/targets/catalog.py new file mode 100644 index 0000000..9c7eaf7 --- /dev/null +++ b/backend/targets/catalog.py @@ -0,0 +1,46 @@ +TARGET_SYSTEMS = { + "gentoo": { + "type": "source-based", + "toolchain": "stage", + "supports_cross": True + }, + "buildroot": { + "type": "embedded", + "toolchain": "cross", + "supports_cross": True + }, + "openwrt": { + "type": "firmware", + "toolchain": "cross", + "supports_cross": True + }, + "alfs": { + "type": "manual-lfs", + "toolchain": "stage", + "supports_cross": False + }, + "sourcemage": { + "type": "source-based", + "toolchain": "native", + "supports_cross": False + }, + "lunar": { + "type": "source-based", + "toolchain": "native", + "supports_cross": False + } +} + + +ARCHES = [ + "x86_64", + "arm64", + "riscv64" +] + + +RUNTIMES = [ + "host", + "lxc", + "libvirt" +] diff --git a/backend/targets/router.py b/backend/targets/router.py new file mode 100644 index 0000000..7218865 --- /dev/null +++ b/backend/targets/router.py @@ -0,0 +1,28 @@ +from integrations.lxc import LXCManager +from integrations.libvirt import LibvirtManager + + +class TargetRouter: + + def __init__(self): + + self.lxc = LXCManager() + self.libvirt = LibvirtManager() + + # ----------------------------- + # EXECUTION DISPATCH + # ----------------------------- + def run(self, target, command): + + if target.runtime == "lxc": + return self.lxc.execute(target, command) + + if target.runtime == "libvirt": + return self.libvirt.execute(target, command) + + # default: host + return self._run_host(command) + + def _run_host(self, command): + import subprocess + return subprocess.call(command, shell=True) diff --git a/backend/targets/spec.py b/backend/targets/spec.py new file mode 100644 index 0000000..8f600e8 --- /dev/null +++ b/backend/targets/spec.py @@ -0,0 +1,20 @@ +class Target: + + def __init__(self, + name, + arch, + os, + runtime, + toolchain, + mode="native"): + + self.name = name + self.arch = arch + self.os = os + self.runtime = runtime # host | lxc | libvirt + self.toolchain = toolchain + + self.mode = mode # native | cross | verify + + def is_cross(self): + return self.arch != "x86_64" diff --git a/backend/targets/target.py b/backend/targets/target.py new file mode 100644 index 0000000..48eba32 --- /dev/null +++ b/backend/targets/target.py @@ -0,0 +1,15 @@ +class Target: + + def __init__(self, + system, + arch, + runtime, + profile="default"): + + self.system = system + self.arch = arch + self.runtime = runtime + self.profile = profile + + def key(self): + return f"{self.system}-{self.arch}-{self.runtime}-{self.profile}" diff --git a/backend/utils.py b/backend/utils.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/fester.py b/cli/fester.py new file mode 100644 index 0000000..3dd43b3 --- /dev/null +++ b/cli/fester.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 + +import argparse +import requests +import json +import websocket + +API = "http://localhost:8080/api" +WS = "ws://localhost:8080/ws" + + +# ----------------------------- +# LIVE STREAM (IDE MODE) +# ----------------------------- +def stream(): + ws = websocket.WebSocket() + ws.connect(WS) + + print("๐Ÿ“ก Fester IDE stream active\n") + + while True: + msg = ws.recv() + event = json.loads(msg) + + if event["type"] == "pipeline": + d = event["data"] + print(f"[PIPELINE] {d['action']} โ†’ {d['state']} @ {d.get('node')}") + + elif event["type"] == "node": + print(f"[NODE] {event['data']}") + + +# ----------------------------- +# PIPELINE CONTROL +# ----------------------------- +def status(): + r = requests.get(f"{API}/pipeline/state") + print(json.dumps(r.json(), indent=2)) + + +def retry(action): + r = requests.post(f"{API}/pipeline/retry", json={"action": action}) + print(r.json()) + + +def force(action, node): + r = requests.post(f"{API}/pipeline/force-node", json={ + "action": action, + "node": node + }) + print(r.json()) + + +def pause(): + requests.post(f"{API}/pipeline/pause") + + +def resume(): + requests.post(f"{API}/pipeline/resume") + + +# ----------------------------- +# SESSION CONTROL (IDE FEATURE) +# ----------------------------- +def sessions(): + r = requests.get(f"{API}/sessions") + print(json.dumps(r.json(), indent=2)) + + +def attach(action): + r = requests.post(f"{API}/session/attach", json={"action": action}) + print(r.json()) + + +def detach(action): + r = requests.post(f"{API}/session/detach", json={"action": action}) + print(r.json()) + + +# ----------------------------- +# POLICY CONTROL (WIZARD LAYER) +# ----------------------------- +def set_policy(key, value): + r = requests.post(f"{API}/policy/set", json={ + "key": key, + "value": value + }) + print(r.json()) + + +def clear_policy(key): + r = requests.post(f"{API}/policy/clear", json={ + "key": key + }) + print(r.json()) + + +# ----------------------------- +# CLI ENTRYPOINT +# ----------------------------- +def main(): + parser = argparse.ArgumentParser("fester IDE CLI") + + sub = parser.add_subparsers(dest="cmd") + + # core + sub.add_parser("stream") + sub.add_parser("status") + sub.add_parser("sessions") + + # pipeline control + r = sub.add_parser("retry") + r.add_argument("action") + + f = sub.add_parser("force") + f.add_argument("action") + f.add_argument("node") + + sub.add_parser("pause") + sub.add_parser("resume") + + # session control + a = sub.add_parser("attach") + a.add_argument("action") + + d = sub.add_parser("detach") + d.add_argument("action") + + # policy control + p = sub.add_parser("policy-set") + p.add_argument("key") + p.add_argument("value") + + c = sub.add_parser("policy-clear") + c.add_argument("key") + + args = parser.parse_args() + + # ----------------------------- + # COMMAND HANDLING + # ----------------------------- + if args.cmd == "stream": + stream() + + elif args.cmd == "status": + status() + + elif args.cmd == "sessions": + sessions() + + elif args.cmd == "retry": + retry(args.action) + + elif args.cmd == "force": + force(args.action, args.node) + + elif args.cmd == "attach": + attach(args.action) + + elif args.cmd == "detach": + detach(args.action) + + elif args.cmd == "pause": + pause() + + elif args.cmd == "resume": + resume() + + elif args.cmd == "policy-set": + set_policy(args.key, args.value) + + elif args.cmd == "policy-clear": + clear_policy(args.key) + + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/cockpit/fester-module/targets.js b/cockpit/fester-module/targets.js new file mode 100644 index 0000000..cf3381a --- /dev/null +++ b/cockpit/fester-module/targets.js @@ -0,0 +1,30 @@ +async function loadTargets() { + + const res = await fetch("/api/targets"); + const data = await res.json(); + + const sys = document.getElementById("system"); + const arch = document.getElementById("arch"); + const runtime = document.getElementById("runtime"); + + data.systems.forEach(s => { + let o = document.createElement("option"); + o.value = s; + o.text = s; + sys.appendChild(o); + }); + + data.arches.forEach(a => { + let o = document.createElement("option"); + o.value = a; + o.text = a; + arch.appendChild(o); + }); + + data.runtimes.forEach(r => { + let o = document.createElement("option"); + o.value = r; + o.text = r; + runtime.appendChild(o); + }); +} diff --git a/cockpit/fester-module/ui.html b/cockpit/fester-module/ui.html new file mode 100644 index 0000000..1b2c0a6 --- /dev/null +++ b/cockpit/fester-module/ui.html @@ -0,0 +1,244 @@ + + + + +Fester Debug Cockpit + + + + + + +
+ + +
+ +

Build Control

+ + + + + + + + + +
+ + +
+ + +
+

Node Debugger

+
Select a node
+
+ +
+ + + + + diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..5e87db2 --- /dev/null +++ b/config.yaml @@ -0,0 +1,23 @@ +master: + name: fester-master + role: control + +nodes: + - name: x99-v3 + host: 192.168.1.10 + max_jobs: 24 + + - name: x99-v4 + host: 192.168.1.11 + max_jobs: 30 + + +projects: + + - name: linux-tool + repo: https://forgejo.local/linux-tool.git + + targets: + debian: "make clean && make debian" + arch: "make clean && make arch" + fedora: "make clean && make fedora" diff --git a/debugger/interactive_engine.py b/debugger/interactive_engine.py new file mode 100644 index 0000000..cf0195f --- /dev/null +++ b/debugger/interactive_engine.py @@ -0,0 +1,124 @@ +# debugger/interactive_engine.py + +import asyncio + +class InteractiveDebuggerEngine: + + def __init__(self, nodes, graph, scheduler, timeline, ws): + + self.nodes = nodes + self.graph = graph + self.scheduler = scheduler + self.timeline = timeline + self.ws = ws + + self.paused = True + self.step_mode = True + self.current_index = 0 + self.actions = [] + + # ----------------------------- + # LOAD EXECUTION PLAN + # ----------------------------- + def load_plan(self, actions): + self.actions = actions + self.current_index = 0 + + # ----------------------------- + # CONTROL + # ----------------------------- + def pause(self): + self.paused = True + + def resume(self): + self.paused = False + + def step(self): + self.paused = False + + # ----------------------------- + # MAIN EXECUTION LOOP + # ----------------------------- + async def run(self): + + while self.current_index < len(self.actions): + + action = self.actions[self.current_index] + + # ----------------------------- + # SCHEDULER PREVIEW (NO EXECUTION YET) + # ----------------------------- + decision = self.scheduler.choose_best_node( + self.nodes, + action + ) + + preview_event = { + "type": "debugger_preview", + "data": { + "action": action["name"], + "selected_node": decision["best_node"]["name"], + "alternatives": decision["explanation"]["top_candidates"], + "state": "awaiting_step" + } + } + + await self.ws.broadcast(preview_event) + self.timeline.record(preview_event) + + # ----------------------------- + # WAIT FOR USER STEP + # ----------------------------- + while self.paused: + await asyncio.sleep(0.05) + + # auto-pause again after step + self.paused = True + + # ----------------------------- + # EXECUTE ACTION + # ----------------------------- + node = decision["best_node"] + + exec_event = { + "type": "node", + "data": { + "action": action["name"], + "node": node["name"], + "state": "running", + "mode": "debug_step" + } + } + + await self.ws.broadcast(exec_event) + self.timeline.record(exec_event) + + # simulate execution hook (replace with real executor) + rc = await self.execute(action, node) + + final_state = "done" if rc == 0 else "failed" + + final_event = { + "type": "node", + "data": { + "action": action["name"], + "node": node["name"], + "state": final_state, + "mode": "debug_step" + } + } + + await self.ws.broadcast(final_event) + self.timeline.record(final_event) + + self.current_index += 1 + + # ----------------------------- + # EXECUTION HOOK + # ----------------------------- + async def execute(self, action, node): + """ + Replace with distcc / lxc / libvirt execution layer + """ + await asyncio.sleep(0.2) + return 0 diff --git a/fester.js b/fester.js new file mode 100644 index 0000000..57b7053 --- /dev/null +++ b/fester.js @@ -0,0 +1,48 @@ +document.addEventListener("DOMContentLoaded", function () { + + function color(heat) { + if (heat < 30) return "#2ecc71"; + if (heat < 60) return "#f1c40f"; + if (heat < 80) return "#e67e22"; + return "#e74c3c"; + } + + document.getElementById("load").addEventListener("click", function () { + + cockpit.spawn( + ["python3", "/usr/share/cockpit/fester/backend/nodes.py"], + { superuser: "require" } + ) + .then(data => { + + const parsed = JSON.parse(data); + + let html = `

Master

+
${JSON.stringify(parsed.master, null, 2)}
+

Nodes

`; + + parsed.nodes.forEach(n => { + + html += ` +
+ ${n.name}
+ state: ${n.state}
+ heat: ${n.heat}
+
+ `; + }); + + document.getElementById("nodes").innerHTML = html; + }) + .catch(err => { + document.getElementById("nodes").textContent = err.toString(); + }); + }); + +}); diff --git a/index.html b/index.html new file mode 100644 index 0000000..9f01a3c --- /dev/null +++ b/index.html @@ -0,0 +1,31 @@ + +

Fester Build Control

+ + +

+
+    
+ +

Custom Build

+ +
+

+ +
+

+ + + +

Output

+

+
+

Release Control

+ +
+

+ + + +

+
+
diff --git a/install-fester.sh b/install-fester.sh
new file mode 100644
index 0000000..ab1e656
--- /dev/null
+++ b/install-fester.sh
@@ -0,0 +1,195 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+echo "๐Ÿง  Fester Bootstrap Installer Starting..."
+
+BASE_DIR="${BASE_DIR:-/usr/share/cockpit/fester}"
+
+echo "๐Ÿ“ Creating directory structure at $BASE_DIR"
+
+mkdir -p "$BASE_DIR"/{backend,ui,analysis,cli,cockpit,fester,docs,config,integrations}
+
+mkdir -p "$BASE_DIR/backend"/{api,executor,scheduler,analysis,cache,graph,events,metrics,targets,policy,pipeline,nodes,storage,integrations}
+
+mkdir -p "$BASE_DIR/ui"
+mkdir -p "$BASE_DIR/docs"
+
+# ------------------------------------------------------------
+# CONFIG
+# ------------------------------------------------------------
+cat > "$BASE_DIR/config.yaml" <<'EOF'
+cluster:
+  name: fester-cluster
+  scheduler: weighted
+  default_target: native
+
+cache:
+  backend: minio
+  tmpfs_acceleration: true
+  btrfs_snapshot: optional
+
+observability:
+  prometheus: true
+  grafana: true
+
+execution:
+  distcc: true
+  libvirt: true
+  lxc: true
+EOF
+
+# ------------------------------------------------------------
+# CHEATSHEET
+# ------------------------------------------------------------
+cat > "$BASE_DIR/docs/CHEATSHEET.md" <<'EOF'
+# ๐Ÿง  Fester Cheatsheet
+
+## Build
+fester build ./project.yaml
+
+## Nodes
+fester node list
+fester node set-policy  preferred
+
+## Debug
+/api/autopsy/
+/api/replay/
+
+## UI
+Cockpit Module: Fester
+Live DAG: /ui/live_dag.html
+EOF
+
+# ------------------------------------------------------------
+# LICENSE
+# ------------------------------------------------------------
+cat > "$BASE_DIR/LICENSE" <<'EOF'
+AGPLv3 - See https://www.gnu.org/licenses/
+
+This system is provided AS IS with no warranty.
+EOF
+
+# ------------------------------------------------------------
+# SIMPLE UI ENTRY POINT
+# ------------------------------------------------------------
+cat > "$BASE_DIR/ui/live_dag.html" <<'EOF'
+
+
+
+Fester Live DAG
+
+
+
+
+ + + +EOF + +# ------------------------------------------------------------ +# PROMETHEUS CONFIG +# ------------------------------------------------------------ +cat > "$BASE_DIR/add-to-prometheus-config.yml" <<'EOF' +scrape_configs: + - job_name: 'fester' + static_configs: + - targets: ['localhost:9100'] +EOF + +# ------------------------------------------------------------ +# GRAFANA CONFIG +# ------------------------------------------------------------ +cat > "$BASE_DIR/add-to-grafana-config.json" <<'EOF' +{ + "dashboard": { + "title": "Fester Cluster Overview" + } +} +EOF + +# ------------------------------------------------------------ +# COCKPIT MODULE ENTRY +# ------------------------------------------------------------ +cat > "$BASE_DIR/manifest.json" <<'EOF' +{ + "version": 1, + "name": "fester", + "label": "Fester Cluster Control", + "entrypoint": "index.html" +} +EOF + +# ------------------------------------------------------------ +# CLI BOOTSTRAP +# ------------------------------------------------------------ +cat > "$BASE_DIR/cli/fester.py" <<'EOF' +#!/usr/bin/env python3 +import argparse + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("cmd") + args = parser.parse_args() + print("Fester CLI:", args.cmd) + +if __name__ == "__main__": + main() +EOF + +chmod +x "$BASE_DIR/cli/fester.py" + +# ------------------------------------------------------------ +# BASIC INDEX +# ------------------------------------------------------------ +cat > "$BASE_DIR/index.html" <<'EOF' + + +

Fester Control Plane

+Live DAG + + +EOF + +# ------------------------------------------------------------ +# INITIAL CONFIG WIZARD +# ------------------------------------------------------------ +echo "โš™๏ธ Initial Configuration" + +read -p "Cluster name [fester-cluster]: " CLUSTER_NAME +CLUSTER_NAME=${CLUSTER_NAME:-fester-cluster} + +read -p "Enable libvirt? (y/n) [y]: " LIBVIRT +LIBVIRT=${LIBVIRT:-y} + +read -p "Enable LXC? (y/n) [y]: " LXC +LXC=${LXC:-y} + +read -p "Enable distcc? (y/n) [y]: " DISTCC +DISTCC=${DISTCC:-y} + +cat > "$BASE_DIR/config.runtime.yaml" < "$BASE_DIR/config.yaml" < "$BASE_DIR/LICENSE" <<'EOF' +Fester Distributed Build System + +Licensed under GNU AGPL v3 + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License. + +This software is provided "AS IS", without warranty of any kind. +EOF + + +# ============================================================ +# CHEATSHEET +# ============================================================ + +cat > "$BASE_DIR/CHEATSHEET.md" <<'EOF' +# ๐Ÿง  Fester Cheatsheet + +## Build +fester build + +## Node Control +fester node list +fester node set-policy preferred|avoid + +## Debug +/api/autopsy/ +/api/replay/ + +## UI +- Live DAG: /ui/live_dag.html +- Replay: /ui/replay.html +EOF + + +# ============================================================ +# INDEX PAGE (COCKPIT ENTRY) +# ============================================================ + +cat > "$BASE_DIR/index.html" <<'EOF' + +Fester Control Plane + +

Fester Cluster Control

+ + + +EOF + + +# ============================================================ +# UI: LIVE DAG (FULL FILE) +# ============================================================ + +cat > "$BASE_DIR/ui_live_dag.html" <<'EOF' + + + + + + +
+ + + + + +EOF + + +# ============================================================ +# PROMETHEUS CONFIG +# ============================================================ + +cat > "$BASE_DIR/add-to-prometheus-config.yml" <<'EOF' +scrape_configs: + - job_name: 'fester' + static_configs: + - targets: ['localhost:9100'] +EOF + + +# ============================================================ +# GRAFANA CONFIG +# ============================================================ + +cat > "$BASE_DIR/add-to-grafana-config.json" <<'EOF' +{ + "dashboard": { + "title": "Fester Cluster Overview" + } +} +EOF + + +# ============================================================ +# CLI TOOL (FULL FILE) +# ============================================================ + +mkdir -p "$BASE_DIR/cli" + +cat > "$BASE_DIR/cli/fester.py" <<'EOF' +#!/usr/bin/env python3 + +import argparse + +def main(): + parser = argparse.ArgumentParser() + sub = parser.add_subparsers(dest="cmd") + + build = sub.add_parser("build") + build.add_argument("project") + + node = sub.add_parser("node") + node.add_argument("action") + node.add_argument("target", nargs="*") + + args = parser.parse_args() + + if args.cmd == "build": + print("Building:", args.project) + + elif args.cmd == "node": + print("Node action:", args.action, args.target) + + else: + print("Fester CLI ready") + +if __name__ == "__main__": + main() +EOF + +chmod +x "$BASE_DIR/cli/fester.py" + + +# ============================================================ +# SIMPLE COCKPIT MANIFEST +# ============================================================ + +cat > "$BASE_DIR/manifest.json" <<'EOF' +{ + "version": 1, + "name": "fester", + "label": "Fester Cluster Control", + "entrypoint": "index.html" +} +EOF + + +# ============================================================ +# FINAL OUTPUT +# ============================================================ + +echo "" +echo "------------------------------------------------" +echo "โœ… Fester installation complete" +echo "๐Ÿ“ Installed at: $BASE_DIR" +echo "" +echo "Next steps:" +echo " 1. Open Cockpit" +echo " 2. Select Fester module" +echo " 3. Open Live DAG" +echo "------------------------------------------------" diff --git a/manifest.json b/manifest.json new file mode 100644 index 0000000..fbd6430 --- /dev/null +++ b/manifest.json @@ -0,0 +1,10 @@ +{ + "version": 1, + "name": "fester", + "tools": { + "fester": { + "label": "Fester", + "path": "index.html" + } + } +} diff --git a/optimizer.py b/optimizer.py new file mode 100644 index 0000000..902caa1 --- /dev/null +++ b/optimizer.py @@ -0,0 +1,12 @@ +from prometheus_client import Gauge + +target_score_thermal = Gauge("fester_target_score_thermal", "Thermal score", ["target"]) +target_score_cost = Gauge("fester_target_score_cost", "Cost score", ["target"]) +target_score_instability = Gauge("fester_target_score_instability", "Instability score", ["target"]) + + +def export_scores(target, score): + + target_score_thermal.labels(target=target).set(score["thermal"]) + target_score_cost.labels(target=target).set(score["cost"]) + target_score_instability.labels(target=target).set(score["instability"]) diff --git a/project.schema.json b/project.schema.json new file mode 100644 index 0000000..5ecdc40 --- /dev/null +++ b/project.schema.json @@ -0,0 +1,170 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Fester Project Definition", + "type": "object", + "required": ["name", "source", "targets"], + "additionalProperties": false, + + "properties": { + + "name": { + "type": "string", + "description": "Human-readable project name" + }, + + "version": { + "type": "string", + "description": "Optional project version tag (release tracking)", + "default": "0.0.0" + }, + + "source": { + "type": "object", + "required": ["type", "url"], + "additionalProperties": false, + + "properties": { + + "type": { + "type": "string", + "enum": ["git", "hg", "svn", "cvs"], + "description": "Source control system type (normalized into snapshot)" + }, + + "url": { + "type": "string", + "description": "Repository URL or path for checkout/export" + }, + + "ref": { + "type": "string", + "description": "Optional branch/tag/revision reference (git branch, svn rev, hg changeset)", + "default": "HEAD" + } + } + }, + + "targets": { + "type": "object", + "description": "Build targets mapped to shell commands", + "additionalProperties": { + "type": "string" + } + }, + + "resources": { + "type": "object", + "description": "Optional resource hints for scheduler optimization", + + "properties": { + + "max_parallel_builds": { + "type": "integer", + "minimum": 1, + "maximum": 256, + "default": 64 + }, + + "thermal_priority": { + "type": "string", + "enum": ["low", "balanced", "high_safety"], + "default": "high_safety" + }, + + "preferred_nodes": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Optional node names preferred for this project" + }, + + "avoid_nodes": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Nodes to exclude from scheduling" + } + } + }, + + "build_env": { + "type": "object", + "description": "Environment variables injected into build runtime", + + "additionalProperties": { + "type": "string" + } + }, + + "artifacts": { + "type": "object", + "description": "Output packaging configuration", + + "properties": { + + "enabled": { + "type": "boolean", + "default": true + }, + + "format": { + "type": "string", + "enum": ["tar", "tar.gz", "tar.zst", "deb", "rpm"], + "default": "tar.zst" + }, + + "output_dir": { + "type": "string", + "default": "./dist" + }, + + "naming": { + "type": "string", + "description": "Artifact naming template", + "default": "{name}-{version}-{target}-{hash}" + } + } + }, + + "cache": { + "type": "object", + "description": "Build cache behavior control", + + "properties": { + + "enabled": { + "type": "boolean", + "default": true + }, + + "reuse_snapshots": { + "type": "boolean", + "default": true + }, + + "skip_if_unchanged": { + "type": "boolean", + "default": true + } + } + }, + + "logging": { + "type": "object", + "properties": { + + "verbose": { + "type": "boolean", + "default": true + }, + + "stream_to_ui": { + "type": "boolean", + "default": true + } + } + } + } +} diff --git a/release.js b/release.js new file mode 100644 index 0000000..480be98 --- /dev/null +++ b/release.js @@ -0,0 +1,18 @@ +document.addEventListener("DOMContentLoaded", function () { + + function runRelease() { + + const repo = document.getElementById("repo").value; + + cockpit.spawn( + ["python3", "/usr/share/cockpit/fester/backend/release.py", repo], + { superuser: "require" } + ) + .stream(function (data) { + document.getElementById("release_output").textContent += data; + }); + } + + document.getElementById("run_release").addEventListener("click", runRelease); + +}); diff --git a/scheduler/optimizer.py b/scheduler/optimizer.py new file mode 100644 index 0000000..4b70ab0 --- /dev/null +++ b/scheduler/optimizer.py @@ -0,0 +1,96 @@ +# scheduler/optimizer.py + +def choose_best_node(nodes, action, intelligence=None): + + best_node = None + best_score = -10**9 + + ranked = [] + + # ----------------------------- + # SCORE EACH NODE + # ----------------------------- + for node in nodes: + + breakdown = { + "cpu": 0, + "thermal": 0, + "policy": 0, + "intelligence": 0, + "final": 0 + } + + # CPU availability + cpu_score = (100 - node.get("cpu_load", 0)) + breakdown["cpu"] = cpu_score + + # Thermal penalty + thermal_penalty = node.get("temp", 0) + breakdown["thermal"] = -thermal_penalty + + # Policy rules + policy_score = 0 + + if node.get("policy") == "preferred": + policy_score += 20 + elif node.get("policy") == "avoid": + policy_score -= 50 + + breakdown["policy"] = policy_score + + # Intelligence feedback + intel_score = 0 + + if intelligence: + stats = intelligence.profiles.get(action["target"]) + + if stats: + latest = stats.finalize() + + intel_score -= latest["score"].get("thermal", 0) * 0.3 + intel_score -= latest["score"].get("instability", 0) * 0.2 + + breakdown["intelligence"] = intel_score + + # Final score + final = cpu_score + policy_score + intel_score + breakdown["thermal"] + breakdown["final"] = final + + ranked.append({ + "node": node, + "score": breakdown + }) + + if final > best_score: + best_score = final + best_node = node + + # ----------------------------- + # SORT FOR EXPLANATION + # ----------------------------- + ranked_sorted = sorted( + ranked, + key=lambda x: x["score"]["final"], + reverse=True + ) + + # ----------------------------- + # EXPLANATION OBJECT + # ----------------------------- + explanation = { + "selected": best_node["name"] if best_node else None, + "top_candidates": [ + { + "node": r["node"]["name"], + "final_score": r["score"]["final"], + "breakdown": r["score"] + } + for r in ranked_sorted[:5] + ] + } + + return { + "best_node": best_node, + "ranked": ranked, + "explanation": explanation + } diff --git a/style.css b/style.css new file mode 100644 index 0000000..e69de29 diff --git a/ui/live_dag.html b/ui/live_dag.html new file mode 100644 index 0000000..3b1cfaa --- /dev/null +++ b/ui/live_dag.html @@ -0,0 +1,169 @@ + + + + Fester Live DAG + Cause Graph + + + + + + +
+
+ + + + + diff --git a/ui/replay.html b/ui/replay.html new file mode 100644 index 0000000..1d5fcce --- /dev/null +++ b/ui/replay.html @@ -0,0 +1,283 @@ + + + + +Fester Execution Replay Observatory + + + + + + +
+ + +
+ +

Replay Controls

+ + + + + + + +
+ + + +
0
+ +
+ + +
+ + +
+

Inspector

+
Click a node
+
+ +
+

Failure Autopsy

+ + +

+
+ +
+ + + + +