# ============================================================================== # mcp_drift_state_tracker.py # Copyright (C) 2026 Jeremy Anderson info@dcos.net # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # ============================================================================== import ast import os import json import re from abc import ABC, abstractmethod from typing import Dict, Any, List, Set, Optional, Tuple from mcp.server.fastmcp import FastMCP mcp = FastMCP("MCP-Drift-State-Tracker") STATE_FILE_NAME = ".mcp_drift_state.json" GLOBAL_IGNORE_DIRS = { ".git", ".venv", "venv", "__pycache__", "node_modules", "target", "dist", "build", "out", ".cargo", ".rustup", "obj", "bin", ".idea" } # ============================================================================== # DATA-DRIVEN LEXICAL PROFILE TABLES ("CASE ARRAYS") # ============================================================================== # C-Family Shared Baseline Pattern C_STYLE_DEFAULT = { "imp": re.compile(r'^\s*(?:import|require|#include)\s+.*'), "fn": re.compile(r'(?:public|private|protected|static|async|fun|func|function)?\s*([a-zA-Z_0-9]+)\s*\([^)]*\)\s*(?:\{|->|:)?'), "cls": re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct|enum)\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*@([a-zA-Z_0-9]+)') } BRACE_PROFILES: Dict[str, Dict[str, re.Pattern]] = { ".rs": { "imp": re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);'), "fn": re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)'), "cls": re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*#\[([^\]]+)\]') }, ".go": { "imp": re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")'), "fn": re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)'), "cls": re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct'), "dec": re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)') }, ".php": { "imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'), "fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'), "cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*<<([^>>]+)>>') }, ".hack": { "imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'), "fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'), "cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*<<([^>>]+)>>') }, ".sh": { "imp": re.compile(r'^\s*(?:\.|source)\s+.*'), "fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'), "cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)') }, ".zsh": { "imp": re.compile(r'^\s*(?:\.|source)\s+.*'), "fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'), "cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)') }, ".ps1": { "imp": re.compile(r'^\s*(?:\.|source)\s+.*'), "fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'), "cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'), "dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)') } } BLOCK_END_PROFILES: Dict[str, Dict[str, Any]] = { ".jl": { "start": re.compile(r'^\s*(?:function|macro|mutable\s+struct)\s+([a-zA-Z_0-9!]+)'), "end": re.compile(r'^\s*end\b'), "inc": re.compile(r'\b(if|for|while|let|do|try|quote)\b') }, ".lua": { "start": re.compile(r'^\s*(?:local\s+)?function\s+([a-zA-Z_0-9\.:]+)'), "end": re.compile(r'^\s*end\b'), "inc": re.compile(r'\b(if|for|while|do)\b') }, ".ex": { "start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'), "end": re.compile(r'^\s*end\b'), "inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b') }, ".exs": { "start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'), "end": re.compile(r'^\s*end\b'), "inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b') }, ".adb": { "start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'), "end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'), "inc": re.compile(r'\b(if|loop|case|begin)\b') }, ".ads": { "start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'), "end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'), "inc": re.compile(r'\b(if|loop|case|begin)\b') }, ".au3": { "start": re.compile(r'^\s*(?:Func)\s+([a-zA-Z_0-9]+)'), "end": re.compile(r'^\s*EndFunc\b'), "inc": re.compile(r'^\s*(If|While|For|Select|Switch)\b') }, ".vbs": { "start": re.compile(r'^\s*(?:Function|Sub)\s+([a-zA-Z_0-9]+)', re.IGNORECASE), "end": re.compile(r'^\s*End\s+(?:Function|Sub)\b', re.IGNORECASE), "inc": re.compile(r'^\s*(If|For|While|Do)\b', re.IGNORECASE) } } SEQUENTIAL_PROFILES: Dict[str, Dict[str, Any]] = { ".bat": { "start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'), "end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE), "stubs": ["rem todo", "rem fixme", "echo placeholder"] }, ".cmd": { "start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'), "end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE), "stubs": ["rem todo", "rem fixme", "echo placeholder"] }, ".sql": { "start": re.compile(r'(?:create\s+(?:or\s+replace\s+)?(?:procedure|function|view))\s+([a-zA-Z_0-9\.]+)', re.IGNORECASE), "end": re.compile(r'^\s*end\s*[a-zA-Z_0-9]*\s*;', re.IGNORECASE), "stubs": ["-- todo", "-- fixme", "return null", "raise notice"] } } INDENTATION_PROFILES: Dict[str, Dict[str, Any]] = { ".mojo": { "sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('), "stubs": ["pass", "raise notimplementederror", "todo"] }, ".?": { "sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('), "stubs": ["pass", "raise notimplementederror", "todo"] }, ".hs": { "sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='), "stubs": ["undefined", "todo", "error "] }, ".elm": { "sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='), "stubs": ["undefined", "todo", "error "] } } # ============================================================================== # COMPONENT PARSER ARCHITECTURES (CLEAN, FLAT DRIVERS) # ============================================================================== class BaseLanguageParser(ABC): @abstractmethod def parse(self, file_path: str, source: str) -> Dict[str, Any]: pass class PythonASTParser(BaseLanguageParser): """AST analyzer enforcing strict compliance boundaries for native Python files.""" def _is_mock(self, body: List[ast.stmt]) -> bool: if not body: return True first = body[0] # Skip top docstrings cleanly actual_statements = body[1:] if (isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant) and isinstance(first.value.value, str)) else body if len(actual_statements) == 1: node = actual_statements[0] if isinstance(node, (ast.Pass, ast.Break, ast.Continue)): return True if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError": return True if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): return True return False def parse(self, file_path: str, source: str) -> Dict[str, Any]: tree = ast.parse(source, filename=file_path) functions, classes, imports = {}, set(), set() for node in ast.walk(tree): if isinstance(node, ast.ClassDef): classes.add(node.name) elif isinstance(node, ast.Import): for a in node.names: imports.add(f"import {a.name}") elif isinstance(node, ast.ImportFrom): imports.add(f"from {node.module or ''} import {', '.join(a.name for a in node.names)}") elif isinstance(node, ast.FunctionDef): args = {arg.arg: ast.unparse(arg.annotation).strip() if arg.annotation else "None" for arg in node.args.args} decs = {ast.unparse(dec).strip().split('(')[0] for dec in node.decorator_list} span = (node.end_lineno - node.lineno + 1) if hasattr(node, "end_lineno") else 1 functions[node.name] = { "args": args, "returns": ast.unparse(node.returns).strip() if node.returns else "None", "decorators": list(decs), "lines": span, "is_mock": self._is_mock(node.body) } return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(source.splitlines())} class BraceLanguageParser(BaseLanguageParser): """Flat lexical tracker using profile case arrays rather than nested conditional ladders.""" def __init__(self, ext: str): self.profile = BRACE_PROFILES.get(ext, C_STYLE_DEFAULT) def parse(self, file_path: str, source: str) -> Dict[str, Any]: lines = source.splitlines() functions, classes, imports = {}, set(), set() brace_depth = 0 active_fn, fn_start = None, 0 body_lines, pending_decs = [], [] for idx, line in enumerate(lines, start=1): stripped = line.strip() if not stripped: continue # Outer-scope tracking (Signatures, imports, decorators) if brace_depth == 0: if self.profile["imp"].match(line): imports.add(stripped) continue m_dec = self.profile["dec"].match(line) if m_dec: pending_decs.append(m_dec.group(1)) continue m_cls = self.profile["cls"].match(line) if m_cls: classes.add(m_cls.group(1)) m_fn = self.profile["fn"].search(line) if m_fn and not any(k in stripped for k in {"if", "for", "while", "switch", "catch", "return"}): name = m_fn.group(1) or (m_fn.group(2) if len(m_fn.groups()) > 1 else None) if name and name not in {"class", "struct", "function", "return", "import", "fn", "fun"}: active_fn, fn_start, body_lines = name, idx, [] # Character level scope processing for char in stripped: if char == '{': brace_depth += 1 elif char == '}': brace_depth -= 1 if brace_depth == 0 and active_fn: body_txt = "\n".join(body_lines).lower() is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "throw ", "return null", "exit", ":"]) functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": pending_decs.copy(), "lines": idx - fn_start + 1, "is_mock": is_mock} active_fn, pending_decs = None, [] break if brace_depth > 0 and active_fn: body_lines.append(stripped) return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(lines)} class BlockEndLanguageParser(BaseLanguageParser): """Monolithic logic parsing loop tracking keyword-delimited code architectures.""" def __init__(self, ext: str): self.profile = BLOCK_END_PROFILES[ext] def parse(self, file_path: str, source: str) -> Dict[str, Any]: lines = source.splitlines() functions = {} scope_depth = 0 active_fn, fn_start, body_lines = None, 0, [] for idx, line in enumerate(lines, start=1): stripped = line.strip() if not stripped or stripped.startswith(('#', '--', '//', "'", ';')): continue # Guard clause: Locate and step into function tracking bounds if scope_depth == 0: m_fn = self.profile["start"].match(line) if m_fn: active_fn, fn_start, scope_depth, body_lines = m_fn.group(1), idx, 1, [] continue # In-scope execution processing if self.profile["inc"].search(line): scope_depth += 1 if self.profile["end"].match(stripped): scope_depth -= 1 if scope_depth == 0 and active_fn: body_txt = "\n".join(body_lines).lower() is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "nothing", "nil", "raise", "wscript.quit"]) functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": [], "lines": idx - fn_start + 1, "is_mock": is_mock} active_fn = None continue if active_fn: body_lines.append(stripped) return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)} class SequentialScriptParser(BaseLanguageParser): """Label and linear declarative parser for scripting layouts (Batch/SQL).""" def __init__(self, ext: str): self.profile = SEQUENTIAL_PROFILES[ext] def parse(self, file_path: str, source: str) -> Dict[str, Any]: lines = source.splitlines() functions = {} active_routine, routine_start, body_lines = None, 0, [] for idx, line in enumerate(lines, start=1): stripped = line.strip() if not stripped: continue m_rot = self.profile["start"].match(line) if ".bat" in file_path else self.profile["start"].search(line) if m_rot: if active_routine: body_txt = "\n".join(body_lines).lower() is_mock = len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"]) functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start, "is_mock": is_mock} active_routine, routine_start, body_lines = m_rot.group(1), idx, [] continue if active_routine: body_lines.append(stripped) if self.profile["end"].match(stripped): body_txt = "\n".join(body_lines).lower() is_mock = len(body_lines) <= 2 or any(mk in body_txt for mk in self.profile["stubs"]) functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start + 1, "is_mock": is_mock} active_routine = None if active_routine: body_txt = "\n".join(body_lines).lower() functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": len(lines) - routine_start + 1, "is_mock": len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"])} return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)} class LineIndentationParser(BaseLanguageParser): """Layout signature parsing module capturing indentation-sensitive language architectures.""" def __init__(self, ext: str): self.profile = INDENTATION_PROFILES[ext] def parse(self, file_path: str, source: str) -> Dict[str, Any]: lines = source.splitlines() functions = {} active_name, fn_start, body_lines = None, 0, [] for idx, line in enumerate(lines, start=1): if not line.strip(): continue m = self.profile["sig"].match(line) if m: name = m.group(1) or (m.group(2) if len(m.groups()) > 1 else None) if name and name != active_name: if active_name: body_txt = "\n".join(body_lines).lower() functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": idx - fn_start, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])} active_name, fn_start, body_lines = name, idx, [] if active_name: body_lines.append(line) if active_name: body_txt = "\n".join(body_lines).lower() functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": len(lines) - fn_start + 1, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])} return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)} # ============================================================================== # WORKSPACE ROUTING MANAGEMENT # ============================================================================== class WorkspaceRegistry: @staticmethod def get_parser(file_path: str) -> Optional[BaseLanguageParser]: _, ext = os.path.splitext(file_path.lower()) if ext == ".py": return PythonASTParser() if ext in INDENTATION_PROFILES: return LineIndentationParser(ext) if ext in BLOCK_END_PROFILES: return BlockEndLanguageParser(ext) if ext in SEQUENTIAL_PROFILES: return SequentialScriptParser(ext) # Explicit or generic fallback for curly-brace structures if ext in BRACE_PROFILES or ext in {".js", ".jsx", ".ts", ".tsx", ".c", ".cpp", ".h", ".hpp", ".java", ".cs", ".ec", ".eh", ".pike", ".pmod", ".kt", ".kts", ".swift", ".dart", ".r"}: return BraceLanguageParser(ext) return None @classmethod def load_state(cls, root_path: str) -> Dict[str, Any]: target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME) if os.path.exists(target): try: with open(target, "r", encoding="utf-8") as f: return json.load(f) except: pass return {} @classmethod def save_state(cls, root_path: str, state: Dict[str, Any]) -> None: target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME) try: with open(target, "w", encoding="utf-8") as f: json.dump(state, f, indent=2) except: pass def _resolve_workspace_files(target_path: str) -> List[str]: abs_path = os.path.abspath(target_path) if os.path.isfile(abs_path): return [abs_path] if WorkspaceRegistry.get_parser(abs_path) else [] resolved = [] for root, dirs, files in os.walk(abs_path): dirs[:] = [d for d in dirs if d not in GLOBAL_IGNORE_DIRS] for file in files: fp = os.path.join(root, file) if WorkspaceRegistry.get_parser(fp): resolved.append(fp) return resolved # ============================================================================== # EXPOSED PROTOCOL TOOLS # ============================================================================== @mcp.tool() async def snapshot_baseline(target_path: str) -> str: """Snapshots structural footprints across all matching modules inside the designated target directory path.""" base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path)) files = _resolve_workspace_files(target_path) if not files: return f"? BASELINE_FAILURE: No supported source profiles matched under: {target_path}" curr_reg = WorkspaceRegistry.load_state(base_dir) cataloged = 0 for fp in files: parser = WorkspaceRegistry.get_parser(fp) if not parser: continue try: with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read() curr_reg[fp] = parser.parse(fp, content) cataloged += 1 except Exception as e: return f"? BASELINE_FAILURE: Parsing breakdown in '{os.path.basename(fp)}': {str(e)}" WorkspaceRegistry.save_state(base_dir, curr_reg) return f"? BASELINE_LOCKED: High-fidelity profiles tracked across {cataloged} repository points." @mcp.tool() async def audit_revision(target_path: str) -> str: """Audits modified workspace targets against baseline criteria to block truncated logic streams or laziness traps.""" base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path)) registry = WorkspaceRegistry.load_state(base_dir) if not registry: return "? AUDIT_FAILED: Metric profile layer empty. Run baseline snapshot routines." files = _resolve_workspace_files(target_path) drift_errors = [] for tracked_file in list(registry.keys()): if tracked_file.startswith(base_dir) and not os.path.exists(tracked_file): drift_errors.append(f"Module Dropped: Baseline module file '{os.path.basename(tracked_file)}' was deleted.") for fp in files: if fp not in registry: continue baseline, filename = registry[fp], os.path.basename(fp) parser = WorkspaceRegistry.get_parser(fp) if not parser: continue try: with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read() current = parser.parse(fp, content) except Exception as e: drift_errors.append(f"Syntax Error [{filename}]: Parsing crash: {str(e)}") continue missing_imports = set(baseline.get("imports", [])) - set(current.get("imports", [])) if missing_imports: drift_errors.append(f"[{filename}] Missing Dependencies: {list(missing_imports)}") missing_classes = set(baseline.get("classes", [])) - set(current.get("classes", [])) if missing_classes: drift_errors.append(f"[{filename}] Objects/Classes Deleted: {list(missing_classes)}") for fn_name, b_meta in baseline.get("functions", {}).items(): if fn_name not in current.get("functions", {}): drift_errors.append(f"[{filename}] Logic Block Dropped: Routine block '{fn_name}' missing.") continue c_meta = current["functions"][fn_name] if set(b_meta.get("decorators", [])) - set(c_meta.get("decorators", [])): drift_errors.append(f"[{filename}] Stripped Framework Annotation on '{fn_name}'") if c_meta.get("is_mock") and not b_meta.get("is_mock"): drift_errors.append(f"? [{filename}] Laziness Trap Blocked in '{fn_name}': Logic substituted with stub.") if b_meta.get("lines", 0) > 4 and c_meta.get("lines", 0) <= (b_meta.get("lines", 0) * 0.5): drift_errors.append(f"? [{filename}] Truncation Trap Blocked in '{fn_name}': Context block collapsed >50%.") if drift_errors: return f"? CRITICAL_COMPLETENESS_DRIFT: Logic degradation intercepted!\n" + "\n".join(f" - {err}" for err in drift_errors) return "? REVISION_PASSED: Multi-language layout boundaries remain complete and intact." @mcp.tool() async def run_completeness_diagnostic() -> str: """Runs verification pipelines simulating code erosion in script engines and structural blocks.""" f_bat, f_lua = "drift_test.bat", "drift_test.lua" bat_v1, lua_v1 = ":build_array\necho Processing...\ngoto :eof\n", "function calculate_matrix()\n return 1\nend\n" try: with open(f_bat, "w") as f: f.write(bat_v1) with open(f_lua, "w") as f: f.write(lua_v1) await snapshot_baseline(".") with open(f_bat, "w") as f: f.write(":build_array\nrem TODO\n") with open(f_lua, "w") as f: f.write("function calculate_matrix()\n -- todo\nend\n") res = await audit_revision(".") if "Laziness Trap" in res: return "? DIAGNOSTIC_PASSED: Flattened polymorphic profile framework trapped all code erosion simulation vectors." return f"? DIAGNOSTIC_FAILED: Structural checkpoints bypassed. Output:\n{res}" finally: for p in [f_bat, f_lua, STATE_FILE_NAME]: if os.path.exists(p): os.remove(p) if __name__ == "__main__": mcp.run(transport="stdio")