# -*- coding: utf-8 -*-
# Created on Thu Jan 28 14:59:25 2016
# @author: benny
"""
Making scipy ode solvers available via the ode API
==================================================
dopri5
------
This is an explicit runge-kutta method of order (4)5 due to Dormand & Prince
(with stepsize control and dense output).
The API of this solver is as the other scikit.odes ODE solvers
Authors:
E. Hairer and G. Wanner Universite de Geneve, Dept. de Mathematiques CH-1211 Geneve 24, Switzerland e-mail: ernst.hairer@math.unige.ch, gerhard.wanner@math.unige.ch
This code is described in [HNW93].
This integrator accepts the following options:
atol : float or sequence absolute tolerance for solution, default 1e-12
rtol : float or sequence relative tolerance for solution, default 1e-6
nsteps : int Maximum number of (internally defined) steps allowed during one call to the solver. Default=500
first_step : float
max_step : float
safety : float Safety factor on new step selection (default 0.9)
ifactor : float
dfactor : float Maximum factor to increase/decrease step size by in one step
beta : float Beta parameter for stabilised step size control.
verbosity : int Switch for printing messages (< 0 for no messages).
References
[HNW93] (1, 2) E. Hairer, S.P. Norsett and G. Wanner, Solving Ordinary Differential Equations i. Nonstiff Problems. 2nd edition. Springer Series in Computational Mathematics, Springer-Verlag (1993)
dop853
------
This is an explicit runge-kutta method of order 8(5,3) due to Dormand & Prince
(with stepsize control and dense output).
Options and references the same as “dopri5”.
"""
from collections import namedtuple
from enum import IntEnum
from warnings import warn
import sys
import numpy as np
from scipy.integrate import ode as _runner
from scikits_odes_core import OdeBase
SolverReturn = namedtuple(
"SolverReturn", [
"flag", "values", "errors", "roots",
"tstop", "message"
]
)
SolverVariables = namedtuple("SolverVariables", ["t", "y"])
[docs]
class StatusEnumDOP(IntEnum):
SUCCESS = 1
SOLOUT = 2
INPUT_FAIL = -1
NMAX_FAIL = -2
STEPSIZE_FAIL = -3
STIFFNESS_FAIL = -4
UNEXPECTED_IDID = -10
STATUS_MESSAGE = {
StatusEnumDOP.SUCCESS: 'computation successful',
StatusEnumDOP.SOLOUT: 'comput. successful (interrupted by solout)',
StatusEnumDOP.INPUT_FAIL: 'input is not consistent',
StatusEnumDOP.NMAX_FAIL: 'larger nmax is needed',
StatusEnumDOP.STEPSIZE_FAIL: 'step size becomes too small',
StatusEnumDOP.STIFFNESS_FAIL: 'problem is probably stiff (interrupted)',
StatusEnumDOP.UNEXPECTED_IDID: 'Unexpected idid, check warnings for info',
}
[docs]
class DOPSolveException(Exception):
"""Base class for exceptions raised by `DOP.validate_flags`."""
def __init__(self, soln):
self.soln = soln
self.args = (self._message.format(soln),)
[docs]
class DOPSolveFailed(DOPSolveException):
"""`DOP.solve` failed to reach endpoint"""
_message = (
"Solver failed with flag {0.flag} and finished at {0.errors.t}"
"with values {0.errors.y}."
)
[docs]
class dopri5(OdeBase):
name = 'dopri5'
_runner = _runner
default_values = {
'rtol': 1e-6,
'atol': 1e-12,
'nsteps': 500,
'max_step': 0.0,
'first_step': 0.0, # determined by solver
'safety': 0.9,
'ifactor': 10.0,
'dfactor': 0.2,
'beta': 0.0,
'verbosity': -1, # no messages if negative
}
def __init__(self, Rfn, **options):
"""
Initialize the ODE Solver and it's default values
Parameters
----------
Rfn - right-hand-side function
options - additional options for initialization
"""
self.options = self.default_values
self.set_options(rfn=Rfn, **options)
self._validate_flags = None
self.solver = None
self.initialized = False
[docs]
def set_options(self, **options):
"""
Set specific options for the solver.
Calling set_options a second time, normally resets the solver.
"""
for (key, value) in options.items():
self.options[key.lower()] = value
self.initialized = False
def _init_data(self):
self.rfn = self.options['rfn']
self.N = len(self.y0)
def _wrap_Rfn(self, t, y):
yout = np.empty(self.N, float)
self.rfn(t, y, yout)
return yout
[docs]
def init_step(self, t0, y0):
"""
Initializes the solver and allocates memory.
Parameters
----------
t0 - initial time
y0 - initial condition for y (can be list or numpy array)
Returns
-------
if old_api:
not supported
if old_api False:
A named tuple, with entries:
flag = An integer flag (StatusEnumDop)
values = Named tuple with entries t and y and ydot. y will correspond to y_retn value and ydot to yp_retn!
errors = Named tuple with entries t_err and y_err
roots = Named tuple with entries t_roots and y_roots
tstop = Named tuple with entries t_stop and y_tstop
message= String with message in case of an error
"""
self.y0 = y0
self.t = t0
self._init_data()
self.solver = self._runner(self._wrap_Rfn)\
.set_integrator(self.name,
rtol = self.options['rtol'],
atol = self.options['atol'],
nsteps = self.options['nsteps'],
max_step = self.options['max_step'],
first_step = self.options['first_step'], # determined by solver
safety = self.options['safety'],
ifactor = self.options['ifactor'],
dfactor = self.options['dfactor'],
beta = self.options['beta'],
verbosity = self.options['verbosity'])\
.set_initial_value(y0, t0)
self.initialized = True
y_retn = np.empty(len(y0), float)
y_retn[:] = y0[:]
soln = SolverReturn(
flag=StatusEnumDOP.SUCCESS,
values=SolverVariables(t=t0, y=y_retn),
errors=SolverVariables(t=None, y=None),
roots=SolverVariables(t=None, y=None),
tstop=SolverVariables(t=None, y=None),
message=STATUS_MESSAGE[StatusEnumDOP.SUCCESS]
)
if self._validate_flags:
return self.validate_flags(soln)
return soln
[docs]
def step(self, t, y_retn=None):
"""
Method for calling successive next step of the ODE solver to allow
more precise control over the solver. The 'init_step' method has to
be called before the 'step' method.
Parameters
----------
t - A step is done towards time t, and output at t returned.
This time can be higher or lower than the previous time.
If option 'one_step_compute'==True, and the solver supports
it, only one internal solver step is done in the direction
of t starting at the current step.
If old_api=True, the old behavior is used:
if t>0.0 then integration is performed until this time and results at this time are returned in y_retn
if t<0.0 only one internal step is perfomed towards time abs(t) and results after this one time step are returned
y_retn - numpy vector (ndim = 1) in which the computed
value will be stored (needs to be preallocated). If
None y_retn is not used.
Returns
-------
if old_api:
not supported
if old_api False:
A named tuple, with entries:
flag = An integer flag (StatusEnumDOP)
values = Named tuple with entries t and y. y will correspond to y_retn value
errors = Named tuple with entries t_err and y_err
roots = Named tuple with entries t_roots and y_roots
tstop = Named tuple with entries t_stop and y_tstop
message= String with message in case of an error
"""
if not self.initialized == False:
raise ValueError("DOPRI:step: init_step must be run before running the step method.")
if t <= self.t:
raise ValueError("Integration must be forward! t must be > previous timestep")
y = self.solver.integrate(t)
y_err = None
t_err = None
if not self.solver.successful():
flag = StatusEnumDOP.UNEXPECTED_IDID
y_err = y
t_err = t
else:
flag = StatusEnumDOP.SUCCESS
self.y = y
self.t = self.solver.t
return SolverReturn(
flag=flag,
values=SolverVariables(t=self.t, y=y),
errors=SolverVariables(t=t_err, y=y_err),
roots=SolverVariables(t=None, y=None),
tstop=SolverVariables(t=None, y=None),
message=STATUS_MESSAGE[flag]
)
[docs]
def solve(self, tspan, y0):
"""
Runs the solver.
Parameters
----------
tspan - an list/array of times at which the computed value will be
returned. Must contain the start time as first entry..
y0 - list/numpy array of initial values
Returns
-------
if old_api
Not supported
if old_api False:
A named tuple, with entries:
flag = An integer flag
values = Named tuple with entries t and y
errors = Named tuple with entries t and y
roots = Named tuple with entries t and y
tstop = Named tuple with entries t and y
message= String with message in case of an error
"""
self.initialized = False
soln = self.init_step(tspan[0], y0)
nrt = len(tspan)
t_retn = np.empty(np.shape(tspan), float)
y_retn = np.empty([len(tspan), len(y0)], float)
t_retn[0] = tspan[0]
y_retn[0, :] = y0
idx = 1
while self.solver.successful() and idx<nrt:
time = tspan[idx]
y = self.solver.integrate(time)
t_retn[idx] = time
y_retn[idx, :] = y
idx += 1
y_err = None
t_err = None
if not self.solver.successful():
flag = StatusEnumDOP.UNEXPECTED_IDID
y_err = y_retn[idx-1]
t_err = t_retn[idx-1]
# return values computed so far
t_retn = t_retn[0:idx-1]
y_retn = y_retn[0:idx-1, :]
else:
flag = StatusEnumDOP.SUCCESS
soln = SolverReturn(
flag=flag,
values=SolverVariables(t=t_retn, y=y_retn),
errors=SolverVariables(t=t_err, y=y_err),
roots=SolverVariables(t=None, y=None),
tstop=SolverVariables(t=None, y=None),
message=STATUS_MESSAGE[flag]
)
if self._validate_flags:
return self.validate_flags(soln)
return soln
[docs]
def validate_flags(self, soln):
"""
Validates the flag returned by `dopri.solve`.
Validation happens using the following scheme:
* failures (`flag` < 0) raise `DOPSolveFailed` or a subclass of it;
* otherwise, return an instance of `SolverReturn`.
"""
if soln.flag == StatusEnumDOP.SUCCESS:
return soln
if soln.flag < 0:
raise DOPSolveFailed(soln)
warn(WARNING_STR.format(soln.flag, *soln.err_values))
return soln
[docs]
class dop853(dopri5):
name = 'dop853'
default_values = {
'rtol': 1e-6,
'atol': 1e-12,
'nsteps': 500,
'max_step': 0.0,
'first_step': 0.0, # determined by solver
'safety': 0.9,
'ifactor': 6.0,
'dfactor': 0.3,
'beta': 0.0,
'verbosity': -1, # no messages if negative
}
OdeBase.integrator_classes.append(dopri5)
OdeBase.integrator_classes.append(dop853)