MCP-Drift-State-Tracker/mcp_drift_state_tracker.py

395 lines
17 KiB
Python

# ==============================================================================
# mcp_drift_state_tracker.py
# Copyright (C) 2026 Jeremy Anderson info@dcos.net
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# ==============================================================================
import ast
import os
import json
import re
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Set, Optional
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("MCP-Drift-State-Tracker")
# Global persistence config
STATE_FILE_NAME = ".mcp_drift_state.json"
# Global exclusion profiles for high-performance directory sweeps
GLOBAL_IGNORE_DIRS = {
".git", ".venv", "venv", "__pycache__", "node_modules", "target",
"dist", "build", "out", ".cargo", ".rustup", "obj", "bin"
}
# ------------------------------------------------------------------------------
# PARSER INTERFACE & DRIVER IMPLEMENTATIONS
# ------------------------------------------------------------------------------
class BaseLanguageParser(ABC):
"""Abstract Base Class defining the protocol for structural language parsers."""
@abstractmethod
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
"""Parses source content into a unified structural inventory dictionary."""
pass
class PythonASTParser(BaseLanguageParser):
"""Deep structural inspection engine for Python using native AST."""
def _is_mock(self, body: List[ast.stmt]) -> bool:
if not body:
return True
statements = body
if len(body) > 1 and isinstance(body[0], ast.Expr):
if isinstance(body[0].value, ast.Constant) and isinstance(body[0].value.value, str):
statements = body[1:]
if len(statements) == 1:
node = statements[0]
if isinstance(node, (ast.Pass, ast.Break, ast.Continue)):
return True
if isinstance(node, ast.Raise) and isinstance(node.exc, ast.Name) and node.exc.id == "NotImplementedError":
return True
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant) and isinstance(node.value.value, str):
return True
return False
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
tree = ast.parse(source, filename=file_path)
functions: Dict[str, Dict[str, Any]] = {}
classes: Set[str] = set()
imports: Set[str] = set()
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.add(node.name)
elif isinstance(node, ast.Import):
for alias in node.names:
imports.add(f"import {alias.name}")
elif isinstance(node, ast.ImportFrom):
module = node.module if node.module else ""
for alias in node.names:
imports.add(f"from {module} import {alias.name}")
elif isinstance(node, ast.FunctionDef):
args = {arg.arg: ast.unparse(arg.annotation).strip() if arg.annotation else "None" for arg in node.args.args}
ret = ast.unparse(node.returns).strip() if node.returns else "None"
decs = {ast.unparse(dec).strip().split('(')[0] for dec in node.decorator_list}
span = (node.end_lineno - node.lineno + 1) if hasattr(node, "end_lineno") else 1
functions[node.name] = {
"args": args,
"returns": ret,
"decorators": list(decs),
"lines": span,
"is_mock": self._is_mock(node.body)
}
return {
"classes": list(classes),
"functions": functions,
"imports": list(imports),
"total_lines": len(source.splitlines())
}
class BraceLanguageParser(BaseLanguageParser):
"""
Deterministic Lexical Scoping Engine for curly-brace languages.
Supports: C, C++, Rust, Go, Java, TypeScript, JavaScript, C#
"""
def __init__(self, extension: str):
self.ext = extension
def parse(self, file_path: str, source: str) -> Dict[str, Any]:
lines = source.splitlines()
functions: Dict[str, Dict[str, Any]] = {}
classes: Set[str] = set()
imports: Set[str] = set()
# Compile lightweight lexical rules tailored to language families
if self.ext in {".rs"}:
import_pattern = re.compile(r'^\s*(?:pub\s+)?use\s+([^;]+);')
func_pattern = re.compile(r'(?:pub\s+)?(?:async\s+)?fn\s+([a-zA-Z_0-9]+)\s*(<[^>]+>)?\s*\(([^)]*)\)')
class_pattern = re.compile(r'^\s*(?:pub\s+)?(?:struct|enum|trait)\s+([a-zA-Z_0-9]+)')
decorator_pattern = re.compile(r'^\s*#\[([^\]]+)\]')
elif self.ext in {".go"}:
import_pattern = re.compile(r'^\s*import\s+(?:\([^\)]+\)|"[^"]+")')
func_pattern = re.compile(r'^func\s+(?:\([^)]+\)\s+)?([a-zA-Z_0-9]+)\s*\(([^)]*)\)')
class_pattern = re.compile(r'^\s*type\s+([a-zA-Z_0-9]+)\s+struct')
decorator_pattern = re.compile(r'^\s*//\s*@([a-zA-Z_0-9]+)')
else: # C-style family: JS, TS, C, C++, Java, C#
import_pattern = re.compile(r'^\s*(?:import|require|#include)\s+.*')
func_pattern = re.compile(r'(?:public|private|protected|static|async|function)?\s*([a-zA-Z_0-9]+)\s*\(([^)]*)\)\s*(?::\s*[a-zA-Z_0-9<>\s|]+)?\s*\{?')
class_pattern = re.compile(r'^\s*(?:export\s+)?(?:class|interface|struct)\s+([a-zA-Z_0-9]+)')
decorator_pattern = re.compile(r'^\s*@([a-zA-Z_0-9]+)')
brace_depth = 0
active_func: Optional[str] = None
func_start_line = 0
func_body_tokens: List[str] = []
pending_decorators: List[str] = []
for idx, line in enumerate(lines, start=1):
stripped = line.strip()
if not stripped:
continue
# Check Global Imports
if brace_depth == 0 and import_pattern.match(line):
imports.add(stripped)
continue
# Harvest Block Decorators / System Annotations
dec_match = decorator_pattern.match(line)
if brace_depth == 0 and dec_match:
pending_decorators.append(dec_match.group(1))
continue
# Identify Structural Class/Struct boundaries
class_match = class_pattern.match(line)
if brace_depth == 0 and class_match:
classes.add(class_match.group(1))
# Identify Function Signature Transitions
if brace_depth == 0:
func_match = func_pattern.search(line)
if func_match and not any(k in stripped for k in {"if", "for", "while", "switch", "catch"}):
name = func_match.group(1)
if name not in {"class", "struct", "function", "return"}:
active_func = name
func_start_line = idx
func_body_tokens = []
# Stream character token array data to track precise scope boundaries
for char in stripped:
if char == '{':
brace_depth += 1
elif char == '}':
brace_depth -= 1
if brace_depth == 0 and active_func:
# Compute functional line boundaries safely
span = idx - func_start_line + 1
body_str = "\n".join(func_body_tokens).lower()
# Process multi-language laziness markers
is_mock = (
len(func_body_tokens) <= 3 or
any(stub in body_str for stub in ["todo", "panic", "notimplemented", "throw new", "return null"])
)
functions[active_func] = {
"args": {}, # Signature layout preservation mapped textually
"returns": "Inferred",
"decorators": pending_decorators.copy(),
"lines": span,
"is_mock": is_mock
}
active_func = None
pending_decorators.clear()
if brace_depth > 0 and active_func:
func_body_tokens.append(stripped)
return {
"classes": list(classes),
"functions": functions,
"imports": list(imports),
"total_lines": len(lines)
}
# ------------------------------------------------------------------------------
# CORE WORKSPACE ENGINE REGISTRY & STATE PERSISTENCE
# ------------------------------------------------------------------------------
class WorkspaceRegistry:
"""Manages the in-memory analysis cache with underlying thread-safe JSON backup layers."""
@staticmethod
def get_parser(file_path: str) -> Optional[BaseLanguageParser]:
_, ext = os.path.splitext(file_path)
if ext == ".py":
return PythonASTParser()
elif ext in {".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".c", ".cpp", ".h", ".hpp", ".java", ".cs"}:
return BraceLanguageParser(ext)
return None
@classmethod
def load_state(cls, root_path: str) -> Dict[str, Any]:
target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME)
if os.path.exists(target):
try:
with open(target, "r", encoding="utf-8") as f:
return json.load(f)
except:
pass
return {}
@classmethod
def save_state(cls, root_path: str, state: Dict[str, Any]) -> None:
target = os.path.join(root_path, STATE_FILE_NAME) if os.path.isdir(root_path) else os.path.join(os.path.dirname(root_path), STATE_FILE_NAME)
try:
with open(target, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)
except:
pass
def _resolve_workspace_files(target_path: str) -> List[str]:
abs_path = os.path.abspath(target_path)
if os.path.isfile(abs_path):
return [abs_path] if WorkspaceRegistry.get_parser(abs_path) else []
resolved = []
for root, dirs, files in os.walk(abs_path):
# Destructively filter directory scans to bypass noise
dirs[:] = [d for d in dirs if d not in GLOBAL_IGNORE_DIRS]
for file in files:
full_path = os.path.join(root, file)
if WorkspaceRegistry.get_parser(full_path):
resolved.append(full_path)
return resolved
# ------------------------------------------------------------------------------
# EXPOSED MCP PRODUCTION GATE TOOLS
# ------------------------------------------------------------------------------
@mcp.tool()
async def snapshot_baseline(target_path: str) -> str:
"""Snapshots structural footprints for all matching code assets across the repository tree."""
base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path))
files = _resolve_workspace_files(target_path)
if not files:
return f"? BASELINE_FAILURE: No supported source languages discovered at target path: {target_path}"
current_registry = WorkspaceRegistry.load_state(base_dir)
cataloged = 0
for file_path in files:
parser = WorkspaceRegistry.get_parser(file_path)
if not parser:
continue
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
current_registry[file_path] = parser.parse(file_path, content)
cataloged += 1
except Exception as e:
return f"? BASELINE_FAILURE: Internal parsing failure on '{os.path.basename(file_path)}': {str(e)}"
WorkspaceRegistry.save_state(base_dir, current_registry)
return f"? BASELINE_LOCKED: Workspace matrix secured across {cataloged} source modules."
@mcp.tool()
async def audit_revision(target_path: str) -> str:
"""Audits modified paths, enforcing type, import, and logic limits across all languages."""
base_dir = os.path.abspath(target_path) if os.path.isdir(target_path) else os.path.dirname(os.path.abspath(target_path))
registry = WorkspaceRegistry.load_state(base_dir)
if not registry:
return f"? AUDIT_FAILED: Persistent baseline marker layer is empty or missing. Trigger snapshots first."
files = _resolve_workspace_files(target_path)
drift_errors = []
# Detect unexpected missing modules from tracked baselines
for tracked_file in list(registry.keys()):
if tracked_file.startswith(base_dir) and not os.path.exists(tracked_file):
drift_errors.append(f"Module Dropped: Tracked source '{os.path.basename(tracked_file)}' was deleted.")
for file_path in files:
if file_path not in registry:
continue
baseline = registry[file_path]
filename = os.path.basename(file_path)
parser = WorkspaceRegistry.get_parser(file_path)
if not parser:
continue
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
current = parser.parse(file_path, content)
except Exception as e:
drift_errors.append(f"Syntax Error [{filename}]: File compilation or parse block failure: {str(e)}")
continue
# 1. Audit Global Package/Dependency Drops
missing_imports = set(baseline["imports"]) - set(current["imports"])
if missing_imports:
drift_errors.append(f"[{filename}] Dropped Dependencies: {list(missing_imports)}")
# 2. Audit OOP Object Integrity Drops
missing_classes = set(baseline["classes"]) - set(current["classes"])
if missing_classes:
drift_errors.append(f"[{filename}] Core Structs/Classes Missing: {list(missing_classes)}")
# 3. Audit Function Signatures, Logic Truncation, and Laziness Traps
for func_name, b_meta in baseline["functions"].items():
if func_name not in current["functions"]:
drift_errors.append(f"[{filename}] Missing Logic Block: Function '{func_name}' was skipped.")
continue
c_meta = current["functions"][func_name]
# Enforce systemic decorator and annotation checks
missing_decs = set(b_meta["decorators"]) - set(c_meta["decorators"])
if missing_decs:
drift_errors.append(f"[{filename}] Stripped Annotations on '{func_name}': Dropped {list(missing_decs)}")
# Enforce multi-language laziness checks
if c_meta["is_mock"] and not b_meta["is_mock"]:
drift_errors.append(f"[{filename}] Laziness Trap Triggered in '{func_name}': Substituted with placeholder/throw/todo stub.")
# Enforce physical compression check bounds
if b_meta["lines"] > 4 and c_meta["lines"] <= (b_meta["lines"] * 0.5):
drift_errors.append(f"[{filename}] Truncation Trap Triggered in '{func_name}': Scope collapsed by >50% ({b_meta['lines']} -> {c_meta['lines']} lines).")
if drift_errors:
return (
f"? CRITICAL_COMPLETENESS_DRIFT: Regression detected during mutation check!\n"
+ "\n".join(f" - {err}" for err in drift_errors)
+ "\nAction: Halt code ingestion. Force full context reconstruction."
)
return f"? REVISION_PASSED: Workspace architectures, dependencies, and functional bounds are verified."
@mcp.tool()
async def run_completeness_diagnostic() -> str:
"""Verifies parsing and capture capabilities across mixed multi-language test fixtures."""
fixture_py = "drift_test.py"
fixture_rs = "drift_test.rs"
py_v1 = "@mcp.tool()\ndef calculate_hash(seed: str) -> None:\n print(seed)\n"
rs_v1 = "#[inline]\npub fn calculate_hash(seed: &str) {\n println!(\"{}\", seed);\n}"
try:
with open(fixture_py, "w") as f: f.write(py_v1)
with open(fixture_rs, "w") as f: f.write(rs_v1)
await snapshot_baseline(".")
# Ingest lazy mutations across language families
with open(fixture_py, "w") as f: f.write("def calculate_hash(seed):\n pass\n")
with open(fixture_rs, "w") as f: f.write("pub fn calculate_hash(seed: &str) {\n todo!();\n}")
audit_result = await audit_revision(".")
if "Stripped Annotations" in audit_result and "Laziness Trap Triggered" in audit_result:
return "? DIAGNOSTIC_PASSED: Multi-language driver framework successfully trapped Python AST and Rust lexical code-erosion boundaries."
return f"? DIAGNOSTIC_FAILED: Structural deviations bypassed checkpoints. Result:\n{audit_result}"
finally:
for path in [fixture_py, fixture_rs, STATE_FILE_NAME]:
if os.path.exists(path):
os.remove(path)
if __name__ == "__main__":
mcp.run(transport="stdio")