# This file is part of QuTiP: Quantum Toolbox in Python.
#
# Copyright (c) 2011 and later, Paul D. Nation and Robert J. Johansson.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the QuTiP: Quantum Toolbox in Python nor the names
# of its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
###############################################################################
"""Time-dependent Quantum Object (Qobj) class.
"""
__all__ = ['QobjEvo']
from qutip.qobj import Qobj
import qutip.settings as qset
from qutip.interpolate import Cubic_Spline
from scipy.interpolate import CubicSpline, interp1d
from functools import partial
from types import FunctionType, BuiltinFunctionType
import numpy as np
from numbers import Number
from qutip.qobjevo_codegen import (_compile_str_single, _compiled_coeffs,
_compiled_coeffs_python)
from qutip.cy.spmatfuncs import (cy_expect_rho_vec, cy_expect_psi,
spmv)
from qutip.cy.cqobjevo import (CQobjCte, CQobjCteDense, CQobjEvoTd,
CQobjEvoTdMatched, CQobjEvoTdDense)
from qutip.cy.cqobjevo_factor import (InterCoeffT, InterCoeffCte,
InterpolateCoeff, StrCoeff,
StepCoeffCte, StepCoeffT)
import sys
import scipy
import os
from re import sub
if qset.has_openmp:
from qutip.cy.openmp.cqobjevo_omp import (CQobjCteOmp, CQobjEvoTdOmp,
CQobjEvoTdMatchedOmp)
safePickle = [False]
if sys.platform == 'win32':
safePickle[0] = True
try:
import cython
use_cython = [True]
except:
use_cython = [False]
def proj(x):
if np.isfinite(x):
return (x)
else:
return np.inf + 0j * np.imag(x)
str_env = {
"sin": np.sin,
"cos": np.cos,
"tan": np.tan,
"asin": np.arcsin,
"acos": np.arccos,
"atan": np.arctan,
"pi": np.pi,
"sinh": np.sinh,
"cosh": np.cosh,
"tanh": np.tanh,
"asinh": np.arcsinh,
"acosh": np.arccosh,
"atanh": np.arctanh,
"exp": np.exp,
"log": np.log,
"log10": np.log10,
"erf": scipy.special.erf,
"zerf": scipy.special.erf,
"sqrt": np.sqrt,
"real": np.real,
"imag": np.imag,
"conj": np.conj,
"abs": np.abs,
"norm": lambda x: np.abs(x)**2,
"arg": np.angle,
"proj": proj,
"np": np,
"spe": scipy.special}
class _file_list:
"""
Contain temp a list .pyx to clean
"""
def __init__(self):
self.files = []
def add(self, file_):
self.files += [file_ + ".pyx"]
def clean(self):
to_del = []
for i, file_ in enumerate(self.files):
try:
os.remove(file_)
to_del.append(i)
except Exception:
if not os.path.isfile(file_):
to_del.append(i)
for i in to_del[::-1]:
del self.files[i]
def __del__(self):
self.clean()
coeff_files = _file_list()
class _StrWrapper:
def __init__(self, code):
self.code = "_out = " + code
def __call__(self, t, args={}):
env = {"t": t}
env.update(args)
exec(self.code, str_env, env)
return env["_out"]
class _CubicSplineWrapper:
# Using scipy's CubicSpline since Qutip's one
# only accept linearly distributed tlist
def __init__(self, tlist, coeff, args=None):
self.coeff = coeff
self.tlist = tlist
try:
use_step_func = args["_step_func_coeff"]
except KeyError:
use_step_func = 0
if use_step_func:
self.func = interp1d(
self.tlist, self.coeff, kind="previous",
bounds_error=False, fill_value=0.)
else:
self.func = CubicSpline(self.tlist, self.coeff)
def __call__(self, t, args={}):
return self.func([t])[0]
class _StateAsArgs:
# old with state (f(t, psi, args)) to new (args["state"] = psi)
def __init__(self, coeff_func):
self.coeff_func = coeff_func
def __call__(self, t, args={}):
return self.coeff_func(t, args["_state_vec"], args)
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class StateArgs:
"""Object to indicate to use the state in args outside solver.
args[key] = StateArgs(type, op)
"""
def __init__(self, type="Qobj", op=None):
self.dyn_args = (type, op)
def __call__(self):
return self.dyn_args
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# object for each time dependent element of the QobjEvo
# qobj : the Qobj of element ([*Qobj*, f])
# get_coeff : a callable that take (t, args) and return the coeff at that t
# coeff : The coeff as a string, array or function as provided by the user.
# type : flag for the type of coeff
class EvoElement:
"""
Internal type used to represent the time-dependent parts of a
:class:`~QobjEvo`.
Availables "types" are
1. function
2. string
3. ``np.ndarray``
4. :class:`.Cubic_Spline`
"""
def __init__(self, qobj, get_coeff, coeff, type):
self.qobj = qobj
self.get_coeff = get_coeff
self.coeff = coeff
self.type = type
@classmethod
def make(cls, list_):
return cls(*list_)
def __getitem__(self, i):
if i == 0:
return self.qobj
if i == 1:
return self.get_coeff
if i == 2:
return self.coeff
if i == 3:
return self.type
[docs]class QobjEvo:
"""
A class for representing time-dependent quantum objects, such as quantum
operators and states.
Basic math operations are defined:
- ``+``, ``-`` : :class:`~QobjEvo`, :class:`~Qobj`, scalars.
- ``*``: :class:`~Qobj`, C number
- ``/`` : C number
This object is constructed by passing a list of :obj:`~Qobj` instances,
each of which *may* have an associated scalar time dependence. The list is
summed to produce the final result. In other words, if an instance of this
class is :math:`Q(t)`, then it is constructed from a set of constant
:obj:`~Qobj` :math:`\\{Q_k\\}` and time-dependent scalars :math:`f_k(t)` by
.. math::
Q(t) = \\sum_k f_k(t) Q_k
If a scalar :math:`f_k(t)` is not passed with a given :obj:`~Qobj`, then
that term is assumed to be constant. The next section contains more detail
on the allowed forms of the constants, and gives several examples for how
to build instances of this class.
**Time-dependence formats**
There are three major formats for specifying a time-dependent scalar:
- Python function
- string
- array
For function format, the function signature must be
``f(t: float, args: dict) -> complex``, for example
.. code-block:: python
def f1_t(t, args):
return np.exp(-1j * t * args["w1"])
def f2_t(t, args):
return np.cos(t * args["w2"])
H = QobjEvo([H0, [H1, f1_t], [H2, f2_t]], args={"w1":1., "w2":2.})
For string-based coeffients, the string must be a compilable python code
resulting in a complex. The following symbols are defined:
.. code-block::
pi exp log log10
erf zerf norm proj
real imag conj abs arg
sin sinh asin asinh
cos cosh acos acosh
tan tanh atan atanh
numpy as np
scipy.special as spe
A couple more simple examples:
.. code-block:: python
H = QobjEvo([H0, [H1, 'exp(-1j*w1*t)'], [H2, 'cos(w2*t)']],
args={"w1":1.,"w2":2.})
For numpy array format, the array must be an 1d of dtype ``np.float64`` or
``np.complex128``. A list of times (``np.float64``) at which the
coeffients must be given as ``tlist``. The coeffients array must have the
same length as the tlist. The times of the tlist do not need to be
equidistant, but must be sorted. By default, a cubic spline interpolation
will be used for the coefficient at time t. If the coefficients are to be
treated as step functions, use the arguments
``args = {"_step_func_coeff": True}``. Examples of array-format usage are:
.. code-block:: python
tlist = np.logspace(-5,0,100)
H = QobjEvo([H0, [H1, np.exp(-1j*tlist)], [H2, np.cos(2.*tlist)]],
tlist=tlist)
Mixing time formats is allowed. It is not possible to create a single
``QobjEvo`` that contains different ``tlist`` values, however.
**Passing arguments**
``args`` is a dict of (name: object). The name must be a valid Python
identifier string, and in general the object can be any type that is
supported by the code to be compiled in the string.
There are some "magic" names that can be specified, whose objects will be
overwritten when used within :func:`.sesolve`, :func:`.mesolve` and
:func:`.mcsolve`. This allows access to the solvers' internal states, and
they are updated at every call. The initial values of these dictionary
elements is unimportant. The magic names available are:
- ``"state"``: the current state as a :class:`~Qobj`
- ``"state_vec"``: the current state as a column-stacked 1D ``np.ndarray``
- ``"state_mat"``: the current state as a 2D ``np.ndarray``
- ``"expect_op_<n>"``: the current expectation value of the element
``e_ops[n]``, which is an argument to the solvers. Replace ``<n>`` with
an integer literal, e.g. ``"expect_op_0"``. This will be either real- or
complex-valued, depending on whether the state and operator are both
Hermitian or not.
- ``"collapse"``: (:func:`.mcsolve` only) a list of the collapses that have
occurred during the evolution. Each element of the list is a 2-tuple
``(time: float, which: int)``, where ``time`` is the time this collapse
happened, and ``which`` is an integer indexing the ``c_ops`` argument to
:func:`.mcsolve`.
Parameters
----------
Q_object : list, :class:`~Qobj` or :class:`~QobjEvo`
The time-dependent description of the quantum object. This is of the
same format as the first parameter to the general ODE solvers; in
general, it is a list of ``[Qobj, time_dependence]`` pairs that are
summed to make the whole object. The ``time_dependence`` can be any of
the formats discussed in the previous section. If a particular term
has no time-dependence, then you should just give the ``Qobj`` instead
of the 2-element list.
args : dict, optional
Mapping of ``{str: object}``, discussed in greater detail above. The
strings can be any valid Python identifier, and the objects are of the
consumable types. See the previous section for details on the "magic"
names used to access solver internals.
tlist : array_like, optional
List of the times any numpy-array coefficients describe. This is used
only in at least one of the time dependences in ``Q_object`` is given
in Numpy-array format. The times must be sorted, but need not be
equidistant. Values inbetween will be interpolated.
Attributes
----------
cte : :class:`~Qobj`
Constant part of the QobjEvo.
ops : list of :class:`.EvoElement`
Internal representation of the time-dependence structure of the
elements.
args : dict
The current value of the ``args`` dictionary passed into the
constructor.
dynamics_args : list
Names of the dynamic arguments that the solvers will generate. These
are the magic names that were found in the ``args`` parameter.
tlist : array_like
List of times at which the numpy-array coefficients are applied.
compiled : str
A string representing the properties of the low-level Cython class
backing this object (may be empty).
compiled_qobjevo : ``CQobjCte`` or ``CQobjEvoTd``
Cython version of the QobjEvo.
coeff_get : callable
Object called to obtain a list of all the coefficients at a particular
time.
coeff_files : list
Runtime created files to delete with the instance.
dummy_cte : bool
Is self.cte an empty Qobj
const : bool
Indicates if quantum object is constant
type : {"cte", "string", "func", "array", "spline", "mixed_callable", \
"mixed_compilable"}
Information about the type of coefficients used in the entire object.
num_obj : int
Number of :obj:`~Qobj` in the QobjEvo.
use_cython : bool
Flag to compile string to Cython or Python
safePickle : bool
Flag to not share pointers between thread.
"""
def __init__(self, Q_object=[], args={}, copy=True,
tlist=None, state0=None, e_ops=[]):
if isinstance(Q_object, QobjEvo):
if copy:
self._inplace_copy(Q_object)
else:
self.__dict__ = Q_object.__dict__
if args:
self.arguments(args)
for i, dargs in enumerate(self.dynamics_args):
e_int = dargs[1] == "expect" and isinstance(dargs[2], int)
if e_ops and e_int:
self.dynamics_args[i] = (dargs[0], "expect",
e_ops[dargs[2]])
if state0 is not None:
self._dynamics_args_update(0., state0)
return
self.const = False
self.dummy_cte = False
self.args = args.copy()
self.dynamics_args = []
self.cte = None
self.tlist = np.asarray(tlist) if tlist is not None else None
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
self.type = "none"
self.omp = 0
self.coeff_files = []
self.use_cython = use_cython[0]
self.safePickle = safePickle[0]
# Attempt to determine if a 2-element list is a single, time-dependent
# operator, or a list with 2 possibly time-dependent elements.
if isinstance(Q_object, list) and len(Q_object) == 2:
try:
# Test if parsing succeeds on this as a single element.
self._td_op_type(Q_object)
Q_object = [Q_object]
except (TypeError, ValueError):
pass
op_type = self._td_format_check(Q_object)
self.ops = []
if isinstance(op_type, int):
if op_type == 0:
self.cte = Q_object
self.const = True
self.type = "cte"
elif op_type == 1:
raise TypeError("The Qobj must not already be a function")
elif op_type == -1:
pass
else:
op_type_count = [0, 0, 0, 0]
for type_, op in zip(op_type, Q_object):
if type_ == 0:
if self.cte is None:
self.cte = op
else:
self.cte += op
elif type_ == 1:
op_type_count[0] += 1
self.ops.append(EvoElement(op[0], op[1], op[1], "func"))
elif type_ == 2:
op_type_count[1] += 1
self.ops.append(EvoElement(op[0], _StrWrapper(op[1]),
op[1], "string"))
elif type_ == 3:
op_type_count[2] += 1
self.ops.append(EvoElement(
op[0],
_CubicSplineWrapper(tlist, op[1], args=self.args),
op[1].copy(), "array"))
elif type_ == 4:
op_type_count[3] += 1
self.ops.append(EvoElement(op[0], op[1], op[1], "spline"))
nops = sum(op_type_count)
if all([op_t == 0 for op_t in op_type]):
self.type = "cte"
elif op_type_count[0] == nops:
self.type = "func"
elif op_type_count[1] == nops:
self.type = "string"
elif op_type_count[2] == nops:
self.type = "array"
elif op_type_count[3] == nops:
self.type = "spline"
elif op_type_count[0]:
self.type = "mixed_callable"
else:
self.type = "mixed_compilable"
try:
if not self.cte:
self.cte = self.ops[0].qobj
# test is all qobj are compatible (shape, dims)
for op in self.ops[1:]:
self.cte += op.qobj
self.cte *= 0.
self.dummy_cte = True
else:
cte_copy = self.cte.copy()
# test is all qobj are compatible (shape, dims)
for op in self.ops:
cte_copy += op.qobj
except Exception as e:
raise TypeError("Qobj not compatible.") from e
if not self.ops:
self.const = True
self.num_obj = (len(self.ops) if self.dummy_cte else len(self.ops) + 1)
self._args_checks()
if e_ops:
for i, dargs in enumerate(self.dynamics_args):
if dargs[1] == "expect" and isinstance(dargs[2], int):
self.dynamics_args[i] = (dargs[0], "expect",
QobjEvo(e_ops[dargs[2]]))
if state0 is not None:
self._dynamics_args_update(0., state0)
def _td_format_check(self, Q_object):
if isinstance(Q_object, Qobj):
return 0
if isinstance(Q_object, (FunctionType, BuiltinFunctionType, partial)):
return 1
if isinstance(Q_object, list):
return [self._td_op_type(element) for element in Q_object] or -1
raise TypeError("Incorrect Q_object specification")
def _td_op_type(self, element):
if isinstance(element, Qobj):
return 0
try:
op, td = element
except (TypeError, ValueError) as exc:
raise TypeError("Incorrect Q_object specification") from exc
if (not isinstance(op, Qobj)) or isinstance(td, Qobj):
# Qobj is itself callable, so we need an extra check to make sure
# that we don't have a two-element list where both are Qobj.
raise TypeError("Incorrect Q_object specification")
if isinstance(td, Cubic_Spline):
out = 4
elif callable(td):
out = 1
elif isinstance(td, str):
out = 2
elif isinstance(td, np.ndarray):
if self.tlist is None or td.shape != self.tlist.shape:
raise ValueError("Time lists are not compatible")
out = 3
else:
raise TypeError("Incorrect Q_object specification")
return out
def _args_checks(self):
statedims = [self.cte.dims[1],[1]]
for key in self.args:
if key == "state" or key == "state_qobj":
self.dynamics_args += [(key, "Qobj", None)]
if self.args[key] is None:
self.args[key] = Qobj(dims=statedims)
if key == "state_mat":
self.dynamics_args += [("state_mat", "mat", None)]
if isinstance(self.args[key], Qobj):
self.args[key] = self.args[key].full()
if self.args[key] is None:
self.args[key] = Qobj(dims=statedims).full()
if key == "state_vec":
self.dynamics_args += [("state_vec", "vec", None)]
if isinstance(self.args[key], Qobj):
self.args[key] = self.args[key].full().ravel("F")
if self.args[key] is None:
self.args[key] = Qobj(dims=statedims).full().ravel("F")
if key.startswith("expect_op_"):
e_op_num = int(key[10:])
self.dynamics_args += [(key, "expect", e_op_num)]
if isinstance(self.args[key], StateArgs):
self.dynamics_args += [(key, *self.args[key]())]
self.args[key] = 0.
def _check_old_with_state(self):
add_vec = False
for op in self.ops:
if op.type == "func":
try:
op.get_coeff(0., self.args)
except TypeError as e:
nfunc = _StateAsArgs(self.coeff)
op = EvoElement((op.qobj, nfunc, nfunc, "func"))
add_vec = True
if add_vec:
self.dynamics_args += [("_state_vec", "vec", None)]
def __del__(self):
for file_ in self.coeff_files:
try:
os.remove(file_)
except:
pass
def __call__(self, t, data=False, state=None, args={}):
"""
Return a single :obj:`~Qobj` at the given time ``t``.
"""
try:
t = float(t)
except Exception as e:
raise TypeError("Time must be a real scalar.") from e
if state is not None:
self._dynamics_args_update(t, state)
if args:
if not isinstance(args, dict):
raise TypeError("The new args must be in a dict")
old_args = self.args.copy()
old_compiled = self.compiled
self.compiled = False
self.args.update(args)
op_t = self.__call__(t, data=data)
self.args = old_args
self.compiled = old_compiled
elif self.const:
if data:
op_t = self.cte.data.copy()
else:
op_t = self.cte.copy()
elif self.compiled and self.compiled.split()[0] != "dense":
op_t = self.compiled_qobjevo.call(t, data)
elif data:
op_t = self.cte.data.copy()
for part in self.ops:
op_t += part.qobj.data * part.get_coeff(t, self.args)
else:
op_t = self.cte.copy()
for part in self.ops:
op_t += part.qobj * part.get_coeff(t, self.args)
return op_t
def _dynamics_args_update(self, t, state):
if isinstance(state, Qobj):
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state.full().ravel("F")
elif what == "mat":
self.args[name] = state.full()
elif what == "Qobj":
self.args[name] = state
elif what == "expect":
self.args[name] = op.expect(t, state)
elif isinstance(state, np.ndarray) and state.ndim == 1:
s1 = self.cte.shape[1]
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state
elif what == "expect":
self.args[name] = op.expect(t, state)
elif state.shape[0] == s1 and self.cte.issuper:
new_l = int(np.sqrt(s1))
mat = state.reshape((new_l, new_l), order="F")
if what == "mat":
self.args[name] = mat
elif what == "Qobj":
self.args[name] = Qobj(mat, dims=self.cte.dims[1])
elif state.shape[0] == s1:
mat = state.reshape((-1,1))
if what == "mat":
self.args[name] = mat
elif what == "Qobj":
self.args[name] = Qobj(mat, dims=[self.cte.dims[1],[1]])
elif state.shape[0] == s1*s1:
new_l = int(np.sqrt(s1))
mat = state.reshape((new_l, new_l), order="F")
if what == "mat":
self.args[name] = mat
elif what == "Qobj":
self.args[name] = Qobj(mat, dims=[self.cte.dims[1],
self.cte.dims[1]])
elif isinstance(state, np.ndarray) and state.ndim == 2:
s1 = self.cte.shape[1]
new_l = int(np.sqrt(s1))
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state.ravel("F")
elif what == "mat":
self.args[name] = state
elif what == "expect":
self.args[name] = op.expect(t, state)
elif state.shape[1] == 1:
self.args[name] = Qobj(state, dims=[self.cte.dims[1],[1]])
elif state.shape[1] == s1:
self.args[name] = Qobj(state, dims=self.cte.dims)
else:
self.args[name] = Qobj(state)
else:
raise TypeError("state must be a Qobj or np.ndarray")
[docs] def copy(self):
"""Return a copy of this object."""
new = QobjEvo(self.cte.copy())
new.const = self.const
new.args = self.args.copy()
new.dynamics_args = self.dynamics_args.copy()
new.tlist = self.tlist
new.dummy_cte = self.dummy_cte
new.num_obj = self.num_obj
new.type = self.type
new.compiled = False
new.compiled_qobjevo = None
new.coeff_get = None
new.coeff_files = []
new.use_cython = self.use_cython
new.safePickle = self.safePickle
for op in self.ops:
if op.type == "array":
new_coeff = op.coeff.copy()
else:
new_coeff = op.coeff
new.ops.append(EvoElement(op.qobj.copy(), op.get_coeff,
new_coeff, op.type))
return new
def _inplace_copy(self, other):
self.cte = other.cte
self.const = other.const
self.args = other.args.copy()
self.dynamics_args = other.dynamics_args
self.tlist = other.tlist
self.dummy_cte = other.dummy_cte
self.num_obj = other.num_obj
self.type = other.type
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
self.ops = []
self.coeff_files = []
self.use_cython = other.use_cython
self.safePickle = other.safePickle
for op in other.ops:
if op.type == "array":
new_coeff = op.coeff.copy()
else:
new_coeff = op.coeff
self.ops.append(EvoElement(op.qobj.copy(), op.get_coeff,
new_coeff, op.type))
[docs] def arguments(self, new_args):
"""
Update the scoped variables that were passed as ``args`` to new values.
"""
if not isinstance(new_args, dict):
raise TypeError("The new args must be in a dict")
# remove dynamics_args that are to be refreshed
self.dynamics_args = [dargs for dargs in self.dynamics_args
if dargs[0] not in new_args]
self.args.update(new_args)
self._args_checks()
if self.compiled and self.compiled.split()[2] != "cte":
if isinstance(self.coeff_get, StrCoeff):
self.coeff_get.set_args(self.args)
self.coeff_get._set_dyn_args(self.dynamics_args)
elif isinstance(self.coeff_get, _UnitedFuncCaller):
self.coeff_get.set_args(self.args, self.dynamics_args)
def solver_set_args(self, new_args, state, e_ops):
self.dynamics_args = []
self.args.update(new_args)
self._args_checks()
for i, dargs in enumerate(self.dynamics_args):
if dargs[1] == "expect" and isinstance(dargs[2], int):
self.dynamics_args[i] = (dargs[0], "expect",
QobjEvo(e_ops[dargs[2]]))
if self.compiled:
self.dynamics_args[i][2].compile()
self._dynamics_args_update(0., state)
if self.compiled and self.compiled.split()[2] != "cte":
if isinstance(self.coeff_get, StrCoeff):
self.coeff_get.set_args(self.args)
self.coeff_get._set_dyn_args(self.dynamics_args)
elif isinstance(self.coeff_get, _UnitedFuncCaller):
self.coeff_get.set_args(self.args, self.dynamics_args)
[docs] def to_list(self):
"""
Return this operator in the list-like form used to initialised it, like
can be passed to :func:`~mesolve`.
"""
list_qobj = []
if not self.dummy_cte:
list_qobj.append(self.cte)
for op in self.ops:
list_qobj.append([op.qobj, op.coeff])
return list_qobj
# Math function
def __add__(self, other):
res = self.copy()
res += other
return res
def __radd__(self, other):
res = self.copy()
res += other
return res
def __iadd__(self, other):
if isinstance(other, QobjEvo):
self.cte += other.cte
l = len(self.ops)
for op in other.ops:
if op.type == "array":
new_coeff = op.coeff.copy()
else:
new_coeff = op.coeff
self.ops.append(EvoElement(op.qobj.copy(), op.get_coeff,
new_coeff, op.type))
l += 1
self.args.update(**other.args)
self.dynamics_args += other.dynamics_args
self.const = self.const and other.const
self.dummy_cte = self.dummy_cte and other.dummy_cte
if self.type != other.type:
if self.type in ["func", "mixed_callable"] or \
other.type in ["func", "mixed_callable"]:
self.type = "mixed_callable"
else:
self.type = "mixed_compilable"
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
if self.tlist is None:
self.tlist = other.tlist
else:
if other.tlist is None:
pass
elif len(other.tlist) != len(self.tlist) or \
other.tlist[-1] != self.tlist[-1]:
raise ValueError("Time lists are not compatible")
else:
self.cte += other
self.dummy_cte = False
self.num_obj = (len(self.ops) if self.dummy_cte else len(self.ops) + 1)
self._reset_type()
return self
def __sub__(self, other):
res = self.copy()
res -= other
return res
def __rsub__(self, other):
res = -self.copy()
res += other
return res
def __isub__(self, other):
self += (-other)
return self
def __mul__(self, other):
res = self.copy()
res *= other
return res
def __rmul__(self, other):
res = self.copy()
if isinstance(other, Qobj):
res.cte = other * res.cte
for op in res.ops:
op.qobj = other * op.qobj
return res
else:
res *= other
return res
def __imul__(self, other):
if isinstance(other, Qobj) or isinstance(other, Number):
self.cte *= other
for op in self.ops:
op.qobj *= other
return self
if isinstance(other, QobjEvo):
if other.const:
self.cte *= other.cte
for op in self.ops:
op.qobj *= other.cte
elif self.const:
cte = self.cte.copy()
self = other.copy()
self.cte = cte * self.cte
for op in self.ops:
op.qobj = cte*op.qobj
else:
cte = self.cte.copy()
self.cte *= other.cte
new_terms = []
old_ops = self.ops
if not other.dummy_cte:
for op in old_ops:
new_terms.append(self._ops_mul_cte(op, other.cte, "R"))
if not self.dummy_cte:
for op in other.ops:
new_terms.append(self._ops_mul_cte(op, cte, "L"))
for op_left in old_ops:
for op_right in other.ops:
new_terms.append(self._ops_mul_(op_left,
op_right))
self.ops = new_terms
self.args.update(other.args)
self.dynamics_args += other.dynamics_args
self.dummy_cte = self.dummy_cte and other.dummy_cte
self.num_obj = (len(self.ops) if
self.dummy_cte else len(self.ops) + 1)
self._reset_type()
return self
return NotImplemented
def __div__(self, other):
if isinstance(other, (int, float, complex,
np.integer, np.floating, np.complexfloating)):
res = self.copy()
res *= other**(-1)
return res
return NotImplemented
def __idiv__(self, other):
if isinstance(other, (int, float, complex,
np.integer, np.floating, np.complexfloating)):
self *= other**(-1)
return self
return NotImplemented
def __truediv__(self, other):
return self.__div__(other)
def __neg__(self):
res = self.copy()
res.cte = -res.cte
for op in res.ops:
op.qobj = -op.qobj
return res
def _ops_mul_(self, opL, opR):
new_f = _Prod(opL.get_coeff, opR.get_coeff)
new_op = [opL.qobj*opR.qobj, new_f, None, 0]
if opL.type == opR.type and opL.type == "string":
new_op[2] = "(" + opL.coeff + ") * (" + opR.coeff + ")"
new_op[3] = "string"
elif opL[3] == opR[3] and opL[3] == "array":
new_op[2] = opL[2]*opR[2]
new_op[3] = "array"
else:
new_op[2] = new_f
new_op[3] = "func"
if self.type not in ["func", "mixed_callable"]:
self.type = "mixed_callable"
return EvoElement.make(new_op)
def _ops_mul_cte(self, op, cte, side):
new_op = [None, op.get_coeff, op.coeff, op.type]
if side == "R":
new_op[0] = op.qobj * cte
if side == "L":
new_op[0] = cte * op.qobj
return EvoElement.make(new_op)
# Transformations
[docs] def trans(self):
"""Return the matrix transpose."""
res = self.copy()
res.cte = res.cte.trans()
for op in res.ops:
op.qobj = op.qobj.trans()
return res
[docs] def conj(self):
"""Return the matrix elementwise conjugation."""
res = self.copy()
res.cte = res.cte.conj()
for op in res.ops:
op.qobj = op.qobj.conj()
res._f_conj()
return res
[docs] def dag(self):
"""Return the matrix conjugate-transpose (dagger)."""
res = self.copy()
res.cte = res.cte.dag()
for op in res.ops:
op.qobj = op.qobj.dag()
res._f_conj()
return res
def _cdc(self):
"""Return ``a.dag * a``."""
if not self.num_obj == 1:
res = self.dag()
res *= self
else:
res = self.copy()
res.cte = res.cte.dag() * res.cte
for op in res.ops:
op.qobj = op.qobj.dag() * op.qobj
res._f_norm2()
return res
# Unitary function of Qobj
[docs] def tidyup(self, atol=1e-12):
"""Removes small elements from this quantum object inplace."""
self.cte = self.cte.tidyup(atol)
for op in self.ops:
op.qobj = op.qobj.tidyup(atol)
return self
def _compress_make_set(self):
sets = []
callable_flags = ["func", "spline"]
for i, op1 in enumerate(self.ops):
already_matched = False
for _set in sets:
already_matched = already_matched or i in _set
if not already_matched:
this_set = [i]
for j, op2 in enumerate(self.ops[i+1:]):
if op1.qobj == op2.qobj:
same_flag = op1.type == op2.type
callable_1 = op1.type in callable_flags
callable_2 = op2.type in callable_flags
if (same_flag or (callable_1 and callable_2)):
this_set.append(j+i+1)
sets.append(this_set)
fsets = []
for i, op1 in enumerate(self.ops):
already_matched = False
for _set in fsets:
already_matched = already_matched or i in _set
if not already_matched:
this_set = [i]
for j, op2 in enumerate(self.ops[i+1:]):
if op1.type != op2.type:
pass
elif op1.type == "array":
if np.allclose(op1.coeff, op2.coeff):
this_set.append(j+i+1)
else:
if op1.coeff is op2.coeff:
this_set.append(j+i+1)
fsets.append(this_set)
return sets, fsets
def _compress_merge_qobj(self, sets):
callable_flags = ["func", "spline"]
new_ops = []
for _set in sets:
if len(_set) == 1:
new_ops.append(self.ops[_set[0]])
elif self.ops[_set[0]].type in callable_flags:
new_op = [self.ops[_set[0]].qobj, None, None, "func"]
fs = []
for i in _set:
fs += [self.ops[i].get_coeff]
new_op[1] = _Add(fs)
new_op[2] = new_op[1]
new_ops.append(EvoElement.make(new_op))
elif self.ops[_set[0]].type == "string":
new_op = [self.ops[_set[0]].qobj, None, None, "string"]
new_str = "(" + self.ops[_set[0]].coeff + ")"
for i in _set[1:]:
new_str += " + (" + self.ops[i].coeff + ")"
new_op[1] = _StrWrapper(new_str)
new_op[2] = new_str
new_ops.append(EvoElement.make(new_op))
elif self.ops[_set[0]].type == "array":
new_op = [self.ops[_set[0]].qobj, None, None, "array"]
new_array = (self.ops[_set[0]].coeff).copy()
for i in _set[1:]:
new_array += self.ops[i].coeff
new_op[2] = new_array
new_op[1] = _CubicSplineWrapper(
self.tlist, new_array, args=self.args)
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
def _compress_merge_func(self, fsets):
new_ops = []
for _set in fsets:
base = self.ops[_set[0]]
new_op = [None, base.get_coeff, base.coeff, base.type]
if len(_set) == 1:
new_op[0] = base.qobj
else:
new_op[0] = base.qobj.copy()
for i in _set[1:]:
new_op[0] += self.ops[i].qobj
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
[docs] def compress(self):
"""
Merge together elements that share the same time-dependence, to reduce
the number of matrix multiplications and additions that need to be done
to evaluate this object.
Modifies the object inplace.
"""
self.tidyup()
sets, fsets = self._compress_make_set()
N_sets = len(sets)
N_fsets = len(fsets)
num_ops = len(self.ops)
if N_sets < num_ops and N_fsets < num_ops:
# Both could be better
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
if N_sets < N_fsets:
self._compress_merge_qobj(sets)
else:
self._compress_merge_func(fsets)
sets, fsets = self._compress_make_set()
N_sets = len(sets)
N_fsets = len(fsets)
num_ops = len(self.ops)
if N_sets < num_ops:
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
self._compress_merge_qobj(sets)
elif N_fsets < num_ops:
self.compiled = ""
self.compiled_qobjevo = None
self.coeff_get = None
self._compress_merge_func(fsets)
self._reset_type()
def _reset_type(self):
op_type_count = [0, 0, 0, 0]
for op in self.ops:
if op.type == "func":
op_type_count[0] += 1
elif op.type == "string":
op_type_count[1] += 1
elif op.type == "array":
op_type_count[2] += 1
elif op.type == "spline":
op_type_count[3] += 1
nops = sum(op_type_count)
if not self.ops and self.dummy_cte is False:
self.type = "cte"
elif op_type_count[0] == nops:
self.type = "func"
elif op_type_count[1] == nops:
self.type = "string"
elif op_type_count[2] == nops:
self.type = "array"
elif op_type_count[3] == nops:
self.type = "spline"
elif op_type_count[0]:
self.type = "mixed_callable"
else:
self.type = "mixed_compilable"
self.num_obj = (len(self.ops) if self.dummy_cte else len(self.ops) + 1)
[docs] def permute(self, order):
"""
Permute the tensor structure of the underlying matrices into a new
format.
See Also
--------
Qobj.permute : the same operation on constant quantum objects.
"""
res = self.copy()
res.cte = res.cte.permute(order)
for op in res.ops:
op.qobj = op.qobj.permute(order)
return res
[docs] def apply(self, function, *args, **kw_args):
"""
Apply the linear function ``function`` to every ``Qobj`` included in
this time-dependent object, and return a new ``QobjEvo`` with the
result.
Any additional arguments or keyword arguments will be appended to every
function call.
"""
self.compiled = ""
res = self.copy()
cte_res = function(res.cte, *args, **kw_args)
if not isinstance(cte_res, Qobj):
raise TypeError("The function must return a Qobj")
res.cte = cte_res
for op in res.ops:
op.qobj = function(op.qobj, *args, **kw_args)
return res
[docs] def apply_decorator(self, function, *args,
str_mod=None, inplace_np=False, **kw_args):
"""
Apply the given function to every time-dependent coefficient in the
quantum object, and return a new object with the result.
Any additional arguments and keyword arguments will be appended to the
function calls.
Parameters
----------
function : callable
``(time_dependence, *args, **kwargs) -> time_dependence``. Called
on each time-dependent coefficient to produce a new coefficient.
The additional arguments and keyword arguments are the ones given
to this function.
str_mod : list
A 2-element list of strings, that will additionally wrap any string
time-dependences. An existing time-dependence string ``x`` will
become ``str_mod[0] + x + str_mod[1]``.
inplace_np : bool, default False
Whether this function should modify Numpy arrays inplace, or be
used like a regular decorator. Some decorators create incorrect
arrays as some transformations ``f'(t) = f(g(t))`` create a
mismatch between the array and the associated time list.
"""
res = self.copy()
for op in res.ops:
op.get_coeff = function(op.get_coeff, *args, **kw_args)
if op.type == ["func", "spline"]:
op.coeff = op.get_coeff
op.type = "func"
elif op.type == "string":
if str_mod is None:
op.coeff = op.get_coeff
op.type = "func"
else:
op.coeff = str_mod[0] + op.coeff + str_mod[1]
elif op.type == "array":
if inplace_np:
# keep the original function, change the array
def f(a):
return a
ff = function(f, *args, **kw_args)
for i, v in enumerate(op.coeff):
op.coeff[i] = ff(v)
op.get_coeff = _CubicSplineWrapper(
self.tlist, op.coeff, args=self.args)
else:
op.coeff = op.get_coeff
op.type = "func"
if self.type == "string" and str_mod is None:
res.type = "mixed_callable"
elif self.type == "array" and not inplace_np:
res.type = "mixed_callable"
elif self.type == "spline":
res.type = "mixed_callable"
elif self.type == "mixed_compilable":
for op in res.ops:
if op.type == "func":
res.type = "mixed_callable"
return res
def _f_norm2(self):
self.compiled = ""
new_ops = []
for op in self.ops:
new_op = [op.qobj, None, None, op.type]
if op.type == "func":
new_op[1] = _Norm2(op.get_coeff)
new_op[2] = new_op[1]
elif op.type == "string":
new_op[2] = "norm(" + op.coeff + ")"
new_op[1] = _StrWrapper(new_op[2])
elif op.type == "array":
new_op[2] = np.abs(op.coeff)**2
new_op[1] = _CubicSplineWrapper(
self.tlist, new_op[2], args=self.args)
elif op.type == "spline":
new_op[1] = _Norm2(op.get_coeff)
new_op[2] = new_op[1]
new_op[3] = "func"
self.type = "mixed_callable"
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
return self
def _f_conj(self):
self.compiled = ""
new_ops = []
for op in self.ops:
new_op = [op.qobj, None, None, op.type]
if op.type == "func":
new_op[1] = _Conj(op.get_coeff)
new_op[2] = new_op[1]
elif op.type == "string":
new_op[2] = "conj(" + op.coeff + ")"
new_op[1] = _StrWrapper(new_op[2])
elif op.type == "array":
new_op[2] = np.conj(op.coeff)
new_op[1] = _CubicSplineWrapper(
self.tlist, new_op[2], args=self.args)
elif op.type == "spline":
new_op[1] = _Conj(op.get_coeff)
new_op[2] = new_op[1]
new_op[3] = "func"
self.type = "mixed_callable"
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
return self
def _shift(self):
self.compiled = ""
self.args.update({"_t0": 0})
new_ops = []
for op in self.ops:
new_op = [op.qobj, None, None, op.type]
if op.type == "func":
new_op[1] = _Shift(op.get_coeff)
new_op[2] = new_op[1]
elif op.type == "string":
new_op[2] = sub("(?<=[^0-9a-zA-Z_])t(?=[^0-9a-zA-Z_])",
"(t+_t0)", " " + op.coeff + " ")
new_op[1] = _StrWrapper(new_op[2])
elif op.type == "array":
new_op[2] = _Shift(op.get_coeff)
new_op[1] = new_op[2]
new_op[3] = "func"
self.type = "mixed_callable"
elif op.type == "spline":
new_op[1] = _Shift(op.get_coeff)
new_op[2] = new_op[1]
new_op[3] = "func"
self.type = "mixed_callable"
new_ops.append(EvoElement.make(new_op))
self.ops = new_ops
return self
[docs] def expect(self, t, state, herm=False):
"""
Calculate the expectation value of this operator on the given
(time-independent) state at a particular time.
This is more efficient than ``expect(QobjEvo(t), state)``.
Parameters
----------
t : float
The time to evaluate this operator at.
state : Qobj or np.ndarray
The state to take the expectation value around.
herm : bool, default False
Whether this operator and the state are both Hermitian. If True,
only the real part of the result will be returned.
See Also
--------
expect : General-purpose expectation values.
"""
if not isinstance(t, (int, float)):
raise TypeError("Time must be a real scalar")
if isinstance(state, Qobj):
if self.cte.dims[1] == state.dims[0]:
vec = state.full().ravel("F")
elif self.cte.dims[1] == state.dims:
vec = state.full().ravel("F")
else:
raise ValueError("Dimensions do not fit")
elif isinstance(state, np.ndarray):
vec = state.ravel("F")
else:
raise TypeError("The vector must be an array or Qobj")
if vec.shape[0] == self.cte.shape[1]:
if self.compiled:
exp = self.compiled_qobjevo.expect(t, vec)
elif self.cte.issuper:
self._dynamics_args_update(t, state)
exp = cy_expect_rho_vec(self.__call__(t, data=True), vec, 0)
else:
self._dynamics_args_update(t, state)
exp = cy_expect_psi(self.__call__(t, data=True), vec, 0)
elif vec.shape[0] == self.cte.shape[1]**2:
if self.compiled:
exp = self.compiled_qobjevo.overlap(t, vec)
else:
self._dynamics_args_update(t, state)
exp = (self.__call__(t, data=True) *
vec.reshape((self.cte.shape[1],
self.cte.shape[1])).T).trace()
else:
raise ValueError("The shapes do not match")
if herm:
return exp.real
else:
return exp
[docs] def mul_vec(self, t, vec):
"""
Multiply this object evaluated at time `t` by a vector.
Parameters
----------
t : float
The time to evaluate this object at.
vec : Qobj or np.ndarray
The state-vector to multiply this object by.
Returns
-------
vec: Qobj or np.ndarray
The vector result in the same type as the input.
"""
was_Qobj = False
if not isinstance(t, (int, float)):
raise TypeError("Time must be a real scalar")
if isinstance(vec, Qobj):
if self.cte.dims[1] != vec.dims[0]:
raise ValueError("Dimensions do not fit")
was_Qobj = True
dims = vec.dims
vec = vec.full().ravel()
elif not isinstance(vec, np.ndarray):
raise TypeError("The vector must be an array or Qobj")
if vec.ndim != 1:
raise ValueError(f"The vector must be 1d, but is {vec.ndim}d")
if vec.shape[0] != self.cte.shape[1]:
raise ValueError("The lengths do not match")
if self.compiled:
out = self.compiled_qobjevo.mul_vec(t, vec)
else:
self._dynamics_args_update(t, vec)
out = spmv(self.__call__(t, data=True), vec)
if was_Qobj:
return Qobj(out, dims=dims)
else:
return out
[docs] def mul_mat(self, t, mat):
"""
Multiply this object evaluated at time `t` by a matrix (from the
right).
Parameters
----------
t : float
The time to evaluate this object at.
mat : Qobj or np.ndarray
The matrix that is multiplied by this object.
Returns
-------
mat: Qobj or np.ndarray
The matrix result in the same type as the input.
"""
was_Qobj = False
if not isinstance(t, (int, float)):
raise TypeError("Time must be a real scalar")
if isinstance(mat, Qobj):
if self.cte.dims[1] != mat.dims[0]:
raise ValueError("Dimensions do not fit")
was_Qobj = True
dims = mat.dims
mat = mat.full()
if not isinstance(mat, np.ndarray):
raise TypeError("The vector must be an array or Qobj")
if mat.ndim != 2:
raise ValueError(f"The matrix must be 2d, but is {mat.ndim}d")
if mat.shape[0] != self.cte.shape[1]:
raise ValueError("The lengths do not match")
if self.compiled:
out = self.compiled_qobjevo.mul_mat(t, mat)
else:
self._dynamics_args_update(t, mat)
out = self.__call__(t, data=True) * mat
if was_Qobj:
return Qobj(out, dims=dims)
else:
return out
[docs] def compile(self, code=False, matched=False, dense=False, omp=0):
"""
Create an associated Cython object for faster usage. This function is
called automatically by the solvers.
Parameters
----------
code : bool, default False
Return the code string generated by compilation of any strings.
matched : bool, default False
If True, the underlying sparse matrices used to represent each
element of the type will have their structures unified. This may
include adding explicit zeros to sparse matrices, but can be faster
in some cases due to not having to deal with repeated structural
mismatches.
dense : bool, default False
Whether to swap to using dense matrices to back the data.
omp : int, optional
The number of OpenMP threads to use when doing matrix
multiplications, if QuTiP was compiled with OpenMP.
Returns
-------
compiled_str : str
(Only if `code` was set to True). The code-generated string of
compiled calling code.
"""
self.tidyup()
Code = None
if self.compiled:
return
for _, _, op in self.dynamics_args:
if isinstance(op, QobjEvo):
op.compile(code, matched, dense, omp)
if not qset.has_openmp:
omp = 0
if omp:
nnz = [self.cte.data.nnz]
for part in self.ops:
nnz += [part.qobj.data.nnz]
if all(qset.openmp_thresh < nz for nz in nnz):
omp = 0
if self.const:
if dense:
self.compiled_qobjevo = CQobjCteDense()
self.compiled = "dense single cte"
elif omp:
self.compiled_qobjevo = CQobjCteOmp()
self.compiled = "csr omp cte"
self.compiled_qobjevo.set_threads(omp)
self.omp = omp
else:
self.compiled_qobjevo = CQobjCte()
self.compiled = "csr single cte"
self.compiled_qobjevo.set_data(self.cte)
else:
if matched:
if omp:
self.compiled_qobjevo = CQobjEvoTdMatchedOmp()
self.compiled = "matched omp "
self.compiled_qobjevo.set_threads(omp)
self.omp = omp
else:
self.compiled_qobjevo = CQobjEvoTdMatched()
self.compiled = "matched single "
elif dense:
self.compiled_qobjevo = CQobjEvoTdDense()
self.compiled = "dense single "
elif omp:
self.compiled_qobjevo = CQobjEvoTdOmp()
self.compiled = "csr omp "
self.compiled_qobjevo.set_threads(omp)
self.omp = omp
else:
self.compiled_qobjevo = CQobjEvoTd()
self.compiled = "csr single "
self.compiled_qobjevo.set_data(self.cte, self.ops)
self.compiled_qobjevo.has_dyn_args(bool(self.dynamics_args))
if self.type in ["func"]:
# funclist = []
# for part in self.ops:
# funclist.append(part.get_coeff)
funclist = [part.get_coeff for part in self.ops]
self.coeff_get = _UnitedFuncCaller(funclist, self.args,
self.dynamics_args, self.cte)
self.compiled += "pyfunc"
self.compiled_qobjevo.set_factor(func=self.coeff_get)
elif self.type in ["mixed_callable"] and self.use_cython:
funclist = []
for part in self.ops:
if isinstance(part.get_coeff, _StrWrapper):
get_coeff, file_ = _compile_str_single(
part.coeff,
self.args)
coeff_files.add(file_)
self.coeff_files.append(file_)
funclist.append(get_coeff)
else:
funclist.append(part.get_coeff)
self.coeff_get = _UnitedFuncCaller(funclist, self.args,
self.dynamics_args,
self.cte)
self.compiled += "pyfunc"
self.compiled_qobjevo.set_factor(func=self.coeff_get)
elif self.type in ["mixed_callable"]:
funclist = [part.get_coeff for part in self.ops]
_UnitedStrCaller, Code, file_ = _compiled_coeffs_python(
self.ops,
self.args,
self.dynamics_args,
self.tlist)
coeff_files.add(file_)
self.coeff_files.append(file_)
self.coeff_get = _UnitedStrCaller(funclist, self.args,
self.dynamics_args,
self.cte)
self.compiled_qobjevo.set_factor(func=self.coeff_get)
self.compiled += "pyfunc"
elif self.type in ["string", "mixed_compilable"]:
if self.use_cython:
# All factor can be compiled
self.coeff_get, Code, file_ = _compiled_coeffs(
self.ops,
self.args,
self.dynamics_args,
self.tlist)
coeff_files.add(file_)
self.coeff_files.append(file_)
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
self.compiled += "cyfactor"
else:
# All factor can be compiled
_UnitedStrCaller, Code, file_ = _compiled_coeffs_python(
self.ops,
self.args,
self.dynamics_args,
self.tlist)
coeff_files.add(file_)
self.coeff_files.append(file_)
funclist = [part.get_coeff for part in self.ops]
self.coeff_get = _UnitedStrCaller(funclist, self.args,
self.dynamics_args,
self.cte)
self.compiled_qobjevo.set_factor(func=self.coeff_get)
self.compiled += "pyfunc"
elif self.type == "array":
try:
use_step_func = self.args["_step_func_coeff"]
except KeyError:
use_step_func = 0
if np.allclose(np.diff(self.tlist),
self.tlist[1] - self.tlist[0]):
if use_step_func:
self.coeff_get = StepCoeffCte(
self.ops, None, self.tlist)
else:
self.coeff_get = InterCoeffCte(
self.ops, None, self.tlist)
else:
if use_step_func:
self.coeff_get = StepCoeffT(
self.ops, None, self.tlist)
else:
self.coeff_get = InterCoeffT(
self.ops, None, self.tlist)
self.compiled += "cyfactor"
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
elif self.type == "spline":
self.coeff_get = InterpolateCoeff(self.ops, None, None)
self.compiled += "cyfactor"
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
else:
pass
coeff_files.clean()
if code:
return Code
def _get_coeff(self, t):
out = []
for part in self.ops:
out.append(part.get_coeff(t, self.args))
return out
def __getstate__(self):
_dict_ = {key: self.__dict__[key]
for key in self.__dict__ if key != "compiled_qobjevo"}
if self.compiled:
return (_dict_, self.compiled_qobjevo.__getstate__())
else:
return (_dict_,)
def __setstate__(self, state):
self.__dict__ = state[0]
self.compiled_qobjevo = None
if self.compiled:
mat_type, threading, td = self.compiled.split()
if mat_type == "csr":
if self.safePickle:
# __getstate__ and __setstate__ of compiled_qobjevo pass pointers
# In 'safe' mod, these pointers are not used.
if td == "cte":
if threading == "single":
self.compiled_qobjevo = CQobjCte()
self.compiled_qobjevo.set_data(self.cte)
elif threading == "omp":
self.compiled_qobjevo = CQobjCteOmp()
self.compiled_qobjevo.set_data(self.cte)
self.compiled_qobjevo.set_threads(self.omp)
else:
# time dependence is pyfunc or cyfactor
if threading == "single":
self.compiled_qobjevo = CQobjEvoTd()
self.compiled_qobjevo.set_data(self.cte, self.ops)
elif threading == "omp":
self.compiled_qobjevo = CQobjEvoTdOmp()
self.compiled_qobjevo.set_data(self.cte, self.ops)
self.compiled_qobjevo.set_threads(self.omp)
if td == "pyfunc":
self.compiled_qobjevo.set_factor(func=self.coeff_get)
elif td == "cyfactor":
self.compiled_qobjevo.set_factor(obj=self.coeff_get)
else:
if td == "cte":
if threading == "single":
self.compiled_qobjevo = CQobjCte.__new__(CQobjCte)
elif threading == "omp":
self.compiled_qobjevo = CQobjCteOmp.__new__(CQobjCteOmp)
self.compiled_qobjevo.set_threads(self.omp)
else:
# time dependence is pyfunc or cyfactor
if threading == "single":
self.compiled_qobjevo = CQobjEvoTd.__new__(CQobjEvoTd)
elif threading == "omp":
self.compiled_qobjevo = CQobjEvoTdOmp.__new__(CQobjEvoTdOmp)
self.compiled_qobjevo.set_threads(self.omp)
self.compiled_qobjevo.__setstate__(state[1])
elif mat_type == "dense":
if td == "cte":
self.compiled_qobjevo = \
CQobjCteDense.__new__(CQobjCteDense)
else:
CQobjEvoTdDense.__new__(CQobjEvoTdDense)
self.compiled_qobjevo.__setstate__(state[1])
elif mat_type == "matched":
if threading == "single":
self.compiled_qobjevo = \
CQobjEvoTdMatched.__new__(CQobjEvoTdMatched)
elif threading == "omp":
self.compiled_qobjevo = \
CQobjEvoTdMatchedOmp.__new__(CQobjEvoTdMatchedOmp)
self.compiled_qobjevo.set_threads(self.omp)
self.compiled_qobjevo.__setstate__(state[1])
# Function defined inside another function cannot be pickled,
# Using class instead
class _UnitedFuncCaller:
def __init__(self, funclist, args, dynamics_args, cte):
self.funclist = funclist
self.args = args
self.dynamics_args = dynamics_args
self.dims = cte.dims
self.shape = cte.shape
def set_args(self, args, dynamics_args):
self.args = args
self.dynamics_args = dynamics_args
def dyn_args(self, t, state, shape):
# 1d array are to F ordered
mat = state.reshape(shape, order="F")
for name, what, op in self.dynamics_args:
if what == "vec":
self.args[name] = state
elif what == "mat":
self.args[name] = mat
elif what == "Qobj":
if self.shape[1] == shape[1]: # oper
self.args[name] = Qobj(mat, dims=self.dims)
elif shape[1] == 1: # ket
self.args[name] = Qobj(mat, dims=[self.dims[1],[1]])
else: # rho
self.args[name] = Qobj(mat, dims=self.dims[1])
elif what == "expect":
if shape[1] == op.cte.shape[1]: # same shape as object
self.args[name] = op.mul_mat(t, mat).trace()
else:
self.args[name] = op.expect(t, state)
def __call__(self, t, args={}):
if args:
now_args = self.args.copy()
now_args.update(args)
else:
now_args = self.args
out = []
for func in self.funclist:
out.append(func(t, now_args))
return out
def get_args(self):
return self.args
class _Norm2():
def __init__(self, f):
self.func = f
def __call__(self, t, args):
return self.func(t, args)*np.conj(self.func(t, args))
class _Shift():
def __init__(self, f):
self.func = f
def __call__(self, t, args):
return np.conj(self.func(t + args["_t0"], args))
class _Conj():
def __init__(self, f):
self.func = f
def __call__(self, t, args):
return np.conj(self.func(t, args))
class _Prod():
def __init__(self, f, g):
self.func_1 = f
self.func_2 = g
def __call__(self, t, args):
return self.func_1(t, args)*self.func_2(t, args)
class _Add():
def __init__(self, fs):
self.funcs = fs
def __call__(self, t, args):
return np.sum([f(t, args) for f in self.funcs])