Source code for scikits_odes.dopri5

# -*- coding: utf-8 -*-
# Created on Thu Jan 28 14:59:25 2016
# @author: benny
"""
Making scipy ode solvers available via the ode API
==================================================

dopri5
------

This is an explicit runge-kutta method of order (4)5 due to Dormand & Prince
(with stepsize control and dense output).
The API of this solver is as the other scikit.odes ODE solvers

Authors:

    E. Hairer and G. Wanner Universite de Geneve, Dept. de Mathematiques CH-1211 Geneve 24, Switzerland e-mail: ernst.hairer@math.unige.ch, gerhard.wanner@math.unige.ch

This code is described in [HNW93].

This integrator accepts the following options:

    atol : float or sequence absolute tolerance for solution, default 1e-12
    rtol : float or sequence relative tolerance for solution, default 1e-6
    nsteps : int Maximum number of (internally defined) steps allowed during one call to the solver. Default=500
    first_step : float
    max_step : float
    safety : float Safety factor on new step selection (default 0.9)
    ifactor : float
    dfactor : float Maximum factor to increase/decrease step size by in one step
    beta : float Beta parameter for stabilised step size control.
    verbosity : int Switch for printing messages (< 0 for no messages).

References
[HNW93]	(1, 2) E. Hairer, S.P. Norsett and G. Wanner, Solving Ordinary Differential Equations i. Nonstiff Problems. 2nd edition. Springer Series in Computational Mathematics, Springer-Verlag (1993)

dop853
------

This is an explicit runge-kutta method of order 8(5,3) due to Dormand & Prince
(with stepsize control and dense output).
Options and references the same as “dopri5”.

"""
from collections import namedtuple
from enum import IntEnum
from warnings import warn
import sys

import numpy as np
from scipy.integrate import ode as _runner

from scikits_odes_core import OdeBase

SolverReturn = namedtuple(
    "SolverReturn", [
        "flag", "values", "errors", "roots",
        "tstop", "message"
    ]
)

SolverVariables = namedtuple("SolverVariables", ["t", "y"])


[docs] class StatusEnumDOP(IntEnum): SUCCESS = 1 SOLOUT = 2 INPUT_FAIL = -1 NMAX_FAIL = -2 STEPSIZE_FAIL = -3 STIFFNESS_FAIL = -4 UNEXPECTED_IDID = -10
STATUS_MESSAGE = { StatusEnumDOP.SUCCESS: 'computation successful', StatusEnumDOP.SOLOUT: 'comput. successful (interrupted by solout)', StatusEnumDOP.INPUT_FAIL: 'input is not consistent', StatusEnumDOP.NMAX_FAIL: 'larger nmax is needed', StatusEnumDOP.STEPSIZE_FAIL: 'step size becomes too small', StatusEnumDOP.STIFFNESS_FAIL: 'problem is probably stiff (interrupted)', StatusEnumDOP.UNEXPECTED_IDID: 'Unexpected idid, check warnings for info', }
[docs] class DOPSolveException(Exception): """Base class for exceptions raised by `DOP.validate_flags`.""" def __init__(self, soln): self.soln = soln self.args = (self._message.format(soln),)
[docs] class DOPSolveFailed(DOPSolveException): """`DOP.solve` failed to reach endpoint""" _message = ( "Solver failed with flag {0.flag} and finished at {0.errors.t}" "with values {0.errors.y}." )
[docs] class dopri5(OdeBase): name = 'dopri5' _runner = _runner default_values = { 'rtol': 1e-6, 'atol': 1e-12, 'nsteps': 500, 'max_step': 0.0, 'first_step': 0.0, # determined by solver 'safety': 0.9, 'ifactor': 10.0, 'dfactor': 0.2, 'beta': 0.0, 'verbosity': -1, # no messages if negative } def __init__(self, Rfn, **options): """ Initialize the ODE Solver and it's default values Parameters ---------- Rfn - right-hand-side function options - additional options for initialization """ self.options = self.default_values self.set_options(rfn=Rfn, **options) self._validate_flags = None self.solver = None self.initialized = False
[docs] def set_options(self, **options): """ Set specific options for the solver. Calling set_options a second time, normally resets the solver. """ for (key, value) in options.items(): self.options[key.lower()] = value self.initialized = False
def _init_data(self): self.rfn = self.options['rfn'] self.N = len(self.y0) def _wrap_Rfn(self, t, y): yout = np.empty(self.N, float) self.rfn(t, y, yout) return yout
[docs] def init_step(self, t0, y0): """ Initializes the solver and allocates memory. Parameters ---------- t0 - initial time y0 - initial condition for y (can be list or numpy array) Returns ------- if old_api: not supported if old_api False: A named tuple, with entries: flag = An integer flag (StatusEnumDop) values = Named tuple with entries t and y and ydot. y will correspond to y_retn value and ydot to yp_retn! errors = Named tuple with entries t_err and y_err roots = Named tuple with entries t_roots and y_roots tstop = Named tuple with entries t_stop and y_tstop message= String with message in case of an error """ self.y0 = y0 self.t = t0 self._init_data() self.solver = self._runner(self._wrap_Rfn)\ .set_integrator(self.name, rtol = self.options['rtol'], atol = self.options['atol'], nsteps = self.options['nsteps'], max_step = self.options['max_step'], first_step = self.options['first_step'], # determined by solver safety = self.options['safety'], ifactor = self.options['ifactor'], dfactor = self.options['dfactor'], beta = self.options['beta'], verbosity = self.options['verbosity'])\ .set_initial_value(y0, t0) self.initialized = True y_retn = np.empty(len(y0), float) y_retn[:] = y0[:] soln = SolverReturn( flag=StatusEnumDOP.SUCCESS, values=SolverVariables(t=t0, y=y_retn), errors=SolverVariables(t=None, y=None), roots=SolverVariables(t=None, y=None), tstop=SolverVariables(t=None, y=None), message=STATUS_MESSAGE[StatusEnumDOP.SUCCESS] ) if self._validate_flags: return self.validate_flags(soln) return soln
[docs] def step(self, t, y_retn=None): """ Method for calling successive next step of the ODE solver to allow more precise control over the solver. The 'init_step' method has to be called before the 'step' method. Parameters ---------- t - A step is done towards time t, and output at t returned. This time can be higher or lower than the previous time. If option 'one_step_compute'==True, and the solver supports it, only one internal solver step is done in the direction of t starting at the current step. If old_api=True, the old behavior is used: if t>0.0 then integration is performed until this time and results at this time are returned in y_retn if t<0.0 only one internal step is perfomed towards time abs(t) and results after this one time step are returned y_retn - numpy vector (ndim = 1) in which the computed value will be stored (needs to be preallocated). If None y_retn is not used. Returns ------- if old_api: not supported if old_api False: A named tuple, with entries: flag = An integer flag (StatusEnumDOP) values = Named tuple with entries t and y. y will correspond to y_retn value errors = Named tuple with entries t_err and y_err roots = Named tuple with entries t_roots and y_roots tstop = Named tuple with entries t_stop and y_tstop message= String with message in case of an error """ if not self.initialized == False: raise ValueError("DOPRI:step: init_step must be run before running the step method.") if t <= self.t: raise ValueError("Integration must be forward! t must be > previous timestep") y = self.solver.integrate(t) y_err = None t_err = None if not self.solver.successful(): flag = StatusEnumDOP.UNEXPECTED_IDID y_err = y t_err = t else: flag = StatusEnumDOP.SUCCESS self.y = y self.t = self.solver.t return SolverReturn( flag=flag, values=SolverVariables(t=self.t, y=y), errors=SolverVariables(t=t_err, y=y_err), roots=SolverVariables(t=None, y=None), tstop=SolverVariables(t=None, y=None), message=STATUS_MESSAGE[flag] )
[docs] def solve(self, tspan, y0): """ Runs the solver. Parameters ---------- tspan - an list/array of times at which the computed value will be returned. Must contain the start time as first entry.. y0 - list/numpy array of initial values Returns ------- if old_api Not supported if old_api False: A named tuple, with entries: flag = An integer flag values = Named tuple with entries t and y errors = Named tuple with entries t and y roots = Named tuple with entries t and y tstop = Named tuple with entries t and y message= String with message in case of an error """ self.initialized = False soln = self.init_step(tspan[0], y0) nrt = len(tspan) t_retn = np.empty(np.shape(tspan), float) y_retn = np.empty([len(tspan), len(y0)], float) t_retn[0] = tspan[0] y_retn[0, :] = y0 idx = 1 while self.solver.successful() and idx<nrt: time = tspan[idx] y = self.solver.integrate(time) t_retn[idx] = time y_retn[idx, :] = y idx += 1 y_err = None t_err = None if not self.solver.successful(): flag = StatusEnumDOP.UNEXPECTED_IDID y_err = y_retn[idx-1] t_err = t_retn[idx-1] # return values computed so far t_retn = t_retn[0:idx-1] y_retn = y_retn[0:idx-1, :] else: flag = StatusEnumDOP.SUCCESS soln = SolverReturn( flag=flag, values=SolverVariables(t=t_retn, y=y_retn), errors=SolverVariables(t=t_err, y=y_err), roots=SolverVariables(t=None, y=None), tstop=SolverVariables(t=None, y=None), message=STATUS_MESSAGE[flag] ) if self._validate_flags: return self.validate_flags(soln) return soln
[docs] def validate_flags(self, soln): """ Validates the flag returned by `dopri.solve`. Validation happens using the following scheme: * failures (`flag` < 0) raise `DOPSolveFailed` or a subclass of it; * otherwise, return an instance of `SolverReturn`. """ if soln.flag == StatusEnumDOP.SUCCESS: return soln if soln.flag < 0: raise DOPSolveFailed(soln) warn(WARNING_STR.format(soln.flag, *soln.err_values)) return soln
[docs] class dop853(dopri5): name = 'dop853' default_values = { 'rtol': 1e-6, 'atol': 1e-12, 'nsteps': 500, 'max_step': 0.0, 'first_step': 0.0, # determined by solver 'safety': 0.9, 'ifactor': 6.0, 'dfactor': 0.3, 'beta': 0.0, 'verbosity': -1, # no messages if negative }
OdeBase.integrator_classes.append(dopri5) OdeBase.integrator_classes.append(dop853)