updated to case logic, added more languages

This commit is contained in:
Jeremy Anderson 2026-06-10 23:27:00 -04:00
parent b80e653c34
commit 8d9e0bde1a
1 changed files with 385 additions and 253 deletions

View File

@ -13,207 +13,386 @@ import os
import json
import re
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Set, Optional
from typing import Dict, Any, List, Set, Optional, Tuple
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("MCP-Drift-State-Tracker")
# Global persistence config
STATE_FILE_NAME = ".mcp_drift_state.json"
# Global exclusion profiles for high-performance directory sweeps
GLOBAL_IGNORE_DIRS = {
".git", ".venv", "venv", "__pycache__", "node_modules", "target",
"dist", "build", "out", ".cargo", ".rustup", "obj", "bin"
"dist", "build", "out", ".cargo", ".rustup", "obj", "bin", ".idea"
}
# ------------------------------------------------------------------------------
# PARSER INTERFACE & DRIVER IMPLEMENTATIONS
# ------------------------------------------------------------------------------
class BaseLanguageParser(ABC):
"""Abstract Base Class defining the protocol for structural language parsers."""
# ==============================================================================
# DATA-DRIVEN LEXICAL PROFILE TABLES ("CASE ARRAYS")
# ==============================================================================
# C-Family Shared Baseline Pattern
C_STYLE_DEFAULT = {
"imp": re.compile(r'^\s*(?:import|require|#include)\s+.*'),
"fn": re.compile(r'(?:public|private|protected|static|async|fun|func|function)?\s*([a-zA-Z_0-9]+)\s*\([^)]*\)\s*(?:\{|->|:)?'),
"cls": re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct|enum)\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*@([a-zA-Z_0-9]+)')
}
BRACE_PROFILES: Dict[str, Dict[str, re.Pattern]] = {
".rs": {
"imp": re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);'),
"fn": re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)'),
"cls": re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*#\[([^\]]+)\]')
},
".go": {
"imp": re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")'),
"fn": re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)'),
"cls": re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct'),
"dec": re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)')
},
".php": {
"imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'),
"fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'),
"cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*<<([^>>]+)>>')
},
".hack": {
"imp": re.compile(r'^\s*(?:use|require|include)(?:\s+once)?\s+([^;]+);'),
"fn": re.compile(r'(?:public|private|protected|static)?\s*function\s+([a-zA-Z_0-9]+)'),
"cls": re.compile(r'^\s*(?:abstract\s+)?class\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*<<([^>>]+)>>')
},
".sh": {
"imp": re.compile(r'^\s*(?:\.|source)\s+.*'),
"fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'),
"cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)')
},
".zsh": {
"imp": re.compile(r'^\s*(?:\.|source)\s+.*'),
"fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'),
"cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)')
},
".ps1": {
"imp": re.compile(r'^\s*(?:\.|source)\s+.*'),
"fn": re.compile(r'(?:function\s+)?([a-zA-Z_0-9\-]+)\s*\(\s*\)\s*\{?|^\s*function\s+([a-zA-Z_0-9\-]+)'),
"cls": re.compile(r'^\s*class\s+([a-zA-Z_0-9]+)'),
"dec": re.compile(r'^\s*#\s*@([a-zA-Z_0-9]+)')
}
}
BLOCK_END_PROFILES: Dict[str, Dict[str, Any]] = {
".jl": {
"start": re.compile(r'^\s*(?:function|macro|mutable\s+struct)\s+([a-zA-Z_0-9!]+)'),
"end": re.compile(r'^\s*end\b'),
"inc": re.compile(r'\b(if|for|while|let|do|try|quote)\b')
},
".lua": {
"start": re.compile(r'^\s*(?:local\s+)?function\s+([a-zA-Z_0-9\.:]+)'),
"end": re.compile(r'^\s*end\b'),
"inc": re.compile(r'\b(if|for|while|do)\b')
},
".ex": {
"start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'),
"end": re.compile(r'^\s*end\b'),
"inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b')
},
".exs": {
"start": re.compile(r'^\s*(?:def|defp|defmacro)\s+([a-zA-Z_0-9!]+)'),
"end": re.compile(r'^\s*end\b'),
"inc": re.compile(r'\b(if|case|cond|unless|try)\b.*\bdo\b')
},
".adb": {
"start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'),
"end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'),
"inc": re.compile(r'\b(if|loop|case|begin)\b')
},
".ads": {
"start": re.compile(r'^\s*(?:procedure|function)\s+([a-zA-Z_0-9]+)'),
"end": re.compile(r'^\s*end\s+[a-zA-Z_0-9]+;'),
"inc": re.compile(r'\b(if|loop|case|begin)\b')
},
".au3": {
"start": re.compile(r'^\s*(?:Func)\s+([a-zA-Z_0-9]+)'),
"end": re.compile(r'^\s*EndFunc\b'),
"inc": re.compile(r'^\s*(If|While|For|Select|Switch)\b')
},
".vbs": {
"start": re.compile(r'^\s*(?:Function|Sub)\s+([a-zA-Z_0-9]+)', re.IGNORECASE),
"end": re.compile(r'^\s*End\s+(?:Function|Sub)\b', re.IGNORECASE),
"inc": re.compile(r'^\s*(If|For|While|Do)\b', re.IGNORECASE)
}
}
SEQUENTIAL_PROFILES: Dict[str, Dict[str, Any]] = {
".bat": {
"start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'),
"end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE),
"stubs": ["rem todo", "rem fixme", "echo placeholder"]
},
".cmd": {
"start": re.compile(r'^\s*:([a-zA-Z_0-9\-]+)'),
"end": re.compile(r'^\s*(?:goto\s+:eof|exit\b)', re.IGNORECASE),
"stubs": ["rem todo", "rem fixme", "echo placeholder"]
},
".sql": {
"start": re.compile(r'(?:create\s+(?:or\s+replace\s+)?(?:procedure|function|view))\s+([a-zA-Z_0-9\.]+)', re.IGNORECASE),
"end": re.compile(r'^\s*end\s*[a-zA-Z_0-9]*\s*;', re.IGNORECASE),
"stubs": ["-- todo", "-- fixme", "return null", "raise notice"]
}
}
INDENTATION_PROFILES: Dict[str, Dict[str, Any]] = {
".mojo": {
"sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('),
"stubs": ["pass", "raise notimplementederror", "todo"]
},
".?": {
"sig": re.compile(r'^\s*(?:fn|def)\s+([a-zA-Z_0-9]+)\s*\('),
"stubs": ["pass", "raise notimplementederror", "todo"]
},
".hs": {
"sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='),
"stubs": ["undefined", "todo", "error "]
},
".elm": {
"sig": re.compile(r'^([a-zA-Z_0-9]+)\s+::\s+.*|^\s*([a-zA-Z_0-9]+)\s*='),
"stubs": ["undefined", "todo", "error "]
}
}
# ==============================================================================
# COMPONENT PARSER ARCHITECTURES (CLEAN, FLAT DRIVERS)
# ==============================================================================
class BaseLanguageParser(ABC):
@abstractmethod
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
"""Parses source content into a unified structural inventory dictionary."""
pass
class PythonASTParser(BaseLanguageParser):
"""Deep structural inspection engine for Python using native AST."""
"""AST analyzer enforcing strict compliance boundaries for native Python files."""
def _is_mock(self, body: List[ast.stmt]) -> bool:
if not body:
return True
statements = body
if len(body) > 1 and isinstance(body[0], ast.Expr):
if isinstance(body[0].value, ast.Constant) and isinstance(body[0].value.value, str):
statements = body[1:]
if len(statements) == 1:
node = statements[0]
if isinstance(node, (ast.Pass, ast.Break, ast.Continue)):
return True
if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError":
return True
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str):
return True
first = body[0]
# Skip top docstrings cleanly
actual_statements = body[1:] if (isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant) and isinstance(first.value.value, str)) else body
if len(actual_statements) == 1:
node = actual_statements[0]
if isinstance(node, (ast.Pass, ast.Break, ast.Continue)): return True
if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError": return True
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): return True
return False
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
tree = ast.parse(source, filename=file_path)
functions: Dict[str, Dict[str, Any]] = {}
classes: Set[str] = set()
imports: Set[str] = set()
functions, classes, imports = {}, set(), set()
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.add(node.name)
elif isinstance(node, ast.Import):
for alias in node.names:
imports.add(f"import {alias.name}")
for a in node.names: imports.add(f"import {a.name}")
elif isinstance(node, ast.ImportFrom):
module = node.module if node.module else ""
for alias in node.names:
imports.add(f"from {module} import {alias.name}")
imports.add(f"from {node.module or ''} import {', '.join(a.name for a in node.names)}")
elif isinstance(node, ast.FunctionDef):
args = {arg.arg: ast.unparse(arg.annotation).strip() if arg.annotation else "None" for arg in node.args.args}
ret = ast.unparse(node.returns).strip() if node.returns else "None"
decs = {ast.unparse(dec).strip().split('(')[0] for dec in node.decorator_list}
span = (node.end_lineno - node.lineno + 1) if hasattr(node, "end_lineno") else 1
functions[node.name] = {
"args": args,
"returns": ret,
"decorators": list(decs),
"lines": span,
"is_mock": self._is_mock(node.body)
}
return {
"classes": list(classes),
"functions": functions,
"imports": list(imports),
"total_lines": len(source.splitlines())
"args": args, "returns": ast.unparse(node.returns).strip() if node.returns else "None",
"decorators": list(decs), "lines": span, "is_mock": self._is_mock(node.body)
}
return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(source.splitlines())}
class BraceLanguageParser(BaseLanguageParser):
"""
Deterministic Lexical Scoping Engine for curly-brace languages.
Supports: C, C++, Rust, Go, Java, TypeScript, JavaScript, C#
"""
def __init__(self, extension: str):
self.ext = extension
"""Flat lexical tracker using profile case arrays rather than nested conditional ladders."""
def __init__(self, ext: str):
self.profile = BRACE_PROFILES.get(ext, C_STYLE_DEFAULT)
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
lines = source.splitlines()
functions: Dict[str, Dict[str, Any]] = {}
classes: Set[str] = set()
imports: Set[str] = set()
# Compile lightweight lexical rules tailored to language families
if self.ext in {".rs"}:
import_pattern = re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);')
func_pattern = re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)\s*(<[^>]+>)?\s*\(([^)]*)\)')
class_pattern = re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)')
decorator_pattern = re.compile(r'^\s*#\[([^\]]+)\]')
elif self.ext in {".go"}:
import_pattern = re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")')
func_pattern = re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)\s*\(([^)]*)\)')
class_pattern = re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct')
decorator_pattern = re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)')
else: # C-style family: JS, TS, C, C++, Java, C#
import_pattern = re.compile(r'^\s*(?:import|require|#include)\s+.*')
func_pattern = re.compile(r'(?:public|private|protected|static|async|function)?\s*([a-zA-Z_0-9]+)\s*\(([^)]*)\)\s*(?::\s*[a-zA-Z_0-9<>\s|]+)?\s*\{?')
class_pattern = re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct)\s+([a-zA-Z_0-9]+)')
decorator_pattern = re.compile(r'^\s*@([a-zA-Z_0-9]+)')
functions, classes, imports = {}, set(), set()
brace_depth = 0
active_func: Optional[str] = None
func_start_line = 0
func_body_tokens: List[str] = []
pending_decorators: List[str] = []
active_fn, fn_start = None, 0
body_lines, pending_decs = [], []
for idx, line in enumerate(lines, start=1):
stripped = line.strip()
if not stripped:
continue
if not stripped: continue
# Check Global Imports
if brace_depth == 0 and import_pattern.match(line):
# Outer-scope tracking (Signatures, imports, decorators)
if brace_depth == 0:
if self.profile["imp"].match(line):
imports.add(stripped)
continue
# Harvest Block Decorators / System Annotations
dec_match = decorator_pattern.match(line)
if brace_depth == 0 and dec_match:
pending_decorators.append(dec_match.group(1))
m_dec = self.profile["dec"].match(line)
if m_dec:
pending_decs.append(m_dec.group(1))
continue
m_cls = self.profile["cls"].match(line)
if m_cls:
classes.add(m_cls.group(1))
# Identify Structural Class/Struct boundaries
class_match = class_pattern.match(line)
if brace_depth == 0 and class_match:
classes.add(class_match.group(1))
m_fn = self.profile["fn"].search(line)
if m_fn and not any(k in stripped for k in {"if", "for", "while", "switch", "catch", "return"}):
name = m_fn.group(1) or (m_fn.group(2) if len(m_fn.groups()) > 1 else None)
if name and name not in {"class", "struct", "function", "return", "import", "fn", "fun"}:
active_fn, fn_start, body_lines = name, idx, []
# Identify Function Signature Transitions
if brace_depth == 0:
func_match = func_pattern.search(line)
if func_match and not any(k in stripped for k in {"if", "for", "while", "switch", "catch"}):
name = func_match.group(1)
if name not in {"class", "struct", "function", "return"}:
active_func = name
func_start_line = idx
func_body_tokens = []
# Stream character token array data to track precise scope boundaries
# Character level scope processing
for char in stripped:
if char == '{':
brace_depth += 1
elif char == '}':
brace_depth -= 1
if brace_depth == 0 and active_func:
# Compute functional line boundaries safely
span = idx - func_start_line + 1
body_str = "\n".join(func_body_tokens).lower()
# Process multi-language laziness markers
is_mock = (
len(func_body_tokens) <= 3 or
any(stub in body_str for stub in ["todo", "panic", "notimplemented", "throw new", "return null"])
)
if brace_depth == 0 and active_fn:
body_txt = "\n".join(body_lines).lower()
is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "throw ", "return null", "exit", ":"])
functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": pending_decs.copy(), "lines": idx - fn_start + 1, "is_mock": is_mock}
active_fn, pending_decs = None, []
break
functions[active_func] = {
"args": {}, # Signature layout preservation mapped textually
"returns": "Inferred",
"decorators": pending_decorators.copy(),
"lines": span,
"is_mock": is_mock
}
active_func = None
pending_decorators.clear()
if brace_depth > 0 and active_fn:
body_lines.append(stripped)
if brace_depth > 0 and active_func:
func_body_tokens.append(stripped)
return {"classes": list(classes), "functions": functions, "imports": list(imports), "total_lines": len(lines)}
return {
"classes": list(classes),
"functions": functions,
"imports": list(imports),
"total_lines": len(lines)
}
# ------------------------------------------------------------------------------
# CORE WORKSPACE ENGINE REGISTRY & STATE PERSISTENCE
# ------------------------------------------------------------------------------
class BlockEndLanguageParser(BaseLanguageParser):
"""Monolithic logic parsing loop tracking keyword-delimited code architectures."""
def __init__(self, ext: str):
self.profile = BLOCK_END_PROFILES[ext]
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
lines = source.splitlines()
functions = {}
scope_depth = 0
active_fn, fn_start, body_lines = None, 0, []
for idx, line in enumerate(lines, start=1):
stripped = line.strip()
if not stripped or stripped.startswith(('#', '--', '//', "'", ';')): continue
# Guard clause: Locate and step into function tracking bounds
if scope_depth == 0:
m_fn = self.profile["start"].match(line)
if m_fn:
active_fn, fn_start, scope_depth, body_lines = m_fn.group(1), idx, 1, []
continue
# In-scope execution processing
if self.profile["inc"].search(line):
scope_depth += 1
if self.profile["end"].match(stripped):
scope_depth -= 1
if scope_depth == 0 and active_fn:
body_txt = "\n".join(body_lines).lower()
is_mock = len(body_lines) <= 2 or any(s in body_txt for s in ["todo", "panic", "notimplemented", "nothing", "nil", "raise", "wscript.quit"])
functions[active_fn] = {"args": {}, "returns": "Inferred", "decorators": [], "lines": idx - fn_start + 1, "is_mock": is_mock}
active_fn = None
continue
if active_fn:
body_lines.append(stripped)
return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)}
class SequentialScriptParser(BaseLanguageParser):
"""Label and linear declarative parser for scripting layouts (Batch/SQL)."""
def __init__(self, ext: str):
self.profile = SEQUENTIAL_PROFILES[ext]
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
lines = source.splitlines()
functions = {}
active_routine, routine_start, body_lines = None, 0, []
for idx, line in enumerate(lines, start=1):
stripped = line.strip()
if not stripped: continue
m_rot = self.profile["start"].match(line) if ".bat" in file_path else self.profile["start"].search(line)
if m_rot:
if active_routine:
body_txt = "\n".join(body_lines).lower()
is_mock = len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"])
functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start, "is_mock": is_mock}
active_routine, routine_start, body_lines = m_rot.group(1), idx, []
continue
if active_routine:
body_lines.append(stripped)
if self.profile["end"].match(stripped):
body_txt = "\n".join(body_lines).lower()
is_mock = len(body_lines) <= 2 or any(mk in body_txt for mk in self.profile["stubs"])
functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": idx - routine_start + 1, "is_mock": is_mock}
active_routine = None
if active_routine:
body_txt = "\n".join(body_lines).lower()
functions[active_routine] = {"args": {}, "returns": "Routine", "decorators": [], "lines": len(lines) - routine_start + 1, "is_mock": len(body_lines) <= 1 or any(mk in body_txt for mk in self.profile["stubs"])}
return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)}
class LineIndentationParser(BaseLanguageParser):
"""Layout signature parsing module capturing indentation-sensitive language architectures."""
def __init__(self, ext: str):
self.profile = INDENTATION_PROFILES[ext]
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
lines = source.splitlines()
functions = {}
active_name, fn_start, body_lines = None, 0, []
for idx, line in enumerate(lines, start=1):
if not line.strip(): continue
m = self.profile["sig"].match(line)
if m:
name = m.group(1) or (m.group(2) if len(m.groups()) > 1 else None)
if name and name != active_name:
if active_name:
body_txt = "\n".join(body_lines).lower()
functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": idx - fn_start, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])}
active_name, fn_start, body_lines = name, idx, []
if active_name:
body_lines.append(line)
if active_name:
body_txt = "\n".join(body_lines).lower()
functions[active_name] = {"args": {}, "returns": "Declarative", "decorators": [], "lines": len(lines) - fn_start + 1, "is_mock": any(ms in body_txt for ms in self.profile["stubs"])}
return {"classes": [], "functions": functions, "imports": [], "total_lines": len(lines)}
# ==============================================================================
# WORKSPACE ROUTING MANAGEMENT
# ==============================================================================
class WorkspaceRegistry:
"""Manages the in-memory analysis cache with underlying thread-safe JSON backup layers."""
@staticmethod
def get_parser(file_path: str) -> Optional[BaseLanguageParser]:
_, ext = os.path.splitext(file_path)
if ext == ".py":
return PythonASTParser()
elif ext in {".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".c", ".cpp", ".h", ".hpp", ".java", ".cs"}:
_, ext = os.path.splitext(file_path.lower())
if ext == ".py": return PythonASTParser()
if ext in INDENTATION_PROFILES: return LineIndentationParser(ext)
if ext in BLOCK_END_PROFILES: return BlockEndLanguageParser(ext)
if ext in SEQUENTIAL_PROFILES: return SequentialScriptParser(ext)
# Explicit or generic fallback for curly-brace structures
if ext in BRACE_PROFILES or ext in {".js", ".jsx", ".ts", ".tsx", ".c", ".cpp", ".h", ".hpp", ".java", ".cs", ".ec", ".eh", ".pike", ".pmod", ".kt", ".kts", ".swift", ".dart", ".r"}:
return BraceLanguageParser(ext)
return None
@ -222,174 +401,127 @@ class WorkspaceRegistry:
target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME)
if os.path.exists(target):
try:
with open(target, "r", encoding="utf-8") as f:
return json.load(f)
except:
pass
with open(target, "r", encoding="utf-8") as f: return json.load(f)
except: pass
return {}
@classmethod
def save_state(cls, root_path: str, state: Dict[str, Any]) -> None:
target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME)
try:
with open(target, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)
except:
pass
with open(target, "w", encoding="utf-8") as f: json.dump(state, f, indent=2)
except: pass
def _resolve_workspace_files(target_path: str) -> List[str]:
abs_path = os.path.abspath(target_path)
if os.path.isfile(abs_path):
return [abs_path] if WorkspaceRegistry.get_parser(abs_path) else []
resolved = []
for root, dirs, files in os.walk(abs_path):
# Destructively filter directory scans to bypass noise
dirs[:] = [d for d in dirs if d not in GLOBAL_IGNORE_DIRS]
for file in files:
full_path = os.path.join(root, file)
if WorkspaceRegistry.get_parser(full_path):
resolved.append(full_path)
fp = os.path.join(root, file)
if WorkspaceRegistry.get_parser(fp): resolved.append(fp)
return resolved
# ------------------------------------------------------------------------------
# EXPOSED MCP PRODUCTION GATE TOOLS
# ------------------------------------------------------------------------------
# ==============================================================================
# EXPOSED PROTOCOL TOOLS
# ==============================================================================
@mcp.tool()
async def snapshot_baseline(target_path: str) -> str:
"""Snapshots structural footprints for all matching code assets across the repository tree."""
"""Snapshots structural footprints across all matching modules inside the designated target directory path."""
base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path))
files = _resolve_workspace_files(target_path)
if not files:
return f"? BASELINE_FAILURE: No supported source languages discovered at target path: {target_path}"
if not files: return f"? BASELINE_FAILURE: No supported source profiles matched under: {target_path}"
current_registry = WorkspaceRegistry.load_state(base_dir)
curr_reg = WorkspaceRegistry.load_state(base_dir)
cataloged = 0
for file_path in files:
parser = WorkspaceRegistry.get_parser(file_path)
if not parser:
continue
for fp in files:
parser = WorkspaceRegistry.get_parser(fp)
if not parser: continue
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
current_registry[file_path] = parser.parse(file_path, content)
with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read()
curr_reg[fp] = parser.parse(fp, content)
cataloged += 1
except Exception as e:
return f"? BASELINE_FAILURE: Internal parsing failure on '{os.path.basename(file_path)}': {str(e)}"
return f"? BASELINE_FAILURE: Parsing breakdown in '{os.path.basename(fp)}': {str(e)}"
WorkspaceRegistry.save_state(base_dir, current_registry)
return f"? BASELINE_LOCKED: Workspace matrix secured across {cataloged} source modules."
WorkspaceRegistry.save_state(base_dir, curr_reg)
return f"? BASELINE_LOCKED: High-fidelity profiles tracked across {cataloged} repository points."
@mcp.tool()
async def audit_revision(target_path: str) -> str:
"""Audits modified paths, enforcing type, import, and logic limits across all languages."""
"""Audits modified workspace targets against baseline criteria to block truncated logic streams or laziness traps."""
base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path))
registry = WorkspaceRegistry.load_state(base_dir)
if not registry:
return f"? AUDIT_FAILED: Persistent baseline marker layer is empty or missing. Trigger snapshots first."
if not registry: return "? AUDIT_FAILED: Metric profile layer empty. Run baseline snapshot routines."
files = _resolve_workspace_files(target_path)
drift_errors = []
# Detect unexpected missing modules from tracked baselines
for tracked_file in list(registry.keys()):
if tracked_file.startswith(base_dir) and not os.path.exists(tracked_file):
drift_errors.append(f"Module Dropped: Tracked source '{os.path.basename(tracked_file)}' was deleted.")
drift_errors.append(f"Module Dropped: Baseline module file '{os.path.basename(tracked_file)}' was deleted.")
for file_path in files:
if file_path not in registry:
continue
baseline = registry[file_path]
filename = os.path.basename(file_path)
parser = WorkspaceRegistry.get_parser(file_path)
if not parser:
continue
for fp in files:
if fp not in registry: continue
baseline, filename = registry[fp], os.path.basename(fp)
parser = WorkspaceRegistry.get_parser(fp)
if not parser: continue
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
current = parser.parse(file_path, content)
with open(fp, "r", encoding="utf-8", errors="ignore") as f: content = f.read()
current = parser.parse(fp, content)
except Exception as e:
drift_errors.append(f"Syntax Error [{filename}]: File compilation or parse block failure: {str(e)}")
drift_errors.append(f"Syntax Error [{filename}]: Parsing crash: {str(e)}")
continue
# 1. Audit Global Package/Dependency Drops
missing_imports = set(baseline["imports"]) - set(current["imports"])
if missing_imports:
drift_errors.append(f"[{filename}] Dropped Dependencies: {list(missing_imports)}")
missing_imports = set(baseline.get("imports", [])) - set(current.get("imports", []))
if missing_imports: drift_errors.append(f"[{filename}] Missing Dependencies: {list(missing_imports)}")
# 2. Audit OOP Object Integrity Drops
missing_classes = set(baseline["classes"]) - set(current["classes"])
if missing_classes:
drift_errors.append(f"[{filename}] Core Structs/Classes Missing: {list(missing_classes)}")
missing_classes = set(baseline.get("classes", [])) - set(current.get("classes", []))
if missing_classes: drift_errors.append(f"[{filename}] Objects/Classes Deleted: {list(missing_classes)}")
# 3. Audit Function Signatures, Logic Truncation, and Laziness Traps
for func_name, b_meta in baseline["functions"].items():
if func_name not in current["functions"]:
drift_errors.append(f"[{filename}] Missing Logic Block: Function '{func_name}' was skipped.")
for fn_name, b_meta in baseline.get("functions", {}).items():
if fn_name not in current.get("functions", {}):
drift_errors.append(f"[{filename}] Logic Block Dropped: Routine block '{fn_name}' missing.")
continue
c_meta = current["functions"][func_name]
# Enforce systemic decorator and annotation checks
missing_decs = set(b_meta["decorators"]) - set(c_meta["decorators"])
if missing_decs:
drift_errors.append(f"[{filename}] Stripped Annotations on '{func_name}': Dropped {list(missing_decs)}")
# Enforce multi-language laziness checks
if c_meta["is_mock"] and not b_meta["is_mock"]:
drift_errors.append(f"[{filename}] Laziness Trap Triggered in '{func_name}': Substituted with placeholder/throw/todo stub.")
# Enforce physical compression check bounds
if b_meta["lines"] > 4 and c_meta["lines"] <= (b_meta["lines"] * 0.5):
drift_errors.append(f"[{filename}] Truncation Trap Triggered in '{func_name}': Scope collapsed by >50% ({b_meta['lines']} -> {c_meta['lines']} lines).")
c_meta = current["functions"][fn_name]
if set(b_meta.get("decorators", [])) - set(c_meta.get("decorators", [])):
drift_errors.append(f"[{filename}] Stripped Framework Annotation on '{fn_name}'")
if c_meta.get("is_mock") and not b_meta.get("is_mock"):
drift_errors.append(f"? [{filename}] Laziness Trap Blocked in '{fn_name}': Logic substituted with stub.")
if b_meta.get("lines", 0) > 4 and c_meta.get("lines", 0) <= (b_meta.get("lines", 0) * 0.5):
drift_errors.append(f"? [{filename}] Truncation Trap Blocked in '{fn_name}': Context block collapsed >50%.")
if drift_errors:
return (
f"? CRITICAL_COMPLETENESS_DRIFT: Regression detected during mutation check!\n"
+ "\n".join(f" - {err}" for err in drift_errors)
+ "\nAction: Halt code ingestion. Force full context reconstruction."
)
return f"? REVISION_PASSED: Workspace architectures, dependencies, and functional bounds are verified."
return f"? CRITICAL_COMPLETENESS_DRIFT: Logic degradation intercepted!\n" + "\n".join(f" - {err}" for err in drift_errors)
return "? REVISION_PASSED: Multi-language layout boundaries remain complete and intact."
@mcp.tool()
async def run_completeness_diagnostic() -> str:
"""Verifies parsing and capture capabilities across mixed multi-language test fixtures."""
fixture_py = "drift_test.py"
fixture_rs = "drift_test.rs"
py_v1 = "@mcp.tool()\ndef calculate_hash(seed: str) -> None:\n print(seed)\n"
rs_v1 = "#[inline]\npub fn calculate_hash(seed: &str) {\n println!(\"{}\", seed);\n}"
"""Runs verification pipelines simulating code erosion in script engines and structural blocks."""
f_bat, f_lua = "drift_test.bat", "drift_test.lua"
bat_v1, lua_v1 = ":build_array\necho Processing...\ngoto :eof\n", "function calculate_matrix()\n return 1\nend\n"
try:
with open(fixture_py, "w") as f: f.write(py_v1)
with open(fixture_rs, "w") as f: f.write(rs_v1)
with open(f_bat, "w") as f: f.write(bat_v1)
with open(f_lua, "w") as f: f.write(lua_v1)
await snapshot_baseline(".")
# Ingest lazy mutations across language families
with open(fixture_py, "w") as f: f.write("def calculate_hash(seed):\n pass\n")
with open(fixture_rs, "w") as f: f.write("pub fn calculate_hash(seed: &str) {\n todo!();\n}")
audit_result = await audit_revision(".")
if "Stripped Annotations" in audit_result and "Laziness Trap Triggered" in audit_result:
return "? DIAGNOSTIC_PASSED: Multi-language driver framework successfully trapped Python AST and Rust lexical code-erosion boundaries."
return f"? DIAGNOSTIC_FAILED: Structural deviations bypassed checkpoints. Result:\n{audit_result}"
with open(f_bat, "w") as f: f.write(":build_array\nrem TODO\n")
with open(f_lua, "w") as f: f.write("function calculate_matrix()\n -- todo\nend\n")
res = await audit_revision(".")
if "Laziness Trap" in res: return "? DIAGNOSTIC_PASSED: Flattened polymorphic profile framework trapped all code erosion simulation vectors."
return f"? DIAGNOSTIC_FAILED: Structural checkpoints bypassed. Output:\n{res}"
finally:
for path in [fixture_py, fixture_rs, STATE_FILE_NAME]:
if os.path.exists(path):
os.remove(path)
for p in [f_bat, f_lua, STATE_FILE_NAME]:
if os.path.exists(p): os.remove(p)
if __name__ == "__main__":
mcp.run(transport="stdio")