Source code for divergence

# SPDX-FileCopyrightText: 2023 SAP SE
#
# SPDX-License-Identifier: Apache-2.0
#
# This file is part of FEDEM - https://openfedem.org

"""
Convenience module for automatic simulation restart of diverging models.
"""
from ctypes import c_double
from json import dumps, load
from os import getcwd, mkdir, path


[docs] def ramp_dataframe(dfr, start_pos=0): """ Creating a s-shape ramp, starting with 0 and ending with 1 Second derivative at start/end will be zero Multiplying each column in dataframe with ramp-array """ r_steps = len(dfr.index.tolist()) # ramp steps smooth = [1.0] * r_steps scale_step = 1.0 / (r_steps - start_pos - 1) for i in range(start_pos, r_steps): s_x = scale_step * (i - start_pos) smooth[i] = s_x * s_x * s_x * (s_x * (s_x * 6.0 - 15.0) + 10.0) return dfr.multiply(smooth, axis=0)
[docs] def jump_state(lib_dir): """ Checks whether the current state should be jumped over or not. """ jump_exist = False json_file = lib_dir + "/stream.json" if path.exists(json_file): with open(json_file, "r") as fd: frame_data = load(fd) jump_exist = frame_data.get("conv_type") == "JUMP_OVER" if jump_exist: # Reset conv_type to None with open(json_file, "w") as fd: fd.write(dumps({"conv_type": None}, indent=2)) return jump_exist
def _opts_list(opts_dict): """ Helper converting option dictionary into a list of solver options. """ def __format_arg(key, value): if value is None: return None if isinstance(value, bool): return f"-{key}+" if value else f"-{key}-" return f"-{key}={value}" return [__format_arg(k, v) for k, v in opts_dict.items()]
[docs] def restart(solver, df, state, lib_dir, out_id, c_opts, x_times=None): # NOSONAR """ Restarts the simulation over a time window in case of divergence issues. When a divergent solution is detected, one (or more) of the following approaches may be attempted: * `TOL`: Increasing tolerance(s) for convergence checks (tolDispNorm, tolVecNorm, tolEnerSum, maxit). * `STATE_DYNAMIC`: Use dynamic equilibrium at restart. Restart can be performed at a specified number of steps before the divergent step, optionally with ramp-scaling activated (default activated). * `STATE_STATIC`: Use quasi-static equilibrium for a specified number of steps at restart. Restart can be performed at a specified number of steps before the divergent step, optionally with ramp-scaling activated (default activated) done n-steps backwards. * `NO_STATE_WITH_RAMP`: Restart of specified number of steps before the diverged step. The loads are ramped up from zero and the state is not used. * `JUMP_OVER`: Skip the rest if the current window and restart from the next, with the loads ramped up from zero. * `NO_CONV`: No convergence is obtained. * `FAILURE`: Other solver failure during re-initialization. Parameters ---------- solver : FedemSolver Fedem dynamics solver instance df : DataFrame Input function values state : list of float State vector to restart simulation from lib_dir : str Path to the input/output files out_id : list of int List of user Ids identifying the output sensors in the model c_opts : dictionary Settings for the Fedem solver x_times : list of float, default=None Time list linked to the input Returns ------- list of float Output sensor values for each time step str How the simulation was actually restarted (or not) """ print(" ** The solver has diverged at t =", solver.get_current_time()) # If file stream.json not existing, create it json_file = lib_dir + "/stream.json" if not path.isfile(json_file): with open(json_file, "w") as fd: fd.write(dumps({"conv_type": "TOL"}, indent=2)) # Define state path for state file storage step_state_path = lib_dir + "/step_state" if not path.isdir(step_state_path): mkdir(step_state_path) # Input data input_data = df.values n_step = df.shape[0] # Default dictionary settings (solver) # Check that the option files exist before adding them opts = {"cwd": getcwd(), "terminal": -1} for ext in ("fco", "fop", "fao"): option_file = "fedem_solver." + ext if path.isfile(getcwd() + "/" + option_file): opts[ext] = option_file # Do not overwrite the original res-file ires = 0 while ires <= 0: ires -= 1 opts["resfile"] = "fedem_solver_r" + str(-ires) + ".res" if not path.isfile(getcwd() + "/" + opts["resfile"]): ires = -ires if c_opts is not None: opts.update(c_opts) # Convergence handling settings (solver) activate_ramp = opts.pop("use_ramping", True) state_pos = opts.pop("use_state_n_steps_behind", 4) nr_steps = opts.pop("nr_steps", 10) conv_types = opts.pop( "sequence", [ "TOL", "STATE_DYNAMIC", "STATE_STATIC", "NO_STATE_WITH_RAMP", "JUMP_OVER", "NO_CONV", ], ) # Ensure the trial loop exits with NO_CONV if nothing else succeeds if conv_types[-1] != "NO_CONV": conv_types.append("NO_CONV") use_ramp = False j = 0 k = 0 setoff = 0 # setoff for scaling function output = [0.0] * n_step * len(out_id) # Set convergence type to TOL conv_type = conv_types[0] # Initialise step state file size state_size = solver.get_state_size() step_state = state for k in range(len(conv_types)): # Shut down the solver solver.solver_done() # Reinitialize the solver ierr = solver.solver_init(_opts_list(opts), state_data=step_state) if ierr < 0: print( f" *** Solver initialization failure in divergence handling ({ierr})", flush=True, ) return None, "FAILURE" print(f" Trying restart {conv_type} at t = {solver.get_current_time()}") # Scaling down the input for the remaining period if use_ramp: input_data = ramp_dataframe(df, j - setoff).values use_ramp = False while j < n_step and solver.ierr.value == 0: # Solve for next time step t_next = None if x_times is None else x_times[j] if solver.solve_next(input_data[j], time_next=t_next): # Save state during stepping solver.save_state() if not solver.state_data: print(" ** No state at step", j, flush=True) else: step_state_file = step_state_path + "/step_" + str(j % state_pos) with open(step_state_file, "wb") as step_file: try: step_file.write(solver.state_data) except IOError: print(" ** Failed to write state to", step_state_file) # Extract the results output[j * len(out_id) : j * len(out_id) + len(out_id)] = ( float(solver.get_function(out_id[i])) for i in range(len(out_id)) ) j += 1 else: if solver.ierr.value != 0: print(" ** The solver diverged at t =", solver.get_current_time()) break # Leave loop in case of convergence/non-convergence and jump over conv_type = conv_types[k + 1] if conv_type == "JUMP_OVER": j = n_step if j == n_step: # Update data on json file with open(json_file, "w") as fd: fd.write(dumps({"conv_type": conv_type}, indent=2)) break # Convergence issue still exists if conv_type == "NO_STATE_WITH_RAMP": step_state = None use_ramp = True elif conv_type == "NO_CONV": break # No convergence reached, giving up elif conv_type in ("STATE_DYNAMIC", "STATE_STATIC"): if j < state_pos: step_state = state # use state at the beginning else: step_state = (c_double * state_size)() step_state_file = step_state_path + "/step_" + str(j % state_pos) print(f" * Read state from {step_state_file} [{state_size}]") with open(step_state_file, "rb") as step_file: try: step_file.readinto(step_state) except FileNotFoundError: print(" ** Failed to read state from", step_state_file) # Number of backwards steps j = 0 if j < state_pos else j - state_pos + 1 # Activate ramp (external specification, user request, default activated) if activate_ramp: use_ramp = True setoff = 0 # means ramping starts with zero # Equilibrium fails at time, set solver parameter quasiStatic if conv_type == "STATE_STATIC": time = solver.get_next_time() dt = time - solver.get_current_time() opts["quasiStatic"] = str(time + nr_steps * dt) # Increment the restart res-file, to not overwrite previous ones ires += 1 opts["resfile"] = "fedem_solver_r" + str(ires) + ".res" return output, conv_type