agd.ODE.hamiltonian_base
This module implements some basic functionality for solving ODEs derived from a Hamiltonian in a manner compatible with automatic differentiation. (Flow computation, symplectic schemes, etc)
Recall that Hamilton's equations read $$ \frac {dq}{dt} = \frac {\partial H}{\partial p}, \quad \frac {dp}{dt} = - \frac {\partial H}{\partial q}. $$
1# Copyright 2020 Jean-Marie Mirebeau, University Paris-Sud, CNRS, University Paris-Saclay 2# Distributed WITHOUT ANY WARRANTY. Licensed under the Apache License, Version 2.0, see http://www.apache.org/licenses/LICENSE-2.0 3 4r""" 5This module implements some basic functionality for solving ODEs derived from a 6Hamiltonian in a manner compatible with automatic differentiation. 7(Flow computation, symplectic schemes, etc) 8 9Recall that Hamilton's equations read 10$$ 11\frac {dq}{dt} = \frac {\partial H}{\partial p}, 12\quad 13\frac {dp}{dt} = - \frac {\partial H}{\partial q}. 14$$ 15""" 16 17import numpy as np 18from copy import copy 19 20from .. import AutomaticDifferentiation as ad 21 22def fixedpoint(f,x,tol=1e-9,nitermax=100): 23 """ 24 Iterates the function f on the data x until a fixed point is found, 25 up to prescribed tolerance, or the maximum number of iterations is reached. 26 """ 27 norm_infinity = ad.Optimization.norm_infinity 28 x_old = x 29 for i in range(nitermax): 30 x=f(x) 31 if norm_infinity(x-x_old)<tol: break 32 x_old = x 33 return x 34 35damp_None = 0 # Placeholder for the damping factors damp_q and damp_p of class HamiltonianBase 36def read_None(H,x): pass # Placeholder for the data read functions read_q and read_p 37def incr_None(H,x): pass # Placeholder for the data write functions incr_q and incr_p 38 39class HamiltonianBase: 40 """ 41 Base class for Hamiltonians. 42 43 Default initialized fields : 44 - impl_solver : Fixed point solver to be used for the implicit time steps 45 - preserve_q, preserve_p : Wether updates modify q and p in place 46 - damp_q, damp_p : damping factor between scheme iterations 47 - read_q, read_p : called before each damping step, for reading q and p 48 - incr_q, incr_p : called after each damping step, as a source term for q and p 49 """ 50 def __init__(self): 51 self.impl_solver = fixedpoint 52 self.preserve_q = True; self.preserve_p = True 53 self.Impl2_p_merged = True 54 self.damp_q = damp_None; self.damp_p = damp_None 55 self.read_q = read_None; self.read_p = read_None 56 self.incr_q = incr_None; self.incr_p = incr_None 57 self.current_iter = None 58 59 def H(self,q,p): 60 """Evaluates the Hamiltonian, at a given position and impulsion.""" 61 raise NotImplementedError 62 63 def DqH(q,p): 64 """Differentiates the Hamiltonian, w.r.t. position.""" 65 raise NotImplementedError 66 67 def DpH(q,p): 68 """Differentiates the Hamiltonian, w.r.t. impulsion.""" 69 raise NotImplementedError 70 71 def flow(self,q,p): 72 """ 73 Symplectic gradient of the Hamiltonian. 74 """ 75 return (self.DpH(q,p),-self.DqH(q,p)) 76 77 def flow_cat(self,qp,t=None): 78 """ 79 Symplectic gradient of the hamiltonian, intended for odeint. 80 81 Input : 82 - qp : position q, impulsion p, flattened and concatenated. 83 - t : ignored parameter (compatibility with scipy.integrate.odeint) 84 - shape : how to reshape q and p for calling DpH and DqH, if needed 85 86 Output : 87 - symplectic gradient, concatenated and flattened. 88 """ 89 d = len(qp)//2 90 q = qp[:d]; p = qp[d:] 91 if hasattr(self,'shape_free') and self.shape_free is not None: 92 q = q.reshape((*self.shape_free,-1)) 93 p = p.reshape((*self.shape_free,-1)) 94 return np.concatenate(self.flow(q,p),axis=0).reshape(-1) 95 96 def integrate(self,q,p,scheme,niter=None,dt=None,T=None,path=False): 97 """ 98 Solves Hamilton's equations by running the scheme niter times. 99 100 Inputs : 101 - q,p : Initial position and impulsion. 102 - scheme : ODE integration scheme. (string or callable) 103 - niter,dt,T : number of steps, time step, and total time 104 (exactly two among the three must be specified) 105 - path : wether to return the intermediate steps. 106 (If a positive number, period of intermediate steps to return) 107 108 Output : 109 - q,p if path is False. 110 Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], 111 with n=niter, tn=T. 112 """ 113 if isinstance(scheme,str): 114 schemes = self.nonsymplectic_schemes() 115 schemes.update(self.symplectic_schemes()) 116 scheme = schemes[scheme] 117 118 if (niter is None) + (T is None) + (dt is None) != 1: 119 raise ValueError("Exactly two of niter, dt and T must be specified") 120 if T is None: T = niter*dt 121 elif dt is None: dt = T/niter 122 elif niter is None: 123 niter = int(np.ceil(T/dt)) 124 dt = T/niter # slightly decreased time step 125 126 q,p = copy(q),copy(p) 127 if path: Q,P = [copy(q)],[copy(p)] 128 129 for i in range(niter): 130 q,p = scheme(q,p,dt) 131 if path and not i%path: Q.append(copy(q)); P.append(copy(p)) 132 133 if path: return np.stack(Q,axis=-1),np.stack(P,axis=-1),np.linspace(0,T,niter+1) 134 else: return q,p 135 136 def nonsymplectic_schemes(self): 137 """ 138 Standard ODE integration schemes 139 """ 140 def Euler(q,p,dt): 141 dq,dp = self.flow(q,p) 142 return q+dt*dq, p+dt*dp 143 144 def RK2(q,p,dt): 145 dq1,dp1 = self.flow(q, p) 146 dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1) 147 return q+dt*dq2, p+dt*dp2 148 149 def RK4(q,p,dt): 150 dq1,dp1 = self.flow(q, p) 151 dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1) 152 dq3,dp3 = self.flow(q+0.5*dt*dq2, p+0.5*dt*dp2) 153 dq4,dp4 = self.flow(q+dt*dq3, p+dt*dp3) 154 return q+dt*(dq1+2*dq2+2*dq3+dq4)/6., p+dt*(dp1+2*dp2+2*dp3+dp4)/6. 155 156 return {"Euler":Euler,"Runge-Kutta-2":RK2,"Runge-Kutta-4":RK4} 157 158 # -------------- sub-steps of schemes -------------- 159 160 def _mk_copy(self,x): 161 """Returns a copy of variable x. Specialize if needed (for GPU c_contiguity...).""" 162 return x.copy() 163 164 def Expl_q(self,q,p,δ): 165 """Explicit time step for the position q.""" 166 if self.preserve_q: q = self._mk_copy(q) 167 q += δ*self.DpH(q,p) 168 return q 169 def Expl_p(self,q,p,δ): 170 """Explicit time step for the impulsion p.""" 171 if self.preserve_p: p = self._mk_copy(p) 172 p -= δ*self.DqH(q,p) 173 return p 174 175 def Damp_qp(self,q,p,δ): 176 """ 177 Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : 178 p *= exp(-δ*damp_p); q *= exp(-δ*damp_q) 179 Please set damp_q and damp_p as appropriate. 180 """ 181 if self.damp_q is not damp_None: 182 if self.preserve_q: q = self._mk_copy(q) 183 q *= np.exp(-δ*self.damp_q) # Maybe we can avoid recomputing these exponentials 184 if self.damp_p is not damp_None: 185 if self.preserve_p: p = self._mk_copy(p) 186 p *= np.exp(-δ*self.damp_p) 187 return q,p 188 189 def Impl_p(self,q,p,δ): 190 """ 191 Implicit time step for the impulsion p. 192 """ 193 return self.impl_solver(lambda p_:p-δ*self.DqH(q,p_), p) 194 195 def Impl_q(self,q,p,δ): 196 """ 197 Implicit time step for the position q. 198 """ 199 return self.impl_solver(lambda q_:q-δ*self.DqH(q_,p), q) 200 201 def Impl2_p(self,q,p,δ_before,δ_total,δ_after): 202 """ 203 Merge two implicit time steps for the impulsion p, with a damping step in between. 204 """ 205 #Default implementation, without optimization (i.e. merging of the Impl_p steps) 206 p = self.Impl_p(q,p,δ_before) 207 self.read_q(self,q); self.read_p(self,p) # Read position and momentum before damp 208 q,p = self.Damp_qp(q,p,δ_total) 209 self.incr_q(self,q); self.incr_p(self,p) # Modify position and momentum after damp 210 p = self.Impl_p(q,p,δ_after) 211 return q,p 212 213 # ---------------- Symplectic schemes --------------- 214 215 def Euler_p(self,q,p,δ,niter=1): 216 """niter time steps of the symplectic Euler scheme, starting with impulsion p update.""" 217 for i in range(niter): 218 self.current_iter = i 219 p=self.Impl_p(q,p,δ) 220 q=self.Expl_q(q,p,δ) 221 if i!=niter-1: 222 self.read_q(self,q); self.read_p(self,p) 223 self.Damp_qp(q,p,δ) 224 self.incr_q(self,q); self.incr_p(self,p) 225 return q,p 226 227 def Euler_q(self,q,p,δ,niter=1): 228 """niter time steps of the symplectic Euler scheme, starting with position q update.""" 229 for i in range(niter): 230 self.current_iter = i 231 q=self.Impl_q(q,p,δ) 232 p=self.Expl_p(q,p,δ) 233 if i!=niter-1: 234 self.read_q(self,q); self.read_p(self,p) 235 self.Damp_qp(q,p,δ) 236 self.incr_q(self,q); self.incr_p(self,p) 237 return q,p 238 239 def Verlet_p(self,q,p,δ,niter=1): 240 """ 241 niter time steps of the symplectic Verlet scheme. 242 Optional damping steps interleaved. 243 """ 244 hδ=δ/2 245 for i in range(niter): 246 self.current_iter = i 247 if i==0: p=self.Impl_p(q,p,hδ) 248 q=self.Expl_q(q,p,δ) 249 if i==niter-1: p=self.Impl_p(q,p,hδ) 250 else: q,p=self.Impl2_p(q,p,hδ,δ,hδ) 251 return q,p 252 253 def Ruth4_p(self,q,p,δ,niter=1): 254 """ 255 niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia. 256 Optional damping steps interleaved. 257 """ 258 t = 2.**(1./3.) 259 c1 = 1/(2*(2-t)); c2 = (1-t)*c1; 260 d1 = 2*c1; d2 = -t*d1; 261 for i in range(niter): 262 self.current_iter = i 263 if i==0: p=self.Impl_p(q,p,c1*δ) 264 q=self.Expl_q(q,p,d1*δ) 265 p=self.Impl_p(q,p,c2*δ) 266 q=self.Expl_q(q,p,d2*δ) 267 p=self.Impl_p(q,p,c2*δ) 268 q=self.Expl_q(q,p,d1*δ) 269 if i==niter-1: p=self.Impl_p(q,p,c1*δ) 270 else: q,p=self.Impl2_p(q,p,c1*δ,δ,c1*δ) 271 return q,p 272 273 def Sympl_p(self,q,p,δ,niter=1,order=2): 274 """ 275 niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, 276 depending on the order parameter. 277 """ 278 if order==1: return self.Euler_p(q,p,δ,niter) 279 if order==2: return self.Verlet_p(q,p,δ,niter) 280 if order==4: return self.Ruth4_p( q,p,δ,niter) 281 raise ValueError(f"Found {order=}, while expecting 1,2 or 4.") 282 283 def symplectic_schemes(self): 284 return {'Euler-p':self.Euler_p,'Euler-q':self.Euler_q,'Verlet-p':self.Verlet_p, 285 'Ruth4-p':self.Ruth4_p} 286 287 # ------ Routines for backpropagation ------ 288 289 def _Sympl_p(self,q,p,δ,order=2): 290 """ 291 Returns a callable, which computes any given iteration of Sympl_p, and saves appropriate 292 keypoints for computational and memory efficiency. 293 """ 294 # TODO : remove since probably useless ? (Only use case is backprop in seismogram) 295 296 # The main objective of this routine is to preserve accuracy when damp_p and damp_q are set. 297 # Otherwise, the iterations may be reversed by iterating the symplectic scheme 298 # in negative time, providing a result with similar accuracy. 299 def next(qp): # A single time step, including a preliminary damping 300 qp = self.Damp_qp(*qp,δ) 301 return self.Sympl_p(*qp,δ,order=order) 302 from .backtrack import RecurseRewind 303 return RecurseRewind(next,self.Damp_qp(q,p,-δ)) # A single negative damping step should be fine... 304 305 def seismogram(self,q,p,δ,niter,order=2,qh_ind=None,ph_ind=None): 306 307 H_fwd = copy(self) 308 H_fwd.read_q = H_fwd.read_p = read_None; H_fwd.incr_q = H_fwd.incr_p = incr_None 309 qh = []; ph = []; initial=True 310 if qh_ind is not None: H_fwd.read_q = lambda _,q : qh.append(q.reshape(-1)[qh_ind]) 311 if ph_ind is not None: H_fwd.read_p = lambda _,p : ph.append(p.reshape(-1)[ph_ind]) 312 qf,pf = H_fwd.Sympl_p(q,p,δ,niter,order) 313 314 return qf,pf,ad.asarray(qh),ad.asarray(ph)
23def fixedpoint(f,x,tol=1e-9,nitermax=100): 24 """ 25 Iterates the function f on the data x until a fixed point is found, 26 up to prescribed tolerance, or the maximum number of iterations is reached. 27 """ 28 norm_infinity = ad.Optimization.norm_infinity 29 x_old = x 30 for i in range(nitermax): 31 x=f(x) 32 if norm_infinity(x-x_old)<tol: break 33 x_old = x 34 return x
Iterates the function f on the data x until a fixed point is found, up to prescribed tolerance, or the maximum number of iterations is reached.
37def read_None(H,x): pass # Placeholder for the data read functions read_q and read_p
38def incr_None(H,x): pass # Placeholder for the data write functions incr_q and incr_p
40class HamiltonianBase: 41 """ 42 Base class for Hamiltonians. 43 44 Default initialized fields : 45 - impl_solver : Fixed point solver to be used for the implicit time steps 46 - preserve_q, preserve_p : Wether updates modify q and p in place 47 - damp_q, damp_p : damping factor between scheme iterations 48 - read_q, read_p : called before each damping step, for reading q and p 49 - incr_q, incr_p : called after each damping step, as a source term for q and p 50 """ 51 def __init__(self): 52 self.impl_solver = fixedpoint 53 self.preserve_q = True; self.preserve_p = True 54 self.Impl2_p_merged = True 55 self.damp_q = damp_None; self.damp_p = damp_None 56 self.read_q = read_None; self.read_p = read_None 57 self.incr_q = incr_None; self.incr_p = incr_None 58 self.current_iter = None 59 60 def H(self,q,p): 61 """Evaluates the Hamiltonian, at a given position and impulsion.""" 62 raise NotImplementedError 63 64 def DqH(q,p): 65 """Differentiates the Hamiltonian, w.r.t. position.""" 66 raise NotImplementedError 67 68 def DpH(q,p): 69 """Differentiates the Hamiltonian, w.r.t. impulsion.""" 70 raise NotImplementedError 71 72 def flow(self,q,p): 73 """ 74 Symplectic gradient of the Hamiltonian. 75 """ 76 return (self.DpH(q,p),-self.DqH(q,p)) 77 78 def flow_cat(self,qp,t=None): 79 """ 80 Symplectic gradient of the hamiltonian, intended for odeint. 81 82 Input : 83 - qp : position q, impulsion p, flattened and concatenated. 84 - t : ignored parameter (compatibility with scipy.integrate.odeint) 85 - shape : how to reshape q and p for calling DpH and DqH, if needed 86 87 Output : 88 - symplectic gradient, concatenated and flattened. 89 """ 90 d = len(qp)//2 91 q = qp[:d]; p = qp[d:] 92 if hasattr(self,'shape_free') and self.shape_free is not None: 93 q = q.reshape((*self.shape_free,-1)) 94 p = p.reshape((*self.shape_free,-1)) 95 return np.concatenate(self.flow(q,p),axis=0).reshape(-1) 96 97 def integrate(self,q,p,scheme,niter=None,dt=None,T=None,path=False): 98 """ 99 Solves Hamilton's equations by running the scheme niter times. 100 101 Inputs : 102 - q,p : Initial position and impulsion. 103 - scheme : ODE integration scheme. (string or callable) 104 - niter,dt,T : number of steps, time step, and total time 105 (exactly two among the three must be specified) 106 - path : wether to return the intermediate steps. 107 (If a positive number, period of intermediate steps to return) 108 109 Output : 110 - q,p if path is False. 111 Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], 112 with n=niter, tn=T. 113 """ 114 if isinstance(scheme,str): 115 schemes = self.nonsymplectic_schemes() 116 schemes.update(self.symplectic_schemes()) 117 scheme = schemes[scheme] 118 119 if (niter is None) + (T is None) + (dt is None) != 1: 120 raise ValueError("Exactly two of niter, dt and T must be specified") 121 if T is None: T = niter*dt 122 elif dt is None: dt = T/niter 123 elif niter is None: 124 niter = int(np.ceil(T/dt)) 125 dt = T/niter # slightly decreased time step 126 127 q,p = copy(q),copy(p) 128 if path: Q,P = [copy(q)],[copy(p)] 129 130 for i in range(niter): 131 q,p = scheme(q,p,dt) 132 if path and not i%path: Q.append(copy(q)); P.append(copy(p)) 133 134 if path: return np.stack(Q,axis=-1),np.stack(P,axis=-1),np.linspace(0,T,niter+1) 135 else: return q,p 136 137 def nonsymplectic_schemes(self): 138 """ 139 Standard ODE integration schemes 140 """ 141 def Euler(q,p,dt): 142 dq,dp = self.flow(q,p) 143 return q+dt*dq, p+dt*dp 144 145 def RK2(q,p,dt): 146 dq1,dp1 = self.flow(q, p) 147 dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1) 148 return q+dt*dq2, p+dt*dp2 149 150 def RK4(q,p,dt): 151 dq1,dp1 = self.flow(q, p) 152 dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1) 153 dq3,dp3 = self.flow(q+0.5*dt*dq2, p+0.5*dt*dp2) 154 dq4,dp4 = self.flow(q+dt*dq3, p+dt*dp3) 155 return q+dt*(dq1+2*dq2+2*dq3+dq4)/6., p+dt*(dp1+2*dp2+2*dp3+dp4)/6. 156 157 return {"Euler":Euler,"Runge-Kutta-2":RK2,"Runge-Kutta-4":RK4} 158 159 # -------------- sub-steps of schemes -------------- 160 161 def _mk_copy(self,x): 162 """Returns a copy of variable x. Specialize if needed (for GPU c_contiguity...).""" 163 return x.copy() 164 165 def Expl_q(self,q,p,δ): 166 """Explicit time step for the position q.""" 167 if self.preserve_q: q = self._mk_copy(q) 168 q += δ*self.DpH(q,p) 169 return q 170 def Expl_p(self,q,p,δ): 171 """Explicit time step for the impulsion p.""" 172 if self.preserve_p: p = self._mk_copy(p) 173 p -= δ*self.DqH(q,p) 174 return p 175 176 def Damp_qp(self,q,p,δ): 177 """ 178 Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : 179 p *= exp(-δ*damp_p); q *= exp(-δ*damp_q) 180 Please set damp_q and damp_p as appropriate. 181 """ 182 if self.damp_q is not damp_None: 183 if self.preserve_q: q = self._mk_copy(q) 184 q *= np.exp(-δ*self.damp_q) # Maybe we can avoid recomputing these exponentials 185 if self.damp_p is not damp_None: 186 if self.preserve_p: p = self._mk_copy(p) 187 p *= np.exp(-δ*self.damp_p) 188 return q,p 189 190 def Impl_p(self,q,p,δ): 191 """ 192 Implicit time step for the impulsion p. 193 """ 194 return self.impl_solver(lambda p_:p-δ*self.DqH(q,p_), p) 195 196 def Impl_q(self,q,p,δ): 197 """ 198 Implicit time step for the position q. 199 """ 200 return self.impl_solver(lambda q_:q-δ*self.DqH(q_,p), q) 201 202 def Impl2_p(self,q,p,δ_before,δ_total,δ_after): 203 """ 204 Merge two implicit time steps for the impulsion p, with a damping step in between. 205 """ 206 #Default implementation, without optimization (i.e. merging of the Impl_p steps) 207 p = self.Impl_p(q,p,δ_before) 208 self.read_q(self,q); self.read_p(self,p) # Read position and momentum before damp 209 q,p = self.Damp_qp(q,p,δ_total) 210 self.incr_q(self,q); self.incr_p(self,p) # Modify position and momentum after damp 211 p = self.Impl_p(q,p,δ_after) 212 return q,p 213 214 # ---------------- Symplectic schemes --------------- 215 216 def Euler_p(self,q,p,δ,niter=1): 217 """niter time steps of the symplectic Euler scheme, starting with impulsion p update.""" 218 for i in range(niter): 219 self.current_iter = i 220 p=self.Impl_p(q,p,δ) 221 q=self.Expl_q(q,p,δ) 222 if i!=niter-1: 223 self.read_q(self,q); self.read_p(self,p) 224 self.Damp_qp(q,p,δ) 225 self.incr_q(self,q); self.incr_p(self,p) 226 return q,p 227 228 def Euler_q(self,q,p,δ,niter=1): 229 """niter time steps of the symplectic Euler scheme, starting with position q update.""" 230 for i in range(niter): 231 self.current_iter = i 232 q=self.Impl_q(q,p,δ) 233 p=self.Expl_p(q,p,δ) 234 if i!=niter-1: 235 self.read_q(self,q); self.read_p(self,p) 236 self.Damp_qp(q,p,δ) 237 self.incr_q(self,q); self.incr_p(self,p) 238 return q,p 239 240 def Verlet_p(self,q,p,δ,niter=1): 241 """ 242 niter time steps of the symplectic Verlet scheme. 243 Optional damping steps interleaved. 244 """ 245 hδ=δ/2 246 for i in range(niter): 247 self.current_iter = i 248 if i==0: p=self.Impl_p(q,p,hδ) 249 q=self.Expl_q(q,p,δ) 250 if i==niter-1: p=self.Impl_p(q,p,hδ) 251 else: q,p=self.Impl2_p(q,p,hδ,δ,hδ) 252 return q,p 253 254 def Ruth4_p(self,q,p,δ,niter=1): 255 """ 256 niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia. 257 Optional damping steps interleaved. 258 """ 259 t = 2.**(1./3.) 260 c1 = 1/(2*(2-t)); c2 = (1-t)*c1; 261 d1 = 2*c1; d2 = -t*d1; 262 for i in range(niter): 263 self.current_iter = i 264 if i==0: p=self.Impl_p(q,p,c1*δ) 265 q=self.Expl_q(q,p,d1*δ) 266 p=self.Impl_p(q,p,c2*δ) 267 q=self.Expl_q(q,p,d2*δ) 268 p=self.Impl_p(q,p,c2*δ) 269 q=self.Expl_q(q,p,d1*δ) 270 if i==niter-1: p=self.Impl_p(q,p,c1*δ) 271 else: q,p=self.Impl2_p(q,p,c1*δ,δ,c1*δ) 272 return q,p 273 274 def Sympl_p(self,q,p,δ,niter=1,order=2): 275 """ 276 niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, 277 depending on the order parameter. 278 """ 279 if order==1: return self.Euler_p(q,p,δ,niter) 280 if order==2: return self.Verlet_p(q,p,δ,niter) 281 if order==4: return self.Ruth4_p( q,p,δ,niter) 282 raise ValueError(f"Found {order=}, while expecting 1,2 or 4.") 283 284 def symplectic_schemes(self): 285 return {'Euler-p':self.Euler_p,'Euler-q':self.Euler_q,'Verlet-p':self.Verlet_p, 286 'Ruth4-p':self.Ruth4_p} 287 288 # ------ Routines for backpropagation ------ 289 290 def _Sympl_p(self,q,p,δ,order=2): 291 """ 292 Returns a callable, which computes any given iteration of Sympl_p, and saves appropriate 293 keypoints for computational and memory efficiency. 294 """ 295 # TODO : remove since probably useless ? (Only use case is backprop in seismogram) 296 297 # The main objective of this routine is to preserve accuracy when damp_p and damp_q are set. 298 # Otherwise, the iterations may be reversed by iterating the symplectic scheme 299 # in negative time, providing a result with similar accuracy. 300 def next(qp): # A single time step, including a preliminary damping 301 qp = self.Damp_qp(*qp,δ) 302 return self.Sympl_p(*qp,δ,order=order) 303 from .backtrack import RecurseRewind 304 return RecurseRewind(next,self.Damp_qp(q,p,-δ)) # A single negative damping step should be fine... 305 306 def seismogram(self,q,p,δ,niter,order=2,qh_ind=None,ph_ind=None): 307 308 H_fwd = copy(self) 309 H_fwd.read_q = H_fwd.read_p = read_None; H_fwd.incr_q = H_fwd.incr_p = incr_None 310 qh = []; ph = []; initial=True 311 if qh_ind is not None: H_fwd.read_q = lambda _,q : qh.append(q.reshape(-1)[qh_ind]) 312 if ph_ind is not None: H_fwd.read_p = lambda _,p : ph.append(p.reshape(-1)[ph_ind]) 313 qf,pf = H_fwd.Sympl_p(q,p,δ,niter,order) 314 315 return qf,pf,ad.asarray(qh),ad.asarray(ph)
Base class for Hamiltonians.
Default initialized fields :
- impl_solver : Fixed point solver to be used for the implicit time steps
- preserve_q, preserve_p : Wether updates modify q and p in place
- damp_q, damp_p : damping factor between scheme iterations
- read_q, read_p : called before each damping step, for reading q and p
- incr_q, incr_p : called after each damping step, as a source term for q and p
60 def H(self,q,p): 61 """Evaluates the Hamiltonian, at a given position and impulsion.""" 62 raise NotImplementedError
Evaluates the Hamiltonian, at a given position and impulsion.
64 def DqH(q,p): 65 """Differentiates the Hamiltonian, w.r.t. position.""" 66 raise NotImplementedError
Differentiates the Hamiltonian, w.r.t. position.
68 def DpH(q,p): 69 """Differentiates the Hamiltonian, w.r.t. impulsion.""" 70 raise NotImplementedError
Differentiates the Hamiltonian, w.r.t. impulsion.
72 def flow(self,q,p): 73 """ 74 Symplectic gradient of the Hamiltonian. 75 """ 76 return (self.DpH(q,p),-self.DqH(q,p))
Symplectic gradient of the Hamiltonian.
78 def flow_cat(self,qp,t=None): 79 """ 80 Symplectic gradient of the hamiltonian, intended for odeint. 81 82 Input : 83 - qp : position q, impulsion p, flattened and concatenated. 84 - t : ignored parameter (compatibility with scipy.integrate.odeint) 85 - shape : how to reshape q and p for calling DpH and DqH, if needed 86 87 Output : 88 - symplectic gradient, concatenated and flattened. 89 """ 90 d = len(qp)//2 91 q = qp[:d]; p = qp[d:] 92 if hasattr(self,'shape_free') and self.shape_free is not None: 93 q = q.reshape((*self.shape_free,-1)) 94 p = p.reshape((*self.shape_free,-1)) 95 return np.concatenate(self.flow(q,p),axis=0).reshape(-1)
Symplectic gradient of the hamiltonian, intended for odeint.
Input :
- qp : position q, impulsion p, flattened and concatenated.
- t : ignored parameter (compatibility with scipy.integrate.odeint)
- shape : how to reshape q and p for calling DpH and DqH, if needed
Output :
- symplectic gradient, concatenated and flattened.
97 def integrate(self,q,p,scheme,niter=None,dt=None,T=None,path=False): 98 """ 99 Solves Hamilton's equations by running the scheme niter times. 100 101 Inputs : 102 - q,p : Initial position and impulsion. 103 - scheme : ODE integration scheme. (string or callable) 104 - niter,dt,T : number of steps, time step, and total time 105 (exactly two among the three must be specified) 106 - path : wether to return the intermediate steps. 107 (If a positive number, period of intermediate steps to return) 108 109 Output : 110 - q,p if path is False. 111 Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], 112 with n=niter, tn=T. 113 """ 114 if isinstance(scheme,str): 115 schemes = self.nonsymplectic_schemes() 116 schemes.update(self.symplectic_schemes()) 117 scheme = schemes[scheme] 118 119 if (niter is None) + (T is None) + (dt is None) != 1: 120 raise ValueError("Exactly two of niter, dt and T must be specified") 121 if T is None: T = niter*dt 122 elif dt is None: dt = T/niter 123 elif niter is None: 124 niter = int(np.ceil(T/dt)) 125 dt = T/niter # slightly decreased time step 126 127 q,p = copy(q),copy(p) 128 if path: Q,P = [copy(q)],[copy(p)] 129 130 for i in range(niter): 131 q,p = scheme(q,p,dt) 132 if path and not i%path: Q.append(copy(q)); P.append(copy(p)) 133 134 if path: return np.stack(Q,axis=-1),np.stack(P,axis=-1),np.linspace(0,T,niter+1) 135 else: return q,p
Solves Hamilton's equations by running the scheme niter times.
Inputs :
- q,p : Initial position and impulsion.
- scheme : ODE integration scheme. (string or callable)
- niter,dt,T : number of steps, time step, and total time (exactly two among the three must be specified)
- path : wether to return the intermediate steps. (If a positive number, period of intermediate steps to return)
Output :
- q,p if path is False. Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], with n=niter, tn=T.
137 def nonsymplectic_schemes(self): 138 """ 139 Standard ODE integration schemes 140 """ 141 def Euler(q,p,dt): 142 dq,dp = self.flow(q,p) 143 return q+dt*dq, p+dt*dp 144 145 def RK2(q,p,dt): 146 dq1,dp1 = self.flow(q, p) 147 dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1) 148 return q+dt*dq2, p+dt*dp2 149 150 def RK4(q,p,dt): 151 dq1,dp1 = self.flow(q, p) 152 dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1) 153 dq3,dp3 = self.flow(q+0.5*dt*dq2, p+0.5*dt*dp2) 154 dq4,dp4 = self.flow(q+dt*dq3, p+dt*dp3) 155 return q+dt*(dq1+2*dq2+2*dq3+dq4)/6., p+dt*(dp1+2*dp2+2*dp3+dp4)/6. 156 157 return {"Euler":Euler,"Runge-Kutta-2":RK2,"Runge-Kutta-4":RK4}
Standard ODE integration schemes
165 def Expl_q(self,q,p,δ): 166 """Explicit time step for the position q.""" 167 if self.preserve_q: q = self._mk_copy(q) 168 q += δ*self.DpH(q,p) 169 return q
Explicit time step for the position q.
170 def Expl_p(self,q,p,δ): 171 """Explicit time step for the impulsion p.""" 172 if self.preserve_p: p = self._mk_copy(p) 173 p -= δ*self.DqH(q,p) 174 return p
Explicit time step for the impulsion p.
176 def Damp_qp(self,q,p,δ): 177 """ 178 Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : 179 p *= exp(-δ*damp_p); q *= exp(-δ*damp_q) 180 Please set damp_q and damp_p as appropriate. 181 """ 182 if self.damp_q is not damp_None: 183 if self.preserve_q: q = self._mk_copy(q) 184 q *= np.exp(-δ*self.damp_q) # Maybe we can avoid recomputing these exponentials 185 if self.damp_p is not damp_None: 186 if self.preserve_p: p = self._mk_copy(p) 187 p *= np.exp(-δ*self.damp_p) 188 return q,p
Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : p = exp(-δdamp_p); q = exp(-δdamp_q) Please set damp_q and damp_p as appropriate.
190 def Impl_p(self,q,p,δ): 191 """ 192 Implicit time step for the impulsion p. 193 """ 194 return self.impl_solver(lambda p_:p-δ*self.DqH(q,p_), p)
Implicit time step for the impulsion p.
196 def Impl_q(self,q,p,δ): 197 """ 198 Implicit time step for the position q. 199 """ 200 return self.impl_solver(lambda q_:q-δ*self.DqH(q_,p), q)
Implicit time step for the position q.
202 def Impl2_p(self,q,p,δ_before,δ_total,δ_after): 203 """ 204 Merge two implicit time steps for the impulsion p, with a damping step in between. 205 """ 206 #Default implementation, without optimization (i.e. merging of the Impl_p steps) 207 p = self.Impl_p(q,p,δ_before) 208 self.read_q(self,q); self.read_p(self,p) # Read position and momentum before damp 209 q,p = self.Damp_qp(q,p,δ_total) 210 self.incr_q(self,q); self.incr_p(self,p) # Modify position and momentum after damp 211 p = self.Impl_p(q,p,δ_after) 212 return q,p
Merge two implicit time steps for the impulsion p, with a damping step in between.
216 def Euler_p(self,q,p,δ,niter=1): 217 """niter time steps of the symplectic Euler scheme, starting with impulsion p update.""" 218 for i in range(niter): 219 self.current_iter = i 220 p=self.Impl_p(q,p,δ) 221 q=self.Expl_q(q,p,δ) 222 if i!=niter-1: 223 self.read_q(self,q); self.read_p(self,p) 224 self.Damp_qp(q,p,δ) 225 self.incr_q(self,q); self.incr_p(self,p) 226 return q,p
niter time steps of the symplectic Euler scheme, starting with impulsion p update.
228 def Euler_q(self,q,p,δ,niter=1): 229 """niter time steps of the symplectic Euler scheme, starting with position q update.""" 230 for i in range(niter): 231 self.current_iter = i 232 q=self.Impl_q(q,p,δ) 233 p=self.Expl_p(q,p,δ) 234 if i!=niter-1: 235 self.read_q(self,q); self.read_p(self,p) 236 self.Damp_qp(q,p,δ) 237 self.incr_q(self,q); self.incr_p(self,p) 238 return q,p
niter time steps of the symplectic Euler scheme, starting with position q update.
240 def Verlet_p(self,q,p,δ,niter=1): 241 """ 242 niter time steps of the symplectic Verlet scheme. 243 Optional damping steps interleaved. 244 """ 245 hδ=δ/2 246 for i in range(niter): 247 self.current_iter = i 248 if i==0: p=self.Impl_p(q,p,hδ) 249 q=self.Expl_q(q,p,δ) 250 if i==niter-1: p=self.Impl_p(q,p,hδ) 251 else: q,p=self.Impl2_p(q,p,hδ,δ,hδ) 252 return q,p
niter time steps of the symplectic Verlet scheme. Optional damping steps interleaved.
254 def Ruth4_p(self,q,p,δ,niter=1): 255 """ 256 niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia. 257 Optional damping steps interleaved. 258 """ 259 t = 2.**(1./3.) 260 c1 = 1/(2*(2-t)); c2 = (1-t)*c1; 261 d1 = 2*c1; d2 = -t*d1; 262 for i in range(niter): 263 self.current_iter = i 264 if i==0: p=self.Impl_p(q,p,c1*δ) 265 q=self.Expl_q(q,p,d1*δ) 266 p=self.Impl_p(q,p,c2*δ) 267 q=self.Expl_q(q,p,d2*δ) 268 p=self.Impl_p(q,p,c2*δ) 269 q=self.Expl_q(q,p,d1*δ) 270 if i==niter-1: p=self.Impl_p(q,p,c1*δ) 271 else: q,p=self.Impl2_p(q,p,c1*δ,δ,c1*δ) 272 return q,p
niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia. Optional damping steps interleaved.
274 def Sympl_p(self,q,p,δ,niter=1,order=2): 275 """ 276 niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, 277 depending on the order parameter. 278 """ 279 if order==1: return self.Euler_p(q,p,δ,niter) 280 if order==2: return self.Verlet_p(q,p,δ,niter) 281 if order==4: return self.Ruth4_p( q,p,δ,niter) 282 raise ValueError(f"Found {order=}, while expecting 1,2 or 4.")
niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, depending on the order parameter.
306 def seismogram(self,q,p,δ,niter,order=2,qh_ind=None,ph_ind=None): 307 308 H_fwd = copy(self) 309 H_fwd.read_q = H_fwd.read_p = read_None; H_fwd.incr_q = H_fwd.incr_p = incr_None 310 qh = []; ph = []; initial=True 311 if qh_ind is not None: H_fwd.read_q = lambda _,q : qh.append(q.reshape(-1)[qh_ind]) 312 if ph_ind is not None: H_fwd.read_p = lambda _,p : ph.append(p.reshape(-1)[ph_ind]) 313 qf,pf = H_fwd.Sympl_p(q,p,δ,niter,order) 314 315 return qf,pf,ad.asarray(qh),ad.asarray(ph)