Coverage for nexios\events.py: 61%

363 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1# type:ignore 

2import inspect 

3import logging 

4from typing import ( 

5 Callable, 

6 Any, 

7 Dict, 

8 List, 

9 Optional, 

10 Union, 

11 TypeVar, 

12 Tuple, 

13 Type, 

14 Protocol, 

15 cast, 

16) 

17from enum import Enum, auto 

18from dataclasses import dataclass 

19import time 

20import threading 

21import asyncio 

22from concurrent.futures import ThreadPoolExecutor 

23import weakref 

24from weakref import ReferenceType, ref, WeakMethod 

25import uuid 

26import json 

27from datetime import datetime 

28 

29 

30# Setup logging 

31logging.basicConfig(level=logging.INFO) 

32logger = logging.getLogger(__name__) 

33 

34_T = TypeVar("_T", bound="Event") 

35 

36 

37class EventPriority(Enum): 

38 """Priority levels for event listeners""" 

39 

40 HIGHEST = auto() 

41 HIGH = auto() 

42 NORMAL = auto() 

43 LOW = auto() 

44 LOWEST = auto() 

45 

46 

47class EventPhase(Enum): 

48 """Event propagation phases""" 

49 

50 CAPTURING = auto() 

51 BUBBLING = auto() 

52 AT_TARGET = auto() 

53 

54 

55@dataclass 

56class EventContext: 

57 """Context information about the event""" 

58 

59 timestamp: float 

60 event_id: str 

61 source: Any 

62 phase: EventPhase = EventPhase.AT_TARGET 

63 

64 

65class EventError(Exception): 

66 """Base exception for event-related errors""" 

67 

68 pass 

69 

70 

71class ListenerAlreadyRegisteredError(EventError): 

72 """Raised when a listener is already registered""" 

73 

74 pass 

75 

76 

77class MaxListenersExceededError(EventError): 

78 """Raised when maximum number of listeners is exceeded""" 

79 

80 pass 

81 

82 

83class EventCancelledError(EventError): 

84 """Raised when an event is cancelled""" 

85 

86 pass 

87 

88 

89ListenerType = Union[ 

90 Callable[..., Any], 

91 ReferenceType[Callable[..., Any]], 

92 WeakMethod[Callable[..., Any]], 

93] 

94 

95 

96class EventProtocol(Protocol): 

97 """Protocol for event listeners""" 

98 

99 name: ... 

100 listener_count: ... 

101 max_listeners: ... 

102 enabled: ... 

103 get_metrics: ... 

104 

105 def __call__(self, *args: Any, **kwargs: Any) -> Any: ... 

106 

107 

108class EventSerializationMixin(EventProtocol): 

109 """ 

110 Mixin for event serialization capabilities. 

111 """ 

112 

113 def to_json(self) -> str: 

114 """Serialize event configuration to JSON""" 

115 return json.dumps( 

116 { 

117 "name": self.name, 

118 "listener_count": self.listener_count, 

119 "max_listeners": self.max_listeners, 

120 "enabled": self.enabled, 

121 "metrics": self.get_metrics(), 

122 } 

123 ) 

124 

125 @classmethod 

126 def from_json(cls, json_str: str) -> _T: # type:ignore 

127 """Deserialize event configuration from JSON""" 

128 

129 cls = cast(Type[_T], cls) 

130 data = json.loads(json_str) 

131 event = cls(data["name"]) 

132 event.max_listeners = data["max_listeners"] 

133 event.enabled = data["enabled"] 

134 return event 

135 

136 

137class Event(EventSerializationMixin): 

138 """ 

139 Advanced event implementation with support for: 

140 - Priority-based listener execution 

141 - Event propagation (capture/bubble phases) 

142 - Asynchronous listeners 

143 - Thread safety 

144 - Listener management 

145 - Event cancellation 

146 - Detailed event context 

147 - Performance metrics 

148 """ 

149 

150 DEFAULT_MAX_LISTENERS = 100 

151 

152 def __init__(self, name: str, max_listeners: Optional[int] = None): 

153 """ 

154 Initialize an event. 

155 

156 Args: 

157 name: Name of the event 

158 max_listeners: Maximum number of listeners allowed (None for no limit) 

159 """ 

160 self.name = name 

161 self._listeners: Dict[EventPriority, List[ListenerType]] = { 

162 EventPriority.HIGHEST: [], 

163 EventPriority.HIGH: [], 

164 EventPriority.NORMAL: [], 

165 EventPriority.LOW: [], 

166 EventPriority.LOWEST: [], 

167 } 

168 self._once_listeners: Dict[EventPriority, List[ListenerType]] = { 

169 EventPriority.HIGHEST: [], 

170 EventPriority.HIGH: [], 

171 EventPriority.NORMAL: [], 

172 EventPriority.LOW: [], 

173 EventPriority.LOWEST: [], 

174 } 

175 self._max_listeners = max_listeners or self.DEFAULT_MAX_LISTENERS 

176 self._lock = threading.RLock() 

177 self._parent: Optional["Event"] = None 

178 self._children: List["Event"] = [] 

179 self._enabled = True 

180 self._history: List[Dict[str, Any]] = [] 

181 self._metrics: Dict[str, Any] = { 

182 "trigger_count": 0, 

183 "total_listeners_executed": 0, 

184 "average_execution_time": 0.0, 

185 } 

186 

187 def __repr__(self) -> str: 

188 return f"<Event name='{self.name}' listeners={self.listener_count}>" 

189 

190 @property 

191 def listener_count(self) -> int: 

192 """Get total number of listeners""" 

193 with self._lock: 

194 return sum(len(v) for v in self._listeners.values()) + sum( 

195 len(v) for v in self._once_listeners.values() 

196 ) 

197 

198 @property 

199 def max_listeners(self) -> int: 

200 """Get maximum number of listeners allowed""" 

201 return self._max_listeners 

202 

203 @max_listeners.setter 

204 def max_listeners(self, value: int): 

205 """Set maximum number of listeners allowed""" 

206 with self._lock: 

207 if value < self.listener_count: 

208 raise ValueError( 

209 f"Cannot set max_listeners to {value} when there are {self.listener_count} listeners" 

210 ) 

211 self._max_listeners = value 

212 

213 @property 

214 def enabled(self) -> bool: 

215 """Check if event is enabled""" 

216 return self._enabled 

217 

218 @enabled.setter 

219 def enabled(self, value: bool): 

220 """Enable or disable the event""" 

221 self._enabled = value 

222 

223 @property 

224 def parent(self) -> Optional["Event"]: 

225 """Get parent event""" 

226 return self._parent 

227 

228 @parent.setter 

229 def parent(self, value: Optional["Event"]): 

230 """Set parent event""" 

231 with self._lock: 

232 if self._parent is not None: 

233 self._parent._children.remove(self) 

234 self._parent = value 

235 if value is not None: 

236 value._children.append(weakref.proxy(self)) # type:ignore 

237 

238 @property 

239 def children(self) -> List["Event"]: 

240 """Get child events""" 

241 return self._children.copy() 

242 

243 def add_child(self, child: "Event"): 

244 """Add a child event""" 

245 child.parent = self 

246 

247 def remove_child(self, child: "Event"): 

248 """Remove a child event""" 

249 if child in self._children: 

250 child.parent = None 

251 

252 def listen( 

253 self, 

254 func: Optional[Callable[..., Any]] = None, 

255 *, 

256 priority: EventPriority = EventPriority.NORMAL, 

257 weak_ref: bool = False, 

258 ) -> Callable[..., Any]: 

259 """ 

260 Decorator or function to register a listener. 

261 

262 Args: 

263 func: Listener function 

264 priority: Listener priority 

265 weak_ref: Use weak reference to the listener 

266 

267 Returns: 

268 The decorated function or decorator 

269 """ 

270 

271 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

272 self._add_listener(f, priority=priority, weak_ref=weak_ref) 

273 return f 

274 

275 if func is None: 

276 return decorator 

277 return decorator(func) 

278 

279 def once( 

280 self, 

281 func: Optional[Callable[..., Any]] = None, 

282 *, 

283 priority: EventPriority = EventPriority.NORMAL, 

284 weak_ref: bool = False, 

285 ) -> Callable[..., Any]: 

286 """ 

287 Decorator or function to register a one-time listener. 

288 

289 Args: 

290 func: Listener function 

291 priority: Listener priority 

292 weak_ref: Use weak reference to the listener 

293 

294 Returns: 

295 The decorated function or decorator 

296 """ 

297 

298 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

299 self._add_listener(f, priority=priority, once=True, weak_ref=weak_ref) 

300 return f 

301 

302 if func is None: 

303 return decorator 

304 return decorator(func) 

305 

306 def _add_listener( 

307 self, 

308 listener: Callable[..., Any], 

309 *, 

310 priority: EventPriority = EventPriority.NORMAL, 

311 once: bool = False, 

312 weak_ref: bool = False, 

313 ): 

314 """ 

315 Internal method to add a listener. 

316 

317 Args: 

318 listener: Listener function 

319 priority: Listener priority 

320 once: Whether to remove after first trigger 

321 weak_ref: Use weak reference to the listener 

322 

323 Raises: 

324 ListenerAlreadyRegisteredError: If listener is already registered 

325 MaxListenersExceededError: If max listeners would be exceeded 

326 """ 

327 with self._lock: 

328 if self.listener_count >= self._max_listeners: 

329 raise MaxListenersExceededError( 

330 f"Max listeners ({self._max_listeners}) exceeded for event '{self.name}'" 

331 ) 

332 

333 # Check if listener is already registered 

334 container = self._once_listeners if once else self._listeners 

335 for existing in container[priority]: 

336 if self._listeners_equal(existing, listener): 

337 raise ListenerAlreadyRegisteredError( 

338 f"Listener already registered for event '{self.name}'" 

339 ) 

340 

341 # Apply weak reference if requested 

342 wrapped_listener: ListenerType 

343 if weak_ref: 

344 if inspect.ismethod(listener): 

345 wrapped_listener = WeakMethod(listener) 

346 else: 

347 wrapped_listener = ref(listener) 

348 else: 

349 wrapped_listener = listener 

350 

351 # Store the listener 

352 container[priority].append(wrapped_listener) 

353 

354 def remove_listener(self, listener: Callable[..., Any]): 

355 """ 

356 Remove a listener from all priorities. 

357 

358 Args: 

359 listener: Listener function to remove 

360 """ 

361 with self._lock: 

362 for priority in EventPriority: 

363 self._listeners[priority] = [ 

364 l 

365 for l in self._listeners[priority] 

366 if not self._listeners_equal(l, listener) 

367 ] 

368 self._once_listeners[priority] = [ 

369 l 

370 for l in self._once_listeners[priority] 

371 if not self._listeners_equal(l, listener) 

372 ] 

373 

374 def _listeners_equal( 

375 self, listener1: Callable[..., Any], listener2: Callable[..., Any] 

376 ) -> bool: 

377 """Check if two listeners are effectively the same""" 

378 if listener1 == listener2: 

379 return True 

380 

381 l1 = listener1() if isinstance(listener1, (ref, WeakMethod)) else listener1 

382 l2 = listener2() if isinstance(listener2, (ref, WeakMethod)) else listener2 

383 

384 if l1 is None or l2 is None: 

385 return False 

386 

387 # Check if one wraps the other 

388 if hasattr(l1, "__wrapped__"): 

389 l1 = l1.__wrapped__ 

390 if hasattr(l2, "__wrapped__"): 

391 l2 = l2.__wrapped__ 

392 

393 return l1 == l2 

394 

395 def remove_all_listeners(self): 

396 """Remove all listeners""" 

397 with self._lock: 

398 for priority in EventPriority: 

399 self._listeners[priority].clear() 

400 self._once_listeners[priority].clear() 

401 

402 def has_listener(self, listener: Callable[..., Any]) -> bool: 

403 """Check if a listener is registered""" 

404 with self._lock: 

405 for priority in EventPriority: 

406 if any( 

407 self._listeners_equal(l, listener) 

408 for l in self._listeners[priority] 

409 ): 

410 return True 

411 if any( 

412 self._listeners_equal(l, listener) 

413 for l in self._once_listeners[priority] 

414 ): 

415 return True 

416 return False 

417 

418 def trigger(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: 

419 """ 

420 Trigger the event and notify all listeners. 

421 

422 Args: 

423 *args: Positional arguments to pass to listeners 

424 **kwargs: Keyword arguments to pass to listeners 

425 

426 Returns: 

427 Dictionary with execution statistics 

428 

429 Raises: 

430 EventCancelledError: If event is cancelled during propagation 

431 """ 

432 if not self._enabled: 

433 return {"cancelled": True, "reason": "Event disabled"} 

434 

435 with self._lock: 

436 event_id = str(uuid.uuid4()) 

437 context = EventContext( 

438 timestamp=time.time(), event_id=event_id, source=self 

439 ) 

440 

441 # Prepare event data 

442 event_data: Dict[str, Any] = { 

443 "args": args, 

444 "kwargs": kwargs, 

445 "context": context, 

446 "cancelled": False, 

447 "default_prevented": False, 

448 } 

449 

450 # Execute in phases: capture (parent to child), target, bubble (child to parent) 

451 try: 

452 # Capture phase (parent to child) 

453 if self.parent: 

454 self._propagate(event_data, EventPhase.CAPTURING) 

455 

456 # Target phase 

457 execution_stats = self._execute_listeners( 

458 event_data, EventPhase.AT_TARGET 

459 ) 

460 

461 # Bubble phase (child to parent) if not cancelled 

462 if not event_data["cancelled"] and self.parent: 

463 self._propagate(event_data, EventPhase.BUBBLING) 

464 

465 # Update metrics 

466 self._update_metrics(execution_stats) 

467 

468 # Record history 

469 self._record_history(event_data, execution_stats) 

470 

471 if event_data["cancelled"]: 

472 raise EventCancelledError("Event was cancelled during propagation") 

473 

474 return { 

475 "event_id": event_id, 

476 "listeners_executed": execution_stats["total"], 

477 "execution_time": execution_stats["total_time"], 

478 "cancelled": event_data["cancelled"], 

479 } 

480 except Exception as e: 

481 logger.error( 

482 f"Error triggering event '{self.name}': {str(e)}", exc_info=True 

483 ) 

484 raise 

485 

486 def _propagate(self, event_data: Dict[str, Any], phase: EventPhase): 

487 """Propagate event to parent or children""" 

488 if phase == EventPhase.CAPTURING and self.parent: 

489 event_data["context"].phase = phase 

490 self.parent.trigger(*event_data["args"], **event_data["kwargs"]) 

491 elif phase == EventPhase.BUBBLING and self.children: 

492 for child in self.children: 

493 event_data["context"].phase = phase 

494 child.trigger(*event_data["args"], **event_data["kwargs"]) 

495 

496 def _execute_listeners( 

497 self, event_data: Dict[str, Any], phase: EventPhase 

498 ) -> Dict[str, Any]: 

499 """ 

500 Execute all appropriate listeners. 

501 

502 Args: 

503 event_data: Event data dictionary 

504 phase: Current event phase 

505 

506 Returns: 

507 Execution statistics 

508 """ 

509 start_time = time.time() 

510 listeners_executed = 0 

511 cancelled = False 

512 

513 # Collect listeners to execute 

514 with self._lock: 

515 all_listeners: List[Tuple[ListenerType, EventPriority, bool]] = [] 

516 for priority in EventPriority: 

517 all_listeners.extend( 

518 (listener, priority, False) 

519 for listener in self._listeners[priority] 

520 ) 

521 all_listeners.extend( 

522 (listener, priority, True) 

523 for listener in self._once_listeners[priority] 

524 ) 

525 

526 # Clear once listeners 

527 for priority in EventPriority: 

528 self._once_listeners[priority].clear() 

529 

530 # Execute listeners in priority order 

531 for listener, priority, _ in all_listeners: 

532 if event_data.get("cancelled", False): 

533 cancelled = True 

534 break 

535 

536 try: 

537 # Resolve weak references 

538 actual_listener: Optional[Callable[..., Any]] = None 

539 if isinstance(listener, (ref, WeakMethod)): 

540 actual_listener = listener() # type:ignore 

541 if actual_listener is None: 

542 continue 

543 else: 

544 actual_listener = listener 

545 

546 if actual_listener is None: 

547 continue 

548 

549 # Update context 

550 event_data["context"].phase = phase 

551 

552 # Execute the listener 

553 if asyncio.iscoroutinefunction(actual_listener): 

554 asyncio.create_task( 

555 actual_listener(*event_data["args"], **event_data["kwargs"]) 

556 ) 

557 else: 

558 actual_listener(*event_data["args"], **event_data["kwargs"]) 

559 

560 listeners_executed += 1 

561 except EventCancelledError: 

562 event_data["cancelled"] = True 

563 cancelled = True 

564 break 

565 except Exception as e: 

566 logger.error( 

567 f"Error in event listener for '{self.name}': {str(e)}", 

568 exc_info=True, 

569 ) 

570 

571 execution_time = time.time() - start_time 

572 

573 return { 

574 "total": listeners_executed, 

575 "total_time": execution_time, 

576 "average_time": execution_time / max(1, listeners_executed), 

577 "cancelled": cancelled, 

578 } 

579 

580 def _update_metrics(self, stats: Dict[str, Any]): 

581 """Update performance metrics""" 

582 with self._lock: 

583 self._metrics["trigger_count"] += 1 

584 self._metrics["total_listeners_executed"] += stats["total"] 

585 

586 # Update average execution time using moving average 

587 old_avg = self._metrics["average_execution_time"] 

588 new_count = self._metrics["trigger_count"] 

589 self._metrics["average_execution_time"] = ( 

590 old_avg * (new_count - 1) + stats["average_time"] 

591 ) / new_count 

592 

593 def _record_history(self, event_data: Dict[str, Any], stats: Dict[str, Any]): 

594 """Record event trigger in history""" 

595 with self._lock: 

596 self._history.append( 

597 { 

598 "timestamp": datetime.now().isoformat(), 

599 "event_id": event_data["context"].event_id, 

600 "args": str(event_data["args"]), 

601 "kwargs": str(event_data["kwargs"]), 

602 "listeners_executed": stats["total"], 

603 "execution_time": stats["total_time"], 

604 "cancelled": event_data["cancelled"], 

605 } 

606 ) 

607 

608 # Keep history size manageable 

609 if len(self._history) > 100: 

610 self._history.pop(0) 

611 

612 def get_metrics(self) -> Dict[str, Any]: 

613 """Get event performance metrics""" 

614 return self._metrics.copy() 

615 

616 def get_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: 

617 """Get event trigger history""" 

618 with self._lock: 

619 if limit is None: 

620 return self._history.copy() 

621 return self._history[-limit:] if limit else [] 

622 

623 def cancel(self): 

624 """Cancel the current event propagation""" 

625 raise EventCancelledError("Event propagation cancelled") 

626 

627 def prevent_default(self): 

628 """Prevent default behavior (meaning depends on event)""" 

629 event_data = inspect.currentframe().f_back.f_locals.get("event_data") # type: ignore 

630 if event_data: 

631 event_data["default_prevented"] = True 

632 

633 

634class EventEmitter: 

635 """ 

636 Advanced event emitter that manages multiple events and provides 

637 a namespace for event organization. 

638 """ 

639 

640 def __init__(self): 

641 self._events: Dict[str, Event] = {} 

642 self._lock = threading.RLock() 

643 self._namespace_separator = ":" 

644 

645 def __contains__(self, event_name: str) -> bool: 

646 """Check if event exists""" 

647 return event_name in self._events 

648 

649 def __getitem__(self, event_name: str) -> Event: 

650 """Get an event by name""" 

651 return self.event(event_name) 

652 

653 def event(self, event_name: str) -> Event: 

654 """ 

655 Get or create an event by name. 

656 

657 Args: 

658 event_name: Name of the event (can include namespaces) 

659 

660 Returns: 

661 Event instance 

662 """ 

663 with self._lock: 

664 if event_name not in self._events: 

665 self._events[event_name] = Event(event_name) 

666 return self._events[event_name] 

667 

668 def namespace(self, namespace: str) -> "EventNamespace": 

669 """ 

670 Get a namespace for organizing events. 

671 

672 Args: 

673 namespace: Namespace prefix 

674 

675 Returns: 

676 EventNamespace instance 

677 """ 

678 return EventNamespace(self, namespace) 

679 

680 def remove_event(self, event_name: str): 

681 """ 

682 Remove an event and all its listeners. 

683 

684 Args: 

685 event_name: Name of the event to remove 

686 """ 

687 with self._lock: 

688 if event_name in self._events: 

689 del self._events[event_name] 

690 

691 def remove_all_events(self): 

692 """Remove all events and their listeners""" 

693 with self._lock: 

694 self._events.clear() 

695 

696 def event_names(self) -> List[str]: 

697 """Get list of all event names""" 

698 return list(self._events.keys()) 

699 

700 def has_event(self, event_name: str) -> bool: 

701 """Check if an event exists""" 

702 return event_name in self._events 

703 

704 def emit(self, event_name: str, *args: Any, **kwargs: Any) -> Dict[str, Any]: 

705 """ 

706 Trigger an event by name. 

707 

708 Args: 

709 event_name: Name of the event to trigger 

710 *args: Positional arguments to pass to listeners 

711 **kwargs: Keyword arguments to pass to listeners 

712 

713 Returns: 

714 Dictionary with execution statistics 

715 """ 

716 return self.event(event_name).trigger(*args, **kwargs) 

717 

718 def on( 

719 self, 

720 event_name: str, 

721 func: Optional[Callable[..., Any]] = None, 

722 *, 

723 priority: EventPriority = EventPriority.NORMAL, 

724 weak_ref: bool = False, 

725 ) -> Callable[..., Any]: 

726 """ 

727 Decorator or function to register a listener for an event. 

728 

729 Args: 

730 event_name: Name of the event 

731 func: Listener function 

732 priority: Listener priority 

733 weak_ref: Use weak reference to the listener 

734 

735 Returns: 

736 The decorated function or decorator 

737 """ 

738 

739 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

740 self.event(event_name).listen(f, priority=priority, weak_ref=weak_ref) 

741 return f 

742 

743 if func is None: 

744 return decorator 

745 return decorator(func) 

746 

747 def once( 

748 self, 

749 event_name: str, 

750 func: Optional[Callable[..., Any]] = None, 

751 *, 

752 priority: EventPriority = EventPriority.NORMAL, 

753 weak_ref: bool = False, 

754 ) -> Callable[..., Any]: 

755 """ 

756 Decorator or function to register a one-time listener for an event. 

757 

758 Args: 

759 event_name: Name of the event 

760 func: Listener function 

761 priority: Listener priority 

762 weak_ref: Use weak reference to the listener 

763 

764 Returns: 

765 The decorated function or decorator 

766 """ 

767 

768 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

769 self.event(event_name).once(f, priority=priority, weak_ref=weak_ref) 

770 return f 

771 

772 if func is None: 

773 return decorator 

774 return decorator(func) 

775 

776 def remove_listener(self, event_name: str, listener: Callable[..., Any]): 

777 """ 

778 Remove a listener from an event. 

779 

780 Args: 

781 event_name: Name of the event 

782 listener: Listener function to remove 

783 """ 

784 self.event(event_name).remove_listener(listener) 

785 

786 def remove_all_listeners(self, event_name: Optional[str] = None): 

787 """ 

788 Remove all listeners from an event or all events. 

789 

790 Args: 

791 event_name: Name of the event (None for all events) 

792 """ 

793 if event_name is None: 

794 for event in self._events.values(): 

795 event.remove_all_listeners() 

796 else: 

797 self.event(event_name).remove_all_listeners() 

798 

799 

800class EventNamespace: 

801 """ 

802 Namespace for organizing events hierarchically. 

803 """ 

804 

805 def __init__(self, emitter: EventEmitter, namespace: str): 

806 self._emitter = emitter 

807 self._namespace = namespace 

808 

809 def __getitem__(self, event_name: str) -> Event: 

810 """Get an event within this namespace""" 

811 return self.event(event_name) 

812 

813 def event(self, event_name: str) -> Event: 

814 """ 

815 Get or create an event within this namespace. 

816 

817 Args: 

818 event_name: Name of the event (relative to namespace) 

819 

820 Returns: 

821 Event instance 

822 """ 

823 full_name = ( 

824 f"{self._namespace}{self._emitter._namespace_separator}{event_name}" 

825 ) # type:ignore 

826 return self._emitter.event(full_name) 

827 

828 def namespace(self, sub_namespace: str) -> "EventNamespace": 

829 """ 

830 Get a sub-namespace within this namespace. 

831 

832 Args: 

833 sub_namespace: Sub-namespace name 

834 

835 Returns: 

836 EventNamespace instance 

837 """ 

838 return EventNamespace( 

839 self._emitter, 

840 f"{self._namespace}{self._emitter._namespace_separator}{sub_namespace}", # type: ignore 

841 ) 

842 

843 def emit(self, event_name: str, *args: Any, **kwargs: Any) -> Dict[str, Any]: 

844 """ 

845 Trigger an event within this namespace. 

846 

847 Args: 

848 event_name: Name of the event (relative to namespace) 

849 *args: Positional arguments to pass to listeners 

850 **kwargs: Keyword arguments to pass to listeners 

851 

852 Returns: 

853 Dictionary with execution statistics 

854 """ 

855 return self.event(event_name).trigger(*args, **kwargs) 

856 

857 def on( 

858 self, 

859 event_name: str, 

860 func: Optional[Callable[..., Any]] = None, 

861 *, 

862 priority: EventPriority = EventPriority.NORMAL, 

863 weak_ref: bool = False, 

864 ) -> Callable[..., Any]: 

865 """ 

866 Decorator or function to register a listener for an event in this namespace. 

867 

868 Args: 

869 event_name: Name of the event (relative to namespace) 

870 func: Listener function 

871 priority: Listener priority 

872 weak_ref: Use weak reference to the listener 

873 

874 Returns: 

875 The decorated function or decorator 

876 """ 

877 

878 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

879 self.event(event_name).listen(f, priority=priority, weak_ref=weak_ref) 

880 return f 

881 

882 if func is None: 

883 return decorator 

884 return decorator(func) 

885 

886 def once( 

887 self, 

888 event_name: str, 

889 func: Optional[Callable[..., Any]] = None, 

890 *, 

891 priority: EventPriority = EventPriority.NORMAL, 

892 weak_ref: bool = False, 

893 ) -> Callable[..., Any]: 

894 """ 

895 Decorator or function to register a one-time listener for an event in this namespace. 

896 

897 Args: 

898 event_name: Name of the event (relative to namespace) 

899 func: Listener function 

900 priority: Listener priority 

901 weak_ref: Use weak reference to the listener 

902 

903 Returns: 

904 The decorated function or decorator 

905 """ 

906 

907 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

908 self.event(event_name).once(f, priority=priority, weak_ref=weak_ref) 

909 return f 

910 

911 if func is None: 

912 return decorator 

913 return decorator(func) 

914 

915 

916class AsyncEventEmitter(EventEmitter): 

917 """ 

918 Event emitter with enhanced support for asynchronous operations. 

919 """ 

920 

921 def __init__(self, max_workers: Optional[int] = None): 

922 super().__init__() 

923 self._executor = ThreadPoolExecutor(max_workers=max_workers) 

924 

925 async def emit_async( 

926 self, event_name: str, *args: Any, **kwargs: Any 

927 ) -> Dict[str, Any]: 

928 """ 

929 Asynchronously trigger an event by name. 

930 

931 Args: 

932 event_name: Name of the event to trigger 

933 *args: Positional arguments to pass to listeners 

934 **kwargs: Keyword arguments to pass to listeners 

935 

936 Returns: 

937 Dictionary with execution statistics 

938 """ 

939 loop = asyncio.get_event_loop() 

940 return await loop.run_in_executor( 

941 self._executor, lambda: self.emit(event_name, *args, **kwargs) 

942 ) 

943 

944 def schedule_emit(self, event_name: str, *args: Any, **kwargs: Any) -> asyncio.Future: # type: ignore 

945 """ 

946 Schedule an event to be triggered asynchronously. 

947 

948 Args: 

949 event_name: Name of the event to trigger 

950 *args: Positional arguments to pass to listeners 

951 **kwargs: Keyword arguments to pass to listeners 

952 

953 Returns: 

954 Future representing the eventual execution 

955 """ 

956 loop = asyncio.get_event_loop() 

957 return loop.run_in_executor( 

958 self._executor, lambda: self.emit(event_name, *args, **kwargs) 

959 ) 

960 

961 def shutdown(self): 

962 """Clean up resources""" 

963 self._executor.shutdown() 

964 

965 

966class EventBenchmark: 

967 """ 

968 Utility for benchmarking event performance. 

969 """ 

970 

971 @staticmethod 

972 def benchmark( 

973 emitter: EventEmitter, event_name: str, iterations: int = 1000 

974 ) -> Dict[str, Any]: 

975 """ 

976 Benchmark event triggering performance. 

977 

978 Args: 

979 emitter: Event emitter instance 

980 event_name: Name of the event to benchmark 

981 iterations: Number of iterations to run 

982 

983 Returns: 

984 Dictionary with benchmark results 

985 """ 

986 

987 # Add a simple listener 

988 def dummy_listener(*args: Any, **kwargs: Any) -> None: 

989 pass 

990 

991 emitter.on(event_name)(dummy_listener) 

992 

993 # Run benchmark 

994 start_time = time.time() 

995 for _ in range(iterations): 

996 emitter.emit(event_name) 

997 total_time = time.time() - start_time 

998 

999 # Clean up 

1000 emitter.remove_listener(event_name, dummy_listener) 

1001 

1002 return { 

1003 "iterations": iterations, 

1004 "total_time": total_time, 

1005 "average_time": total_time / iterations, 

1006 "events_per_second": iterations / total_time, 

1007 }