"""
==========================
Vivarium Interactive Tools
==========================
This module provides an interface for interactive simulation usage. The main
part is the :class:`InteractiveContext`, a sub-class of the main simulation
object in ``vivarium`` that has been extended to include convenience
methods for running and exploring the simulation in an interactive setting.
See the associated tutorials for :ref:`running <interactive_tutorial>` and
:ref:`exploring <exploration_tutorial>` for more information.
"""
from __future__ import annotations
from collections.abc import Callable
from math import ceil
from typing import Any, overload
import pandas as pd
from vivarium.engine.framework.engine import SimulationContext
from vivarium.engine.framework.event import Event
from vivarium.engine.framework.values import Pipeline
from vivarium.engine.interface.utilities import log_progress, run_from_ipython
from vivarium.engine.types import ClockStepSize, ClockTime
[docs]
class InteractiveContext(SimulationContext):
"""A simulation context with helper methods for running simulations interactively."""
def __init__(self, *args: Any, setup: bool = True, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
if setup:
self.setup()
@property
def current_time(self) -> ClockTime:
"""Returns the current simulation time."""
return self._clock.time
[docs]
def setup(self) -> None:
super().setup()
self.initialize_simulants()
self._population_view = self._builder.population.get_view()
[docs]
def step(self, step_size: ClockStepSize | None = None) -> None:
"""Advance the simulation one step.
Parameters
----------
step_size
An optional size of step to take. Must be compatible with the
simulation clock's step size (usually a pandas.Timedelta).
"""
old_step_size = self._clock._clock_step_size
if step_size is not None:
if not (
isinstance(step_size, type(self._clock.step_size))
or isinstance(self._clock.step_size, type(step_size))
):
raise ValueError(
f"Provided time must be compatible with {type(self._clock.step_size)}"
)
self._clock._clock_step_size = step_size
super().step()
self._clock._clock_step_size = old_step_size
[docs]
def run(self, with_logging: bool = True) -> None: # type: ignore [override]
"""Run the simulation for the duration specified in the configuration.
Parameters
----------
with_logging
Whether or not to log the simulation steps. Only works in an ipython
environment.
Returns
-------
The number of steps the simulation took.
"""
self.run_until(self._clock.stop_time, with_logging=with_logging)
[docs]
def run_for(self, duration: ClockStepSize | str, with_logging: bool = True) -> None:
"""Run the simulation for the given time duration.
Parameters
----------
duration
The length of time to run the simulation for. Should be compatible
with the simulation clock's step size (usually a pandas
Timedelta). If a string is provided, it will be passed to
`pandas.Timedelta` to be converted.
with_logging
Whether or not to log the simulation steps. Only works in an ipython
environment.
Returns
-------
The number of steps the simulation took.
"""
if isinstance(duration, str):
duration = pd.Timedelta(duration)
self.run_until(self._clock.time + duration, with_logging=with_logging) # type: ignore [operator]
[docs]
def run_until(self, end_time: ClockTime, with_logging: bool = True) -> None:
"""Run the simulation until the provided end time.
Parameters
----------
end_time
The time to run the simulation until. The simulation will run until
its clock is greater than or equal to the provided end time. Must be
compatible with the simulation clock's step size (usually a pandas.Timestamp)
with_logging
Whether or not to log the simulation steps. Only works in an ipython
environment.
Returns
-------
The number of steps the simulation took.
"""
if not (
isinstance(end_time, type(self._clock.time))
or isinstance(self._clock.time, type(end_time))
):
raise ValueError(
f"Provided time must be compatible with {type(self._clock.time)}"
)
iterations = int(ceil((end_time - self._clock.time) / self._clock.step_size)) # type: ignore [operator, arg-type]
self.take_steps(number_of_steps=iterations, with_logging=with_logging)
assert self._clock.time - self._clock.step_size < end_time <= self._clock.time # type: ignore [operator]
print("Simulation complete after", iterations, "iterations")
[docs]
def take_steps(
self,
number_of_steps: int = 1,
step_size: ClockStepSize | None = None,
with_logging: bool = True,
) -> None:
"""Run the simulation for the given number of steps.
Parameters
----------
number_of_steps
The number of steps to take.
step_size
An optional size of step to take. Must be compatible with the
simulation clock's step size (usually a pandas.Timedelta).
with_logging
Whether or not to log the simulation steps. Only works in an ipython
environment.
"""
if not isinstance(number_of_steps, int):
raise ValueError("Number of steps must be an integer.")
if run_from_ipython() and with_logging:
for _ in log_progress(range(number_of_steps), name="Step"):
self.step(step_size)
else:
for _ in range(number_of_steps):
self.step(step_size)
@overload
def get_population(
self,
attributes: str,
query: str = "",
include_untracked: bool = False,
) -> pd.Series[Any] | pd.DataFrame:
...
@overload
def get_population(
self,
attributes: list[str] | tuple[str, ...],
query: str = "",
include_untracked: bool = False,
) -> pd.DataFrame:
...
[docs]
def get_population(
self,
attributes: str | list[str] | tuple[str, ...],
query: str = "",
include_untracked: bool = False,
) -> pd.Series[Any] | pd.DataFrame:
"""Get a copy of the population state table.
Parameters
----------
attributes
The attribute pipelines to include in the returned table.
query
Additional conditions used to filter the index.
include_untracked
Whether to include untracked simulants.
Returns
-------
The current state of requested population attributes.
"""
index = self.get_population_index()
if isinstance(attributes, str):
try:
return self._population_view.get(
index, attributes, query, include_untracked=include_untracked
)
except ValueError as e:
if "call `get_frame()` instead" in str(e):
return self._population_view.get_frame(
index, attributes, query, include_untracked=include_untracked
)
raise
return self._population_view.get(
index, attributes, include_untracked=include_untracked
)
[docs]
def get_attribute_names(self) -> list[str]:
"""List all attributes in the population state table."""
return self._population.get_all_attribute_names()
[docs]
def list_values(self) -> list[str]:
"""List the names of all value pipelines in the simulation."""
return list(self._values.get_value_pipelines().keys())
[docs]
def get_value(self, value_pipeline_name: str) -> Pipeline:
"""Get the value pipeline associated with the given name."""
if value_pipeline_name not in self.list_values():
raise ValueError(
f"No value pipeline '{value_pipeline_name}' registered. "
"Are you looking for an attribute pipeline?"
)
return self._values.get_value(value_pipeline_name)
[docs]
def list_events(self) -> list[str]:
"""List all event types registered with the simulation."""
return self._events.list_events()
[docs]
def get_listeners(self, event_name: str) -> dict[int, list[Callable[[Event], None]]]:
"""Get all listeners of a particular type of event.
Available events can be found by calling
:func:`InteractiveContext.list_events`.
Parameters
----------
event_name
The name of the event to grab the listeners for.
Returns
-------
A dictionary that maps each priority level of the named event's
listeners to a list of listeners at that level.
"""
if event_name not in self._events:
raise ValueError(f"No event {event_name} in system.")
return self._events.get_listeners(event_name)
[docs]
def get_emitter(
self, event_name: str
) -> Callable[[pd.Index[int], dict[str, Any] | None], Event]:
"""Get the callable that emits the given type of events.
Available events can be found by calling
:func:`InteractiveContext.list_events`.
Parameters
----------
event_name
The name of the event to grab the listeners for.
Returns
-------
The callable that emits the named event.
"""
if event_name not in self._events:
raise ValueError(f"No event {event_name} in system.")
return self._events.get_emitter(event_name)
[docs]
def list_components(self) -> dict[str, Any]:
"""Get a mapping of component names to components currently in the simulation.
Returns
-------
A dictionary mapping component names to components.
"""
return self._component_manager.list_components()
[docs]
def get_component(self, name: str) -> Any:
"""Get the component in the simulation that has ``name``, if present.
Names are guaranteed to be unique.
Parameters
----------
name
A component name.
Returns
-------
A component that has the name ``name`` else None.
"""
return self._component_manager.get_component(name)
[docs]
def print_initializer_order(self) -> None:
"""Print the order in which population initializers are called."""
initializers = []
for r in self._resource.get_population_initializers():
name = r.__name__
if hasattr(r, "__self__"):
obj = r.__self__
initializers.append(f"{obj.__class__.__name__}({obj.name}).{name}")
else:
initializers.append(f"Unbound function {name}")
print("\n".join(initializers))
[docs]
def print_lifecycle_order(self) -> None:
"""Print the order of lifecycle events (including user event handlers)."""
print(self._lifecycle)
def __repr__(self) -> str:
return "InteractiveContext()"