Source code for eta_ctrl.simulators.fmu

"""The FMUSimulator class enables easy simulation of FMU files."""

from __future__ import annotations

import pathlib
import shutil
from contextlib import contextmanager
from logging import getLogger
from typing import TYPE_CHECKING, TypedDict

import numpy as np
from fmpy import extract, read_model_description
from fmpy.build import build_platform_binary
from fmpy.fmi2 import FMU2Model, FMU2Slave
from fmpy.sundials import CVodeSolver

from eta_ctrl.util.utils import timestep_to_seconds

if TYPE_CHECKING:
    from collections.abc import Iterator, Mapping, Sequence
    from typing import Any

    from fmpy.model_description import ModelDescription

    from eta_ctrl.util.type_annotations import Number, Path, TimeStep

log = getLogger(__name__)


[docs] class FMUSimulator: """FMU simulator object. :param _id: FMU instance ID. :param fmu_path: Path to the FMU file. :param start_time: Simulation start time in seconds. :param float stop_time: Simulation stop time in seconds. :param step_size: Simulation step size in seconds. :param names_inputs: List of input names that correspond to names used in the FMU file (e.g. ['u', 'p']). If the step function is going to be used with lists as input values, this list will be used to translate between the list position and the variable name in the FMU. :param names_outputs: List of output names that correspond to names used in the FMU file (e.g. ['y', 'th', 'thdot']). If the step function should return only specific values instead of all results as a dictionary, this parameter can be specified to determine, which parameters should be returned. :param init_values: Starting values for parameters that should be pushed to the FMU with names corresponding to variables in the FMU. """ def __init__( self, _id: int, fmu_path: Path, start_time: TimeStep = 0, stop_time: TimeStep = 1, step_size: TimeStep = 1, names_inputs: Sequence[str] | None = None, names_outputs: Sequence[str] | None = None, init_values: Mapping[str, float] | None = None, ) -> None: #: Path to the FMU model. self.fmu_path = fmu_path #: Start time for the simulation in time increments. self.start_time = int(timestep_to_seconds(start_time)) #: Stopping time for the simulation in time increments (only relevant if run in simulation loop). self.stop_time = int(timestep_to_seconds(stop_time)) #: Step size (time) for the simulation in time increments. self.step_size = int(timestep_to_seconds(step_size)) #: Model description from the FMU (contains variable names, types, references and more). self.model_description: ModelDescription = read_model_description(fmu_path) #: Variable map from model description. The map specifies the value reference and datatype of a named #: variable in the FMU. The structure is {'name': {'ref': <value reference>, 'type': <variable data type>}}. self.__type_map = {"Real": "real", "Boolean": "bool", "Integer": "int", "Enumeration": "enum"} self._model_vars: dict[str, dict[str, str]] = {} for var in self.model_description.modelVariables: self._model_vars[var.name] = {"ref": var.valueReference, "type": self.__type_map[var.type]} self.model_vars_names = list(self._model_vars.keys()) if names_inputs is not None: self._validate_names(names_inputs, context="input names") input_names = names_inputs else: input_names = self.model_vars_names #: Mapping of input names to model references self.input_mapping = {name: self._model_vars[name]["ref"] for name in input_names} if names_outputs is not None: self._validate_names(names_outputs, context="output names") output_names = names_outputs else: output_names = self.model_vars_names #: Mapping of output names to model references self.output_mapping = {name: self._model_vars[name]["ref"] for name in output_names} #: Directory where the FMU will be extracted. self._unzipdir: Path = extract(fmu_path) try: #: Instance of the FMU Slave object. self.fmu: FMU2Slave = FMU2Slave( guid=self.model_description.guid, unzipDirectory=self._unzipdir, modelIdentifier=self.model_description.coSimulation.modelIdentifier, instanceName="FMUsimulator_" + str(_id), ) except Exception: # noqa: BLE001 fmpy raises bare Exceptions build_platform_binary(unzipdir=self._unzipdir) self.fmu = FMU2Slave( guid=self.model_description.guid, unzipDirectory=self._unzipdir, modelIdentifier=self.model_description.coSimulation.modelIdentifier, instanceName="FMUsimulator_" + str(_id), ) # initialize self.fmu.instantiate(visible=False, callbacks=None, loggingOn=False) self.fmu.setupExperiment(startTime=self.start_time) # set init values # instead of using the fmpy apply_start_values func from fmpy use the own set_values func to set the values # of the simulation variables correctly, reasons are also performance and simulation speed init_values = {} if init_values is None else init_values self.set_values(init_values) self.fmu.enterInitializationMode() self.fmu.exitInitializationMode() #: Current simulation time. self.time = self.start_time def __str__(self) -> str: """Human-readable string representation of FMUSimulator.""" n_inputs = len(self.input_mapping) n_outputs = len(self.output_mapping) return f"FMUSimulator('{pathlib.Path(self.fmu_path).stem}', {n_inputs} inputs, {n_outputs} outputs)" def __repr__(self) -> str: """Developer-friendly string representation of FMUSimulator.""" return ( f"FMUSimulator(fmu_path='{self.fmu_path}', start_time={self.start_time}, " f"stop_time={self.stop_time}, step_size={self.step_size})" ) def _validate_names(self, names: Sequence[str], context: str = "names") -> None: """Validate a given sequence of names by checking their existence in the FMU. :param names: Names to check. :param context: Additional context for log messages, defaults to "names". :raises KeyError: Unknown variable names. """ names_set = set(names) model_vars_set = set(self.model_vars_names) if len(names_set) < len(names): log.warning(f"{context.capitalize()} contain duplicate elements") # Find names that don't exist in the model missing_names = names_set - model_vars_set if missing_names: msg = f"Found {len(missing_names)} unknown variable names in {context}: {sorted(missing_names)}. " raise KeyError(msg) @property def input_vars(self) -> list[str]: """Ordered list of all available input variable names in the FMU.""" return list(self.input_mapping.keys()) @property def output_vars(self) -> list[str]: """Ordered list of all available output variable names in the FMU.""" return list(self.output_mapping.keys()) @property def parameter_vars(self) -> list[str]: """Get names of all available parameters in the FMU. :return: List of parameter variable names. """ if not hasattr(self, "_parameter_vars"): # Extract parameter variables from model description self._parameter_vars = [] for var in self.model_description.modelVariables: # Check if the variable is a parameter (or has causality="parameter") if hasattr(var, "causality") and var.causality == "parameter": self._parameter_vars.append(var.name) return self._parameter_vars
[docs] def read_values(self, names: Sequence[str] | None = None) -> dict[str, float]: """Return current values of the simulation without advancing a simulation step or the simulation time. :param names: Sequence of values to read from the FMU. If this is None (default), all available values will be read. :return: Read values from the FMU mapped with their names. """ # Find value references and names for the variables that should be read from the FMU if names is None: var_refs = self.output_mapping else: self._validate_names(names, context="output names") var_refs = {name: self.output_mapping[name] for name in names} # Get values from the FMU and convert to a dictionary output_values = self.fmu.getReal(list(var_refs.values())) return dict(zip(var_refs.keys(), output_values, strict=False))
[docs] def set_values(self, values: Mapping[str, Number | bool]) -> None: """Set values of simulation variables without advancing a simulation step or the simulation time. :param values: Values that should be pushed to the FMU. Names of the input_values must correspond to variables in the FMU. """ self._validate_names(list(values.keys()), context="input names") vals: dict[str, list[Number | bool]] = {"real": [], "int": [], "bool": []} refs: dict[str, list[str]] = {"real": [], "int": [], "bool": []} for var, val in values.items(): model_var = self._model_vars[var] refs[model_var["type"]].append(model_var["ref"]) vals[model_var["type"]].append(val) if len(refs["real"]) > 0: self.fmu.setReal(refs["real"], vals["real"]) if len(refs["int"]) > 0: self.fmu.setInteger(refs["int"], vals["int"]) if len(refs["bool"]) > 0: self.fmu.setBoolean(refs["bool"], vals["bool"])
[docs] def step( self, input_values: Mapping[str, float] | None = None, output_names: Sequence[str] | None = None, advance_time: bool = True, nr_substeps: int | None = None, ) -> dict[str, float]: """Simulate next time step in the FMU with defined input values and output values. :param input_values: Current values that should be pushed to the FMU. Names of the input_values must correspond to variables in the FMU. :param advance_time: Decide if the FMUsimulator should add one timestep to the simulation time or not. This can be deactivated, if you just want to look at the result of a simulation step beforehand, without actually advancing simulation time. :param nr_substeps: if simulation steps are divided into substeps, this value will let the simulator know that no time violation warning is necessary. :return: Resulting input and output values from the FMU with the keys named corresponding to the variables in the FMU. """ if input_values is not None: self.set_values(input_values) # put out warning for time limit violation, if self.time + self.step_size > self.stop_time + full step size if self.time + self.step_size > self.stop_time + (int(nr_substeps) if nr_substeps else 1) * self.step_size: log.warning( f"Simulation time {self.time + self.step_size} s exceeds specified stop time of " f"{self.stop_time} s. Proceed with care, simulation may become inaccurate." ) # push input values to the FMU and do one timestep, doStep performs a step of certain size self.fmu.doStep(currentCommunicationPoint=self.time, communicationStepSize=self.step_size) # advance time if advance_time: self.time += self.step_size # advance the time return self.read_values(output_names)
[docs] @classmethod def simulate( cls, fmu_path: Path, start_time: TimeStep = 0, stop_time: TimeStep = 1, step_size: TimeStep = 1, init_values: Mapping[str, float] | None = None, ) -> np.ndarray: """Instantiate a simulator with the specified FMU, perform simulation and return results. :param fmu_path: Path to the FMU file. :param start_time: Simulation start time in seconds. :param float stop_time: Simulation stop time in seconds. :param step_size: simulation step size in seconds. :param init_values: Starting values for parameters that should be pushed to the FMU with names corresponding to variables in the FMU. """ simulator = cls(0, fmu_path, start_time, stop_time, step_size, init_values=init_values) dt = np.dtype([(name, float) for name in simulator.read_values()]) # mypy does not recognize the return type of floor division... result = np.rec.array( None, shape=((simulator.stop_time - simulator.start_time) // simulator.step_size + 1,), dtype=dt, ) if result.dtype.names is None: msg = "There must be some output variables specified for the simulator." raise ValueError(msg) step = 0 while simulator.time <= simulator.stop_time: step_result = simulator.step() for name in result.dtype.names: result[step][name] = step_result[name] step += 1 return result
[docs] def reset(self, init_values: Mapping[str, float] | None = None) -> None: """Reset FMU to specified initial condition. :param init_values: Values for initialization. """ self.time = self.start_time self.fmu.reset() self.fmu.setupExperiment(startTime=self.start_time) # set init values # instead of using the fmpy apply_start_values func from fmpy use the own set_values func to set the values # of the simulation variables correctly, reasons are also performance and simulation speed self.set_values(init_values) # type: ignore[arg-type] self.fmu.enterInitializationMode() self.fmu.exitInitializationMode()
[docs] def close(self) -> None: """Close the FMU and tidy up the unzipped files.""" self.fmu.terminate() self.fmu.freeInstance() shutil.rmtree(self._unzipdir) # clean up unzipped files
[docs] @classmethod @contextmanager def inspect(cls, fmu_path: pathlib.Path | str) -> Iterator[FMUContext]: """ Context manager for inspecting an FMU to extract metadata. Initializes a temporary FMU simulation environment to extract information such as input/output variable names, parameter start values, and variable bounds. Returns this data as a dictionary for use in TOML export or further analysis. This method handles FMU loading, parsing, and cleanup internally, and is safe to use with `with` blocks. If initialization fails, it yields `None`. :param fmu_path: Path to the FMU file (.fmu) as a string or Path. :param output_path: Optional output path for the resulting file or reference. :yield: A dictionary with FMU metadata, or None on failure. """ def parse_numeric(value: str | None) -> float | None: if value is None: return None try: number = float(value) return int(number) if number.is_integer() else number except (ValueError, TypeError): return None fmu_path = pathlib.Path(fmu_path) simulator = cls( _id=0, fmu_path=fmu_path, start_time=0.0, stop_time=1.0, step_size=0.1, ) model_description = simulator.model_description try: actions: list[VariableDict] = [] observations: list[VariableDict] = [] parameters: dict[str, str | None] = {} var_dict: VariableDict for var in model_description.modelVariables: if "." in var.name or "[" in var.name or "]" in var.name: continue if var.causality == "input": var_dict = {"name": var.name, "is_ext_input": True} min_val = parse_numeric(getattr(var, "min", None)) max_val = parse_numeric(getattr(var, "max", None)) if min_val is not None: var_dict["low_value"] = min_val if max_val is not None: var_dict["high_value"] = max_val actions.append(var_dict) elif var.causality == "output": var_dict = {"name": var.name, "is_ext_output": True} min_val = parse_numeric(getattr(var, "min", None)) max_val = parse_numeric(getattr(var, "max", None)) if min_val is not None: var_dict["low_value"] = min_val if max_val is not None: var_dict["high_value"] = max_val observations.append(var_dict) elif var.causality == "parameter": parameters[var.name] = str(getattr(var, "start", None)) if hasattr(var, "start") else None yield { "fmu_name": fmu_path.stem, "fmu_path": fmu_path, "actions": actions, "observations": observations, "parameters": parameters, } except Exception: log.exception("Failed to inspect FMU simulator") finally: if simulator: simulator.close()
[docs] class VariableDict(TypedDict, total=False): name: str is_ext_input: bool is_ext_output: bool low_value: Number high_value: Number
[docs] class FMUContext(TypedDict): fmu_name: str fmu_path: pathlib.Path actions: list[VariableDict] observations: list[VariableDict] parameters: dict[str, str | None]
[docs] class FMU2MESlave(FMU2Model): """Helper class for simulation of FMU2 FMUs. This is as wrapper for FMU2Model. It can be used to wrap model exchange FMUs such that they can be simulated similar to a co-simulation FMU. This is especially helpful for testing model exchange FMUs. It exposes an interface that emulates part of the original FMU2Slave class from fmpy. """ # Define some constants that might be needed according to the FMI Standard fmi2True: int = 1 # noqa: N815 fmi2False: int = 0 # noqa: N815 fmi2OK: int = 0 # noqa: N815 fmi2Warning: int = 1 # noqa: N815 fmi2Discard: int = 2 # noqa: N815 fmi2Error: int = 3 # noqa: N815 fmi2Fatal: int = 4 # noqa: N815 fmi2Pending: int = 5 # noqa: N815 def __init__(self, **kwargs: Any) -> None: r"""Initialize the FMU2Slave object. See also the fmyp documentation :py:class:`fmpy.fmi2.FMU2Model`. :param Any \**kwargs: Accepts any parameters that fmpy.FMU2Model accepts. """ super().__init__(**kwargs) self._model_description: ModelDescription = read_model_description(kwargs["unzipDirectory"]) self._solver: CVodeSolver self._tolerance: float = 0.0 self._stop_time: float = 0.0 self._start_time: float = 0.0
[docs] def setupExperiment( # noqa: N802 self, tolerance: float | None = None, startTime: float = 0.0, # noqa:N803 stopTime: float | None = None, # noqa:N803 **kwargs: Any, ) -> int: """Experiment setup and storage of required values. .. see also:: fmpy.fmi2.FMU2Model.setupExperiment :param tolerance: Solver tolerance, default value is 1e-5. :param startTime: Starting time for the experiment. :param stopTime: Ending time for the experiment. :param kwargs: Other keyword arguments that might be required for FMU2Model.setupExperiment in the future. :return: FMI2 return value. """ self._tolerance = 1e-5 if tolerance is None else tolerance self._stop_time = 0.0 if stopTime is None else stopTime self._start_time = startTime kwargs["tolerance"] = self._tolerance kwargs["stopTime"] = self._stop_time kwargs["startTime"] = self._start_time return super().setupExperiment(**kwargs)
[docs] def exitInitializationMode(self, **kwargs: Any) -> int: # noqa: N802 """Exit the initialization mode and set up the cvode solver. See also: :py:class:`fmpy.fmi2.FMU2Model.exitInitializationMode` :param kwargs: Keyword arguments accepted by FMU2Model.exitInitializationMode. :return: FMI2 return value. """ ret = super().exitInitializationMode(**kwargs) # Collect discrete states from FMU self.eventInfo.newDiscreteStatesNeeded = self.fmi2true self.eventInfo.terminateSimulation = self.fmi2false while ( self.eventInfo.newDiscreteStatesNeeded == self.fmi2true and self.eventInfo.terminateSimulation == self.fmi2false ): # update discrete states self.newDiscreteStates() self.enterContinuousTimeMode() # Initialize solver self._solver = CVodeSolver( set_time=self.setTime, startTime=self._start_time, maxStep=(self._stop_time - self._start_time) / 50.0, relativeTolerance=self._tolerance, nx=self._model_description.numberOfContinuousStates, nz=self._model_description.numberOfEventIndicators, get_x=self.getContinuousStates, set_x=self.setContinuousStates, get_dx=self.getDerivatives, get_z=self.getEventIndicators, ) return ret
[docs] def doStep( # noqa: N802 self, currentCommunicationPoint: float, # noqa: N803 communicationStepSize: float, # noqa: N803 noSetFMUStatePriorToCurrentPoint: int | None = None, # noqa: N803 ) -> int: """Perform a simulation step. Advance simulation from *currentCommunicationPoint* by *communicationStepSize*. Also refer to the FMI2 Standard documentation. :param currentCommunicationPoint: Current time stamp (starting point for simulation step). :param communicationStepSize: Time step size. :param noSetFMUStatePriorToCurrentPoint: Determine whether a reset before *currentCommunicationPoint* is possible. Must be either fmi2True or fmi2False. :return: FMU2 return value. """ time = currentCommunicationPoint step_size = communicationStepSize # Perform a solver step and reset the FMU Model time. _, time = self._solver.step(time, time + step_size) self.setTime(time) # Check for events that might have occurred during the step _step_event, _ = self.completedIntegratorStep() return self.fmi2ok