"""
=============
Event Manager
=============
``vivarium`` constructs and manages the flow of :ref:`time <time_concept>`
through the emission of regularly scheduled events. The tools in this module
manage the relationships between event emitters and listeners.
The :class:`EventManager` maintains a mapping between event types and channels.
Each event type (and event types must be unique so event type is equivalent to
event name, e.g., ``time_step_prepare``) corresponds to an
:class:`EventChannel`, which tracks listeners to that event in prioritized
levels and passes on the event to those listeners when emitted.
For more information, see the associated event :ref:`concept note <event_concept>`.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
import pandas as pd
from vivarium.engine.framework.lifecycle import ConstraintError, lifecycle_states
from vivarium.engine.manager import Manager
from vivarium.engine.types import ClockStepSize, ClockTime
if TYPE_CHECKING:
from vivarium.engine.framework.engine import Builder
[docs]
@dataclass(frozen=True)
class Event:
"""An Event object represents the context of an event.
Events themselves are just a bundle of data. They must be emitted
along an :class:`EventChannel` in order for other simulation components
to respond to them.
"""
name: str
"""The name of the event. Typically a lifecycle state."""
index: pd.Index[int]
"""An index into the population table containing all simulants affected by this event."""
user_data: dict[str, Any]
"""Any additional data provided by the user about the event."""
time: ClockTime
"""The simulation time at which this event will resolve. The current simulation
size plus the current time step size."""
step_size: ClockStepSize
"""The current step size at the time of the event."""
[docs]
def split(self, new_index: pd.Index[int]) -> "Event":
"""Creates a copy of this event with a new index.
This function should be used to emit an event in a new :class:`EventChannel`
in response to an event emitted from a different channel.
Parameters
----------
new_index
An index into the population table containing all simulants
affected by this event.
Returns
-------
The new event.
"""
return Event(self.name, new_index, self.user_data, self.time, self.step_size)
def __repr__(self) -> str:
return f"Event(name={self.name}, user_data={self.user_data}, time={self.time}, step_size={self.step_size})"
def __eq__(self, other: object) -> bool:
return self.__dict__ == other.__dict__
[docs]
class EventChannel:
"""A named subscription channel that passes events to event listeners."""
def __init__(self, manager: EventManager, event_name: str) -> None:
self.event_name = event_name
self.name = f"event_channel_{event_name}"
self.manager = manager
self.listeners: list[list[Callable[[Event], None]]] = [[] for _ in range(10)]
[docs]
def emit(self, index: pd.Index[int], user_data: dict[str, Any] | None = None) -> Event:
"""Notifies all listeners to this channel that an event has occurred.
Events are emitted to listeners in order of priority (with order 0 being
first and order 9 last), with no ordering within a particular priority
level guaranteed.
Parameters
----------
index
An index into the population table containing all simulants
affected by this event.
user_data
Any additional data provided by the user about the event.
"""
if not user_data:
user_data = {}
clock = self.manager.clock()
step_size = self.manager.step_size()
event_time: ClockTime
if isinstance(clock, int) and isinstance(step_size, int):
event_time = clock + step_size
elif isinstance(clock, datetime) and isinstance(step_size, timedelta):
event_time = clock + step_size
else:
raise ValueError(
f"Clock ({type(clock)}) and step size ({type(step_size)}) are not compatible."
)
e = Event(
self.event_name,
index,
user_data,
event_time,
step_size,
)
for priority_bucket in self.listeners:
for listener in priority_bucket:
listener(e)
return e
def __repr__(self) -> str:
return f"EventChannel(listeners: {[listener for bucket in self.listeners for listener in bucket]})"
[docs]
class EventManager(Manager):
"""The configuration for the event system.
Notes
-----
Client code should never need to interact with this class
except through the decorators in this module and the emitter
function exposed on the builder during the setup phase.
"""
def __init__(self) -> None:
self._event_types: dict[str, EventChannel] = {}
@property
def name(self) -> str:
"""The name of this component."""
return "event_manager"
[docs]
def get_channel(self, event_name: str) -> EventChannel:
if event_name not in self._event_types:
self._event_types[event_name] = EventChannel(self, event_name)
return self._event_types[event_name]
[docs]
def setup(self, builder: Builder) -> None:
"""Performs this component's simulation setup.
Parameters
----------
builder
Object giving access to core framework functionality.
"""
self.clock = builder.time.clock()
self.step_size = builder.time.step_size()
builder.event.register_listener(lifecycle_states.POST_SETUP, self.on_post_setup)
self.add_handlers = builder.lifecycle.add_handlers
self.add_constraint = builder.lifecycle.add_constraint
builder.lifecycle.add_constraint(
self.get_emitter,
allow_during=[
lifecycle_states.SETUP,
lifecycle_states.SIMULATION_END,
lifecycle_states.REPORT,
],
)
builder.lifecycle.add_constraint(
self.register_listener, allow_during=[lifecycle_states.SETUP]
)
[docs]
def on_post_setup(self, event: Event) -> None:
for name, channel in self._event_types.items():
self.add_handlers(name, [h for level in channel.listeners for h in level])
[docs]
def get_emitter(
self, event_name: str
) -> Callable[[pd.Index[int], dict[str, Any] | None], Event]:
"""Gets an emitter function for the named event.
Parameters
----------
name
The name of the event.
Returns
-------
A function that accepts an index and optional user data. This function
creates and timestamps an Event and distributes it to all interested
listeners
"""
channel = self.get_channel(event_name)
try:
self.add_constraint(channel.emit, allow_during=[event_name])
except ConstraintError:
# Multiple components have requested this emitter.
# Shouldn't happen in production, but happens frequently in tests.
pass
return channel.emit
[docs]
def register_listener(
self, event_name: str, listener: Callable[[Event], None], priority: int = 5
) -> None:
"""Registers a new listener to the named event.
Parameters
----------
name
The name of the event.
listener
The consumer of the named event.
priority
Number in range(10) used to assign the ordering in which listeners
process the event.
"""
self.get_channel(event_name).listeners[priority].append(listener)
[docs]
def get_listeners(self, event_name: str) -> dict[int, list[Callable[[Event], None]]]:
"""Gets all listeners registered for the named event.
Parameters
----------
name
The name of the event.
Returns
-------
A dictionary that maps each priority level of the named event's
listeners to a list of listeners at that level.
"""
channel = self.get_channel(event_name)
return {
priority: listeners
for priority, listeners in enumerate(channel.listeners)
if listeners
}
[docs]
def list_events(self) -> list[str]:
"""Lists all event names known to the event system.
Returns
-------
A list of all known events names.
Notes
-----
This value can change after setup if components dynamically create
new event labels.
"""
return list(self._event_types)
def __contains__(self, item: str) -> bool:
return item in self._event_types
def __repr__(self) -> str:
return "EventManager()"