# This file is part of QuTiP: Quantum Toolbox in Python.
#
# Copyright (c) 2011 and later, Paul D. Nation and Robert J. Johansson.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the QuTiP: Quantum Toolbox in Python nor the names
# of its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
###############################################################################
"""Time-dependent Quantum Object (Qobj) class.
"""
__all__ = ['QobjEvo']
from qutip.qobj import Qobj
import qutip.settings as qset
from qutip.interpolate import Cubic_Spline
from scipy.interpolate import CubicSpline
from functools import partial, wraps
from types import FunctionType, BuiltinFunctionType
import numpy as np
from numbers import Number
from qutip.qobjevo_codegen import _compile_str_single, _compiled_coeffs
from qutip.cy.spmatfuncs import (cy_expect_rho_vec, cy_expect_psi, spmv, cy_spmm_tr)
from qutip.cy.cqobjevo import (CQobjCte, CQobjCteDense, CQobjEvoTd,
CQobjEvoTdMatched, CQobjEvoTdDense)
from qutip.cy.cqobjevo_factor import (InterCoeffT, InterCoeffCte,
InterpolateCoeff, StrCoeff)
import pickle
import sys
import scipy
import os
if qset.has_openmp:
from qutip.cy.openmp.cqobjevo_omp import (CQobjCteOmp, CQobjEvoTdOmp,
CQobjEvoTdMatchedOmp)
safePickle = False
if sys.platform == 'win32':
safePickle = True
def proj(x):
if np.isfinite(x):
return (x)
else:
return np.inf+0j
str_env = {
"sin": np.sin,
"cos": np.cos,
"tan": np.tan,
"asin": np.arcsin,
"acos": np.arccos,
"atan": np.arctan,
"pi": np.pi,
"sinh": np.sinh,
"cosh": np.cosh,
"tanh": np.tanh,
"asinh": np.arcsinh,
"acosh": np.arccosh,
"atanh": np.arctanh,
"exp": np.exp,
"log": np.log,
"log10": np.log10,
"erf": scipy.special.erf,
"zerf": scipy.special.erf,
"sqrt": np.sqrt,
"real": np.real,
"imag": np.imag,
"conj": np.conj,
"abs": np.abs,
"norm": lambda x: np.abs(x)**2,
"arg": np.angle,
"proj": proj,
"np": np,
"spe": scipy.special}
class _file_list:
"""
Contain temp a list .pyx to clean
"""
def __init__(self):
self.files = []
def add(self, file_):
self.files += [file_ + ".pyx"]
def clean(self):
for i, file_ in enumerate(self.files):
try:
os.remove(file_)
except:
pass
if not os.path.isfile(file_):
# Don't exist anymore
del self.files[i]
def __del__(self):
self.clean()
coeff_files = _file_list()
class _StrWrapper:
def __init__(self, code):
self.code = "_out = " + code
def __call__(self, t, args={}):
env = {"t": t}
env.update(args)
exec(self.code, str_env, env)
return env["_out"]
class _CubicSplineWrapper:
# Using scipy's CubicSpline since Qutip's one
# only accept linearly distributed tlist
def __init__(self, tlist, coeff):
self.coeff = coeff
self.tlist = tlist
self.func = CubicSpline(self.tlist, self.coeff)
def __call__(self, t, args={}):
return self.func([t])[0]
class _StateAsArgs:
# old with state (f(t, psi, args)) to new (args["state"] = psi)
def __init__(self, coeff_func):
self.coeff_func = coeff_func
def __call__(self, t, args={}):
return self.coeff_func(t, args["_state_vec"], args)
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# object for each time dependent element of the QobjEvo
# qobj : the Qobj of element ([*Qobj*, f])
# get_coeff : a callable that take (t, args) and return the coeff at that t
# coeff : The coeff as a string, array or function as provided by the user.
# type : flag for the type of coeff
class EvoElement:
def __init__(self, qobj, get_coeff, coeff, type):
self.qobj = qobj
self.get_coeff = get_coeff
self.coeff = coeff
self.type = type
@classmethod
def make(cls, list_):
"""self.qobj = list_[0]
self.get_coeff = list_[1]
self.coeff = list_[2]
self.type = list_[3]"""
return cls(*list_)
def __getitem__(self, i):
if i == 0:
return self.qobj
if i == 1:
return self.get_coeff
if i == 2:
return self.coeff
if i == 3:
return self.type
[docs]class QobjEvo:
"""A class for representing time-dependent quantum objects,
such as quantum operators and states.
The QobjEvo class is a representation of time-dependent Qutip quantum
objects (Qobj). This class implements math operations :
+,- : QobjEvo, Qobj
* : Qobj, C-number
/ : C-number
and some common linear operator/state operations. The QobjEvo
are constructed from a nested list of Qobj with their time-dependent
coefficients. The time-dependent coefficients are either a funciton, a
string or a numpy array.
For function format, the function signature must be f(t, args).
*Examples*
def f1_t(t, args):
return np.exp(-1j * t * args["w1"])
def f2_t(t, args):
return np.cos(t * args["w2"])
H = QobjEvo([H0, [H1, f1_t], [H2, f2_t]], args={"w1":1., "w2":2.})
For string based coeffients, the string must be a compilable python code
resulting in a complex. The following symbols are defined:
sin cos tan asin acos atan pi
sinh cosh tanh asinh acosh atanh
exp log log10 erf zerf sqrt
real imag conj abs norm arg proj
numpy as np, and scipy.special as spe.
*Examples*
H = QobjEvo([H0, [H1, 'exp(-1j*w1*t)'], [H2, 'cos(w2*t)']],
args={"w1":1.,"w2":2.})
For numpy array format, the array must be an 1d of dtype float or complex.
A list of times (float64) at which the coeffients must be given (tlist).
The coeffients array must have the same len as the tlist.
The time of the tlist do not need to be equidistant, but must be sorted.
*Examples*
tlist = np.logspace(-5,0,100)
H = QobjEvo([H0, [H1, np.exp(-1j*tlist)], [H2, np.cos(2.*tlist)]],
tlist=tlist)
args is a dict of (name:object). The name must be a valid variables string.
Some solvers support arguments that update at each call:
sesolve, mesolve, mcsolve:
state can be obtained with:
name+"=vec":Qobj => args[name] == state as 1D np.ndarray
name+"=mat":Qobj => args[name] == state as 2D np.ndarray
name+"=Qobj":Qobj => args[name] == state as Qobj
This Qobj is the initial value.
expectation values:
name+"=expect":O (Qobj/QobjEvo) => args[name] == expect(O, state)
expect is <phi|O|psi> or tr(state * O) depending on state dimensions
mcsolve:
collapse can be obtained with:
name+"=collapse":list => args[name] == list of collapse
each collapse will be appended to the list as (time, which c_ops)
Mixing the formats is possible, but not recommended.
Mixing tlist will cause problem.
Parameters
----------
QobjEvo(Q_object=[], args={}, tlist=None)
Q_object : array_like
Data for vector/matrix representation of the quantum object.
args : dictionary that contain the arguments for
tlist : array_like
List of times at which the numpy-array coefficients are applied. Times
must be equidistant and start from 0.
Attributes
----------
cte : Qobj
Constant part of the QobjEvo
ops : list
List of Qobj and the coefficients.
[(Qobj, coefficient as a function, original coefficient,
type, local arguments ), ... ]
type :
1: function
2: string
3: np.array
4: Cubic_Spline
args : map
arguments of the coefficients
tlist : array_like
List of times at which the numpy-array coefficients are applied.
compiled : int
Has the cython version of the QobjEvo been created
compiled_qobjevo : cy_qobj (CQobjCte or CQobjEvoTd)
Cython version of the QobjEvo
dummy_cte : bool
is self.cte a dummy Qobj
const : bool
Indicates if quantum object is Constant
type : int
information about the type of coefficient
"string", "func", "array",
"spline", "mixed_callable", "mixed_compilable"
num_obj : int
number of Qobj in the QobjEvo : len(ops) + (1 if not dummy_cte)
Methods
-------
copy() :
Create copy of Qobj
arguments(new_args):
Update the args of the object
Math:
+/- QobjEvo, Qobj, scalar:
Addition is possible between QobjEvo and with Qobj or scalar
-:
Negation operator
* Qobj, scalar:
Product is possible with Qobj or scalar
/ scalar:
It is possible to divide by scalar only
conj()
Return the conjugate of quantum object.
dag()
Return the adjoint (dagger) of quantum object.
trans()
Return the transpose of quantum object.
norm()
Return self.dag() * self.
Only possible if num_obj == 1
permute(order)
Returns composite qobj with indices reordered.
ptrace(sel)
Returns quantum object for selected dimensions after performing
partial trace.
apply(f, *args, **kw_args)
Apply the function f to every Qobj. f(Qobj) -> Qobj
Return a modified QobjEvo and let the original one untouched
apply_decorator(decorator, *args, str_mod=None,
inplace_np=False, **kw_args):
Apply the decorator to each function of the ops.
The *args and **kw_args are passed to the decorator.
new_coeff_function = decorator(coeff_function, *args, **kw_args)
str_mod : list of 2 elements
replace the string : str_mod[0] + original_string + str_mod[1]
*exemple: str_mod = ["exp(",")"]
inplace_np:
Change the numpy array instead of applying the decorator to the
function reading the array. Some decorators create incorrect array.
Transformations f'(t) = f(g(t)) create a missmatch between the
array and the associated time list.
tidyup(atol=1e-12)
Removes small elements from quantum object.
compress():
Merge ops which are based on the same quantum object and coeff type.
compile(code=False, matched=False, dense=False, omp=0):
Create the associated cython object for faster usage.
code: return the code generated for compilation of the strings.
matched: the compiled object use sparse matrix with matching indices.
(experimental, no real advantage)
dense: the compiled object use dense matrix.
omp: (int) number of thread: the compiled object use spmvpy_openmp.
__call__(t, data=False, state=None, args={}):
Return the Qobj at time t.
*Faster after compilation
mul_mat(t, mat):
Product of this at t time with the dense matrix mat.
*Faster after compilation
mul_vec(t, psi):
Apply the quantum object (if operator, no check) to psi.
More generaly, return the product of the object at t with psi.
*Faster after compilation
expect(t, psi, herm=False):
Calculates the expectation value for the quantum object (if operator,
no check) and state psi.
Return only the real part if herm.
*Faster after compilation
to_list():
Return the time-dependent quantum object as a list
"""
def __init__(self, Q_object=[], args={}, tlist=None, copy=True):
if isinstance(Q_object, QobjEvo):
if copy:
self._inplace_copy(Q_object)
else:
self.__dict__ = Q_object.__dict__
if args:
self.arguments(args)
return
self.const = False
self.dummy_cte = False
self.args = args.copy() if copy else args
self.dynamics_args = []
self.cte = None
self.tlist = tlist
self.compiled = ""
self.compiled_qobjevo = None
self.compiled_ptr = None
self.coeff_get = None
self.type = "none"
self.omp = 0
if isinstance(Q_object, list) and len(Q_object) == 2:
if isinstance(Q_object[0], Qobj) and not isinstance(Q_object[1],
(Qobj, list)):
# The format is [Qobj, f/str]
Q_object = [Q_object]
op_type = self._td_format_check_single(Q_object, tlist)
self.ops = []
if isinstance(op_type, int):
if op_type == 0:
self.cte = Q_object
self.const = True
self.type = "cte"
elif op_type == 1:
raise Exception("The Qobj must not already be a function")
elif op_type == -1:
pass
else:
op_type_count = [0, 0, 0, 0]
for type_, op in zip(op_type, Q_object):
if type_ == 0:
if self.cte is None:
self.cte = op
else:
self.cte += op
elif type_ == 1:
op_type_count[0] += 1
self.ops.append(EvoElement(op[0], op[1], op[1], "func"))
elif type_ == 2:
op_type_count[1] += 1
self.ops.append(EvoElement(op[0], _StrWrapper(op[1]),
op[1], "string"))
elif type_ == 3:
op_type_count[2] += 1
self.ops.append(EvoElement(op[0], _CubicSplineWrapper(tlist, op[1]),
op[1].copy(), "array"))
elif type_ == 4:
op_type_count[3] += 1
self.ops.append(EvoElement(op[0], op[1], op[1], "spline"))
nops = sum(op_type_count)
if op_type_count[0] == nops:
self.type = "func"
elif op_type_count[1] == nops:
self.type = "string"
elif op_type_count[2] == nops:
self.type = "array"
elif op_type_count[3] == nops:
self.type = "spline"
elif op_type_count[0]:
self.type = "mixed_callable"
else:
self.type = "mixed_compilable"
try:
if not self.cte:
self.cte = self.ops[0].qobj
# test is all qobj are compatible (shape, dims)
for op in self.ops[1:]:
self.cte += op.qobj
self.cte *= 0.
self.dummy_cte = True
else:
cte_copy = self.cte.copy()
# test is all qobj are compatible (shape, dims)
for op in self.ops:
cte_copy += op.qobj
except Exception as e:
raise TypeError("Qobj not compatible.") from e
if not self.ops:
self.const = True
self.num_obj = (len(self.ops) if self.dummy_cte else len(self.ops) + 1)
self._args_checks()
def _td_format_check_single(self, Q_object, tlist=None):
op_type = []
if isinstance(Q_object, Qobj):
op_type = 0
elif isinstance(Q_object, (FunctionType,
BuiltinFunctionType, partial)):
op_type = 1
elif isinstance(Q_object, list):
if (len(Q_object) == 0):
op_type = -1
for op_k in Q_object:
if isinstance(op_k, Qobj):
op_type.append(0)
elif isinstance(op_k, list):
if not isinstance(op_k[0], Qobj):
raise TypeError("Incorrect Q_object specification")
elif len(op_k) == 2:
if isinstance(op_k[1], Cubic_Spline):
op_type.append(4)
elif callable(op_k[1]):
op_type.append(1)
elif isinstance(op_k[1], str):
op_type.append(2)
elif isinstance(op_k[1], np.ndarray):
if not isinstance(tlist, np.ndarray) or not \
len(op_k[1]) == len(tlist):
raise TypeError("Time list does not match")
op_type.append(3)
else:
raise TypeError("Incorrect Q_object specification")
else:
raise TypeError("Incorrect Q_object specification")
else:
raise TypeError("Incorrect Q_object specification")
return op_type
def _args_checks(self, update=False):
to_remove = []
to_add = {}
for key in self.args:
if "=" in key:
name, what = key.split("=")
if what in ["Qobj", "vec", "mat"]:
# state first, expect last
if not update:
self.dynamics_args = [(name, what, None)] + self.dynamics_args
if name not in self.args:
if isinstance(self.args[key], Qobj):
if what == "Qobj":
to_add[name] = self.args[key]
elif what == "mat":
to_add[name] = self.args[key].full()
else:
to_add[name] = self.args[key].full().ravel("F")
else:
if what == "Qobj":
to_add[name] = Qobj(dims=[self.cte.dims[1],[1]])
elif what == "mat":
to_add[name] = np.zeros((self.cte.shape[1],1))
else:
to_add[name] = np.zeros((self.cte.shape[1]))
elif what == "expect":
if isinstance(self.args[key], QobjEvo):
expect_op = self.args[key]
else:
expect_op = QobjEvo(self.args[key], copy=False)
if update:
for ops in self.dynamics_args:
if ops[0] == name:
ops = (name, what, expect_op)
else:
self.dynamics_args += [(name, what, expect_op)]
if name not in self.args:
to_add[name] = 0.
else:
raise Exception("Could not understand dynamics args: " +
what + "\nSupported dynamics args: "
"Qobj, csr, vec, mat, expect")
to_remove.append(key)
for key in to_remove:
del self.args[key]
self.args.update(to_add)
def _check_old_with_state(self):
add_vec = False
for op in self.ops:
if op.type == "func":
try:
op.get_coeff(0., self.args)
except TypeError as e:
nfunc = _StateAsArgs(self.coeff)
op = EvoElement((op.qobj, nfunc, nfunc, "func"))
add_vec = True
if add_vec:
self.dynamics_args += [("_state_vec", "vec", None)]
def __del__(self):
# sometime not called
coeff_files.clean()
def __call__(self, t, data=False, state=None, args={}):
try:
t = float(t)
except Exception as e:
raise TypeError("t should be a real scalar.") from e
if state is not None:
self._dynamics_args_update(t, state)
if args:
if not isinstance(args, dict):
raise TypeError("The new args must be in a dict")
old_args = self.args.copy()
old_compiled = self.compiled
self.compiled = False
self.args.update(args)
op_t = self.__call__(t, data=data)
self.args = old_args
self.compiled = old_compiled
elif self.const:
if data:
op_t = self.cte.data.copy()
else:
op_t = self.cte.copy()
elif self.compiled and self.compiled.split()[0] != "dense":
op_t = self.compiled_qobjevo.call(t, data)
elif data:
op_t = self.cte.data.copy()
for part in self.ops:
op_t += part.qobj.data * part.get_coeff(t, self.args)
else:
op_t = self.cte.copy()
for part in self.ops:
op_t += part.qobj * part.get_coeff(t, self.args)
return op_t
def _dynamics_args_update(self, t, state):
if isinstance(state, Qobj):
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state.full().ravel("F")
elif what == "mat":
self.args[name] = state.full()
elif what == "Qobj":
self.args[name] = state
elif what == "expect":
self.args[name] = op.expect(t, state)
elif isinstance(state, np.ndarray) and state.ndim == 1:
s1 = self.cte.shape[1]
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state
elif what == "expect":
self.args[name] = op.expect(t, state)
elif state.shape[0] == s1 and self.cte.issuper:
new_l = int(np.sqrt(s1))
mat = state.reshape((new_l, new_l), order="F")
if what == "mat":
self.args[name] = mat
elif what == "Qobj":
self.args[name] = Qobj(mat, dims=self.cte.dims[1])
elif state.shape[0] == s1:
mat = state.reshape((-1,1))
if what == "mat":
self.args[name] = mat
elif what == "Qobj":
self.args[name] = Qobj(mat, dims=[self.cte.dims[1],[1]])
elif state.shape[0] == s1*s1:
new_l = int(np.sqrt(s1))
mat = state.reshape((new_l, new_l), order="F")
if what == "mat":
self.args[name] = mat
elif what == "Qobj":
self.args[name] = Qobj(mat, dims=[self.cte.dims[1], self.cte.dims[1]])
elif isinstance(state, np.ndarray) and state.ndim == 2:
s1 = self.cte.shape[1]
new_l = int(np.sqrt(s1))
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state.ravel("F")
elif what == "mat":
self.args[name] = state
elif what == "expect":
self.args[name] = op.expect(t, state)
elif state.shape[1] == 1:
self.args[name] = Qobj(state, dims=[self.cte.dims[1],[1]])
elif state.shape[1] == s1:
self.args[name] = Qobj(state, dims=self.cte.dims)
else:
self.args[name] = Qobj(state)
else:
raise TypeError("state must be a Qobj or np.ndarray")
def copy(self):
new = QobjEvo(self.cte.copy())
new.const = self.const
new.args = self.args.copy()
new.dynamics_args = self.dynamics_args.copy()
new.tlist = self.tlist
new.dummy_cte = self.dummy_cte
new.num_obj = self.num_obj
new.type = self.type
new.compiled = False
new.compiled_qobjevo = None
new.compiled_ptr = None
new.coeff_get = None
new.coeff_files = []
for op in self.ops:
if op.type == "array":
new_coeff = op.coeff.copy()
else:
new_coeff = op.coeff
new.ops.append(EvoElement(op.qobj.copy(), op.get_coeff,
new_coeff, op.type))
return new
def _inplace_copy(self, other):
self.cte = other.cte
self.const = other.const
self.args = other.args.copy()
self.dynamics_args = other.dynamics_args
self.tlist = other.tlist
self.dummy_cte = other.dummy_cte
self.num_obj = other.num_obj
self.type = other.type
self.compiled = ""
self.compiled_qobjevo = None
self.compiled_ptr = None
self.coeff_get = None
self.ops = []
for op in other.ops:
if op.type == "array":
new_coeff = op.coeff.copy()
else:
new_coeff = op.coeff
self.ops.append(EvoElement(op.qobj.copy(), op.get_coeff,
new_coeff, op.type))
def arguments(self, args):
if not isinstance(args, dict):
raise TypeError("The new args must be in a dict")
self.args.update(args)
self._args_checks(True)
if self.compiled and self.compiled.split()[2] is not "cte":
if isinstance(self.coeff_get, StrCoeff):
self.coeff_get.set_args(self.args)
self.coeff_get._set_dyn_args(self.dynamics_args)
elif isinstance(self.coeff_get, _UnitedFuncCaller):
self.coeff_get.set_args(self.args, self.dynamics_args)
else:
pass
def to_list(self):
list_qobj = []
if not self.dummy_cte:
list_qobj.append(self.cte)
for op in self.ops:
list_qobj.append([op.qobj, op.coeff])
return list_qobj
# Math function
def __add__(self, other):
res = self.copy()
res += other
return res
def __radd__(self, other):
res = self.copy()
res += other
return res
def __iadd__(self, other):
if isinstance(other, QobjEvo):
self.cte += other.cte
l = len(self.ops)
for op in other.ops:
if op.type == "array":
new_coeff = op.coeff.copy()
else:
new_coeff = op.coeff
self.ops.append(EvoElement(op.qobj.copy(), op.get_coeff,
new_coeff, op.type))
l += 1
self.args.update(**other.args)
self.dynamics_args += other.dynamics_args
self.const = self.const and other.const
self.dummy_cte = self.dummy_cte and other.dummy_cte
if self.type != other.type:
if self.type in ["func", "mixed_callable"] or \
other.type in ["func", "mixed_callable"]:
self.type = "mixed_callable"
else:
self.type = "mixed_compilable"
self.compiled = ""
self.compiled_qobjevo = None
self.compiled_ptr = None
self.coeff_get = None
if self.tlist is None:
self.tlist = other.tlist
else:
if other.tlist is None:
pass
elif len(other.tlist) != len(self.tlist) or \
other.tlist[-1] != self.tlist[-1]:
raise Exception("tlist are not compatible")
else:
self.cte += other
self.dummy_cte = False
self.num_obj = (len(self.ops) if self.dummy_cte else len(self.ops) + 1)
self._reset_type()
return self
def __sub__(self, other):
res = self.copy()
res -= other
return res
def __rsub__(self, other):
res = -self.copy()
res += other
return res
def __isub__(self, other):
self += (-other)
return self
def __mul__(self, other):
res = self.copy()
res *= other
return res
def __rmul__(self, other):
res = self.copy()
if isinstance(other, Qobj):
res.cte = other * res.cte
for op in res.ops:
op.qobj = other * op.qobj
return res
else:
res *= other
return res
def __imul__(self, other):
if isinstance(other, Qobj) or isinstance(other, Number):
self.cte *= other
for op in self.ops:
op.qobj *= other
elif isinstance(other, QobjEvo):
if other.const:
self.cte *= other.cte
for op in self.ops:
op.qobj *= other.cte
elif self.const:
cte = self.cte.copy()
self = other.copy()
self.cte = cte * self.cte
for op in self.ops:
op.qobj = cte*op.qobj
else:
cte = self.cte.copy()
self.cte *= other.cte
new_terms = []
old_ops = self.ops
if not other.dummy_cte:
for op in old_ops:
new_terms.append(self._ops_mul_cte(op, other.cte, "R"))
if not self.dummy_cte:
for op in other.ops:
new_terms.append(self._ops_mul_cte(op, cte, "L"))
for op_left in old_ops:
for op_right in other.ops:
new_terms.append(self._ops_mul_(op_left,
op_right))
self.ops = new_terms
self.args.update(other.args)
self.dynamics_args += other.dynamics_args
self.dummy_cte = self.dummy_cte and other.dummy_cte
self.num_obj = (len(self.ops) if
self.dummy_cte else len(self.ops) + 1)
self._reset_type()
else:
raise TypeError("QobjEvo can only be multiplied"
" with QobjEvo, Qobj or numbers")
return self
def __div__(self, other):
if isinstance(other, (int, float, complex,
np.integer, np.floating, np.complexfloating)):
res = self.copy()
res *= other**(-1)
return res
else:
raise TypeError('Incompatible object for division')
def __idiv__(self, other):
if isinstance(other, (int, float, complex,
np.integer, np.floating, np.complexfloating)):
self *= other**(-1)
else:
raise TypeError('Incompatible object for division')
return self
def __truediv__(self, other):
return self.__div__(other)
def __neg__(self):
res = self.copy()
res.cte = -res.cte
for op in res.ops:
op.qobj = -op.qobj
return res
def _ops_mul_(self, opL, opR):
new_f = _Prod(opL.get_coeff, opR.get_coeff)
new_op = [opL.qobj*opR.qobj, new_f, None, 0]
if opL.type == opR.type and opL.type == "string":
new_op[2] = "(" + opL.coeff + ") * (" + opR.coeff + ")"
new_op[3] = "string"
elif opL[3] == opR[3] and opL[3] == "array":
new_op[2] = opL[2]*opR[2]
new_op[3] = "array"
else:
new_op[2] = new_f
new_op[3] = "func"
if self.type not in ["func", "mixed_callable"]:
self.type = "mixed_callable"
return EvoElement.make(new_op)
def _ops_mul_cte(self, op, cte, side):
new_op = [None, op.get_coeff, op.coeff, op.type]
if side == "R":
new_op[0] = op.qobj * cte
if side == "L":
new_op[0] = cte * op.qobj
return EvoElement.make(new_op)
# Transformations
def trans(self):
res = self.copy()
res.cte = res.cte.trans()
for op in res.ops:
op.qobj = op.qobj.trans()
return res
def conj(self):
res = self.copy()
res.cte = res.cte.conj()
for op in res.ops:
op.qobj = op.qobj.conj()
res._f_conj()
return res
def dag(self):
res = self.copy()
res.cte = res.cte.dag()
for op in res.ops:
op.qobj = op.qobj.dag()
res._f_conj()
return res
def _cdc(self):
"""return a.dag * a """
if not self.num_obj == 1:
res = self.dag()
res *= self
else:
res = self.copy()
res.cte = res.cte.dag() * res.cte
for op in res.ops:
op.qobj = op.qobj.dag() * op.qobj
res._f_norm2()
return res
# Unitary function of Qobj
def tidyup(self, atol=1e-12):
self.cte = self.cte.tidyup(atol)
for op in self.ops:
op.qobj = op.qobj.tidyup(atol)
return self
def _compress_make_set(self):
sets = []
callable_flags = ["func", "spline"]
for i, op1 in enumerate(self.ops):
already_matched = False
for _set in sets:
already_matched = already_matched or i in _set
if not already_matched:
this_set = [i]
for j, op2 in enumerate(self.ops[i+1:]):
if op1.qobj == op2.qobj:
same_flag = op1.type == op2.type
callable_1 = op1.type in callable_flags
callable_2 = op2.type in callable_flags
if (same_flag or (callable_1 and callable_2)):
this_set.append(j+i+1)
sets.append(this_set)
fsets = []
for i, op1 in enumerate(self.ops):
already_matched = False
for _set in fsets:
already_matched = already_matched or i in _set
if not already_matched:
this_set = [i]
for j, op2 in enumerate(self.ops[i+1:]):
if op1.type != op2.type:
pass
elif op1.type is "array":
if np.allclose(op1.coeff, op2.coeff):
this_set.append(j+i+1)
else:
if op1.coeff is op2.coeff:
this_set.append(j+i+1)
fsets.append(this_set)
return sets, fsets
def _compress_merge_qobj(self, sets):
callable_flags = ["func", "spline"]
new_ops = []
for _set in sets:
if len(_set) == 1:
new_ops.append(self.ops[_set[0]])
elif self.ops[_set[0]].type in callable_flags:
new_op = [self.ops[_set[0]].qobj, None, None, "func"]
fs = []
for i in _set:
fs += [self.ops[i].get_coeff]
new_op[1] = _Add(fs)
new_op[2] = new_op[1]
new_ops.append(EvoElement.make(new_op))
elif self.ops[_set[0]].type is "string":
new_op = [self.ops[_set[0]].qobj, None, None, "string"]
new_str = "(" + self.ops[_set[0]].coeff + ")"
for i in _set[1:]:
new_str += " + (" + self.ops[i].coeff + ")"
new_op[1] = _StrWrapper(new_str)
new_op[2] = new_str
new_ops.append(EvoElement.make(new_op))
elif self.ops[_set[0]].type is "array":
new_op = [self.ops[_set[0]].qobj, None, None, "array"]
new_array = (self.ops[_set[0]].coeff).copy()
for i in _set[1:]:
new_array += self.ops[i].coeff
new_op[2] = new_array
new_op[1] = _CubicSplineWrapper(self.tlist, new_array)
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
def _compress_merge_func(self, fsets):
new_ops = []
for _set in fsets:
base = self.ops[_set[0]]
new_op = [None, base.get_coeff, base.coeff, base.type]
if len(_set) == 1:
new_op[0] = base.qobj
else:
new_op[0] = base.qobj.copy()
for i in _set[1:]:
new_op[0] += self.ops[i].qobj
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
def compress(self):
self.tidyup()
sets, fsets = self._compress_make_set()
N_sets = len(sets)
N_fsets = len(fsets)
num_ops = len(self.ops)
if N_sets < num_ops and N_fsets < num_ops:
# Both could be better
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
if N_sets < N_fsets:
self._compress_merge_qobj(sets)
else:
self._compress_merge_func(fsets)
sets, fsets = self._compress_make_set()
N_sets = len(sets)
N_fsets = len(fsets)
num_ops = len(self.ops)
if N_sets < num_ops:
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
self._compress_merge_qobj(sets)
elif N_fsets < num_ops:
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
self._compress_merge_func(fsets)
self._reset_type()
def _reset_type(self):
op_type_count = [0, 0, 0, 0]
for op in self.ops:
if op.type == "func":
op_type_count[0] += 1
elif op.type == "string":
op_type_count[1] += 1
elif op.type == "array":
op_type_count[2] += 1
elif op.type == "spline":
op_type_count[3] += 1
nops = sum(op_type_count)
if op_type_count[0] == nops:
self.type = "func"
elif op_type_count[1] == nops:
self.type = "string"
elif op_type_count[2] == nops:
self.type = "array"
elif op_type_count[3] == nops:
self.type = "spline"
elif op_type_count[0]:
self.type = "mixed_callable"
else:
self.type = "mixed_compilable"
self.num_obj = (len(self.ops) if self.dummy_cte else len(self.ops) + 1)
def permute(self, order):
res = self.copy()
res.cte = res.cte.permute(order)
for op in res.ops:
op.qobj = op.qobj.permute(order)
return res
def ptrace(self, sel):
res = self.copy()
res.cte = res.cte.ptrace(sel)
for op in res.ops:
op.qobj = op.qobj.ptrace(sel)
return res
# function to apply custom transformations
def apply(self, function, *args, **kw_args):
self.compiled = ""
res = self.copy()
cte_res = function(res.cte, *args, **kw_args)
if not isinstance(cte_res, Qobj):
raise TypeError("The function must return a Qobj")
res.cte = cte_res
for op in res.ops:
op.qobj = function(op.qobj, *args, **kw_args)
return res
def apply_decorator(self, function, *args, **kw_args):
if "str_mod" in kw_args:
str_mod = kw_args["str_mod"]
del kw_args["str_mod"]
else:
str_mod = None
if "inplace_np" in kw_args:
inplace_np = kw_args["inplace_np"]
del kw_args["inplace_np"]
else:
inplace_np = None
res = self.copy()
for op in res.ops:
op.get_coeff = function(op.get_coeff, *args, **kw_args)
if op.type == ["func", "spline"]:
op.coeff = op.get_coeff
op.type = "func"
elif op.type == "string":
if str_mod is None:
op.coeff = op.get_coeff
op.type = "func"
else:
op.coeff = str_mod[0] + op.coeff + str_mod[1]
elif op.type == "array":
if inplace_np:
# keep the original function, change the array
def f(a):
return a
ff = function(f, *args, **kw_args)
for i, v in enumerate(op.coeff):
op.coeff[i] = ff(v)
op.get_coeff = _CubicSplineWrapper(self.tlist, op.coeff)
else:
op.coeff = op.get_coeff
op.type = "func"
if self.type == "string" and str_mod is None:
res.type = "mixed_callable"
elif self.type == "array" and not inplace_np:
res.type = "mixed_callable"
elif self.type == "spline":
res.type = "mixed_callable"
elif self.type == "mixed_compilable":
for op in res.ops:
if op.type == "func":
res.type = "mixed_callable"
return res
def _f_norm2(self):
new_ops = []
for op in self.ops:
new_op = [op.qobj, None, None, op.type]
if op.type == "func":
new_op[1] = _Norm2(op.get_coeff)
new_op[2] = new_op[1]
elif op.type == "string":
new_op[2] = "norm(" + op.coeff + ")"
new_op[1] = _StrWrapper(new_op[2])
elif op.type == "array":
new_op[2] = np.abs(op.coeff)**2
new_op[1] = _CubicSplineWrapper(self.tlist, new_op[2])
elif op.type == "spline":
new_op[1] = _Norm2(op.get_coeff)
new_op[2] = new_op[1]
new_op[3] = "func"
self.type = "mixed_callable"
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
return self
def _f_conj(self):
new_ops = []
for op in self.ops:
new_op = [op.qobj, None, None, op.type]
if op.type == "func":
new_op[1] = _Conj(op.get_coeff)
new_op[2] = new_op[1]
elif op.type == "string":
new_op[2] = "conj(" + op.coeff + ")"
new_op[1] = _StrWrapper(new_op[2])
elif op.type == "array":
new_op[2] = np.conj(op.coeff)
new_op[1] = _CubicSplineWrapper(self.tlist, new_op[2])
elif op.type == "spline":
new_op[1] = _Conj(op.get_coeff)
new_op[2] = new_op[1]
new_op[3] = "func"
self.type = "mixed_callable"
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
return self
def expect(self, t, state, herm=0):
if not isinstance(t, (int, float)):
raise TypeError("The time need to be a real scalar")
if isinstance(state, Qobj):
if self.cte.dims[1] == state.dims[0]:
vec = state.full().ravel("F")
elif self.cte.dims[1] == state.dims:
vec = state.full().ravel("F")
else:
raise Exception("Dimensions do not fit")
elif isinstance(state, np.ndarray):
vec = state.reshape((-1))
else:
raise TypeError("The vector must be an array or Qobj")
if vec.shape[0] == self.cte.shape[1]:
if self.compiled:
exp = self.compiled_qobjevo.expect(t, vec)
elif self.cte.issuper:
self._dynamics_args_update(t, state)
exp = cy_expect_rho_vec(self.__call__(t, data=True), vec, 0)
else:
self._dynamics_args_update(t, state)
exp = cy_expect_psi(self.__call__(t, data=True), vec, 0)
elif vec.shape[0] == self.cte.shape[1]**2:
if self.compiled:
exp = self.compiled_qobjevo.overlapse(t, vec)
else:
self._dynamics_args_update(t, state)
exp = (self.__call__(t, data=True) *
vec.reshape((self.cte.shape[1],
self.cte.shape[1]))).trace()
else:
raise Exception("The shapes do not match")
if herm:
return exp.real
else:
return exp
def mul_vec(self, t, vec):
was_Qobj = False
if not isinstance(t, (int, float)):
raise TypeError("the time need to be a real scalar")
if isinstance(vec, Qobj):
if self.cte.dims[1] != vec.dims[0]:
raise Exception("Dimensions do not fit")
was_Qobj = True
dims = vec.dims
vec = vec.full().ravel()
elif not isinstance(vec, np.ndarray):
raise TypeError("The vector must be an array or Qobj")
if vec.ndim != 1:
raise Exception("The vector must be 1d")
if vec.shape[0] != self.cte.shape[1]:
raise Exception("The length do not match")
if self.compiled:
out = self.compiled_qobjevo.mul_vec(t, vec)
else:
self._dynamics_args_update(t, vec)
out = spmv(self.__call__(t, data=True), vec)
if was_Qobj:
return Qobj(out, dims=dims)
else:
return out
def mul_mat(self, t, mat):
was_Qobj = False
if not isinstance(t, (int, float)):
raise TypeError("the time need to be a real scalar")
if isinstance(mat, Qobj):
if self.cte.dims[1] != mat.dims[0]:
raise Exception("Dimensions do not fit")
was_Qobj = True
dims = mat.dims
mat = mat.full()
if not isinstance(mat, np.ndarray):
raise TypeError("The vector must be an array or Qobj")
if mat.ndim != 2:
raise Exception("The matrice must be 2d")
if mat.shape[0] != self.cte.shape[1]:
raise Exception("The length do not match")
if self.compiled:
out = self.compiled_qobjevo.mul_mat(t, mat)
else:
self._dynamics_args_update(t, mat)
out = self.__call__(t, data=True) * mat
if was_Qobj:
return Qobj(out, dims=dims)
else:
return out
def compile(self, code=False, matched=False, dense=False, omp=0):
self.tidyup()
Code = None
if self.compiled:
return
for _, _, op in self.dynamics_args:
if isinstance(op, QobjEvo):
op.compile(code, matched, dense, omp)
if not qset.has_openmp:
omp = 0
if omp:
nnz = [self.cte.data.nnz]
for part in self.ops:
nnz += [part.qobj.data.nnz]
if all(qset.openmp_thresh < nz for nz in nnz):
omp = 0
if self.const:
if dense:
self.compiled_qobjevo = CQobjCteDense()
self.compiled = "dense single cte"
elif omp:
self.compiled_qobjevo = CQobjCteOmp()
self.compiled = "csr omp cte"
self.compiled_qobjevo.set_threads(omp)
self.omp = omp
else:
self.compiled_qobjevo = CQobjCte()
self.compiled = "csr single cte"
self.compiled_qobjevo.set_data(self.cte)
else:
if matched:
if omp:
self.compiled_qobjevo = CQobjEvoTdMatchedOmp()
self.compiled = "matched omp "
self.compiled_qobjevo.set_threads(omp)
self.omp = omp
else:
self.compiled_qobjevo = CQobjEvoTdMatched()
self.compiled = "matched single "
elif dense:
self.compiled_qobjevo = CQobjEvoTdDense()
self.compiled = "dense single "
elif omp:
self.compiled_qobjevo = CQobjEvoTdOmp()
self.compiled = "csr omp "
self.compiled_qobjevo.set_threads(omp)
self.omp = omp
else:
self.compiled_qobjevo = CQobjEvoTd()
self.compiled = "csr single "
self.compiled_qobjevo.set_data(self.cte, self.ops)
self.compiled_qobjevo.has_dyn_args(bool(self.dynamics_args))
if self.type in ["func"]:
funclist = []
for part in self.ops:
funclist.append(part.get_coeff)
self.coeff_get = _UnitedFuncCaller(funclist, self.args,
self.dynamics_args, self.cte)
self.compiled += "pyfunc"
self.compiled_qobjevo.set_factor(func=self.coeff_get)
elif self.type in ["mixed_callable"]:
funclist = []
for part in self.ops:
if isinstance(part.get_coeff, _StrWrapper):
part.get_coeff, file = _compile_str_single(part.coeff, self.args)
coeff_files.add(file)
funclist.append(part.get_coeff)
self.coeff_get = _UnitedFuncCaller(funclist, self.args,
self.dynamics_args, self.cte)
self.compiled += "pyfunc"
self.compiled_qobjevo.set_factor(func=self.coeff_get)
elif self.type in ["string", "mixed_compilable"]:
# All factor can be compiled
self.coeff_get, Code, file = _compiled_coeffs(self.ops,
self.args,
self.dynamics_args,
self.tlist)
coeff_files.add(file)
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
self.compiled += "cyfactor"
elif self.type == "array":
if np.allclose(np.diff(self.tlist),
self.tlist[1] - self.tlist[0]):
self.coeff_get = InterCoeffCte(self.ops, None,
self.tlist)
else:
self.coeff_get = InterCoeffT(self.ops, None, self.tlist)
self.compiled += "cyfactor"
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
elif self.type == "spline":
self.coeff_get = InterpolateCoeff(self.ops, None, None)
self.compiled += "cyfactor"
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
else:
pass
coeff_files.clean()
if code:
return Code
def _get_coeff(self, t):
out = []
for part in self.ops:
out.append(part.get_coeff(t, self.args))
return out
def __getstate__(self):
_dict_ = {key: self.__dict__[key]
for key in self.__dict__ if key is not "compiled_qobjevo"}
if self.compiled:
return (_dict_, self.compiled_qobjevo.__getstate__())
else:
return (_dict_,)
def __setstate__(self, state):
self.__dict__ = state[0]
self.compiled_qobjevo = None
if self.compiled:
mat_type, threading, td = self.compiled.split()
if mat_type == "csr":
if safePickle:
# __getstate__ and __setstate__ of compiled_qobjevo pass pointers
# In 'safe' mod, these pointers are not used.
if td == "cte":
if threading == "single":
self.compiled_qobjevo = CQobjCte()
self.compiled_qobjevo.set_data(self.cte)
elif threading == "omp":
self.compiled_qobjevo = CQobjCteOmp()
self.compiled_qobjevo.set_data(self.cte)
self.compiled_qobjevo.set_threads(self.omp)
else:
# time dependence is pyfunc or cyfactor
if threading == "single":
self.compiled_qobjevo = CQobjEvoTd()
self.compiled_qobjevo.set_data(self.cte, self.ops)
elif threading == "omp":
self.compiled_qobjevo = CQobjEvoTdOmp()
self.compiled_qobjevo.set_data(self.cte, self.ops)
self.compiled_qobjevo.set_threads(self.omp)
if td == "pyfunc":
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
elif td == "cyfactor":
self.compiled_qobjevo.set_factor(func=self.coeff_get)
else:
if td == "cte":
if threading == "single":
self.compiled_qobjevo = CQobjCte.__new__(CQobjCte)
elif threading == "omp":
self.compiled_qobjevo = CQobjCteOmp.__new__(CQobjCteOmp)
self.compiled_qobjevo.set_threads(self.omp)
else:
# time dependence is pyfunc or cyfactor
if threading == "single":
self.compiled_qobjevo = CQobjEvoTd.__new__(CQobjEvoTd)
elif threading == "omp":
self.compiled_qobjevo = CQobjEvoTdOmp.__new__(CQobjEvoTdOmp)
self.compiled_qobjevo.set_threads(self.omp)
self.compiled_qobjevo.__setstate__(state[1])
elif mat_type == "dense":
if td == "cte":
self.compiled_qobjevo = \
CQobjCteDense.__new__(CQobjCteDense)
else:
CQobjEvoTdDense.__new__(CQobjEvoTdDense)
self.compiled_qobjevo.__setstate__(state[1])
elif mat_type == "matched":
if threading == "single":
self.compiled_qobjevo = \
CQobjEvoTdMatched.__new__(CQobjEvoTdMatched)
elif threading == "omp":
self.compiled_qobjevo = \
CQobjEvoTdMatchedOmp.__new__(CQobjEvoTdMatchedOmp)
self.compiled_qobjevo.set_threads(self.omp)
self.compiled_qobjevo.__setstate__(state[1])
# Function defined inside another function cannot be pickled,
# Using class instead
class _UnitedFuncCaller:
def __init__(self, funclist, args, dynamics_args, cte):
self.funclist = funclist
self.args = args
self.dynamics_args = dynamics_args
self.dims = cte.dims
self.shape = cte.shape
def set_args(self, args, dynamics_args):
self.args = args
self.dynamics_args = dynamics_args
def dyn_args(self, t, state, shape):
# 1d array are to F ordered
mat = state.reshape(shape, order="F")
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state
elif what == "mat":
self.args[name] = mat
elif what == "Qobj":
if self.shape[1] == shape[1]: # oper
self.args[name] = Qobj(mat, dims=self.dims)
elif shape[1] == 1:
self.args[name] = Qobj(mat, dims=[self.dims[1],[1]])
else: # rho
self.args[name] = Qobj(mat, dims=self.dims[1])
elif what == "expect": # ket
if shape[1] == op.cte.shape[1]: # same shape as object
self.args[name] = op.mul_mat(t, mat).trace()
else:
self.args[name] = op.expect(t, state)
def __call__(self, t, args=None):
if args:
now_args = self.args.copy()
now_args.update(args)
else:
now_args = self.args
out = []
for func in self.funclist:
out.append(func(t, now_args))
return out
def get_args(self):
return self.args
class _Norm2():
def __init__(self, f):
self.func = f
def __call__(self, t, args):
return self.func(t, args)*np.conj(self.func(t, args))
class _Conj():
def __init__(self, f):
self.func = f
def __call__(self, t, args):
return np.conj(self.func(t, args))
class _Prod():
def __init__(self, f, g):
self.func_1 = f
self.func_2 = g
def __call__(self, t, args):
return self.func_1(t, args)*self.func_2(t, args)
class _Add():
def __init__(self, fs):
self.funcs = fs
def __call__(self, t, args):
return np.sum([f(t, args) for f in self.funcs])
from qutip.superoperator import vec2mat