import asyncio import json class EventStream: def __init__(self): self.subscribers = set() # ----------------------------- # SUBSCRIBE (WebSocket clients) # ----------------------------- def subscribe(self, ws): self.subscribers.add(ws) def unsubscribe(self, ws): self.subscribers.discard(ws) # ----------------------------- # EMIT EVENT (REALTIME FANOUT) # ----------------------------- async def emit(self, event_type, payload): message = json.dumps({ "type": event_type, "data": payload }) dead = set() for ws in self.subscribers: try: await ws.send(message) except Exception: dead.add(ws) for ws in dead: self.subscribers.discard(ws)