Coverage for nexios\events.py: 61%
363 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
1# type:ignore
2import inspect
3import logging
4from typing import (
5 Callable,
6 Any,
7 Dict,
8 List,
9 Optional,
10 Union,
11 TypeVar,
12 Tuple,
13 Type,
14 Protocol,
15 cast,
16)
17from enum import Enum, auto
18from dataclasses import dataclass
19import time
20import threading
21import asyncio
22from concurrent.futures import ThreadPoolExecutor
23import weakref
24from weakref import ReferenceType, ref, WeakMethod
25import uuid
26import json
27from datetime import datetime
30# Setup logging
31logging.basicConfig(level=logging.INFO)
32logger = logging.getLogger(__name__)
34_T = TypeVar("_T", bound="Event")
37class EventPriority(Enum):
38 """Priority levels for event listeners"""
40 HIGHEST = auto()
41 HIGH = auto()
42 NORMAL = auto()
43 LOW = auto()
44 LOWEST = auto()
47class EventPhase(Enum):
48 """Event propagation phases"""
50 CAPTURING = auto()
51 BUBBLING = auto()
52 AT_TARGET = auto()
55@dataclass
56class EventContext:
57 """Context information about the event"""
59 timestamp: float
60 event_id: str
61 source: Any
62 phase: EventPhase = EventPhase.AT_TARGET
65class EventError(Exception):
66 """Base exception for event-related errors"""
68 pass
71class ListenerAlreadyRegisteredError(EventError):
72 """Raised when a listener is already registered"""
74 pass
77class MaxListenersExceededError(EventError):
78 """Raised when maximum number of listeners is exceeded"""
80 pass
83class EventCancelledError(EventError):
84 """Raised when an event is cancelled"""
86 pass
89ListenerType = Union[
90 Callable[..., Any],
91 ReferenceType[Callable[..., Any]],
92 WeakMethod[Callable[..., Any]],
93]
96class EventProtocol(Protocol):
97 """Protocol for event listeners"""
99 name: ...
100 listener_count: ...
101 max_listeners: ...
102 enabled: ...
103 get_metrics: ...
105 def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
108class EventSerializationMixin(EventProtocol):
109 """
110 Mixin for event serialization capabilities.
111 """
113 def to_json(self) -> str:
114 """Serialize event configuration to JSON"""
115 return json.dumps(
116 {
117 "name": self.name,
118 "listener_count": self.listener_count,
119 "max_listeners": self.max_listeners,
120 "enabled": self.enabled,
121 "metrics": self.get_metrics(),
122 }
123 )
125 @classmethod
126 def from_json(cls, json_str: str) -> _T: # type:ignore
127 """Deserialize event configuration from JSON"""
129 cls = cast(Type[_T], cls)
130 data = json.loads(json_str)
131 event = cls(data["name"])
132 event.max_listeners = data["max_listeners"]
133 event.enabled = data["enabled"]
134 return event
137class Event(EventSerializationMixin):
138 """
139 Advanced event implementation with support for:
140 - Priority-based listener execution
141 - Event propagation (capture/bubble phases)
142 - Asynchronous listeners
143 - Thread safety
144 - Listener management
145 - Event cancellation
146 - Detailed event context
147 - Performance metrics
148 """
150 DEFAULT_MAX_LISTENERS = 100
152 def __init__(self, name: str, max_listeners: Optional[int] = None):
153 """
154 Initialize an event.
156 Args:
157 name: Name of the event
158 max_listeners: Maximum number of listeners allowed (None for no limit)
159 """
160 self.name = name
161 self._listeners: Dict[EventPriority, List[ListenerType]] = {
162 EventPriority.HIGHEST: [],
163 EventPriority.HIGH: [],
164 EventPriority.NORMAL: [],
165 EventPriority.LOW: [],
166 EventPriority.LOWEST: [],
167 }
168 self._once_listeners: Dict[EventPriority, List[ListenerType]] = {
169 EventPriority.HIGHEST: [],
170 EventPriority.HIGH: [],
171 EventPriority.NORMAL: [],
172 EventPriority.LOW: [],
173 EventPriority.LOWEST: [],
174 }
175 self._max_listeners = max_listeners or self.DEFAULT_MAX_LISTENERS
176 self._lock = threading.RLock()
177 self._parent: Optional["Event"] = None
178 self._children: List["Event"] = []
179 self._enabled = True
180 self._history: List[Dict[str, Any]] = []
181 self._metrics: Dict[str, Any] = {
182 "trigger_count": 0,
183 "total_listeners_executed": 0,
184 "average_execution_time": 0.0,
185 }
187 def __repr__(self) -> str:
188 return f"<Event name='{self.name}' listeners={self.listener_count}>"
190 @property
191 def listener_count(self) -> int:
192 """Get total number of listeners"""
193 with self._lock:
194 return sum(len(v) for v in self._listeners.values()) + sum(
195 len(v) for v in self._once_listeners.values()
196 )
198 @property
199 def max_listeners(self) -> int:
200 """Get maximum number of listeners allowed"""
201 return self._max_listeners
203 @max_listeners.setter
204 def max_listeners(self, value: int):
205 """Set maximum number of listeners allowed"""
206 with self._lock:
207 if value < self.listener_count:
208 raise ValueError(
209 f"Cannot set max_listeners to {value} when there are {self.listener_count} listeners"
210 )
211 self._max_listeners = value
213 @property
214 def enabled(self) -> bool:
215 """Check if event is enabled"""
216 return self._enabled
218 @enabled.setter
219 def enabled(self, value: bool):
220 """Enable or disable the event"""
221 self._enabled = value
223 @property
224 def parent(self) -> Optional["Event"]:
225 """Get parent event"""
226 return self._parent
228 @parent.setter
229 def parent(self, value: Optional["Event"]):
230 """Set parent event"""
231 with self._lock:
232 if self._parent is not None:
233 self._parent._children.remove(self)
234 self._parent = value
235 if value is not None:
236 value._children.append(weakref.proxy(self)) # type:ignore
238 @property
239 def children(self) -> List["Event"]:
240 """Get child events"""
241 return self._children.copy()
243 def add_child(self, child: "Event"):
244 """Add a child event"""
245 child.parent = self
247 def remove_child(self, child: "Event"):
248 """Remove a child event"""
249 if child in self._children:
250 child.parent = None
252 def listen(
253 self,
254 func: Optional[Callable[..., Any]] = None,
255 *,
256 priority: EventPriority = EventPriority.NORMAL,
257 weak_ref: bool = False,
258 ) -> Callable[..., Any]:
259 """
260 Decorator or function to register a listener.
262 Args:
263 func: Listener function
264 priority: Listener priority
265 weak_ref: Use weak reference to the listener
267 Returns:
268 The decorated function or decorator
269 """
271 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
272 self._add_listener(f, priority=priority, weak_ref=weak_ref)
273 return f
275 if func is None:
276 return decorator
277 return decorator(func)
279 def once(
280 self,
281 func: Optional[Callable[..., Any]] = None,
282 *,
283 priority: EventPriority = EventPriority.NORMAL,
284 weak_ref: bool = False,
285 ) -> Callable[..., Any]:
286 """
287 Decorator or function to register a one-time listener.
289 Args:
290 func: Listener function
291 priority: Listener priority
292 weak_ref: Use weak reference to the listener
294 Returns:
295 The decorated function or decorator
296 """
298 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
299 self._add_listener(f, priority=priority, once=True, weak_ref=weak_ref)
300 return f
302 if func is None:
303 return decorator
304 return decorator(func)
306 def _add_listener(
307 self,
308 listener: Callable[..., Any],
309 *,
310 priority: EventPriority = EventPriority.NORMAL,
311 once: bool = False,
312 weak_ref: bool = False,
313 ):
314 """
315 Internal method to add a listener.
317 Args:
318 listener: Listener function
319 priority: Listener priority
320 once: Whether to remove after first trigger
321 weak_ref: Use weak reference to the listener
323 Raises:
324 ListenerAlreadyRegisteredError: If listener is already registered
325 MaxListenersExceededError: If max listeners would be exceeded
326 """
327 with self._lock:
328 if self.listener_count >= self._max_listeners:
329 raise MaxListenersExceededError(
330 f"Max listeners ({self._max_listeners}) exceeded for event '{self.name}'"
331 )
333 # Check if listener is already registered
334 container = self._once_listeners if once else self._listeners
335 for existing in container[priority]:
336 if self._listeners_equal(existing, listener):
337 raise ListenerAlreadyRegisteredError(
338 f"Listener already registered for event '{self.name}'"
339 )
341 # Apply weak reference if requested
342 wrapped_listener: ListenerType
343 if weak_ref:
344 if inspect.ismethod(listener):
345 wrapped_listener = WeakMethod(listener)
346 else:
347 wrapped_listener = ref(listener)
348 else:
349 wrapped_listener = listener
351 # Store the listener
352 container[priority].append(wrapped_listener)
354 def remove_listener(self, listener: Callable[..., Any]):
355 """
356 Remove a listener from all priorities.
358 Args:
359 listener: Listener function to remove
360 """
361 with self._lock:
362 for priority in EventPriority:
363 self._listeners[priority] = [
364 l
365 for l in self._listeners[priority]
366 if not self._listeners_equal(l, listener)
367 ]
368 self._once_listeners[priority] = [
369 l
370 for l in self._once_listeners[priority]
371 if not self._listeners_equal(l, listener)
372 ]
374 def _listeners_equal(
375 self, listener1: Callable[..., Any], listener2: Callable[..., Any]
376 ) -> bool:
377 """Check if two listeners are effectively the same"""
378 if listener1 == listener2:
379 return True
381 l1 = listener1() if isinstance(listener1, (ref, WeakMethod)) else listener1
382 l2 = listener2() if isinstance(listener2, (ref, WeakMethod)) else listener2
384 if l1 is None or l2 is None:
385 return False
387 # Check if one wraps the other
388 if hasattr(l1, "__wrapped__"):
389 l1 = l1.__wrapped__
390 if hasattr(l2, "__wrapped__"):
391 l2 = l2.__wrapped__
393 return l1 == l2
395 def remove_all_listeners(self):
396 """Remove all listeners"""
397 with self._lock:
398 for priority in EventPriority:
399 self._listeners[priority].clear()
400 self._once_listeners[priority].clear()
402 def has_listener(self, listener: Callable[..., Any]) -> bool:
403 """Check if a listener is registered"""
404 with self._lock:
405 for priority in EventPriority:
406 if any(
407 self._listeners_equal(l, listener)
408 for l in self._listeners[priority]
409 ):
410 return True
411 if any(
412 self._listeners_equal(l, listener)
413 for l in self._once_listeners[priority]
414 ):
415 return True
416 return False
418 def trigger(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
419 """
420 Trigger the event and notify all listeners.
422 Args:
423 *args: Positional arguments to pass to listeners
424 **kwargs: Keyword arguments to pass to listeners
426 Returns:
427 Dictionary with execution statistics
429 Raises:
430 EventCancelledError: If event is cancelled during propagation
431 """
432 if not self._enabled:
433 return {"cancelled": True, "reason": "Event disabled"}
435 with self._lock:
436 event_id = str(uuid.uuid4())
437 context = EventContext(
438 timestamp=time.time(), event_id=event_id, source=self
439 )
441 # Prepare event data
442 event_data: Dict[str, Any] = {
443 "args": args,
444 "kwargs": kwargs,
445 "context": context,
446 "cancelled": False,
447 "default_prevented": False,
448 }
450 # Execute in phases: capture (parent to child), target, bubble (child to parent)
451 try:
452 # Capture phase (parent to child)
453 if self.parent:
454 self._propagate(event_data, EventPhase.CAPTURING)
456 # Target phase
457 execution_stats = self._execute_listeners(
458 event_data, EventPhase.AT_TARGET
459 )
461 # Bubble phase (child to parent) if not cancelled
462 if not event_data["cancelled"] and self.parent:
463 self._propagate(event_data, EventPhase.BUBBLING)
465 # Update metrics
466 self._update_metrics(execution_stats)
468 # Record history
469 self._record_history(event_data, execution_stats)
471 if event_data["cancelled"]:
472 raise EventCancelledError("Event was cancelled during propagation")
474 return {
475 "event_id": event_id,
476 "listeners_executed": execution_stats["total"],
477 "execution_time": execution_stats["total_time"],
478 "cancelled": event_data["cancelled"],
479 }
480 except Exception as e:
481 logger.error(
482 f"Error triggering event '{self.name}': {str(e)}", exc_info=True
483 )
484 raise
486 def _propagate(self, event_data: Dict[str, Any], phase: EventPhase):
487 """Propagate event to parent or children"""
488 if phase == EventPhase.CAPTURING and self.parent:
489 event_data["context"].phase = phase
490 self.parent.trigger(*event_data["args"], **event_data["kwargs"])
491 elif phase == EventPhase.BUBBLING and self.children:
492 for child in self.children:
493 event_data["context"].phase = phase
494 child.trigger(*event_data["args"], **event_data["kwargs"])
496 def _execute_listeners(
497 self, event_data: Dict[str, Any], phase: EventPhase
498 ) -> Dict[str, Any]:
499 """
500 Execute all appropriate listeners.
502 Args:
503 event_data: Event data dictionary
504 phase: Current event phase
506 Returns:
507 Execution statistics
508 """
509 start_time = time.time()
510 listeners_executed = 0
511 cancelled = False
513 # Collect listeners to execute
514 with self._lock:
515 all_listeners: List[Tuple[ListenerType, EventPriority, bool]] = []
516 for priority in EventPriority:
517 all_listeners.extend(
518 (listener, priority, False)
519 for listener in self._listeners[priority]
520 )
521 all_listeners.extend(
522 (listener, priority, True)
523 for listener in self._once_listeners[priority]
524 )
526 # Clear once listeners
527 for priority in EventPriority:
528 self._once_listeners[priority].clear()
530 # Execute listeners in priority order
531 for listener, priority, _ in all_listeners:
532 if event_data.get("cancelled", False):
533 cancelled = True
534 break
536 try:
537 # Resolve weak references
538 actual_listener: Optional[Callable[..., Any]] = None
539 if isinstance(listener, (ref, WeakMethod)):
540 actual_listener = listener() # type:ignore
541 if actual_listener is None:
542 continue
543 else:
544 actual_listener = listener
546 if actual_listener is None:
547 continue
549 # Update context
550 event_data["context"].phase = phase
552 # Execute the listener
553 if asyncio.iscoroutinefunction(actual_listener):
554 asyncio.create_task(
555 actual_listener(*event_data["args"], **event_data["kwargs"])
556 )
557 else:
558 actual_listener(*event_data["args"], **event_data["kwargs"])
560 listeners_executed += 1
561 except EventCancelledError:
562 event_data["cancelled"] = True
563 cancelled = True
564 break
565 except Exception as e:
566 logger.error(
567 f"Error in event listener for '{self.name}': {str(e)}",
568 exc_info=True,
569 )
571 execution_time = time.time() - start_time
573 return {
574 "total": listeners_executed,
575 "total_time": execution_time,
576 "average_time": execution_time / max(1, listeners_executed),
577 "cancelled": cancelled,
578 }
580 def _update_metrics(self, stats: Dict[str, Any]):
581 """Update performance metrics"""
582 with self._lock:
583 self._metrics["trigger_count"] += 1
584 self._metrics["total_listeners_executed"] += stats["total"]
586 # Update average execution time using moving average
587 old_avg = self._metrics["average_execution_time"]
588 new_count = self._metrics["trigger_count"]
589 self._metrics["average_execution_time"] = (
590 old_avg * (new_count - 1) + stats["average_time"]
591 ) / new_count
593 def _record_history(self, event_data: Dict[str, Any], stats: Dict[str, Any]):
594 """Record event trigger in history"""
595 with self._lock:
596 self._history.append(
597 {
598 "timestamp": datetime.now().isoformat(),
599 "event_id": event_data["context"].event_id,
600 "args": str(event_data["args"]),
601 "kwargs": str(event_data["kwargs"]),
602 "listeners_executed": stats["total"],
603 "execution_time": stats["total_time"],
604 "cancelled": event_data["cancelled"],
605 }
606 )
608 # Keep history size manageable
609 if len(self._history) > 100:
610 self._history.pop(0)
612 def get_metrics(self) -> Dict[str, Any]:
613 """Get event performance metrics"""
614 return self._metrics.copy()
616 def get_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
617 """Get event trigger history"""
618 with self._lock:
619 if limit is None:
620 return self._history.copy()
621 return self._history[-limit:] if limit else []
623 def cancel(self):
624 """Cancel the current event propagation"""
625 raise EventCancelledError("Event propagation cancelled")
627 def prevent_default(self):
628 """Prevent default behavior (meaning depends on event)"""
629 event_data = inspect.currentframe().f_back.f_locals.get("event_data") # type: ignore
630 if event_data:
631 event_data["default_prevented"] = True
634class EventEmitter:
635 """
636 Advanced event emitter that manages multiple events and provides
637 a namespace for event organization.
638 """
640 def __init__(self):
641 self._events: Dict[str, Event] = {}
642 self._lock = threading.RLock()
643 self._namespace_separator = ":"
645 def __contains__(self, event_name: str) -> bool:
646 """Check if event exists"""
647 return event_name in self._events
649 def __getitem__(self, event_name: str) -> Event:
650 """Get an event by name"""
651 return self.event(event_name)
653 def event(self, event_name: str) -> Event:
654 """
655 Get or create an event by name.
657 Args:
658 event_name: Name of the event (can include namespaces)
660 Returns:
661 Event instance
662 """
663 with self._lock:
664 if event_name not in self._events:
665 self._events[event_name] = Event(event_name)
666 return self._events[event_name]
668 def namespace(self, namespace: str) -> "EventNamespace":
669 """
670 Get a namespace for organizing events.
672 Args:
673 namespace: Namespace prefix
675 Returns:
676 EventNamespace instance
677 """
678 return EventNamespace(self, namespace)
680 def remove_event(self, event_name: str):
681 """
682 Remove an event and all its listeners.
684 Args:
685 event_name: Name of the event to remove
686 """
687 with self._lock:
688 if event_name in self._events:
689 del self._events[event_name]
691 def remove_all_events(self):
692 """Remove all events and their listeners"""
693 with self._lock:
694 self._events.clear()
696 def event_names(self) -> List[str]:
697 """Get list of all event names"""
698 return list(self._events.keys())
700 def has_event(self, event_name: str) -> bool:
701 """Check if an event exists"""
702 return event_name in self._events
704 def emit(self, event_name: str, *args: Any, **kwargs: Any) -> Dict[str, Any]:
705 """
706 Trigger an event by name.
708 Args:
709 event_name: Name of the event to trigger
710 *args: Positional arguments to pass to listeners
711 **kwargs: Keyword arguments to pass to listeners
713 Returns:
714 Dictionary with execution statistics
715 """
716 return self.event(event_name).trigger(*args, **kwargs)
718 def on(
719 self,
720 event_name: str,
721 func: Optional[Callable[..., Any]] = None,
722 *,
723 priority: EventPriority = EventPriority.NORMAL,
724 weak_ref: bool = False,
725 ) -> Callable[..., Any]:
726 """
727 Decorator or function to register a listener for an event.
729 Args:
730 event_name: Name of the event
731 func: Listener function
732 priority: Listener priority
733 weak_ref: Use weak reference to the listener
735 Returns:
736 The decorated function or decorator
737 """
739 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
740 self.event(event_name).listen(f, priority=priority, weak_ref=weak_ref)
741 return f
743 if func is None:
744 return decorator
745 return decorator(func)
747 def once(
748 self,
749 event_name: str,
750 func: Optional[Callable[..., Any]] = None,
751 *,
752 priority: EventPriority = EventPriority.NORMAL,
753 weak_ref: bool = False,
754 ) -> Callable[..., Any]:
755 """
756 Decorator or function to register a one-time listener for an event.
758 Args:
759 event_name: Name of the event
760 func: Listener function
761 priority: Listener priority
762 weak_ref: Use weak reference to the listener
764 Returns:
765 The decorated function or decorator
766 """
768 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
769 self.event(event_name).once(f, priority=priority, weak_ref=weak_ref)
770 return f
772 if func is None:
773 return decorator
774 return decorator(func)
776 def remove_listener(self, event_name: str, listener: Callable[..., Any]):
777 """
778 Remove a listener from an event.
780 Args:
781 event_name: Name of the event
782 listener: Listener function to remove
783 """
784 self.event(event_name).remove_listener(listener)
786 def remove_all_listeners(self, event_name: Optional[str] = None):
787 """
788 Remove all listeners from an event or all events.
790 Args:
791 event_name: Name of the event (None for all events)
792 """
793 if event_name is None:
794 for event in self._events.values():
795 event.remove_all_listeners()
796 else:
797 self.event(event_name).remove_all_listeners()
800class EventNamespace:
801 """
802 Namespace for organizing events hierarchically.
803 """
805 def __init__(self, emitter: EventEmitter, namespace: str):
806 self._emitter = emitter
807 self._namespace = namespace
809 def __getitem__(self, event_name: str) -> Event:
810 """Get an event within this namespace"""
811 return self.event(event_name)
813 def event(self, event_name: str) -> Event:
814 """
815 Get or create an event within this namespace.
817 Args:
818 event_name: Name of the event (relative to namespace)
820 Returns:
821 Event instance
822 """
823 full_name = (
824 f"{self._namespace}{self._emitter._namespace_separator}{event_name}"
825 ) # type:ignore
826 return self._emitter.event(full_name)
828 def namespace(self, sub_namespace: str) -> "EventNamespace":
829 """
830 Get a sub-namespace within this namespace.
832 Args:
833 sub_namespace: Sub-namespace name
835 Returns:
836 EventNamespace instance
837 """
838 return EventNamespace(
839 self._emitter,
840 f"{self._namespace}{self._emitter._namespace_separator}{sub_namespace}", # type: ignore
841 )
843 def emit(self, event_name: str, *args: Any, **kwargs: Any) -> Dict[str, Any]:
844 """
845 Trigger an event within this namespace.
847 Args:
848 event_name: Name of the event (relative to namespace)
849 *args: Positional arguments to pass to listeners
850 **kwargs: Keyword arguments to pass to listeners
852 Returns:
853 Dictionary with execution statistics
854 """
855 return self.event(event_name).trigger(*args, **kwargs)
857 def on(
858 self,
859 event_name: str,
860 func: Optional[Callable[..., Any]] = None,
861 *,
862 priority: EventPriority = EventPriority.NORMAL,
863 weak_ref: bool = False,
864 ) -> Callable[..., Any]:
865 """
866 Decorator or function to register a listener for an event in this namespace.
868 Args:
869 event_name: Name of the event (relative to namespace)
870 func: Listener function
871 priority: Listener priority
872 weak_ref: Use weak reference to the listener
874 Returns:
875 The decorated function or decorator
876 """
878 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
879 self.event(event_name).listen(f, priority=priority, weak_ref=weak_ref)
880 return f
882 if func is None:
883 return decorator
884 return decorator(func)
886 def once(
887 self,
888 event_name: str,
889 func: Optional[Callable[..., Any]] = None,
890 *,
891 priority: EventPriority = EventPriority.NORMAL,
892 weak_ref: bool = False,
893 ) -> Callable[..., Any]:
894 """
895 Decorator or function to register a one-time listener for an event in this namespace.
897 Args:
898 event_name: Name of the event (relative to namespace)
899 func: Listener function
900 priority: Listener priority
901 weak_ref: Use weak reference to the listener
903 Returns:
904 The decorated function or decorator
905 """
907 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
908 self.event(event_name).once(f, priority=priority, weak_ref=weak_ref)
909 return f
911 if func is None:
912 return decorator
913 return decorator(func)
916class AsyncEventEmitter(EventEmitter):
917 """
918 Event emitter with enhanced support for asynchronous operations.
919 """
921 def __init__(self, max_workers: Optional[int] = None):
922 super().__init__()
923 self._executor = ThreadPoolExecutor(max_workers=max_workers)
925 async def emit_async(
926 self, event_name: str, *args: Any, **kwargs: Any
927 ) -> Dict[str, Any]:
928 """
929 Asynchronously trigger an event by name.
931 Args:
932 event_name: Name of the event to trigger
933 *args: Positional arguments to pass to listeners
934 **kwargs: Keyword arguments to pass to listeners
936 Returns:
937 Dictionary with execution statistics
938 """
939 loop = asyncio.get_event_loop()
940 return await loop.run_in_executor(
941 self._executor, lambda: self.emit(event_name, *args, **kwargs)
942 )
944 def schedule_emit(self, event_name: str, *args: Any, **kwargs: Any) -> asyncio.Future: # type: ignore
945 """
946 Schedule an event to be triggered asynchronously.
948 Args:
949 event_name: Name of the event to trigger
950 *args: Positional arguments to pass to listeners
951 **kwargs: Keyword arguments to pass to listeners
953 Returns:
954 Future representing the eventual execution
955 """
956 loop = asyncio.get_event_loop()
957 return loop.run_in_executor(
958 self._executor, lambda: self.emit(event_name, *args, **kwargs)
959 )
961 def shutdown(self):
962 """Clean up resources"""
963 self._executor.shutdown()
966class EventBenchmark:
967 """
968 Utility for benchmarking event performance.
969 """
971 @staticmethod
972 def benchmark(
973 emitter: EventEmitter, event_name: str, iterations: int = 1000
974 ) -> Dict[str, Any]:
975 """
976 Benchmark event triggering performance.
978 Args:
979 emitter: Event emitter instance
980 event_name: Name of the event to benchmark
981 iterations: Number of iterations to run
983 Returns:
984 Dictionary with benchmark results
985 """
987 # Add a simple listener
988 def dummy_listener(*args: Any, **kwargs: Any) -> None:
989 pass
991 emitter.on(event_name)(dummy_listener)
993 # Run benchmark
994 start_time = time.time()
995 for _ in range(iterations):
996 emitter.emit(event_name)
997 total_time = time.time() - start_time
999 # Clean up
1000 emitter.remove_listener(event_name, dummy_listener)
1002 return {
1003 "iterations": iterations,
1004 "total_time": total_time,
1005 "average_time": total_time / iterations,
1006 "events_per_second": iterations / total_time,
1007 }