Source code for eta_ctrl.envs.sim_env

from __future__ import annotations

import abc
import time
from logging import getLogger
from typing import TYPE_CHECKING

from eta_ctrl.envs import BaseEnv
from eta_ctrl.simulators import FMUSimulator

if TYPE_CHECKING:
    import pathlib
    from collections.abc import Callable, Mapping
    from typing import Any

    from eta_ctrl.config import ConfigRun
    from eta_ctrl.util.type_annotations import TimeStep

log = getLogger(__name__)


[docs] class SimEnv(BaseEnv, abc.ABC): """Base class for FMU Simulation models environments. :param env_id: Identification for the environment, useful when creating multiple environments. :param config_run: Configuration of the optimization run. :param verbose: Verbosity to use for logging. :param callback: callback which should be called after each episode. :param episode_duration: Duration of the episode in seconds. :param sampling_time: Duration of a single time sample / time step in seconds. :param model_parameters: Parameters for the mathematical model. :param sim_steps_per_sample: Number of simulation steps to perform during every sample. :param render_mode: Renders the environments to help visualise what the agent see, examples modes are "human", "rgb_array", "ansi" for text. :param kwargs: Other keyword arguments (for subclasses). """ @property @abc.abstractmethod def fmu_name(self) -> str: """Name of the FMU file.""" return "" def __init__( self, env_id: int, config_run: ConfigRun, verbose: int = 2, callback: Callable | None = None, *, episode_duration: TimeStep | str, sampling_time: TimeStep | str, model_parameters: Mapping[str, Any] | None = None, sim_steps_per_sample: int | str = 1, render_mode: str | None = None, **kwargs: Any, ) -> None: super().__init__( env_id=env_id, config_run=config_run, verbose=verbose, callback=callback, episode_duration=episode_duration, sampling_time=sampling_time, render_mode=render_mode, **kwargs, ) #: Number of simulation steps to be taken for each sample. This must be a divisor of 'sampling_time'. self.sim_steps_per_sample: int = int(sim_steps_per_sample) #: The FMU is expected to be placed in the same folder as the environment self.path_fmu: pathlib.Path = self.path_env / (self.fmu_name + ".fmu") #: Configuration for the FMU model parameters, that need to be set for initialization of the Model. self.model_parameters: Mapping[str, int | float] | None = model_parameters #: Instance of the FMU. This can be used to directly access the eta_ctrl.FMUSimulator interface. self.simulator: FMUSimulator def __str__(self) -> str: """Human-readable string representation of SimEnv.""" base_str = super().__str__() fmu_name = self.fmu_name return f"{base_str}, FMU: {fmu_name}" def __repr__(self) -> str: """Developer-friendly string representation of SimEnv.""" base_repr = super().__repr__() # Remove the closing parenthesis to add our info base_repr = base_repr.rstrip(")") return f"{base_repr}, fmu_name='{self.fmu_name}', sim_steps_per_sample={self.sim_steps_per_sample})" def _init_simulator(self, init_values: Mapping[str, int | float] | None = None) -> None: """Initialize the simulator object. Make sure to call _names_from_state before this or to otherwise initialize the names array. This can also be used to reset the simulator after an episode is completed. It will reuse the same simulator object and reset it to the given initial values. :param init_values: Dictionary of initial values for some FMU variables. """ _init_vals = {} if init_values is None else init_values if hasattr(self, "simulator") and isinstance(self.simulator, FMUSimulator): self.simulator.reset(_init_vals) else: # Instance of the FMU. This can be used to directly access the eta_ctrl.FMUSimulator interface. self.simulator = FMUSimulator( self.env_id, self.path_fmu, start_time=0.0, stop_time=self.episode_duration, step_size=float(self.sampling_time / self.sim_steps_per_sample), names_inputs=[self.state_config.map_ext_ids[name] for name in self.state_config.ext_inputs], names_outputs=[self.state_config.map_ext_ids[name] for name in self.state_config.ext_outputs], init_values=_init_vals, )
[docs] def simulate(self) -> tuple[bool, float]: """Perform a simulator step. Updates the state with new external outputs from the simulation results. :return: Boolean showing whether all simulation steps were successful and time elapsed during simulation. """ # generate FMU input from current state step_inputs: dict[str, int | float | bool | str] = self.get_external_inputs() sim_time_start = time.time() step_success = True try: # We provide output and input names to the FMU so output will be a dictionary step_output: dict[str, float] = self.simulator.step(input_values=step_inputs) # type: ignore[arg-type] except Exception: step_success = False log.exception("Simulation failed") # stop timer for simulation step time debugging sim_time_elapsed = time.time() - sim_time_start # save step_outputs into state if step_success: self.set_external_outputs(step_output) return step_success, sim_time_elapsed
[docs] def _step(self) -> tuple[float, bool, bool, dict]: """Perform one internal time step and return core step results. This private method implements the actual environment transition logic. It works with the internal self.state dictionary that already includes actions and returns the core step results without observations (which are handled by the public step method). .. note:: This function always returns 0 reward. Therefore, it must be extended if it is to be used with reinforcement learning agents. If you need to work with modified actions (e.g., discretized or shaped actions), ensure they are processed before reaching this method or handle them within this method using the values in self.state. If you need to manipulate observations afterwarads, you can do this using the state modification callback. :return: A tuple containing: * **reward**: The value of the reward function. This is just one floating point value. * **terminated (bool)**: Whether the agent reaches the terminal state (as defined under the MDP of the task) which can be positive or negative. An example is reaching the goal state or moving into the lava from the Sutton and Barto Gridworld. If true, the Vectorizer will call :meth:`reset`. * **truncated (bool)**: Whether the truncation condition outside the scope of the MDP is satisfied (i.e. the episode ended). Typically, this is a timelimit, but could also be used to indicate an agent physically going out of bounds. Can be used to end the episode prematurely before a terminal state is reached. If true, the Vectorizer will call :meth:`reset`. * **info**: Provide some additional info about the state of the environment. The contents of this may be used for logging purposes in the future but typically do not currently serve a purpose. .. note:: Stable Baselines3 combines terminated and truncated with a logical OR to trigger the automatic environment reset. Implement both flags for compatibility. :meta public: """ step_success, sim_time_elapsed = self._update_state() info: dict[str, Any] = {"sim_time_elapsed": sim_time_elapsed} return 0, not step_success, False, info
def _update_state(self) -> tuple[bool, float]: """Take additional_state, execute simulation and get state information from scenario. This function updates self.state and increments the step counter. .. warning:: You have to update self.state_log with the entire state before leaving the step to store the state information. :return: Success of the simulation, time taken for simulation. """ step_success, sim_time_elapsed = False, 0.0 # simulate one time step and store the results. for i in range(self.sim_steps_per_sample): # do multiple FMU steps in one environment-step step_success, sim_time_elapsed = self.simulate() # only ext inputs are needed # Append intermediate simulation results to the state_log if i < self.sim_steps_per_sample - 1: self.state_log.append(self.state) return step_success, sim_time_elapsed
[docs] def _reset( self, *, seed: int | None = None, options: dict[str, Any] | None = None, ) -> dict[str, Any]: """Reset the internal state of the environment and return info dictionary. This private method initializes the internal self.state dictionary by reading initial values directly from the FMU/simulator. It does not use the seed parameter since the initial state is determined by the simulator configuration. The public reset method handles the Gymnasium interface including observation filtering and proper seeding mechanism. :param seed: The seed for initializing any randomized components of the state. Subclasses should use this for reproducible randomness in their state init :param options: Additional information to specify how the environment is reset (optional, depending on the specific environment) (default: None) :return: Info dictionary containing information about the initial state. The initial observations are automatically filtered from the internal state by the public reset method and must not be returned here. .. note:: The base implementation initializes external outputs from the FMU without using the seed. Subclasses should use the seed parameter for any additional randomized state observations they implement. :meta public: """ # reset the FMU after every episode with new parameters self._init_simulator(self.model_parameters) # Read values from the fmu without time step and store the results start_obs = [str(self.state_config.map_ext_ids[name]) for name in self.state_config.ext_outputs] # We provide output and input names to the FMU so output will be a dictionary output: dict[str, float] = self.simulator.read_values(start_obs) self.set_external_outputs(output) return {}
[docs] def close(self) -> None: """Close the environment. This should always be called when an entire run is finished. It should be used to close any resources (i.e. simulation models) used by the environment. Default behavior for the Simulation environment is to close the FMU object. """ self.simulator.close() # close the FMU