527 lines
24 KiB
Python
527 lines
24 KiB
Python
# ==============================================================================
|
|
# mcp_drift_state_tracker.py
|
|
# Copyright (C) 2026 Jeremy Anderson info@dcos.net
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
# ==============================================================================
|
|
|
|
import ast
|
|
import os
|
|
import json
|
|
import re
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, List, Set, Optional, Tuple
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
mcp = FastMCP("MCP-Drift-State-Tracker")
|
|
|
|
STATE_FILE_NAME = ".mcp_drift_state.json"
|
|
|
|
GLOBAL_IGNORE_DIRS = {
|
|
".git", ".venv", "venv", "__pycache__", "node_modules", "target",
|
|
"dist", "build", "out", ".cargo", ".rustup", "obj", "bin", ".idea"
|
|
}
|
|
|
|
# ==============================================================================
|
|
# DATA-DRIVEN LEXICAL PROFILE TABLES ("CASE ARRAYS")
|
|
# ==============================================================================
|
|
|
|
# C-Family Shared Baseline Pattern
|
|
C_STYLE_DEFAULT = {
|
|
"imp": re.compile(r'^\s*(?:import|require|#include)\s+.*'),
|
|
"fn": re.compile(r'(?:public|private|protected|static|async|fun|func|function)?\s*([a-zA-Z_0-9]+)\s*\([^)]*\)\s*(?:\{|->|:)?'),
|
|
"cls": re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct|enum)\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*@([a-zA-Z_0-9]+)')
|
|
}
|
|
|
|
BRACE_PROFILES: Dict[str, Dict[str, re.Pattern]] = {
|
|
".rs": {
|
|
"imp": re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);'),
|
|
"fn": re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)'),
|
|
"cls": re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*#\[([^\]]+)\]')
|
|
},
|
|
".go": {
|
|
"imp": re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")'),
|
|
"fn": re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)'),
|
|
"cls": re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct'),
|
|
"dec": re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)')
|
|
},
|
|
".php": {
|
|
"imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'),
|
|
"fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'),
|
|
"cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*<<([^>>]+)>>')
|
|
},
|
|
".hack": {
|
|
"imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'),
|
|
"fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'),
|
|
"cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*<<([^>>]+)>>')
|
|
},
|
|
".sh": {
|
|
"imp": re.compile(r'^\s*(?:\.|source)\s+.*'),
|
|
"fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'),
|
|
"cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)')
|
|
},
|
|
".zsh": {
|
|
"imp": re.compile(r'^\s*(?:\.|source)\s+.*'),
|
|
"fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'),
|
|
"cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)')
|
|
},
|
|
".ps1": {
|
|
"imp": re.compile(r'^\s*(?:\.|source)\s+.*'),
|
|
"fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'),
|
|
"cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'),
|
|
"dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)')
|
|
}
|
|
}
|
|
|
|
BLOCK_END_PROFILES: Dict[str, Dict[str, Any]] = {
|
|
".jl": {
|
|
"start": re.compile(r'^\s*(?:function|macro|mutable\s+struct)\s+([a-zA-Z_0-9!]+)'),
|
|
"end": re.compile(r'^\s*end\b'),
|
|
"inc": re.compile(r'\b(if|for|while|let|do|try|quote)\b')
|
|
},
|
|
".lua": {
|
|
"start": re.compile(r'^\s*(?:local\s+)?function\s+([a-zA-Z_0-9\.:]+)'),
|
|
"end": re.compile(r'^\s*end\b'),
|
|
"inc": re.compile(r'\b(if|for|while|do)\b')
|
|
},
|
|
".ex": {
|
|
"start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'),
|
|
"end": re.compile(r'^\s*end\b'),
|
|
"inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b')
|
|
},
|
|
".exs": {
|
|
"start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'),
|
|
"end": re.compile(r'^\s*end\b'),
|
|
"inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b')
|
|
},
|
|
".adb": {
|
|
"start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'),
|
|
"end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'),
|
|
"inc": re.compile(r'\b(if|loop|case|begin)\b')
|
|
},
|
|
".ads": {
|
|
"start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'),
|
|
"end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'),
|
|
"inc": re.compile(r'\b(if|loop|case|begin)\b')
|
|
},
|
|
".au3": {
|
|
"start": re.compile(r'^\s*(?:Func)\s+([a-zA-Z_0-9]+)'),
|
|
"end": re.compile(r'^\s*EndFunc\b'),
|
|
"inc": re.compile(r'^\s*(If|While|For|Select|Switch)\b')
|
|
},
|
|
".vbs": {
|
|
"start": re.compile(r'^\s*(?:Function|Sub)\s+([a-zA-Z_0-9]+)', re.IGNORECASE),
|
|
"end": re.compile(r'^\s*End\s+(?:Function|Sub)\b', re.IGNORECASE),
|
|
"inc": re.compile(r'^\s*(If|For|While|Do)\b', re.IGNORECASE)
|
|
}
|
|
}
|
|
|
|
SEQUENTIAL_PROFILES: Dict[str, Dict[str, Any]] = {
|
|
".bat": {
|
|
"start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'),
|
|
"end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE),
|
|
"stubs": ["rem todo", "rem fixme", "echo placeholder"]
|
|
},
|
|
".cmd": {
|
|
"start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'),
|
|
"end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE),
|
|
"stubs": ["rem todo", "rem fixme", "echo placeholder"]
|
|
},
|
|
".sql": {
|
|
"start": re.compile(r'(?:create\s+(?:or\s+replace\s+)?(?:procedure|function|view))\s+([a-zA-Z_0-9\.]+)', re.IGNORECASE),
|
|
"end": re.compile(r'^\s*end\s*[a-zA-Z_0-9]*\s*;', re.IGNORECASE),
|
|
"stubs": ["-- todo", "-- fixme", "return null", "raise notice"]
|
|
}
|
|
}
|
|
|
|
INDENTATION_PROFILES: Dict[str, Dict[str, Any]] = {
|
|
".mojo": {
|
|
"sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('),
|
|
"stubs": ["pass", "raise notimplementederror", "todo"]
|
|
},
|
|
".?": {
|
|
"sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('),
|
|
"stubs": ["pass", "raise notimplementederror", "todo"]
|
|
},
|
|
".hs": {
|
|
"sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='),
|
|
"stubs": ["undefined", "todo", "error "]
|
|
},
|
|
".elm": {
|
|
"sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='),
|
|
"stubs": ["undefined", "todo", "error "]
|
|
}
|
|
}
|
|
|
|
# ==============================================================================
|
|
# COMPONENT PARSER ARCHITECTURES (CLEAN, FLAT DRIVERS)
|
|
# ==============================================================================
|
|
|
|
class BaseLanguageParser(ABC):
|
|
@abstractmethod
|
|
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
|
|
pass
|
|
|
|
|
|
class PythonASTParser(BaseLanguageParser):
|
|
"""AST analyzer enforcing strict compliance boundaries for native Python files."""
|
|
|
|
def _is_mock(self, body: List[ast.stmt]) -> bool:
|
|
if not body:
|
|
return True
|
|
first = body[0]
|
|
# Skip top docstrings cleanly
|
|
actual_statements = body[1:] if (isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant) and isinstance(first.value.value, str)) else body
|
|
if len(actual_statements) == 1:
|
|
node = actual_statements[0]
|
|
if isinstance(node, (ast.Pass, ast.Break, ast.Continue)): return True
|
|
if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError": return True
|
|
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): return True
|
|
return False
|
|
|
|
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
|
|
tree = ast.parse(source, filename=file_path)
|
|
functions, classes, imports = {}, set(), set()
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ClassDef):
|
|
classes.add(node.name)
|
|
elif isinstance(node, ast.Import):
|
|
for a in node.names: imports.add(f"import {a.name}")
|
|
elif isinstance(node, ast.ImportFrom):
|
|
imports.add(f"from {node.module or ''} import {', '.join(a.name for a in node.names)}")
|
|
elif isinstance(node, ast.FunctionDef):
|
|
args = {arg.arg: ast.unparse(arg.annotation).strip() if arg.annotation else "None" for arg in node.args.args}
|
|
decs = {ast.unparse(dec).strip().split('(')[0] for dec in node.decorator_list}
|
|
span = (node.end_lineno - node.lineno + 1) if hasattr(node, "end_lineno") else 1
|
|
functions[node.name] = {
|
|
"args": args, "returns": ast.unparse(node.returns).strip() if node.returns else "None",
|
|
"decorators": list(decs), "lines": span, "is_mock": self._is_mock(node.body)
|
|
}
|
|
return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(source.splitlines())}
|
|
|
|
|
|
class BraceLanguageParser(BaseLanguageParser):
|
|
"""Flat lexical tracker using profile case arrays rather than nested conditional ladders."""
|
|
|
|
def __init__(self, ext: str):
|
|
self.profile = BRACE_PROFILES.get(ext, C_STYLE_DEFAULT)
|
|
|
|
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
|
|
lines = source.splitlines()
|
|
functions, classes, imports = {}, set(), set()
|
|
brace_depth = 0
|
|
active_fn, fn_start = None, 0
|
|
body_lines, pending_decs = [], []
|
|
|
|
for idx, line in enumerate(lines, start=1):
|
|
stripped = line.strip()
|
|
if not stripped: continue
|
|
|
|
# Outer-scope tracking (Signatures, imports, decorators)
|
|
if brace_depth == 0:
|
|
if self.profile["imp"].match(line):
|
|
imports.add(stripped)
|
|
continue
|
|
m_dec = self.profile["dec"].match(line)
|
|
if m_dec:
|
|
pending_decs.append(m_dec.group(1))
|
|
continue
|
|
m_cls = self.profile["cls"].match(line)
|
|
if m_cls:
|
|
classes.add(m_cls.group(1))
|
|
|
|
m_fn = self.profile["fn"].search(line)
|
|
if m_fn and not any(k in stripped for k in {"if", "for", "while", "switch", "catch", "return"}):
|
|
name = m_fn.group(1) or (m_fn.group(2) if len(m_fn.groups()) > 1 else None)
|
|
if name and name not in {"class", "struct", "function", "return", "import", "fn", "fun"}:
|
|
active_fn, fn_start, body_lines = name, idx, []
|
|
|
|
# Character level scope processing
|
|
for char in stripped:
|
|
if char == '{':
|
|
brace_depth += 1
|
|
elif char == '}':
|
|
brace_depth -= 1
|
|
|
|
if brace_depth == 0 and active_fn:
|
|
body_txt = "\n".join(body_lines).lower()
|
|
is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "throw ", "return null", "exit", ":"])
|
|
functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": pending_decs.copy(), "lines": idx - fn_start + 1, "is_mock": is_mock}
|
|
active_fn, pending_decs = None, []
|
|
break
|
|
|
|
if brace_depth > 0 and active_fn:
|
|
body_lines.append(stripped)
|
|
|
|
return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(lines)}
|
|
|
|
|
|
class BlockEndLanguageParser(BaseLanguageParser):
|
|
"""Monolithic logic parsing loop tracking keyword-delimited code architectures."""
|
|
|
|
def __init__(self, ext: str):
|
|
self.profile = BLOCK_END_PROFILES[ext]
|
|
|
|
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
|
|
lines = source.splitlines()
|
|
functions = {}
|
|
scope_depth = 0
|
|
active_fn, fn_start, body_lines = None, 0, []
|
|
|
|
for idx, line in enumerate(lines, start=1):
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith(('#', '--', '//', "'", ';')): continue
|
|
|
|
# Guard clause: Locate and step into function tracking bounds
|
|
if scope_depth == 0:
|
|
m_fn = self.profile["start"].match(line)
|
|
if m_fn:
|
|
active_fn, fn_start, scope_depth, body_lines = m_fn.group(1), idx, 1, []
|
|
continue
|
|
|
|
# In-scope execution processing
|
|
if self.profile["inc"].search(line):
|
|
scope_depth += 1
|
|
|
|
if self.profile["end"].match(stripped):
|
|
scope_depth -= 1
|
|
if scope_depth == 0 and active_fn:
|
|
body_txt = "\n".join(body_lines).lower()
|
|
is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "nothing", "nil", "raise", "wscript.quit"])
|
|
functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": [], "lines": idx - fn_start + 1, "is_mock": is_mock}
|
|
active_fn = None
|
|
continue
|
|
|
|
if active_fn:
|
|
body_lines.append(stripped)
|
|
|
|
return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)}
|
|
|
|
|
|
class SequentialScriptParser(BaseLanguageParser):
|
|
"""Label and linear declarative parser for scripting layouts (Batch/SQL)."""
|
|
|
|
def __init__(self, ext: str):
|
|
self.profile = SEQUENTIAL_PROFILES[ext]
|
|
|
|
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
|
|
lines = source.splitlines()
|
|
functions = {}
|
|
active_routine, routine_start, body_lines = None, 0, []
|
|
|
|
for idx, line in enumerate(lines, start=1):
|
|
stripped = line.strip()
|
|
if not stripped: continue
|
|
|
|
m_rot = self.profile["start"].match(line) if ".bat" in file_path else self.profile["start"].search(line)
|
|
if m_rot:
|
|
if active_routine:
|
|
body_txt = "\n".join(body_lines).lower()
|
|
is_mock = len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"])
|
|
functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start, "is_mock": is_mock}
|
|
active_routine, routine_start, body_lines = m_rot.group(1), idx, []
|
|
continue
|
|
|
|
if active_routine:
|
|
body_lines.append(stripped)
|
|
if self.profile["end"].match(stripped):
|
|
body_txt = "\n".join(body_lines).lower()
|
|
is_mock = len(body_lines) <= 2 or any(mk in body_txt for mk in self.profile["stubs"])
|
|
functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start + 1, "is_mock": is_mock}
|
|
active_routine = None
|
|
|
|
if active_routine:
|
|
body_txt = "\n".join(body_lines).lower()
|
|
functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": len(lines) - routine_start + 1, "is_mock": len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"])}
|
|
|
|
return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)}
|
|
|
|
|
|
class LineIndentationParser(BaseLanguageParser):
|
|
"""Layout signature parsing module capturing indentation-sensitive language architectures."""
|
|
|
|
def __init__(self, ext: str):
|
|
self.profile = INDENTATION_PROFILES[ext]
|
|
|
|
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
|
|
lines = source.splitlines()
|
|
functions = {}
|
|
active_name, fn_start, body_lines = None, 0, []
|
|
|
|
for idx, line in enumerate(lines, start=1):
|
|
if not line.strip(): continue
|
|
m = self.profile["sig"].match(line)
|
|
if m:
|
|
name = m.group(1) or (m.group(2) if len(m.groups()) > 1 else None)
|
|
if name and name != active_name:
|
|
if active_name:
|
|
body_txt = "\n".join(body_lines).lower()
|
|
functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": idx - fn_start, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])}
|
|
active_name, fn_start, body_lines = name, idx, []
|
|
if active_name:
|
|
body_lines.append(line)
|
|
|
|
if active_name:
|
|
body_txt = "\n".join(body_lines).lower()
|
|
functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": len(lines) - fn_start + 1, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])}
|
|
|
|
return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)}
|
|
|
|
# ==============================================================================
|
|
# WORKSPACE ROUTING MANAGEMENT
|
|
# ==============================================================================
|
|
|
|
class WorkspaceRegistry:
|
|
@staticmethod
|
|
def get_parser(file_path: str) -> Optional[BaseLanguageParser]:
|
|
_, ext = os.path.splitext(file_path.lower())
|
|
if ext == ".py": return PythonASTParser()
|
|
if ext in INDENTATION_PROFILES: return LineIndentationParser(ext)
|
|
if ext in BLOCK_END_PROFILES: return BlockEndLanguageParser(ext)
|
|
if ext in SEQUENTIAL_PROFILES: return SequentialScriptParser(ext)
|
|
|
|
# Explicit or generic fallback for curly-brace structures
|
|
if ext in BRACE_PROFILES or ext in {".js", ".jsx", ".ts", ".tsx", ".c", ".cpp", ".h", ".hpp", ".java", ".cs", ".ec", ".eh", ".pike", ".pmod", ".kt", ".kts", ".swift", ".dart", ".r"}:
|
|
return BraceLanguageParser(ext)
|
|
return None
|
|
|
|
@classmethod
|
|
def load_state(cls, root_path: str) -> Dict[str, Any]:
|
|
target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME)
|
|
if os.path.exists(target):
|
|
try:
|
|
with open(target, "r", encoding="utf-8") as f: return json.load(f)
|
|
except: pass
|
|
return {}
|
|
|
|
@classmethod
|
|
def save_state(cls, root_path: str, state: Dict[str, Any]) -> None:
|
|
target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME)
|
|
try:
|
|
with open(target, "w", encoding="utf-8") as f: json.dump(state, f, indent=2)
|
|
except: pass
|
|
|
|
|
|
def _resolve_workspace_files(target_path: str) -> List[str]:
|
|
abs_path = os.path.abspath(target_path)
|
|
if os.path.isfile(abs_path):
|
|
return [abs_path] if WorkspaceRegistry.get_parser(abs_path) else []
|
|
resolved = []
|
|
for root, dirs, files in os.walk(abs_path):
|
|
dirs[:] = [d for d in dirs if d not in GLOBAL_IGNORE_DIRS]
|
|
for file in files:
|
|
fp = os.path.join(root, file)
|
|
if WorkspaceRegistry.get_parser(fp): resolved.append(fp)
|
|
return resolved
|
|
|
|
# ==============================================================================
|
|
# EXPOSED PROTOCOL TOOLS
|
|
# ==============================================================================
|
|
|
|
@mcp.tool()
|
|
async def snapshot_baseline(target_path: str) -> str:
|
|
"""Snapshots structural footprints across all matching modules inside the designated target directory path."""
|
|
base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path))
|
|
files = _resolve_workspace_files(target_path)
|
|
if not files: return f"? BASELINE_FAILURE: No supported source profiles matched under: {target_path}"
|
|
|
|
curr_reg = WorkspaceRegistry.load_state(base_dir)
|
|
cataloged = 0
|
|
for fp in files:
|
|
parser = WorkspaceRegistry.get_parser(fp)
|
|
if not parser: continue
|
|
try:
|
|
with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read()
|
|
curr_reg[fp] = parser.parse(fp, content)
|
|
cataloged += 1
|
|
except Exception as e:
|
|
return f"? BASELINE_FAILURE: Parsing breakdown in '{os.path.basename(fp)}': {str(e)}"
|
|
|
|
WorkspaceRegistry.save_state(base_dir, curr_reg)
|
|
return f"? BASELINE_LOCKED: High-fidelity profiles tracked across {cataloged} repository points."
|
|
|
|
|
|
@mcp.tool()
|
|
async def audit_revision(target_path: str) -> str:
|
|
"""Audits modified workspace targets against baseline criteria to block truncated logic streams or laziness traps."""
|
|
base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path))
|
|
registry = WorkspaceRegistry.load_state(base_dir)
|
|
if not registry: return "? AUDIT_FAILED: Metric profile layer empty. Run baseline snapshot routines."
|
|
|
|
files = _resolve_workspace_files(target_path)
|
|
drift_errors = []
|
|
|
|
for tracked_file in list(registry.keys()):
|
|
if tracked_file.startswith(base_dir) and not os.path.exists(tracked_file):
|
|
drift_errors.append(f"Module Dropped: Baseline module file '{os.path.basename(tracked_file)}' was deleted.")
|
|
|
|
for fp in files:
|
|
if fp not in registry: continue
|
|
baseline, filename = registry[fp], os.path.basename(fp)
|
|
parser = WorkspaceRegistry.get_parser(fp)
|
|
if not parser: continue
|
|
|
|
try:
|
|
with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read()
|
|
current = parser.parse(fp, content)
|
|
except Exception as e:
|
|
drift_errors.append(f"Syntax Error [{filename}]: Parsing crash: {str(e)}")
|
|
continue
|
|
|
|
missing_imports = set(baseline.get("imports", [])) - set(current.get("imports", []))
|
|
if missing_imports: drift_errors.append(f"[{filename}] Missing Dependencies: {list(missing_imports)}")
|
|
|
|
missing_classes = set(baseline.get("classes", [])) - set(current.get("classes", []))
|
|
if missing_classes: drift_errors.append(f"[{filename}] Objects/Classes Deleted: {list(missing_classes)}")
|
|
|
|
for fn_name, b_meta in baseline.get("functions", {}).items():
|
|
if fn_name not in current.get("functions", {}):
|
|
drift_errors.append(f"[{filename}] Logic Block Dropped: Routine block '{fn_name}' missing.")
|
|
continue
|
|
|
|
c_meta = current["functions"][fn_name]
|
|
if set(b_meta.get("decorators", [])) - set(c_meta.get("decorators", [])):
|
|
drift_errors.append(f"[{filename}] Stripped Framework Annotation on '{fn_name}'")
|
|
if c_meta.get("is_mock") and not b_meta.get("is_mock"):
|
|
drift_errors.append(f"? [{filename}] Laziness Trap Blocked in '{fn_name}': Logic substituted with stub.")
|
|
if b_meta.get("lines", 0) > 4 and c_meta.get("lines", 0) <= (b_meta.get("lines", 0) * 0.5):
|
|
drift_errors.append(f"? [{filename}] Truncation Trap Blocked in '{fn_name}': Context block collapsed >50%.")
|
|
|
|
if drift_errors:
|
|
return f"? CRITICAL_COMPLETENESS_DRIFT: Logic degradation intercepted!\n" + "\n".join(f" - {err}" for err in drift_errors)
|
|
return "? REVISION_PASSED: Multi-language layout boundaries remain complete and intact."
|
|
|
|
|
|
@mcp.tool()
|
|
async def run_completeness_diagnostic() -> str:
|
|
"""Runs verification pipelines simulating code erosion in script engines and structural blocks."""
|
|
f_bat, f_lua = "drift_test.bat", "drift_test.lua"
|
|
bat_v1, lua_v1 = ":build_array\necho Processing...\ngoto :eof\n", "function calculate_matrix()\n return 1\nend\n"
|
|
try:
|
|
with open(f_bat, "w") as f: f.write(bat_v1)
|
|
with open(f_lua, "w") as f: f.write(lua_v1)
|
|
await snapshot_baseline(".")
|
|
|
|
with open(f_bat, "w") as f: f.write(":build_array\nrem TODO\n")
|
|
with open(f_lua, "w") as f: f.write("function calculate_matrix()\n -- todo\nend\n")
|
|
|
|
res = await audit_revision(".")
|
|
if "Laziness Trap" in res: return "? DIAGNOSTIC_PASSED: Flattened polymorphic profile framework trapped all code erosion simulation vectors."
|
|
return f"? DIAGNOSTIC_FAILED: Structural checkpoints bypassed. Output:\n{res}"
|
|
finally:
|
|
for p in [f_bat, f_lua, STATE_FILE_NAME]:
|
|
if os.path.exists(p): os.remove(p)
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run(transport="stdio") |