diff --git a/mcp_drift_state_tracker.py b/mcp_drift_state_tracker.py index 8acb4d4..e3ffcc2 100644 --- a/mcp_drift_state_tracker.py +++ b/mcp_drift_state_tracker.py @@ -13,207 +13,386 @@ import os import json import re from abc import ABC, abstractmethod -from typing import Dict, Any, List, Set, Optional +from typing import Dict, Any, List, Set, Optional, Tuple + from mcp.server.fastmcp import FastMCP mcp = FastMCP("MCP-Drift-State-Tracker") -# Global persistence config STATE_FILE_NAME = ".mcp_drift_state.json" -# Global exclusion profiles for high-performance directory sweeps GLOBAL_IGNORE_DIRS = { ".git", ".venv", "venv", "__pycache__", "node_modules", "target", - "dist", "build", "out", ".cargo", ".rustup", "obj", "bin" + "dist", "build", "out", ".cargo", ".rustup", "obj", "bin", ".idea" } -# ------------------------------------------------------------------------------ -# PARSER INTERFACE & DRIVER IMPLEMENTATIONS -# ------------------------------------------------------------------------------ +# ============================================================================== +# DATA-DRIVEN LEXICAL PROFILE TABLES ("CASE ARRAYS") +# ============================================================================== + +# C-Family Shared Baseline Pattern +C_STYLE_DEFAULT = { + "imp": re.compile(r'^\s*(?:import|require|#include)\s+.*'), + "fn": re.compile(r'(?:public|private|protected|static|async|fun|func|function)?\s*([a-zA-Z_0-9]+)\s*\([^)]*\)\s*(?:\{|->|:)?'), + "cls": re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct|enum)\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*@([a-zA-Z_0-9]+)') +} + +BRACE_PROFILES: Dict[str, Dict[str, re.Pattern]] = { + ".rs": { + "imp": re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);'), + "fn": re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)'), + "cls": re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*#\[([^\]]+)\]') + }, + ".go": { + "imp": re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")'), + "fn": re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)'), + "cls": re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct'), + "dec": re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)') + }, + ".php": { + "imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'), + "fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'), + "cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*<<([^>>]+)>>') + }, + ".hack": { + "imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'), + "fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'), + "cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*<<([^>>]+)>>') + }, + ".sh": { + "imp": re.compile(r'^\s*(?:\.|source)\s+.*'), + "fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'), + "cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)') + }, + ".zsh": { + "imp": re.compile(r'^\s*(?:\.|source)\s+.*'), + "fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'), + "cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)') + }, + ".ps1": { + "imp": re.compile(r'^\s*(?:\.|source)\s+.*'), + "fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'), + "cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'), + "dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)') + } +} + +BLOCK_END_PROFILES: Dict[str, Dict[str, Any]] = { + ".jl": { + "start": re.compile(r'^\s*(?:function|macro|mutable\s+struct)\s+([a-zA-Z_0-9!]+)'), + "end": re.compile(r'^\s*end\b'), + "inc": re.compile(r'\b(if|for|while|let|do|try|quote)\b') + }, + ".lua": { + "start": re.compile(r'^\s*(?:local\s+)?function\s+([a-zA-Z_0-9\.:]+)'), + "end": re.compile(r'^\s*end\b'), + "inc": re.compile(r'\b(if|for|while|do)\b') + }, + ".ex": { + "start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'), + "end": re.compile(r'^\s*end\b'), + "inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b') + }, + ".exs": { + "start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'), + "end": re.compile(r'^\s*end\b'), + "inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b') + }, + ".adb": { + "start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'), + "end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'), + "inc": re.compile(r'\b(if|loop|case|begin)\b') + }, + ".ads": { + "start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'), + "end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'), + "inc": re.compile(r'\b(if|loop|case|begin)\b') + }, + ".au3": { + "start": re.compile(r'^\s*(?:Func)\s+([a-zA-Z_0-9]+)'), + "end": re.compile(r'^\s*EndFunc\b'), + "inc": re.compile(r'^\s*(If|While|For|Select|Switch)\b') + }, + ".vbs": { + "start": re.compile(r'^\s*(?:Function|Sub)\s+([a-zA-Z_0-9]+)', re.IGNORECASE), + "end": re.compile(r'^\s*End\s+(?:Function|Sub)\b', re.IGNORECASE), + "inc": re.compile(r'^\s*(If|For|While|Do)\b', re.IGNORECASE) + } +} + +SEQUENTIAL_PROFILES: Dict[str, Dict[str, Any]] = { + ".bat": { + "start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'), + "end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE), + "stubs": ["rem todo", "rem fixme", "echo placeholder"] + }, + ".cmd": { + "start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'), + "end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE), + "stubs": ["rem todo", "rem fixme", "echo placeholder"] + }, + ".sql": { + "start": re.compile(r'(?:create\s+(?:or\s+replace\s+)?(?:procedure|function|view))\s+([a-zA-Z_0-9\.]+)', re.IGNORECASE), + "end": re.compile(r'^\s*end\s*[a-zA-Z_0-9]*\s*;', re.IGNORECASE), + "stubs": ["-- todo", "-- fixme", "return null", "raise notice"] + } +} + +INDENTATION_PROFILES: Dict[str, Dict[str, Any]] = { + ".mojo": { + "sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('), + "stubs": ["pass", "raise notimplementederror", "todo"] + }, + ".?": { + "sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('), + "stubs": ["pass", "raise notimplementederror", "todo"] + }, + ".hs": { + "sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='), + "stubs": ["undefined", "todo", "error "] + }, + ".elm": { + "sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='), + "stubs": ["undefined", "todo", "error "] + } +} + +# ============================================================================== +# COMPONENT PARSER ARCHITECTURES (CLEAN, FLAT DRIVERS) +# ============================================================================== + class BaseLanguageParser(ABC): - """Abstract Base Class defining the protocol for structural language parsers.""" - @abstractmethod def parse(self, file_path: str, source: str) -> Dict[str, Any]: - """Parses source content into a unified structural inventory dictionary.""" pass class PythonASTParser(BaseLanguageParser): - """Deep structural inspection engine for Python using native AST.""" - + """AST analyzer enforcing strict compliance boundaries for native Python files.""" + def _is_mock(self, body: List[ast.stmt]) -> bool: - if not body: + if not body: return True - statements = body - if len(body) > 1 and isinstance(body[0], ast.Expr): - if isinstance(body[0].value, ast.Constant) and isinstance(body[0].value.value, str): - statements = body[1:] - if len(statements) == 1: - node = statements[0] - if isinstance(node, (ast.Pass, ast.Break, ast.Continue)): - return True - if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError": - return True - if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): - return True + first = body[0] + # Skip top docstrings cleanly + actual_statements = body[1:] if (isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant) and isinstance(first.value.value, str)) else body + if len(actual_statements) == 1: + node = actual_statements[0] + if isinstance(node, (ast.Pass, ast.Break, ast.Continue)): return True + if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError": return True + if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): return True return False def parse(self, file_path: str, source: str) -> Dict[str, Any]: tree = ast.parse(source, filename=file_path) - functions: Dict[str, Dict[str, Any]] = {} - classes: Set[str] = set() - imports: Set[str] = set() - + functions, classes, imports = {}, set(), set() + for node in ast.walk(tree): if isinstance(node, ast.ClassDef): classes.add(node.name) elif isinstance(node, ast.Import): - for alias in node.names: - imports.add(f"import {alias.name}") + for a in node.names: imports.add(f"import {a.name}") elif isinstance(node, ast.ImportFrom): - module = node.module if node.module else "" - for alias in node.names: - imports.add(f"from {module} import {alias.name}") + imports.add(f"from {node.module or ''} import {', '.join(a.name for a in node.names)}") elif isinstance(node, ast.FunctionDef): args = {arg.arg: ast.unparse(arg.annotation).strip() if arg.annotation else "None" for arg in node.args.args} - ret = ast.unparse(node.returns).strip() if node.returns else "None" decs = {ast.unparse(dec).strip().split('(')[0] for dec in node.decorator_list} span = (node.end_lineno - node.lineno + 1) if hasattr(node, "end_lineno") else 1 - functions[node.name] = { - "args": args, - "returns": ret, - "decorators": list(decs), - "lines": span, - "is_mock": self._is_mock(node.body) + "args": args, "returns": ast.unparse(node.returns).strip() if node.returns else "None", + "decorators": list(decs), "lines": span, "is_mock": self._is_mock(node.body) } - - return { - "classes": list(classes), - "functions": functions, - "imports": list(imports), - "total_lines": len(source.splitlines()) - } + return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(source.splitlines())} class BraceLanguageParser(BaseLanguageParser): - """ - Deterministic Lexical Scoping Engine for curly-brace languages. - Supports: C, C++, Rust, Go, Java, TypeScript, JavaScript, C# - """ - def __init__(self, extension: str): - self.ext = extension + """Flat lexical tracker using profile case arrays rather than nested conditional ladders.""" + + def __init__(self, ext: str): + self.profile = BRACE_PROFILES.get(ext, C_STYLE_DEFAULT) def parse(self, file_path: str, source: str) -> Dict[str, Any]: lines = source.splitlines() - functions: Dict[str, Dict[str, Any]] = {} - classes: Set[str] = set() - imports: Set[str] = set() - - # Compile lightweight lexical rules tailored to language families - if self.ext in {".rs"}: - import_pattern = re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);') - func_pattern = re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)\s*(<[^>]+>)?\s*\(([^)]*)\)') - class_pattern = re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)') - decorator_pattern = re.compile(r'^\s*#\[([^\]]+)\]') - elif self.ext in {".go"}: - import_pattern = re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")') - func_pattern = re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)\s*\(([^)]*)\)') - class_pattern = re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct') - decorator_pattern = re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)') - else: # C-style family: JS, TS, C, C++, Java, C# - import_pattern = re.compile(r'^\s*(?:import|require|#include)\s+.*') - func_pattern = re.compile(r'(?:public|private|protected|static|async|function)?\s*([a-zA-Z_0-9]+)\s*\(([^)]*)\)\s*(?::\s*[a-zA-Z_0-9<>\s|]+)?\s*\{?') - class_pattern = re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct)\s+([a-zA-Z_0-9]+)') - decorator_pattern = re.compile(r'^\s*@([a-zA-Z_0-9]+)') - + functions, classes, imports = {}, set(), set() brace_depth = 0 - active_func: Optional[str] = None - func_start_line = 0 - func_body_tokens: List[str] = [] - pending_decorators: List[str] = [] + active_fn, fn_start = None, 0 + body_lines, pending_decs = [], [] for idx, line in enumerate(lines, start=1): stripped = line.strip() - if not stripped: - continue + if not stripped: continue - # Check Global Imports - if brace_depth == 0 and import_pattern.match(line): - imports.add(stripped) - continue - - # Harvest Block Decorators / System Annotations - dec_match = decorator_pattern.match(line) - if brace_depth == 0 and dec_match: - pending_decorators.append(dec_match.group(1)) - continue - - # Identify Structural Class/Struct boundaries - class_match = class_pattern.match(line) - if brace_depth == 0 and class_match: - classes.add(class_match.group(1)) - - # Identify Function Signature Transitions + # Outer-scope tracking (Signatures, imports, decorators) if brace_depth == 0: - func_match = func_pattern.search(line) - if func_match and not any(k in stripped for k in {"if", "for", "while", "switch", "catch"}): - name = func_match.group(1) - if name not in {"class", "struct", "function", "return"}: - active_func = name - func_start_line = idx - func_body_tokens = [] + if self.profile["imp"].match(line): + imports.add(stripped) + continue + m_dec = self.profile["dec"].match(line) + if m_dec: + pending_decs.append(m_dec.group(1)) + continue + m_cls = self.profile["cls"].match(line) + if m_cls: + classes.add(m_cls.group(1)) + + m_fn = self.profile["fn"].search(line) + if m_fn and not any(k in stripped for k in {"if", "for", "while", "switch", "catch", "return"}): + name = m_fn.group(1) or (m_fn.group(2) if len(m_fn.groups()) > 1 else None) + if name and name not in {"class", "struct", "function", "return", "import", "fn", "fun"}: + active_fn, fn_start, body_lines = name, idx, [] - # Stream character token array data to track precise scope boundaries + # Character level scope processing for char in stripped: - if char == '{': + if char == '{': brace_depth += 1 - elif char == '}': + elif char == '}': brace_depth -= 1 - if brace_depth == 0 and active_func: - # Compute functional line boundaries safely - span = idx - func_start_line + 1 - body_str = "\n".join(func_body_tokens).lower() - - # Process multi-language laziness markers - is_mock = ( - len(func_body_tokens) <= 3 or - any(stub in body_str for stub in ["todo", "panic", "notimplemented", "throw new", "return null"]) - ) - - functions[active_func] = { - "args": {}, # Signature layout preservation mapped textually - "returns": "Inferred", - "decorators": pending_decorators.copy(), - "lines": span, - "is_mock": is_mock - } - active_func = None - pending_decorators.clear() + + if brace_depth == 0 and active_fn: + body_txt = "\n".join(body_lines).lower() + is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "throw ", "return null", "exit", ":"]) + functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": pending_decs.copy(), "lines": idx - fn_start + 1, "is_mock": is_mock} + active_fn, pending_decs = None, [] + break - if brace_depth > 0 and active_func: - func_body_tokens.append(stripped) + if brace_depth > 0 and active_fn: + body_lines.append(stripped) - return { - "classes": list(classes), - "functions": functions, - "imports": list(imports), - "total_lines": len(lines) - } + return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(lines)} -# ------------------------------------------------------------------------------ -# CORE WORKSPACE ENGINE REGISTRY & STATE PERSISTENCE -# ------------------------------------------------------------------------------ -class WorkspaceRegistry: - """Manages the in-memory analysis cache with underlying thread-safe JSON backup layers.""" + +class BlockEndLanguageParser(BaseLanguageParser): + """Monolithic logic parsing loop tracking keyword-delimited code architectures.""" + def __init__(self, ext: str): + self.profile = BLOCK_END_PROFILES[ext] + + def parse(self, file_path: str, source: str) -> Dict[str, Any]: + lines = source.splitlines() + functions = {} + scope_depth = 0 + active_fn, fn_start, body_lines = None, 0, [] + + for idx, line in enumerate(lines, start=1): + stripped = line.strip() + if not stripped or stripped.startswith(('#', '--', '//', "'", ';')): continue + + # Guard clause: Locate and step into function tracking bounds + if scope_depth == 0: + m_fn = self.profile["start"].match(line) + if m_fn: + active_fn, fn_start, scope_depth, body_lines = m_fn.group(1), idx, 1, [] + continue + + # In-scope execution processing + if self.profile["inc"].search(line): + scope_depth += 1 + + if self.profile["end"].match(stripped): + scope_depth -= 1 + if scope_depth == 0 and active_fn: + body_txt = "\n".join(body_lines).lower() + is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "nothing", "nil", "raise", "wscript.quit"]) + functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": [], "lines": idx - fn_start + 1, "is_mock": is_mock} + active_fn = None + continue + + if active_fn: + body_lines.append(stripped) + + return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)} + + +class SequentialScriptParser(BaseLanguageParser): + """Label and linear declarative parser for scripting layouts (Batch/SQL).""" + + def __init__(self, ext: str): + self.profile = SEQUENTIAL_PROFILES[ext] + + def parse(self, file_path: str, source: str) -> Dict[str, Any]: + lines = source.splitlines() + functions = {} + active_routine, routine_start, body_lines = None, 0, [] + + for idx, line in enumerate(lines, start=1): + stripped = line.strip() + if not stripped: continue + + m_rot = self.profile["start"].match(line) if ".bat" in file_path else self.profile["start"].search(line) + if m_rot: + if active_routine: + body_txt = "\n".join(body_lines).lower() + is_mock = len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"]) + functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start, "is_mock": is_mock} + active_routine, routine_start, body_lines = m_rot.group(1), idx, [] + continue + + if active_routine: + body_lines.append(stripped) + if self.profile["end"].match(stripped): + body_txt = "\n".join(body_lines).lower() + is_mock = len(body_lines) <= 2 or any(mk in body_txt for mk in self.profile["stubs"]) + functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start + 1, "is_mock": is_mock} + active_routine = None + + if active_routine: + body_txt = "\n".join(body_lines).lower() + functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": len(lines) - routine_start + 1, "is_mock": len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"])} + + return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)} + + +class LineIndentationParser(BaseLanguageParser): + """Layout signature parsing module capturing indentation-sensitive language architectures.""" + + def __init__(self, ext: str): + self.profile = INDENTATION_PROFILES[ext] + + def parse(self, file_path: str, source: str) -> Dict[str, Any]: + lines = source.splitlines() + functions = {} + active_name, fn_start, body_lines = None, 0, [] + + for idx, line in enumerate(lines, start=1): + if not line.strip(): continue + m = self.profile["sig"].match(line) + if m: + name = m.group(1) or (m.group(2) if len(m.groups()) > 1 else None) + if name and name != active_name: + if active_name: + body_txt = "\n".join(body_lines).lower() + functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": idx - fn_start, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])} + active_name, fn_start, body_lines = name, idx, [] + if active_name: + body_lines.append(line) + + if active_name: + body_txt = "\n".join(body_lines).lower() + functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": len(lines) - fn_start + 1, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])} + + return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)} + +# ============================================================================== +# WORKSPACE ROUTING MANAGEMENT +# ============================================================================== + +class WorkspaceRegistry: @staticmethod def get_parser(file_path: str) -> Optional[BaseLanguageParser]: - _, ext = os.path.splitext(file_path) - if ext == ".py": - return PythonASTParser() - elif ext in {".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".c", ".cpp", ".h", ".hpp", ".java", ".cs"}: + _, ext = os.path.splitext(file_path.lower()) + if ext == ".py": return PythonASTParser() + if ext in INDENTATION_PROFILES: return LineIndentationParser(ext) + if ext in BLOCK_END_PROFILES: return BlockEndLanguageParser(ext) + if ext in SEQUENTIAL_PROFILES: return SequentialScriptParser(ext) + + # Explicit or generic fallback for curly-brace structures + if ext in BRACE_PROFILES or ext in {".js", ".jsx", ".ts", ".tsx", ".c", ".cpp", ".h", ".hpp", ".java", ".cs", ".ec", ".eh", ".pike", ".pmod", ".kt", ".kts", ".swift", ".dart", ".r"}: return BraceLanguageParser(ext) return None @@ -222,174 +401,127 @@ class WorkspaceRegistry: target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME) if os.path.exists(target): try: - with open(target, "r", encoding="utf-8") as f: - return json.load(f) - except: - pass + with open(target, "r", encoding="utf-8") as f: return json.load(f) + except: pass return {} @classmethod def save_state(cls, root_path: str, state: Dict[str, Any]) -> None: target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME) try: - with open(target, "w", encoding="utf-8") as f: - json.dump(state, f, indent=2) - except: - pass + with open(target, "w", encoding="utf-8") as f: json.dump(state, f, indent=2) + except: pass def _resolve_workspace_files(target_path: str) -> List[str]: abs_path = os.path.abspath(target_path) if os.path.isfile(abs_path): return [abs_path] if WorkspaceRegistry.get_parser(abs_path) else [] - resolved = [] for root, dirs, files in os.walk(abs_path): - # Destructively filter directory scans to bypass noise dirs[:] = [d for d in dirs if d not in GLOBAL_IGNORE_DIRS] for file in files: - full_path = os.path.join(root, file) - if WorkspaceRegistry.get_parser(full_path): - resolved.append(full_path) + fp = os.path.join(root, file) + if WorkspaceRegistry.get_parser(fp): resolved.append(fp) return resolved -# ------------------------------------------------------------------------------ -# EXPOSED MCP PRODUCTION GATE TOOLS -# ------------------------------------------------------------------------------ +# ============================================================================== +# EXPOSED PROTOCOL TOOLS +# ============================================================================== + @mcp.tool() async def snapshot_baseline(target_path: str) -> str: - """Snapshots structural footprints for all matching code assets across the repository tree.""" + """Snapshots structural footprints across all matching modules inside the designated target directory path.""" base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path)) files = _resolve_workspace_files(target_path) - if not files: - return f"? BASELINE_FAILURE: No supported source languages discovered at target path: {target_path}" + if not files: return f"? BASELINE_FAILURE: No supported source profiles matched under: {target_path}" - current_registry = WorkspaceRegistry.load_state(base_dir) + curr_reg = WorkspaceRegistry.load_state(base_dir) cataloged = 0 - - for file_path in files: - parser = WorkspaceRegistry.get_parser(file_path) - if not parser: - continue + for fp in files: + parser = WorkspaceRegistry.get_parser(fp) + if not parser: continue try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: - content = f.read() - current_registry[file_path] = parser.parse(file_path, content) + with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read() + curr_reg[fp] = parser.parse(fp, content) cataloged += 1 except Exception as e: - return f"? BASELINE_FAILURE: Internal parsing failure on '{os.path.basename(file_path)}': {str(e)}" + return f"? BASELINE_FAILURE: Parsing breakdown in '{os.path.basename(fp)}': {str(e)}" - WorkspaceRegistry.save_state(base_dir, current_registry) - return f"? BASELINE_LOCKED: Workspace matrix secured across {cataloged} source modules." + WorkspaceRegistry.save_state(base_dir, curr_reg) + return f"? BASELINE_LOCKED: High-fidelity profiles tracked across {cataloged} repository points." @mcp.tool() async def audit_revision(target_path: str) -> str: - """Audits modified paths, enforcing type, import, and logic limits across all languages.""" + """Audits modified workspace targets against baseline criteria to block truncated logic streams or laziness traps.""" base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path)) registry = WorkspaceRegistry.load_state(base_dir) - - if not registry: - return f"? AUDIT_FAILED: Persistent baseline marker layer is empty or missing. Trigger snapshots first." + if not registry: return "? AUDIT_FAILED: Metric profile layer empty. Run baseline snapshot routines." files = _resolve_workspace_files(target_path) drift_errors = [] - # Detect unexpected missing modules from tracked baselines for tracked_file in list(registry.keys()): if tracked_file.startswith(base_dir) and not os.path.exists(tracked_file): - drift_errors.append(f"Module Dropped: Tracked source '{os.path.basename(tracked_file)}' was deleted.") + drift_errors.append(f"Module Dropped: Baseline module file '{os.path.basename(tracked_file)}' was deleted.") - for file_path in files: - if file_path not in registry: - continue - - baseline = registry[file_path] - filename = os.path.basename(file_path) - parser = WorkspaceRegistry.get_parser(file_path) - - if not parser: - continue + for fp in files: + if fp not in registry: continue + baseline, filename = registry[fp], os.path.basename(fp) + parser = WorkspaceRegistry.get_parser(fp) + if not parser: continue try: - with open(file_path, "r", encoding="utf-8", errors="ignore") as f: - content = f.read() - current = parser.parse(file_path, content) + with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read() + current = parser.parse(fp, content) except Exception as e: - drift_errors.append(f"Syntax Error [{filename}]: File compilation or parse block failure: {str(e)}") + drift_errors.append(f"Syntax Error [{filename}]: Parsing crash: {str(e)}") continue - # 1. Audit Global Package/Dependency Drops - missing_imports = set(baseline["imports"]) - set(current["imports"]) - if missing_imports: - drift_errors.append(f"[{filename}] Dropped Dependencies: {list(missing_imports)}") + missing_imports = set(baseline.get("imports", [])) - set(current.get("imports", [])) + if missing_imports: drift_errors.append(f"[{filename}] Missing Dependencies: {list(missing_imports)}") - # 2. Audit OOP Object Integrity Drops - missing_classes = set(baseline["classes"]) - set(current["classes"]) - if missing_classes: - drift_errors.append(f"[{filename}] Core Structs/Classes Missing: {list(missing_classes)}") + missing_classes = set(baseline.get("classes", [])) - set(current.get("classes", [])) + if missing_classes: drift_errors.append(f"[{filename}] Objects/Classes Deleted: {list(missing_classes)}") - # 3. Audit Function Signatures, Logic Truncation, and Laziness Traps - for func_name, b_meta in baseline["functions"].items(): - if func_name not in current["functions"]: - drift_errors.append(f"[{filename}] Missing Logic Block: Function '{func_name}' was skipped.") + for fn_name, b_meta in baseline.get("functions", {}).items(): + if fn_name not in current.get("functions", {}): + drift_errors.append(f"[{filename}] Logic Block Dropped: Routine block '{fn_name}' missing.") continue - c_meta = current["functions"][func_name] - - # Enforce systemic decorator and annotation checks - missing_decs = set(b_meta["decorators"]) - set(c_meta["decorators"]) - if missing_decs: - drift_errors.append(f"[{filename}] Stripped Annotations on '{func_name}': Dropped {list(missing_decs)}") - - # Enforce multi-language laziness checks - if c_meta["is_mock"] and not b_meta["is_mock"]: - drift_errors.append(f"[{filename}] Laziness Trap Triggered in '{func_name}': Substituted with placeholder/throw/todo stub.") - - # Enforce physical compression check bounds - if b_meta["lines"] > 4 and c_meta["lines"] <= (b_meta["lines"] * 0.5): - drift_errors.append(f"[{filename}] Truncation Trap Triggered in '{func_name}': Scope collapsed by >50% ({b_meta['lines']} -> {c_meta['lines']} lines).") + c_meta = current["functions"][fn_name] + if set(b_meta.get("decorators", [])) - set(c_meta.get("decorators", [])): + drift_errors.append(f"[{filename}] Stripped Framework Annotation on '{fn_name}'") + if c_meta.get("is_mock") and not b_meta.get("is_mock"): + drift_errors.append(f"? [{filename}] Laziness Trap Blocked in '{fn_name}': Logic substituted with stub.") + if b_meta.get("lines", 0) > 4 and c_meta.get("lines", 0) <= (b_meta.get("lines", 0) * 0.5): + drift_errors.append(f"? [{filename}] Truncation Trap Blocked in '{fn_name}': Context block collapsed >50%.") if drift_errors: - return ( - f"? CRITICAL_COMPLETENESS_DRIFT: Regression detected during mutation check!\n" - + "\n".join(f" - {err}" for err in drift_errors) - + "\nAction: Halt code ingestion. Force full context reconstruction." - ) - - return f"? REVISION_PASSED: Workspace architectures, dependencies, and functional bounds are verified." + return f"? CRITICAL_COMPLETENESS_DRIFT: Logic degradation intercepted!\n" + "\n".join(f" - {err}" for err in drift_errors) + return "? REVISION_PASSED: Multi-language layout boundaries remain complete and intact." @mcp.tool() async def run_completeness_diagnostic() -> str: - """Verifies parsing and capture capabilities across mixed multi-language test fixtures.""" - fixture_py = "drift_test.py" - fixture_rs = "drift_test.rs" - - py_v1 = "@mcp.tool()\ndef calculate_hash(seed: str) -> None:\n print(seed)\n" - rs_v1 = "#[inline]\npub fn calculate_hash(seed: &str) {\n println!(\"{}\", seed);\n}" - + """Runs verification pipelines simulating code erosion in script engines and structural blocks.""" + f_bat, f_lua = "drift_test.bat", "drift_test.lua" + bat_v1, lua_v1 = ":build_array\necho Processing...\ngoto :eof\n", "function calculate_matrix()\n return 1\nend\n" try: - with open(fixture_py, "w") as f: f.write(py_v1) - with open(fixture_rs, "w") as f: f.write(rs_v1) - + with open(f_bat, "w") as f: f.write(bat_v1) + with open(f_lua, "w") as f: f.write(lua_v1) await snapshot_baseline(".") - - # Ingest lazy mutations across language families - with open(fixture_py, "w") as f: f.write("def calculate_hash(seed):\n pass\n") - with open(fixture_rs, "w") as f: f.write("pub fn calculate_hash(seed: &str) {\n todo!();\n}") - - audit_result = await audit_revision(".") - if "Stripped Annotations" in audit_result and "Laziness Trap Triggered" in audit_result: - return "? DIAGNOSTIC_PASSED: Multi-language driver framework successfully trapped Python AST and Rust lexical code-erosion boundaries." - return f"? DIAGNOSTIC_FAILED: Structural deviations bypassed checkpoints. Result:\n{audit_result}" - + with open(f_bat, "w") as f: f.write(":build_array\nrem TODO\n") + with open(f_lua, "w") as f: f.write("function calculate_matrix()\n -- todo\nend\n") + + res = await audit_revision(".") + if "Laziness Trap" in res: return "? DIAGNOSTIC_PASSED: Flattened polymorphic profile framework trapped all code erosion simulation vectors." + return f"? DIAGNOSTIC_FAILED: Structural checkpoints bypassed. Output:\n{res}" finally: - for path in [fixture_py, fixture_rs, STATE_FILE_NAME]: - if os.path.exists(path): - os.remove(path) - + for p in [f_bat, f_lua, STATE_FILE_NAME]: + if os.path.exists(p): os.remove(p) if __name__ == "__main__": mcp.run(transport="stdio") \ No newline at end of file