agd.ODE.hamiltonian_base

This module implements some basic functionality for solving ODEs derived from a Hamiltonian in a manner compatible with automatic differentiation. (Flow computation, symplectic schemes, etc)

Recall that Hamilton's equations read $$ \frac {dq}{dt} = \frac {\partial H}{\partial p}, \quad \frac {dp}{dt} = - \frac {\partial H}{\partial q}. $$

  1# Copyright 2020 Jean-Marie Mirebeau, University Paris-Sud, CNRS, University Paris-Saclay
  2# Distributed WITHOUT ANY WARRANTY. Licensed under the Apache License, Version 2.0, see http://www.apache.org/licenses/LICENSE-2.0
  3
  4r"""
  5This module implements some basic functionality for solving ODEs derived from a 
  6Hamiltonian in a manner compatible with automatic differentiation.
  7(Flow computation, symplectic schemes, etc)
  8
  9Recall that Hamilton's equations read 
 10$$
 11\frac {dq}{dt} = \frac {\partial H}{\partial p},
 12\quad
 13\frac {dp}{dt} = - \frac {\partial H}{\partial q}.
 14$$
 15"""
 16
 17import numpy as np
 18from copy import copy
 19
 20from .. import AutomaticDifferentiation as ad
 21
 22def fixedpoint(f,x,tol=1e-9,nitermax=100):
 23	"""
 24	Iterates the function f on the data x until a fixed point is found, 
 25	up to prescribed tolerance, or the maximum number of iterations is reached.
 26	"""
 27	norm_infinity = ad.Optimization.norm_infinity
 28	x_old = x
 29	for i in range(nitermax):
 30		x=f(x)
 31		if norm_infinity(x-x_old)<tol: break
 32		x_old = x
 33	return x
 34
 35damp_None = 0 # Placeholder for the damping factors damp_q and damp_p of class HamiltonianBase
 36def read_None(H,x): pass # Placeholder for the data read functions read_q and read_p
 37def incr_None(H,x): pass # Placeholder for the data write functions incr_q and incr_p
 38
 39class HamiltonianBase:
 40	"""
 41	Base class for Hamiltonians.
 42
 43	Default initialized fields : 
 44	- impl_solver : Fixed point solver to be used for the implicit time steps
 45	- preserve_q, preserve_p : Wether updates modify q and p in place
 46	- damp_q, damp_p : damping factor between scheme iterations
 47	- read_q, read_p : called before each damping step, for reading q and p
 48	- incr_q, incr_p : called after each damping step, as a source term for q and p
 49	"""
 50	def __init__(self):
 51		self.impl_solver = fixedpoint 
 52		self.preserve_q = True; self.preserve_p = True 
 53		self.Impl2_p_merged = True 
 54		self.damp_q = damp_None; self.damp_p = damp_None 
 55		self.read_q = read_None; self.read_p = read_None
 56		self.incr_q = incr_None; self.incr_p = incr_None
 57		self.current_iter = None 
 58
 59	def H(self,q,p):
 60		"""Evaluates the Hamiltonian, at a given position and impulsion."""
 61		raise NotImplementedError
 62
 63	def DqH(q,p):
 64		"""Differentiates the Hamiltonian, w.r.t. position."""
 65		raise NotImplementedError
 66
 67	def DpH(q,p):
 68		"""Differentiates the Hamiltonian, w.r.t. impulsion."""
 69		raise NotImplementedError
 70
 71	def flow(self,q,p):
 72		"""
 73		Symplectic gradient of the Hamiltonian.
 74		"""
 75		return (self.DpH(q,p),-self.DqH(q,p))
 76
 77	def flow_cat(self,qp,t=None):
 78		"""
 79		Symplectic gradient of the hamiltonian, intended for odeint. 
 80
 81		Input : 
 82		 - qp : position q, impulsion p, flattened and concatenated.
 83		 - t : ignored parameter (compatibility with scipy.integrate.odeint)
 84		 - shape : how to reshape q and p for calling DpH and DqH, if needed
 85
 86		Output :
 87		 - symplectic gradient, concatenated and flattened.
 88		"""
 89		d = len(qp)//2
 90		q = qp[:d]; p = qp[d:]
 91		if hasattr(self,'shape_free') and self.shape_free is not None:
 92			q = q.reshape((*self.shape_free,-1))
 93			p = p.reshape((*self.shape_free,-1))
 94		return np.concatenate(self.flow(q,p),axis=0).reshape(-1)
 95
 96	def integrate(self,q,p,scheme,niter=None,dt=None,T=None,path=False):
 97		"""
 98		Solves Hamilton's equations by running the scheme niter times.
 99
100		Inputs : 
101		 - q,p : Initial position and impulsion.
102		 - scheme : ODE integration scheme. (string or callable)
103		 - niter,dt,T : number of steps, time step, and total time
104			  (exactly two among the three must be specified)
105		 - path : wether to return the intermediate steps. 
106			   (If a positive number, period of intermediate steps to return)
107		
108		Output : 
109		 - q,p if path is False. 
110		  Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], 
111		  	with n=niter, tn=T.
112		"""
113		if isinstance(scheme,str):
114			schemes = self.nonsymplectic_schemes()
115			schemes.update(self.symplectic_schemes())
116			scheme = schemes[scheme]
117
118		if (niter is None) + (T is None) + (dt is None) != 1: 
119			raise ValueError("Exactly two of niter, dt and T must be specified")
120		if T is None:    T  = niter*dt
121		elif dt is None: dt = T/niter
122		elif niter is None: 
123			niter = int(np.ceil(T/dt))
124			dt = T/niter # slightly decreased time step
125
126		q,p = copy(q),copy(p)
127		if path: Q,P = [copy(q)],[copy(p)]
128
129		for i in range(niter):
130			q,p = scheme(q,p,dt)
131			if path and not i%path: Q.append(copy(q)); P.append(copy(p))
132
133		if path: return np.stack(Q,axis=-1),np.stack(P,axis=-1),np.linspace(0,T,niter+1)
134		else: return q,p
135
136	def nonsymplectic_schemes(self):
137		"""
138		Standard ODE integration schemes
139		"""
140		def Euler(q,p,dt):
141			dq,dp = self.flow(q,p)
142			return q+dt*dq, p+dt*dp
143
144		def RK2(q,p,dt):
145			dq1,dp1 = self.flow(q, p)
146			dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1)
147			return q+dt*dq2, p+dt*dp2
148
149		def RK4(q,p,dt):
150			dq1,dp1 = self.flow(q, p)
151			dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1)
152			dq3,dp3 = self.flow(q+0.5*dt*dq2, p+0.5*dt*dp2)
153			dq4,dp4 = self.flow(q+dt*dq3, p+dt*dp3)
154			return q+dt*(dq1+2*dq2+2*dq3+dq4)/6., p+dt*(dp1+2*dp2+2*dp3+dp4)/6.
155
156		return {"Euler":Euler,"Runge-Kutta-2":RK2,"Runge-Kutta-4":RK4}
157
158	# -------------- sub-steps of schemes --------------
159
160	def _mk_copy(self,x): 
161		"""Returns a copy of variable x. Specialize if needed (for GPU c_contiguity...)."""
162		return x.copy()
163
164	def Expl_q(self,q,p,δ):
165		"""Explicit time step for the position q."""
166		if self.preserve_q: q = self._mk_copy(q)
167		q += δ*self.DpH(q,p)
168		return q 
169	def Expl_p(self,q,p,δ):
170		"""Explicit time step for the impulsion p."""
171		if self.preserve_p: p = self._mk_copy(p)
172		p -= δ*self.DqH(q,p)
173		return p 
174
175	def Damp_qp(self,q,p,δ):
176		"""
177		Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : 
178		p *= exp(-δ*damp_p); q *= exp(-δ*damp_q)
179		Please set damp_q and damp_p as appropriate.
180		"""
181		if self.damp_q is not damp_None:
182			if self.preserve_q: q = self._mk_copy(q)
183			q *= np.exp(-δ*self.damp_q) # Maybe we can avoid recomputing these exponentials
184		if self.damp_p is not damp_None:
185			if self.preserve_p: p = self._mk_copy(p)
186			p *= np.exp(-δ*self.damp_p)
187		return q,p
188
189	def Impl_p(self,q,p,δ):
190		"""
191		Implicit time step for the impulsion p.
192		"""
193		return self.impl_solver(lambda p_:p-δ*self.DqH(q,p_), p)
194
195	def Impl_q(self,q,p,δ):
196		"""
197		Implicit time step for the position q.
198		"""
199		return self.impl_solver(lambda q_:q-δ*self.DqH(q_,p), q)
200
201	def Impl2_p(self,q,p,δ_before,δ_total,δ_after):
202		"""
203		Merge two implicit time steps for the impulsion p, with a damping step in between.
204		"""
205		#Default implementation, without optimization (i.e. merging of the Impl_p steps)
206		p   = self.Impl_p(q,p,δ_before)
207		self.read_q(self,q); self.read_p(self,p) # Read position and momentum before damp
208		q,p = self.Damp_qp(q,p,δ_total)
209		self.incr_q(self,q); self.incr_p(self,p) # Modify position and momentum after damp
210		p   = self.Impl_p(q,p,δ_after)
211		return q,p
212
213	# ---------------- Symplectic schemes ---------------
214
215	def Euler_p(self,q,p,δ,niter=1):
216		"""niter time steps of the symplectic Euler scheme, starting with impulsion p update."""
217		for i in range(niter):
218			self.current_iter = i
219			p=self.Impl_p(q,p,δ)
220			q=self.Expl_q(q,p,δ)
221			if i!=niter-1:
222				self.read_q(self,q); self.read_p(self,p) 
223				self.Damp_qp(q,p,δ) 
224				self.incr_q(self,q); self.incr_p(self,p)
225		return q,p
226
227	def Euler_q(self,q,p,δ,niter=1):
228		"""niter time steps of the symplectic Euler scheme, starting with position q update."""
229		for i in range(niter):
230			self.current_iter = i
231			q=self.Impl_q(q,p,δ)
232			p=self.Expl_p(q,p,δ)
233			if i!=niter-1:
234				self.read_q(self,q); self.read_p(self,p) 
235				self.Damp_qp(q,p,δ) 
236				self.incr_q(self,q); self.incr_p(self,p)
237		return q,p
238
239	def Verlet_p(self,q,p,δ,niter=1):
240		"""
241		niter time steps of the symplectic Verlet scheme.
242		Optional damping steps interleaved.
243		"""
244		=δ/2
245		for i in range(niter):
246			self.current_iter = i
247			if i==0: p=self.Impl_p(q,p,)
248			q=self.Expl_q(q,p,δ)
249			if i==niter-1: p=self.Impl_p(q,p,)
250			else: q,p=self.Impl2_p(q,p,,δ,)
251		return q,p
252
253	def Ruth4_p(self,q,p,δ,niter=1):
254		"""
255		niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia.
256		Optional damping steps interleaved.
257		"""
258		t = 2.**(1./3.)
259		c1 = 1/(2*(2-t)); c2 = (1-t)*c1; 
260		d1 = 2*c1; d2 = -t*d1;
261		for i in range(niter):
262			self.current_iter = i
263			if i==0: p=self.Impl_p(q,p,c1*δ)
264			q=self.Expl_q(q,p,d1*δ)
265			p=self.Impl_p(q,p,c2*δ)
266			q=self.Expl_q(q,p,d2*δ)
267			p=self.Impl_p(q,p,c2*δ)
268			q=self.Expl_q(q,p,d1*δ)
269			if i==niter-1: p=self.Impl_p(q,p,c1*δ) 
270			else: q,p=self.Impl2_p(q,p,c1*δ,δ,c1*δ)
271		return q,p
272
273	def Sympl_p(self,q,p,δ,niter=1,order=2):
274		"""
275		niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, 
276		depending on the order parameter.
277		"""
278		if order==1: return self.Euler_p(q,p,δ,niter)
279		if order==2: return self.Verlet_p(q,p,δ,niter)
280		if order==4: return self.Ruth4_p( q,p,δ,niter)
281		raise ValueError(f"Found {order=}, while expecting 1,2 or 4.")
282
283	def symplectic_schemes(self):
284		return {'Euler-p':self.Euler_p,'Euler-q':self.Euler_q,'Verlet-p':self.Verlet_p,
285		'Ruth4-p':self.Ruth4_p}
286
287	# ------ Routines for backpropagation ------
288
289	def _Sympl_p(self,q,p,δ,order=2):
290		"""
291		Returns a callable, which computes any given iteration of Sympl_p, and saves appropriate 
292		keypoints for computational and memory efficiency.
293		"""
294		# TODO : remove since probably useless ? (Only use case is backprop in seismogram)
295
296		# The main objective of this routine is to preserve accuracy when damp_p and damp_q are set.
297		# Otherwise, the iterations may be reversed by iterating the symplectic scheme 
298		# in negative time, providing a result with similar accuracy. 
299		def next(qp): # A single time step, including a preliminary damping
300			qp = self.Damp_qp(*qp,δ)
301			return self.Sympl_p(*qp,δ,order=order)
302		from .backtrack import RecurseRewind
303		return RecurseRewind(next,self.Damp_qp(q,p,-δ)) # A single negative damping step should be fine...
304
305	def seismogram(self,q,p,δ,niter,order=2,qh_ind=None,ph_ind=None):
306
307		H_fwd = copy(self)
308		H_fwd.read_q = H_fwd.read_p = read_None; H_fwd.incr_q = H_fwd.incr_p = incr_None
309		qh = []; ph = []; initial=True 
310		if qh_ind is not None: H_fwd.read_q = lambda _,q : qh.append(q.reshape(-1)[qh_ind])
311		if ph_ind is not None: H_fwd.read_p = lambda _,p : ph.append(p.reshape(-1)[ph_ind])
312		qf,pf = H_fwd.Sympl_p(q,p,δ,niter,order)
313
314		return qf,pf,ad.asarray(qh),ad.asarray(ph)
def fixedpoint(f, x, tol=1e-09, nitermax=100):
23def fixedpoint(f,x,tol=1e-9,nitermax=100):
24	"""
25	Iterates the function f on the data x until a fixed point is found, 
26	up to prescribed tolerance, or the maximum number of iterations is reached.
27	"""
28	norm_infinity = ad.Optimization.norm_infinity
29	x_old = x
30	for i in range(nitermax):
31		x=f(x)
32		if norm_infinity(x-x_old)<tol: break
33		x_old = x
34	return x

Iterates the function f on the data x until a fixed point is found, up to prescribed tolerance, or the maximum number of iterations is reached.

damp_None = 0
def read_None(H, x):
37def read_None(H,x): pass # Placeholder for the data read functions read_q and read_p
def incr_None(H, x):
38def incr_None(H,x): pass # Placeholder for the data write functions incr_q and incr_p
class HamiltonianBase:
 40class HamiltonianBase:
 41	"""
 42	Base class for Hamiltonians.
 43
 44	Default initialized fields : 
 45	- impl_solver : Fixed point solver to be used for the implicit time steps
 46	- preserve_q, preserve_p : Wether updates modify q and p in place
 47	- damp_q, damp_p : damping factor between scheme iterations
 48	- read_q, read_p : called before each damping step, for reading q and p
 49	- incr_q, incr_p : called after each damping step, as a source term for q and p
 50	"""
 51	def __init__(self):
 52		self.impl_solver = fixedpoint 
 53		self.preserve_q = True; self.preserve_p = True 
 54		self.Impl2_p_merged = True 
 55		self.damp_q = damp_None; self.damp_p = damp_None 
 56		self.read_q = read_None; self.read_p = read_None
 57		self.incr_q = incr_None; self.incr_p = incr_None
 58		self.current_iter = None 
 59
 60	def H(self,q,p):
 61		"""Evaluates the Hamiltonian, at a given position and impulsion."""
 62		raise NotImplementedError
 63
 64	def DqH(q,p):
 65		"""Differentiates the Hamiltonian, w.r.t. position."""
 66		raise NotImplementedError
 67
 68	def DpH(q,p):
 69		"""Differentiates the Hamiltonian, w.r.t. impulsion."""
 70		raise NotImplementedError
 71
 72	def flow(self,q,p):
 73		"""
 74		Symplectic gradient of the Hamiltonian.
 75		"""
 76		return (self.DpH(q,p),-self.DqH(q,p))
 77
 78	def flow_cat(self,qp,t=None):
 79		"""
 80		Symplectic gradient of the hamiltonian, intended for odeint. 
 81
 82		Input : 
 83		 - qp : position q, impulsion p, flattened and concatenated.
 84		 - t : ignored parameter (compatibility with scipy.integrate.odeint)
 85		 - shape : how to reshape q and p for calling DpH and DqH, if needed
 86
 87		Output :
 88		 - symplectic gradient, concatenated and flattened.
 89		"""
 90		d = len(qp)//2
 91		q = qp[:d]; p = qp[d:]
 92		if hasattr(self,'shape_free') and self.shape_free is not None:
 93			q = q.reshape((*self.shape_free,-1))
 94			p = p.reshape((*self.shape_free,-1))
 95		return np.concatenate(self.flow(q,p),axis=0).reshape(-1)
 96
 97	def integrate(self,q,p,scheme,niter=None,dt=None,T=None,path=False):
 98		"""
 99		Solves Hamilton's equations by running the scheme niter times.
100
101		Inputs : 
102		 - q,p : Initial position and impulsion.
103		 - scheme : ODE integration scheme. (string or callable)
104		 - niter,dt,T : number of steps, time step, and total time
105			  (exactly two among the three must be specified)
106		 - path : wether to return the intermediate steps. 
107			   (If a positive number, period of intermediate steps to return)
108		
109		Output : 
110		 - q,p if path is False. 
111		  Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], 
112		  	with n=niter, tn=T.
113		"""
114		if isinstance(scheme,str):
115			schemes = self.nonsymplectic_schemes()
116			schemes.update(self.symplectic_schemes())
117			scheme = schemes[scheme]
118
119		if (niter is None) + (T is None) + (dt is None) != 1: 
120			raise ValueError("Exactly two of niter, dt and T must be specified")
121		if T is None:    T  = niter*dt
122		elif dt is None: dt = T/niter
123		elif niter is None: 
124			niter = int(np.ceil(T/dt))
125			dt = T/niter # slightly decreased time step
126
127		q,p = copy(q),copy(p)
128		if path: Q,P = [copy(q)],[copy(p)]
129
130		for i in range(niter):
131			q,p = scheme(q,p,dt)
132			if path and not i%path: Q.append(copy(q)); P.append(copy(p))
133
134		if path: return np.stack(Q,axis=-1),np.stack(P,axis=-1),np.linspace(0,T,niter+1)
135		else: return q,p
136
137	def nonsymplectic_schemes(self):
138		"""
139		Standard ODE integration schemes
140		"""
141		def Euler(q,p,dt):
142			dq,dp = self.flow(q,p)
143			return q+dt*dq, p+dt*dp
144
145		def RK2(q,p,dt):
146			dq1,dp1 = self.flow(q, p)
147			dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1)
148			return q+dt*dq2, p+dt*dp2
149
150		def RK4(q,p,dt):
151			dq1,dp1 = self.flow(q, p)
152			dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1)
153			dq3,dp3 = self.flow(q+0.5*dt*dq2, p+0.5*dt*dp2)
154			dq4,dp4 = self.flow(q+dt*dq3, p+dt*dp3)
155			return q+dt*(dq1+2*dq2+2*dq3+dq4)/6., p+dt*(dp1+2*dp2+2*dp3+dp4)/6.
156
157		return {"Euler":Euler,"Runge-Kutta-2":RK2,"Runge-Kutta-4":RK4}
158
159	# -------------- sub-steps of schemes --------------
160
161	def _mk_copy(self,x): 
162		"""Returns a copy of variable x. Specialize if needed (for GPU c_contiguity...)."""
163		return x.copy()
164
165	def Expl_q(self,q,p,δ):
166		"""Explicit time step for the position q."""
167		if self.preserve_q: q = self._mk_copy(q)
168		q += δ*self.DpH(q,p)
169		return q 
170	def Expl_p(self,q,p,δ):
171		"""Explicit time step for the impulsion p."""
172		if self.preserve_p: p = self._mk_copy(p)
173		p -= δ*self.DqH(q,p)
174		return p 
175
176	def Damp_qp(self,q,p,δ):
177		"""
178		Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : 
179		p *= exp(-δ*damp_p); q *= exp(-δ*damp_q)
180		Please set damp_q and damp_p as appropriate.
181		"""
182		if self.damp_q is not damp_None:
183			if self.preserve_q: q = self._mk_copy(q)
184			q *= np.exp(-δ*self.damp_q) # Maybe we can avoid recomputing these exponentials
185		if self.damp_p is not damp_None:
186			if self.preserve_p: p = self._mk_copy(p)
187			p *= np.exp(-δ*self.damp_p)
188		return q,p
189
190	def Impl_p(self,q,p,δ):
191		"""
192		Implicit time step for the impulsion p.
193		"""
194		return self.impl_solver(lambda p_:p-δ*self.DqH(q,p_), p)
195
196	def Impl_q(self,q,p,δ):
197		"""
198		Implicit time step for the position q.
199		"""
200		return self.impl_solver(lambda q_:q-δ*self.DqH(q_,p), q)
201
202	def Impl2_p(self,q,p,δ_before,δ_total,δ_after):
203		"""
204		Merge two implicit time steps for the impulsion p, with a damping step in between.
205		"""
206		#Default implementation, without optimization (i.e. merging of the Impl_p steps)
207		p   = self.Impl_p(q,p,δ_before)
208		self.read_q(self,q); self.read_p(self,p) # Read position and momentum before damp
209		q,p = self.Damp_qp(q,p,δ_total)
210		self.incr_q(self,q); self.incr_p(self,p) # Modify position and momentum after damp
211		p   = self.Impl_p(q,p,δ_after)
212		return q,p
213
214	# ---------------- Symplectic schemes ---------------
215
216	def Euler_p(self,q,p,δ,niter=1):
217		"""niter time steps of the symplectic Euler scheme, starting with impulsion p update."""
218		for i in range(niter):
219			self.current_iter = i
220			p=self.Impl_p(q,p,δ)
221			q=self.Expl_q(q,p,δ)
222			if i!=niter-1:
223				self.read_q(self,q); self.read_p(self,p) 
224				self.Damp_qp(q,p,δ) 
225				self.incr_q(self,q); self.incr_p(self,p)
226		return q,p
227
228	def Euler_q(self,q,p,δ,niter=1):
229		"""niter time steps of the symplectic Euler scheme, starting with position q update."""
230		for i in range(niter):
231			self.current_iter = i
232			q=self.Impl_q(q,p,δ)
233			p=self.Expl_p(q,p,δ)
234			if i!=niter-1:
235				self.read_q(self,q); self.read_p(self,p) 
236				self.Damp_qp(q,p,δ) 
237				self.incr_q(self,q); self.incr_p(self,p)
238		return q,p
239
240	def Verlet_p(self,q,p,δ,niter=1):
241		"""
242		niter time steps of the symplectic Verlet scheme.
243		Optional damping steps interleaved.
244		"""
245		=δ/2
246		for i in range(niter):
247			self.current_iter = i
248			if i==0: p=self.Impl_p(q,p,)
249			q=self.Expl_q(q,p,δ)
250			if i==niter-1: p=self.Impl_p(q,p,)
251			else: q,p=self.Impl2_p(q,p,,δ,)
252		return q,p
253
254	def Ruth4_p(self,q,p,δ,niter=1):
255		"""
256		niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia.
257		Optional damping steps interleaved.
258		"""
259		t = 2.**(1./3.)
260		c1 = 1/(2*(2-t)); c2 = (1-t)*c1; 
261		d1 = 2*c1; d2 = -t*d1;
262		for i in range(niter):
263			self.current_iter = i
264			if i==0: p=self.Impl_p(q,p,c1*δ)
265			q=self.Expl_q(q,p,d1*δ)
266			p=self.Impl_p(q,p,c2*δ)
267			q=self.Expl_q(q,p,d2*δ)
268			p=self.Impl_p(q,p,c2*δ)
269			q=self.Expl_q(q,p,d1*δ)
270			if i==niter-1: p=self.Impl_p(q,p,c1*δ) 
271			else: q,p=self.Impl2_p(q,p,c1*δ,δ,c1*δ)
272		return q,p
273
274	def Sympl_p(self,q,p,δ,niter=1,order=2):
275		"""
276		niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, 
277		depending on the order parameter.
278		"""
279		if order==1: return self.Euler_p(q,p,δ,niter)
280		if order==2: return self.Verlet_p(q,p,δ,niter)
281		if order==4: return self.Ruth4_p( q,p,δ,niter)
282		raise ValueError(f"Found {order=}, while expecting 1,2 or 4.")
283
284	def symplectic_schemes(self):
285		return {'Euler-p':self.Euler_p,'Euler-q':self.Euler_q,'Verlet-p':self.Verlet_p,
286		'Ruth4-p':self.Ruth4_p}
287
288	# ------ Routines for backpropagation ------
289
290	def _Sympl_p(self,q,p,δ,order=2):
291		"""
292		Returns a callable, which computes any given iteration of Sympl_p, and saves appropriate 
293		keypoints for computational and memory efficiency.
294		"""
295		# TODO : remove since probably useless ? (Only use case is backprop in seismogram)
296
297		# The main objective of this routine is to preserve accuracy when damp_p and damp_q are set.
298		# Otherwise, the iterations may be reversed by iterating the symplectic scheme 
299		# in negative time, providing a result with similar accuracy. 
300		def next(qp): # A single time step, including a preliminary damping
301			qp = self.Damp_qp(*qp,δ)
302			return self.Sympl_p(*qp,δ,order=order)
303		from .backtrack import RecurseRewind
304		return RecurseRewind(next,self.Damp_qp(q,p,-δ)) # A single negative damping step should be fine...
305
306	def seismogram(self,q,p,δ,niter,order=2,qh_ind=None,ph_ind=None):
307
308		H_fwd = copy(self)
309		H_fwd.read_q = H_fwd.read_p = read_None; H_fwd.incr_q = H_fwd.incr_p = incr_None
310		qh = []; ph = []; initial=True 
311		if qh_ind is not None: H_fwd.read_q = lambda _,q : qh.append(q.reshape(-1)[qh_ind])
312		if ph_ind is not None: H_fwd.read_p = lambda _,p : ph.append(p.reshape(-1)[ph_ind])
313		qf,pf = H_fwd.Sympl_p(q,p,δ,niter,order)
314
315		return qf,pf,ad.asarray(qh),ad.asarray(ph)

Base class for Hamiltonians.

Default initialized fields :

  • impl_solver : Fixed point solver to be used for the implicit time steps
  • preserve_q, preserve_p : Wether updates modify q and p in place
  • damp_q, damp_p : damping factor between scheme iterations
  • read_q, read_p : called before each damping step, for reading q and p
  • incr_q, incr_p : called after each damping step, as a source term for q and p
impl_solver
preserve_q
preserve_p
Impl2_p_merged
damp_q
damp_p
read_q
read_p
incr_q
incr_p
current_iter
def H(self, q, p):
60	def H(self,q,p):
61		"""Evaluates the Hamiltonian, at a given position and impulsion."""
62		raise NotImplementedError

Evaluates the Hamiltonian, at a given position and impulsion.

def DqH(q, p):
64	def DqH(q,p):
65		"""Differentiates the Hamiltonian, w.r.t. position."""
66		raise NotImplementedError

Differentiates the Hamiltonian, w.r.t. position.

def DpH(q, p):
68	def DpH(q,p):
69		"""Differentiates the Hamiltonian, w.r.t. impulsion."""
70		raise NotImplementedError

Differentiates the Hamiltonian, w.r.t. impulsion.

def flow(self, q, p):
72	def flow(self,q,p):
73		"""
74		Symplectic gradient of the Hamiltonian.
75		"""
76		return (self.DpH(q,p),-self.DqH(q,p))

Symplectic gradient of the Hamiltonian.

def flow_cat(self, qp, t=None):
78	def flow_cat(self,qp,t=None):
79		"""
80		Symplectic gradient of the hamiltonian, intended for odeint. 
81
82		Input : 
83		 - qp : position q, impulsion p, flattened and concatenated.
84		 - t : ignored parameter (compatibility with scipy.integrate.odeint)
85		 - shape : how to reshape q and p for calling DpH and DqH, if needed
86
87		Output :
88		 - symplectic gradient, concatenated and flattened.
89		"""
90		d = len(qp)//2
91		q = qp[:d]; p = qp[d:]
92		if hasattr(self,'shape_free') and self.shape_free is not None:
93			q = q.reshape((*self.shape_free,-1))
94			p = p.reshape((*self.shape_free,-1))
95		return np.concatenate(self.flow(q,p),axis=0).reshape(-1)

Symplectic gradient of the hamiltonian, intended for odeint.

Input :

  • qp : position q, impulsion p, flattened and concatenated.
  • t : ignored parameter (compatibility with scipy.integrate.odeint)
  • shape : how to reshape q and p for calling DpH and DqH, if needed

Output :

  • symplectic gradient, concatenated and flattened.
def integrate(self, q, p, scheme, niter=None, dt=None, T=None, path=False):
 97	def integrate(self,q,p,scheme,niter=None,dt=None,T=None,path=False):
 98		"""
 99		Solves Hamilton's equations by running the scheme niter times.
100
101		Inputs : 
102		 - q,p : Initial position and impulsion.
103		 - scheme : ODE integration scheme. (string or callable)
104		 - niter,dt,T : number of steps, time step, and total time
105			  (exactly two among the three must be specified)
106		 - path : wether to return the intermediate steps. 
107			   (If a positive number, period of intermediate steps to return)
108		
109		Output : 
110		 - q,p if path is False. 
111		  Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], 
112		  	with n=niter, tn=T.
113		"""
114		if isinstance(scheme,str):
115			schemes = self.nonsymplectic_schemes()
116			schemes.update(self.symplectic_schemes())
117			scheme = schemes[scheme]
118
119		if (niter is None) + (T is None) + (dt is None) != 1: 
120			raise ValueError("Exactly two of niter, dt and T must be specified")
121		if T is None:    T  = niter*dt
122		elif dt is None: dt = T/niter
123		elif niter is None: 
124			niter = int(np.ceil(T/dt))
125			dt = T/niter # slightly decreased time step
126
127		q,p = copy(q),copy(p)
128		if path: Q,P = [copy(q)],[copy(p)]
129
130		for i in range(niter):
131			q,p = scheme(q,p,dt)
132			if path and not i%path: Q.append(copy(q)); P.append(copy(p))
133
134		if path: return np.stack(Q,axis=-1),np.stack(P,axis=-1),np.linspace(0,T,niter+1)
135		else: return q,p

Solves Hamilton's equations by running the scheme niter times.

Inputs :

  • q,p : Initial position and impulsion.
  • scheme : ODE integration scheme. (string or callable)
  • niter,dt,T : number of steps, time step, and total time (exactly two among the three must be specified)
  • path : wether to return the intermediate steps. (If a positive number, period of intermediate steps to return)

Output :

  • q,p if path is False. Otherwise np.stack([q0,...,qn],axis=-1),np.stack([p0,...,pn],axis=-1),[t0,..tn], with n=niter, tn=T.
def nonsymplectic_schemes(self):
137	def nonsymplectic_schemes(self):
138		"""
139		Standard ODE integration schemes
140		"""
141		def Euler(q,p,dt):
142			dq,dp = self.flow(q,p)
143			return q+dt*dq, p+dt*dp
144
145		def RK2(q,p,dt):
146			dq1,dp1 = self.flow(q, p)
147			dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1)
148			return q+dt*dq2, p+dt*dp2
149
150		def RK4(q,p,dt):
151			dq1,dp1 = self.flow(q, p)
152			dq2,dp2 = self.flow(q+0.5*dt*dq1, p+0.5*dt*dp1)
153			dq3,dp3 = self.flow(q+0.5*dt*dq2, p+0.5*dt*dp2)
154			dq4,dp4 = self.flow(q+dt*dq3, p+dt*dp3)
155			return q+dt*(dq1+2*dq2+2*dq3+dq4)/6., p+dt*(dp1+2*dp2+2*dp3+dp4)/6.
156
157		return {"Euler":Euler,"Runge-Kutta-2":RK2,"Runge-Kutta-4":RK4}

Standard ODE integration schemes

def Expl_q(self, q, p, δ):
165	def Expl_q(self,q,p,δ):
166		"""Explicit time step for the position q."""
167		if self.preserve_q: q = self._mk_copy(q)
168		q += δ*self.DpH(q,p)
169		return q 

Explicit time step for the position q.

def Expl_p(self, q, p, δ):
170	def Expl_p(self,q,p,δ):
171		"""Explicit time step for the impulsion p."""
172		if self.preserve_p: p = self._mk_copy(p)
173		p -= δ*self.DqH(q,p)
174		return p 

Explicit time step for the impulsion p.

def Damp_qp(self, q, p, δ):
176	def Damp_qp(self,q,p,δ):
177		"""
178		Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : 
179		p *= exp(-δ*damp_p); q *= exp(-δ*damp_q)
180		Please set damp_q and damp_p as appropriate.
181		"""
182		if self.damp_q is not damp_None:
183			if self.preserve_q: q = self._mk_copy(q)
184			q *= np.exp(-δ*self.damp_q) # Maybe we can avoid recomputing these exponentials
185		if self.damp_p is not damp_None:
186			if self.preserve_p: p = self._mk_copy(p)
187			p *= np.exp(-δ*self.damp_p)
188		return q,p

Optional damping step, interleaved in between Verlet and Ruth4 steps, equivalent to : p = exp(-δdamp_p); q = exp(-δdamp_q) Please set damp_q and damp_p as appropriate.

def Impl_p(self, q, p, δ):
190	def Impl_p(self,q,p,δ):
191		"""
192		Implicit time step for the impulsion p.
193		"""
194		return self.impl_solver(lambda p_:p-δ*self.DqH(q,p_), p)

Implicit time step for the impulsion p.

def Impl_q(self, q, p, δ):
196	def Impl_q(self,q,p,δ):
197		"""
198		Implicit time step for the position q.
199		"""
200		return self.impl_solver(lambda q_:q-δ*self.DqH(q_,p), q)

Implicit time step for the position q.

def Impl2_p(self, q, p, δ_before, δ_total, δ_after):
202	def Impl2_p(self,q,p,δ_before,δ_total,δ_after):
203		"""
204		Merge two implicit time steps for the impulsion p, with a damping step in between.
205		"""
206		#Default implementation, without optimization (i.e. merging of the Impl_p steps)
207		p   = self.Impl_p(q,p,δ_before)
208		self.read_q(self,q); self.read_p(self,p) # Read position and momentum before damp
209		q,p = self.Damp_qp(q,p,δ_total)
210		self.incr_q(self,q); self.incr_p(self,p) # Modify position and momentum after damp
211		p   = self.Impl_p(q,p,δ_after)
212		return q,p

Merge two implicit time steps for the impulsion p, with a damping step in between.

def Euler_p(self, q, p, δ, niter=1):
216	def Euler_p(self,q,p,δ,niter=1):
217		"""niter time steps of the symplectic Euler scheme, starting with impulsion p update."""
218		for i in range(niter):
219			self.current_iter = i
220			p=self.Impl_p(q,p,δ)
221			q=self.Expl_q(q,p,δ)
222			if i!=niter-1:
223				self.read_q(self,q); self.read_p(self,p) 
224				self.Damp_qp(q,p,δ) 
225				self.incr_q(self,q); self.incr_p(self,p)
226		return q,p

niter time steps of the symplectic Euler scheme, starting with impulsion p update.

def Euler_q(self, q, p, δ, niter=1):
228	def Euler_q(self,q,p,δ,niter=1):
229		"""niter time steps of the symplectic Euler scheme, starting with position q update."""
230		for i in range(niter):
231			self.current_iter = i
232			q=self.Impl_q(q,p,δ)
233			p=self.Expl_p(q,p,δ)
234			if i!=niter-1:
235				self.read_q(self,q); self.read_p(self,p) 
236				self.Damp_qp(q,p,δ) 
237				self.incr_q(self,q); self.incr_p(self,p)
238		return q,p

niter time steps of the symplectic Euler scheme, starting with position q update.

def Verlet_p(self, q, p, δ, niter=1):
240	def Verlet_p(self,q,p,δ,niter=1):
241		"""
242		niter time steps of the symplectic Verlet scheme.
243		Optional damping steps interleaved.
244		"""
245		=δ/2
246		for i in range(niter):
247			self.current_iter = i
248			if i==0: p=self.Impl_p(q,p,)
249			q=self.Expl_q(q,p,δ)
250			if i==niter-1: p=self.Impl_p(q,p,)
251			else: q,p=self.Impl2_p(q,p,,δ,)
252		return q,p

niter time steps of the symplectic Verlet scheme. Optional damping steps interleaved.

def Ruth4_p(self, q, p, δ, niter=1):
254	def Ruth4_p(self,q,p,δ,niter=1):
255		"""
256		niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia.
257		Optional damping steps interleaved.
258		"""
259		t = 2.**(1./3.)
260		c1 = 1/(2*(2-t)); c2 = (1-t)*c1; 
261		d1 = 2*c1; d2 = -t*d1;
262		for i in range(niter):
263			self.current_iter = i
264			if i==0: p=self.Impl_p(q,p,c1*δ)
265			q=self.Expl_q(q,p,d1*δ)
266			p=self.Impl_p(q,p,c2*δ)
267			q=self.Expl_q(q,p,d2*δ)
268			p=self.Impl_p(q,p,c2*δ)
269			q=self.Expl_q(q,p,d1*δ)
270			if i==niter-1: p=self.Impl_p(q,p,c1*δ) 
271			else: q,p=self.Impl2_p(q,p,c1*δ,δ,c1*δ)
272		return q,p

niter time steps of the Ruth 1983 4th order symplectic scheme, as of Wikipedia. Optional damping steps interleaved.

def Sympl_p(self, q, p, δ, niter=1, order=2):
274	def Sympl_p(self,q,p,δ,niter=1,order=2):
275		"""
276		niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, 
277		depending on the order parameter.
278		"""
279		if order==1: return self.Euler_p(q,p,δ,niter)
280		if order==2: return self.Verlet_p(q,p,δ,niter)
281		if order==4: return self.Ruth4_p( q,p,δ,niter)
282		raise ValueError(f"Found {order=}, while expecting 1,2 or 4.")

niter steps of the Euler_p, Verlet_p or Ruth4_p symplectic scheme, depending on the order parameter.

def symplectic_schemes(self):
284	def symplectic_schemes(self):
285		return {'Euler-p':self.Euler_p,'Euler-q':self.Euler_q,'Verlet-p':self.Verlet_p,
286		'Ruth4-p':self.Ruth4_p}
def seismogram(self, q, p, δ, niter, order=2, qh_ind=None, ph_ind=None):
306	def seismogram(self,q,p,δ,niter,order=2,qh_ind=None,ph_ind=None):
307
308		H_fwd = copy(self)
309		H_fwd.read_q = H_fwd.read_p = read_None; H_fwd.incr_q = H_fwd.incr_p = incr_None
310		qh = []; ph = []; initial=True 
311		if qh_ind is not None: H_fwd.read_q = lambda _,q : qh.append(q.reshape(-1)[qh_ind])
312		if ph_ind is not None: H_fwd.read_p = lambda _,p : ph.append(p.reshape(-1)[ph_ind])
313		qf,pf = H_fwd.Sympl_p(q,p,δ,niter,order)
314
315		return qf,pf,ad.asarray(qh),ad.asarray(ph)