Source code for vivarium.engine.framework.event.manager

"""
=============
Event Manager
=============

``vivarium`` constructs and manages the flow of :ref:`time <time_concept>`
through the emission of regularly scheduled events. The tools in this module
manage the relationships between event emitters and listeners.

The :class:`EventManager` maintains a mapping between event types and channels.
Each event type (and event types must be unique so event type is equivalent to
event name, e.g., ``time_step_prepare``) corresponds to an
:class:`EventChannel`, which tracks listeners to that event in prioritized
levels and passes on the event to those listeners when emitted.

For more information, see the associated event :ref:`concept note <event_concept>`.

"""


from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any

import pandas as pd

from vivarium.engine.framework.lifecycle import ConstraintError, lifecycle_states
from vivarium.engine.manager import Manager
from vivarium.engine.types import ClockStepSize, ClockTime

if TYPE_CHECKING:
    from vivarium.engine.framework.engine import Builder


[docs] @dataclass(frozen=True) class Event: """An Event object represents the context of an event. Events themselves are just a bundle of data. They must be emitted along an :class:`EventChannel` in order for other simulation components to respond to them. """ name: str """The name of the event. Typically a lifecycle state.""" index: pd.Index[int] """An index into the population table containing all simulants affected by this event.""" user_data: dict[str, Any] """Any additional data provided by the user about the event.""" time: ClockTime """The simulation time at which this event will resolve. The current simulation size plus the current time step size.""" step_size: ClockStepSize """The current step size at the time of the event."""
[docs] def split(self, new_index: pd.Index[int]) -> "Event": """Creates a copy of this event with a new index. This function should be used to emit an event in a new :class:`EventChannel` in response to an event emitted from a different channel. Parameters ---------- new_index An index into the population table containing all simulants affected by this event. Returns ------- The new event. """ return Event(self.name, new_index, self.user_data, self.time, self.step_size)
def __repr__(self) -> str: return f"Event(name={self.name}, user_data={self.user_data}, time={self.time}, step_size={self.step_size})" def __eq__(self, other: object) -> bool: return self.__dict__ == other.__dict__
[docs] class EventChannel: """A named subscription channel that passes events to event listeners.""" def __init__(self, manager: EventManager, event_name: str) -> None: self.event_name = event_name self.name = f"event_channel_{event_name}" self.manager = manager self.listeners: list[list[Callable[[Event], None]]] = [[] for _ in range(10)]
[docs] def emit(self, index: pd.Index[int], user_data: dict[str, Any] | None = None) -> Event: """Notifies all listeners to this channel that an event has occurred. Events are emitted to listeners in order of priority (with order 0 being first and order 9 last), with no ordering within a particular priority level guaranteed. Parameters ---------- index An index into the population table containing all simulants affected by this event. user_data Any additional data provided by the user about the event. """ if not user_data: user_data = {} clock = self.manager.clock() step_size = self.manager.step_size() event_time: ClockTime if isinstance(clock, int) and isinstance(step_size, int): event_time = clock + step_size elif isinstance(clock, datetime) and isinstance(step_size, timedelta): event_time = clock + step_size else: raise ValueError( f"Clock ({type(clock)}) and step size ({type(step_size)}) are not compatible." ) e = Event( self.event_name, index, user_data, event_time, step_size, ) for priority_bucket in self.listeners: for listener in priority_bucket: listener(e) return e
def __repr__(self) -> str: return f"EventChannel(listeners: {[listener for bucket in self.listeners for listener in bucket]})"
[docs] class EventManager(Manager): """The configuration for the event system. Notes ----- Client code should never need to interact with this class except through the decorators in this module and the emitter function exposed on the builder during the setup phase. """ def __init__(self) -> None: self._event_types: dict[str, EventChannel] = {} @property def name(self) -> str: """The name of this component.""" return "event_manager"
[docs] def get_channel(self, event_name: str) -> EventChannel: if event_name not in self._event_types: self._event_types[event_name] = EventChannel(self, event_name) return self._event_types[event_name]
[docs] def setup(self, builder: Builder) -> None: """Performs this component's simulation setup. Parameters ---------- builder Object giving access to core framework functionality. """ self.clock = builder.time.clock() self.step_size = builder.time.step_size() builder.event.register_listener(lifecycle_states.POST_SETUP, self.on_post_setup) self.add_handlers = builder.lifecycle.add_handlers self.add_constraint = builder.lifecycle.add_constraint builder.lifecycle.add_constraint( self.get_emitter, allow_during=[ lifecycle_states.SETUP, lifecycle_states.SIMULATION_END, lifecycle_states.REPORT, ], ) builder.lifecycle.add_constraint( self.register_listener, allow_during=[lifecycle_states.SETUP] )
[docs] def on_post_setup(self, event: Event) -> None: for name, channel in self._event_types.items(): self.add_handlers(name, [h for level in channel.listeners for h in level])
[docs] def get_emitter( self, event_name: str ) -> Callable[[pd.Index[int], dict[str, Any] | None], Event]: """Gets an emitter function for the named event. Parameters ---------- name The name of the event. Returns ------- A function that accepts an index and optional user data. This function creates and timestamps an Event and distributes it to all interested listeners """ channel = self.get_channel(event_name) try: self.add_constraint(channel.emit, allow_during=[event_name]) except ConstraintError: # Multiple components have requested this emitter. # Shouldn't happen in production, but happens frequently in tests. pass return channel.emit
[docs] def register_listener( self, event_name: str, listener: Callable[[Event], None], priority: int = 5 ) -> None: """Registers a new listener to the named event. Parameters ---------- name The name of the event. listener The consumer of the named event. priority Number in range(10) used to assign the ordering in which listeners process the event. """ self.get_channel(event_name).listeners[priority].append(listener)
[docs] def get_listeners(self, event_name: str) -> dict[int, list[Callable[[Event], None]]]: """Gets all listeners registered for the named event. Parameters ---------- name The name of the event. Returns ------- A dictionary that maps each priority level of the named event's listeners to a list of listeners at that level. """ channel = self.get_channel(event_name) return { priority: listeners for priority, listeners in enumerate(channel.listeners) if listeners }
[docs] def list_events(self) -> list[str]: """Lists all event names known to the event system. Returns ------- A list of all known events names. Notes ----- This value can change after setup if components dynamically create new event labels. """ return list(self._event_types)
def __contains__(self, item: str) -> bool: return item in self._event_types def __repr__(self) -> str: return "EventManager()"