# ============================================================================== # mcp_drift_state_tracker.py # Copyright (C) 2026 Jeremy Anderson info@dcos.net # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # ============================================================================== import ast import os import json import re from abc import ABC, abstractmethod from typing import Dict, Any, List, Set, Optional from mcp.server.fastmcp import FastMCP mcp = FastMCP("MCP-Drift-State-Tracker") # Global persistence config STATE_FILE_NAME = ".mcp_drift_state.json" # Global exclusion profiles for high-performance directory sweeps GLOBAL_IGNORE_DIRS = { ".git", ".venv", "venv", "__pycache__", "node_modules", "target", "dist", "build", "out", ".cargo", ".rustup", "obj", "bin" } # ------------------------------------------------------------------------------ # PARSER INTERFACE & DRIVER IMPLEMENTATIONS # ------------------------------------------------------------------------------ class BaseLanguageParser(ABC): """Abstract Base Class defining the protocol for structural language parsers.""" @abstractmethod def parse(self, file_path: str, source: str) -> Dict[str, Any]: """Parses source content into a unified structural inventory dictionary.""" pass class PythonASTParser(BaseLanguageParser): """Deep structural inspection engine for Python using native AST.""" def _is_mock(self, body: List[ast.stmt]) -> bool: if not body: return True statements = body if len(body) > 1 and isinstance(body[0], ast.Expr): if isinstance(body[0].value, ast.Constant) and isinstance(body[0].value.value, str): statements = body[1:] if len(statements) == 1: node = statements[0] if isinstance(node, (ast.Pass, ast.Break, ast.Continue)): return True if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError": return True if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): return True return False def parse(self, file_path: str, source: str) -> Dict[str, Any]: tree = ast.parse(source, filename=file_path) functions: Dict[str, Dict[str, Any]] = {} classes: Set[str] = set() imports: Set[str] = set() for node in ast.walk(tree): if isinstance(node, ast.ClassDef): classes.add(node.name) elif isinstance(node, ast.Import): for alias in node.names: imports.add(f"import {alias.name}") elif isinstance(node, ast.ImportFrom): module = node.module if node.module else "" for alias in node.names: imports.add(f"from {module} import {alias.name}") elif isinstance(node, ast.FunctionDef): args = {arg.arg: ast.unparse(arg.annotation).strip() if arg.annotation else "None" for arg in node.args.args} ret = ast.unparse(node.returns).strip() if node.returns else "None" decs = {ast.unparse(dec).strip().split('(')[0] for dec in node.decorator_list} span = (node.end_lineno - node.lineno + 1) if hasattr(node, "end_lineno") else 1 functions[node.name] = { "args": args, "returns": ret, "decorators": list(decs), "lines": span, "is_mock": self._is_mock(node.body) } return { "classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(source.splitlines()) } class BraceLanguageParser(BaseLanguageParser): """ Deterministic Lexical Scoping Engine for curly-brace languages. Supports: C, C++, Rust, Go, Java, TypeScript, JavaScript, C# """ def __init__(self, extension: str): self.ext = extension def parse(self, file_path: str, source: str) -> Dict[str, Any]: lines = source.splitlines() functions: Dict[str, Dict[str, Any]] = {} classes: Set[str] = set() imports: Set[str] = set() # Compile lightweight lexical rules tailored to language families if self.ext in {".rs"}: import_pattern = re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);') func_pattern = re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)\s*(<[^>]+>)?\s*\(([^)]*)\)') class_pattern = re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)') decorator_pattern = re.compile(r'^\s*#\[([^\]]+)\]') elif self.ext in {".go"}: import_pattern = re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")') func_pattern = re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)\s*\(([^)]*)\)') class_pattern = re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct') decorator_pattern = re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)') else: # C-style family: JS, TS, C, C++, Java, C# import_pattern = re.compile(r'^\s*(?:import|require|#include)\s+.*') func_pattern = re.compile(r'(?:public|private|protected|static|async|function)?\s*([a-zA-Z_0-9]+)\s*\(([^)]*)\)\s*(?::\s*[a-zA-Z_0-9<>\s|]+)?\s*\{?') class_pattern = re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct)\s+([a-zA-Z_0-9]+)') decorator_pattern = re.compile(r'^\s*@([a-zA-Z_0-9]+)') brace_depth = 0 active_func: Optional[str] = None func_start_line = 0 func_body_tokens: List[str] = [] pending_decorators: List[str] = [] for idx, line in enumerate(lines, start=1): stripped = line.strip() if not stripped: continue # Check Global Imports if brace_depth == 0 and import_pattern.match(line): imports.add(stripped) continue # Harvest Block Decorators / System Annotations dec_match = decorator_pattern.match(line) if brace_depth == 0 and dec_match: pending_decorators.append(dec_match.group(1)) continue # Identify Structural Class/Struct boundaries class_match = class_pattern.match(line) if brace_depth == 0 and class_match: classes.add(class_match.group(1)) # Identify Function Signature Transitions if brace_depth == 0: func_match = func_pattern.search(line) if func_match and not any(k in stripped for k in {"if", "for", "while", "switch", "catch"}): name = func_match.group(1) if name not in {"class", "struct", "function", "return"}: active_func = name func_start_line = idx func_body_tokens = [] # Stream character token array data to track precise scope boundaries for char in stripped: if char == '{': brace_depth += 1 elif char == '}': brace_depth -= 1 if brace_depth == 0 and active_func: # Compute functional line boundaries safely span = idx - func_start_line + 1 body_str = "\n".join(func_body_tokens).lower() # Process multi-language laziness markers is_mock = ( len(func_body_tokens) <= 3 or any(stub in body_str for stub in ["todo", "panic", "notimplemented", "throw new", "return null"]) ) functions[active_func] = { "args": {}, # Signature layout preservation mapped textually "returns": "Inferred", "decorators": pending_decorators.copy(), "lines": span, "is_mock": is_mock } active_func = None pending_decorators.clear() if brace_depth > 0 and active_func: func_body_tokens.append(stripped) return { "classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(lines) } # ------------------------------------------------------------------------------ # CORE WORKSPACE ENGINE REGISTRY & STATE PERSISTENCE # ------------------------------------------------------------------------------ class WorkspaceRegistry: """Manages the in-memory analysis cache with underlying thread-safe JSON backup layers.""" @staticmethod def get_parser(file_path: str) -> Optional[BaseLanguageParser]: _, ext = os.path.splitext(file_path) if ext == ".py": return PythonASTParser() elif ext in {".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".c", ".cpp", ".h", ".hpp", ".java", ".cs"}: return BraceLanguageParser(ext) return None @classmethod def load_state(cls, root_path: str) -> Dict[str, Any]: target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME) if os.path.exists(target): try: with open(target, "r", encoding="utf-8") as f: return json.load(f) except: pass return {} @classmethod def save_state(cls, root_path: str, state: Dict[str, Any]) -> None: target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME) try: with open(target, "w", encoding="utf-8") as f: json.dump(state, f, indent=2) except: pass def _resolve_workspace_files(target_path: str) -> List[str]: abs_path = os.path.abspath(target_path) if os.path.isfile(abs_path): return [abs_path] if WorkspaceRegistry.get_parser(abs_path) else [] resolved = [] for root, dirs, files in os.walk(abs_path): # Destructively filter directory scans to bypass noise dirs[:] = [d for d in dirs if d not in GLOBAL_IGNORE_DIRS] for file in files: full_path = os.path.join(root, file) if WorkspaceRegistry.get_parser(full_path): resolved.append(full_path) return resolved # ------------------------------------------------------------------------------ # EXPOSED MCP PRODUCTION GATE TOOLS # ------------------------------------------------------------------------------ @mcp.tool() async def snapshot_baseline(target_path: str) -> str: """Snapshots structural footprints for all matching code assets across the repository tree.""" base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path)) files = _resolve_workspace_files(target_path) if not files: return f"? BASELINE_FAILURE: No supported source languages discovered at target path: {target_path}" current_registry = WorkspaceRegistry.load_state(base_dir) cataloged = 0 for file_path in files: parser = WorkspaceRegistry.get_parser(file_path) if not parser: continue try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() current_registry[file_path] = parser.parse(file_path, content) cataloged += 1 except Exception as e: return f"? BASELINE_FAILURE: Internal parsing failure on '{os.path.basename(file_path)}': {str(e)}" WorkspaceRegistry.save_state(base_dir, current_registry) return f"? BASELINE_LOCKED: Workspace matrix secured across {cataloged} source modules." @mcp.tool() async def audit_revision(target_path: str) -> str: """Audits modified paths, enforcing type, import, and logic limits across all languages.""" base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path)) registry = WorkspaceRegistry.load_state(base_dir) if not registry: return f"? AUDIT_FAILED: Persistent baseline marker layer is empty or missing. Trigger snapshots first." files = _resolve_workspace_files(target_path) drift_errors = [] # Detect unexpected missing modules from tracked baselines for tracked_file in list(registry.keys()): if tracked_file.startswith(base_dir) and not os.path.exists(tracked_file): drift_errors.append(f"Module Dropped: Tracked source '{os.path.basename(tracked_file)}' was deleted.") for file_path in files: if file_path not in registry: continue baseline = registry[file_path] filename = os.path.basename(file_path) parser = WorkspaceRegistry.get_parser(file_path) if not parser: continue try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() current = parser.parse(file_path, content) except Exception as e: drift_errors.append(f"Syntax Error [{filename}]: File compilation or parse block failure: {str(e)}") continue # 1. Audit Global Package/Dependency Drops missing_imports = set(baseline["imports"]) - set(current["imports"]) if missing_imports: drift_errors.append(f"[{filename}] Dropped Dependencies: {list(missing_imports)}") # 2. Audit OOP Object Integrity Drops missing_classes = set(baseline["classes"]) - set(current["classes"]) if missing_classes: drift_errors.append(f"[{filename}] Core Structs/Classes Missing: {list(missing_classes)}") # 3. Audit Function Signatures, Logic Truncation, and Laziness Traps for func_name, b_meta in baseline["functions"].items(): if func_name not in current["functions"]: drift_errors.append(f"[{filename}] Missing Logic Block: Function '{func_name}' was skipped.") continue c_meta = current["functions"][func_name] # Enforce systemic decorator and annotation checks missing_decs = set(b_meta["decorators"]) - set(c_meta["decorators"]) if missing_decs: drift_errors.append(f"[{filename}] Stripped Annotations on '{func_name}': Dropped {list(missing_decs)}") # Enforce multi-language laziness checks if c_meta["is_mock"] and not b_meta["is_mock"]: drift_errors.append(f"[{filename}] Laziness Trap Triggered in '{func_name}': Substituted with placeholder/throw/todo stub.") # Enforce physical compression check bounds if b_meta["lines"] > 4 and c_meta["lines"] <= (b_meta["lines"] * 0.5): drift_errors.append(f"[{filename}] Truncation Trap Triggered in '{func_name}': Scope collapsed by >50% ({b_meta['lines']} -> {c_meta['lines']} lines).") if drift_errors: return ( f"? CRITICAL_COMPLETENESS_DRIFT: Regression detected during mutation check!\n" + "\n".join(f" - {err}" for err in drift_errors) + "\nAction: Halt code ingestion. Force full context reconstruction." ) return f"? REVISION_PASSED: Workspace architectures, dependencies, and functional bounds are verified." @mcp.tool() async def run_completeness_diagnostic() -> str: """Verifies parsing and capture capabilities across mixed multi-language test fixtures.""" fixture_py = "drift_test.py" fixture_rs = "drift_test.rs" py_v1 = "@mcp.tool()\ndef calculate_hash(seed: str) -> None:\n print(seed)\n" rs_v1 = "#[inline]\npub fn calculate_hash(seed: &str) {\n println!(\"{}\", seed);\n}" try: with open(fixture_py, "w") as f: f.write(py_v1) with open(fixture_rs, "w") as f: f.write(rs_v1) await snapshot_baseline(".") # Ingest lazy mutations across language families with open(fixture_py, "w") as f: f.write("def calculate_hash(seed):\n pass\n") with open(fixture_rs, "w") as f: f.write("pub fn calculate_hash(seed: &str) {\n todo!();\n}") audit_result = await audit_revision(".") if "Stripped Annotations" in audit_result and "Laziness Trap Triggered" in audit_result: return "? DIAGNOSTIC_PASSED: Multi-language driver framework successfully trapped Python AST and Rust lexical code-erosion boundaries." return f"? DIAGNOSTIC_FAILED: Structural deviations bypassed checkpoints. Result:\n{audit_result}" finally: for path in [fixture_py, fixture_rs, STATE_FILE_NAME]: if os.path.exists(path): os.remove(path) if __name__ == "__main__": mcp.run(transport="stdio")