Source code for fedempy.dts_operators.window

# SPDX-FileCopyrightText: 2023 SAP SE
#
# SPDX-License-Identifier: Apache-2.0
#
# This file is part of FEDEM - https://openfedem.org

"""
Convenience module for running Fedem simulations as an operator.
Usage: Invoke `run(df)` with the input data in the DataFrame `df`.
"""
from os import environ, path
from sys import stdout

from numpy import zeros
from pandas import DataFrame

from fedempy.divergence import jump_state, ramp_dataframe, restart
from fedempy.fmm_solver import FmmSolver
from fedempy.solver import FedemException, FedemSolver


def _check_state(dts, use_state):
    """
    Checks whether solver state transfer should be used or not.
    """
    if not use_state or not bool(dts):
        return False, None

    if dts.window is not None and dts.window.overlap > 0:
        raise FedemException("State transfer with overlap not yet supported.")

    if dts.state is None:
        return True, None

    if int(dts.state.iat[0, 0]) < 1:
        # An invalid state array was provided, ignore it and do normal start instead.
        # The first three values are the step number, actual time, and increment size,
        # respectively. At least the first and third value should thus be positive.
        print(f"  ** Ignoring invalid state array {dts.state.head(3)}")
        return True, None

    return True, dts.state.to_numpy()


[docs] def start_solver( solver_options, cw_dir, use_state=False, old_state=None, ext_input=None, tstart=None ): """ Starts the dynamics solver with the provided command-line options. Parameters ---------- solver_options : list of str List of command-line arguments that are passed to the solver cw_dir : str Path to working directory of the solver process use_state : bool, default=False If True, internal state vectors will be allocated old_state : list of float, default=None State vector to restart simulation from ext_input : list of float, default=None Initial external function values, for initial equilibrium iterations tstart : float, default=None Optional start time of simulation, override setting in model file Returns ------- FedemSolver The dynamics solver object int Zero on success, negative values indicate errors """ def format_arg(key, value): if value is None: return None if isinstance(value, bool): return f"-{key}+" if value else f"-{key}-" return f"-{key}={value}" options = [format_arg(key, val) for key, val in solver_options.items()] if path.isdir(cw_dir): options.append("-cwd=" + cw_dir) if tstart is not None: options.append("-timeStart=" + str(tstart)) if "FEDEM_SOLVER" not in environ: raise FedemException("Environment variable FEDEM_SOLVER not defined") solver = FedemSolver(environ["FEDEM_SOLVER"], None, use_state) status = solver.solver_init(options, None, old_state, extf_input=ext_input) if status >= 0 and solver.ierr.value != 0: status = -abs(solver.ierr.value) return solver, status
[docs] def start_fmm_solver( fmm_file, lib_dir, use_state=False, old_state=None, ext_input=None, t_start=None, keep_old_res=True, ): """ Starts the dynamics solver on the provided Fedem model file. Parameters ---------- fmm_file : str The Fedem model file to run simulation on lib_dir : str Path to where the model file is located, used only if `fmm_file` is a relative path use_state : bool, default=False If True, internal state vectors will be allocated old_state : list of float, default=None State vector to restart simulation from ext_input : list of float, default=None Initial external function values, for initial equilibrium iterations t_start : float, default=None Optional start time of simulation, override setting in model file keep_old_res : bool, default=True Option to not overwrite any existing res-file in the RDB directory Returns ------- FmmSolver The dynamics solver object int Zero on success, negative values indicate errors """ if fmm_file and not path.isabs(fmm_file) and path.isdir(lib_dir): fmm_file = lib_dir + "/" + fmm_file solver = FmmSolver(None, use_state) status = solver.start( fmm_file, keep_old_res, True, True, old_state, extf_input=ext_input, time_start=t_start, ) if status >= 0 and solver.ierr.value != 0: status = -abs(solver.ierr.value) return solver, status
def _get_start_state(df, use_times): """ Extracts start condition (time and input values) from given DataFrame. """ if use_times: # Extract start time and associated input values from first row, # to be applied on the initial equilibirum iterations, if any return df.index[0], df.iloc[0].values if df.index[0] == 0: # First row has time stamp 0.0, assume the associated input values # then should be applied on the initial equilibrium iterations, if any return None, df.iloc[0].values # The first row will be applied as load on the first time increment return None, None def _run_window(solver, n_step, df, out_id, xtimes, old_state, lib_dir, options): """ Runs a new time window, with optional restart on divergence. """ output = solver.solve_window(n_step, df.values.flatten(), out_id, xtimes)[0] if solver.ierr.value == 0: conv_type = "CONVERGED" else: output, conv_type = restart( solver, df, old_state, lib_dir, out_id, options, xtimes, ) t_stop = solver.get_current_time() if conv_type == "FAILURE": ierr = -1 # Initialization failure, model is already deallocated else: ierr = solver.solver_done() if ierr == 0 and solver.ierr.value == 0: print(f"**** Time window finished at t={t_stop}", flush=True) elif conv_type in ("NO_CONV", "FAILURE"): print(" *** Dynamics solver failed, probably a divergent model.") print(" Check the fedem_solver.res file for details.", flush=True) raise FedemException(f"Failed to run solver ({solver.ierr.value}).") else: print(f"**** Time window stopped [{conv_type}]", flush=True) if ierr != 0 or solver.ierr.value != 0: print(" with error flags", ierr, solver.ierr.value) return output
[docs] def run(df, dts=None, **kwargs): """ Run Fedem simulation over a time window with `df` as input. Parameters ---------- df : DataFrame Input function values dts : DTSContext, default=None State data to be passed between each micro-batch kwargs : dict Dictionary containing output definitions and/or solver options Returns ------- DataFrame Response values in output sensors """ # Absolute path to app location lib_dir = kwargs.get("lib_dir", "/var/digitaltwin/app/lib") use_times = kwargs.get("use_times", False) if jump_state(lib_dir): df = ramp_dataframe(df) use_state, old_state = True, None else: use_state, old_state = _check_state(dts, kwargs.get("use_state", False)) if old_state is None: print("\n**** Start solver on initial state", flush=True) t_start, ext_inp = _get_start_state(df, use_times) else: # Restart from previous state istep = int(old_state[0][0]) time0 = float(old_state[1][0]) print(f"\n**** Start solver from step {istep} t={time0}", flush=True) t_start = None ext_inp = None if "solver_options" in kwargs: # Start the dynamics solver directly with the solver options provided solver, status = start_solver( kwargs["solver_options"], lib_dir, use_state, old_state, ext_inp, t_start ) else: # Assume a Fedem model file is provided. # This will then create the solver input files and start the dynamics solver. # Any FE models will be reduced first, unless they already have been reduced. solver, status = start_fmm_solver( kwargs.get("fmm_file"), lib_dir, use_state, old_state, ext_inp, t_start, kwargs.get("keep_old_res", False), ) if status < 0: stdout.flush() raise FedemException(f"Failed to start solver ({status}).") if status > 0: # The first row of the input dataframe was consumed by initial equilibrium iterations, # so drop it from the subsequent dynamics/quasi-static simulation df.drop(index=df.index[0], axis=0, inplace=True) t0 = "\n The first input values are consumed by the initial equilibrium analysis." else: t0 = "" # Run the solver through the given time window n_step = df.shape[0] t_step = f"solving {n_step} steps [{df.index[0]},{df.index[-1]}]" + t0 print(" * Solver successfully started,", t_step, flush=True) out_id = kwargs.get("output_ids", None) output = _run_window( solver, n_step, df, solver.get_function_ids(out_id), solver.check_times(df.index.values, use_times), old_state, lib_dir, kwargs.get("crash_options", None), ) if out_id is None: return None # Create the output DataFrame n_out = len(output) n_col = len(out_id) n_row = int(n_out / n_col) print(f" * Create output DataFrame [{n_col}x{n_row}].") np_outputs = zeros(n_out) np_outputs[:] = output[:] df_out = DataFrame(np_outputs.reshape((n_row, n_col), order="C"), index=df.index) df_out.columns = [str(oid) for oid in out_id] if use_state and solver.state_data: # Store the final state for next time window print(f" * Store final state for next window ({solver.state_size.value})") new_state = DataFrame(index=range(solver.state_size.value)) new_state.insert(0, "state", list(solver.state_data)) dts.state = new_state return df_out