Source code for qutip.solver.result

""" Class for solve function results"""

from typing import TypedDict
import numpy as np
from ..core import Qobj, QobjEvo, expect, isket, ket2dm, qzero_like

__all__ = [
    "Result",
    "MultiTrajResult",
    "McResult",
    "NmmcResult",
    "McTrajectoryResult",
    "McResultImprovedSampling",
]


class _QobjExpectEop:
    """
    Pickable e_ops callable that calculates the expectation value for a given
    operator.

    Parameters
    ----------
    op : :obj:`.Qobj`
        The expectation value operator.
    """

    def __init__(self, op):
        self.op = op

    def __call__(self, t, state):
        return expect(self.op, state)


class ExpectOp:
    """
    A result e_op (expectation operation).

    Parameters
    ----------
    op : object
        The original object used to define the e_op operation, e.g. a
        :~obj:`Qobj` or a function ``f(t, state)``.

    f : function
        A callable ``f(t, state)`` that will return the value of the e_op
        for the specified state and time.

    append : function
        A callable ``append(value)``, e.g. ``expect[k].append``, that will
        store the result of the e_ops function ``f(t, state)``.

    Attributes
    ----------
    op : object
        The original object used to define the e_op operation.
    """

    def __init__(self, op, f, append):
        self.op = op
        self._f = f
        self._append = append

    def __call__(self, t, state):
        """
        Return the expectation value for the given time, ``t`` and
        state, ``state``.
        """
        return self._f(t, state)

    def _store(self, t, state):
        """
        Store the result of the e_op function. Should only be called by
        :class:`~Result`.
        """
        self._append(self._f(t, state))


class _BaseResult:
    """
    Common method for all ``Result``.
    """

    def __init__(self, options, *, solver=None, stats=None):
        self.solver = solver
        if stats is None:
            stats = {}
        self.stats = stats

        self._state_processors = []
        self._state_processors_require_copy = False

        # make sure not to store a reference to the solver
        options_copy = options.copy()
        if hasattr(options_copy, "_feedback"):
            options_copy._feedback = None
        self.options = options_copy

    def _e_ops_to_dict(self, e_ops):
        """Convert the supplied e_ops to a dictionary of Eop instances."""
        if e_ops is None:
            e_ops = {}
        elif isinstance(e_ops, (list, tuple)):
            e_ops = {k: e_op for k, e_op in enumerate(e_ops)}
        elif isinstance(e_ops, dict):
            pass
        else:
            e_ops = {0: e_ops}
        return e_ops

    def add_processor(self, f, requires_copy=False):
        """
        Append a processor ``f`` to the list of state processors.

        Parameters
        ----------
        f : function, ``f(t, state)``
            A function to be called each time a state is added to this
            result object. The state is the state passed to ``.add``, after
            applying the pre-processors, if any.

        requires_copy : bool, default False
            Whether this processor requires a copy of the state rather than
            a reference. A processor must never modify the supplied state, but
            if a processor stores the state it should set ``require_copy`` to
            true.
        """
        self._state_processors.append(f)
        self._state_processors_require_copy |= requires_copy


class ResultOptions(TypedDict):
    store_states: bool
    store_final_state: bool


[docs]class Result(_BaseResult): """ Base class for storing solver results. Parameters ---------- e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these The ``e_ops`` parameter defines the set of values to record at each time step ``t``. If an element is a :obj:`.Qobj` or :obj:`.QobjEvo` the value recorded is the expectation value of that operator given the state at ``t``. If the element is a function, ``f``, the value recorded is ``f(t, state)``. The values are recorded in the ``e_data`` and ``expect`` attributes of this result object. ``e_data`` is a dictionary and ``expect`` is a list, where each item contains the values of the corresponding ``e_op``. options : dict The options for this result class. solver : str or None The name of the solver generating these results. stats : dict or None The stats generated by the solver while producing these results. Note that the solver may update the stats directly while producing results. kw : dict Additional parameters specific to a result sub-class. Attributes ---------- times : list A list of the times at which the expectation values and states were recorded. states : list of :obj:`.Qobj` The state at each time ``t`` (if the recording of the state was requested). final_state : :obj:`.Qobj`: The final state (if the recording of the final state was requested). expect : list of arrays of expectation values A list containing the values of each ``e_op``. The list is in the same order in which the ``e_ops`` were supplied and empty if no ``e_ops`` were given. Each element is itself a list and contains the values of the corresponding ``e_op``, with one value for each time in ``.times``. The same lists of values may be accessed via the ``.e_data`` dictionary and the original ``e_ops`` are available via the ``.e_ops`` attribute. e_data : dict A dictionary containing the values of each ``e_op``. If the ``e_ops`` were supplied as a dictionary, the keys are the same as in that dictionary. Otherwise the keys are the index of the ``e_op`` in the ``.expect`` list. The lists of expectation values returned are the *same* lists as those returned by ``.expect``. e_ops : dict A dictionary containing the supplied e_ops as ``ExpectOp`` instances. The keys of the dictionary are the same as for ``.e_data``. Each value is object where ``.e_ops[k](t, state)`` calculates the value of ``e_op`` ``k`` at time ``t`` and the given ``state``, and ``.e_ops[k].op`` is the original object supplied to create the ``e_op``. solver : str or None The name of the solver generating these results. stats : dict or None The stats generated by the solver while producing these results. options : dict The options for this result class. """ options: ResultOptions def __init__( self, e_ops, options: ResultOptions, *, solver=None, stats=None, **kw, ): super().__init__(options, solver=solver, stats=stats) raw_ops = self._e_ops_to_dict(e_ops) self.e_data = {k: [] for k in raw_ops} self.e_ops = {} for k, op in raw_ops.items(): f = self._e_op_func(op) self.e_ops[k] = ExpectOp(op, f, self.e_data[k].append) self.add_processor(self.e_ops[k]._store) self.times = [] self.states = [] self._final_state = None self._post_init(**kw) def _e_op_func(self, e_op): """ Convert an e_op entry into a function, ``f(t, state)`` that returns the appropriate value (usually an expectation value). Sub-classes may override this function to calculate expectation values in different ways. """ if isinstance(e_op, Qobj): return _QobjExpectEop(e_op) elif isinstance(e_op, QobjEvo): return e_op.expect elif callable(e_op): return e_op raise TypeError(f"{e_op!r} has unsupported type {type(e_op)!r}.") def _post_init(self): """ Perform post __init__ initialisation. In particular, add state processors or pre-processors. Sub-class may override this. If the sub-class wishes to register the default processors for storing states, it should call this parent ``.post_init()`` method. Sub-class ``.post_init()`` implementation may take additional keyword arguments if required. """ store_states = self.options["store_states"] store_states = store_states or ( len(self.e_ops) == 0 and store_states is None ) if store_states: self.add_processor(self._store_state, requires_copy=True) store_final_state = self.options["store_final_state"] if store_final_state and not store_states: self.add_processor(self._store_final_state, requires_copy=True) def _store_state(self, t, state): """Processor that stores a state in ``.states``.""" self.states.append(state) def _store_final_state(self, t, state): """Processor that writes the state to ``._final_state``.""" self._final_state = state def _pre_copy(self, state): """Return a copy of the state. Sub-classes may override this to copy a state in different manner or to skip making a copy altogether if a copy is not necessary. """ return state.copy() def add(self, t, state): """ Add a state to the results for the time ``t`` of the evolution. Adding a state calculates the expectation value of the state for each of the supplied ``e_ops`` and stores the result in ``.expect``. The state is recorded in ``.states`` and ``.final_state`` if specified by the supplied result options. Parameters ---------- t : float The time of the added state. state : typically a :obj:`.Qobj` The state a time ``t``. Usually this is a :obj:`.Qobj` with suitable dimensions, but it sub-classes of result might support other forms of the state. Notes ----- The expectation values, i.e. ``e_ops``, and states are recorded by the state processors (see ``.add_processor``). Additional processors may be added by sub-classes. """ self.times.append(t) if self._state_processors_require_copy: state = self._pre_copy(state) for op in self._state_processors: op(t, state) def __repr__(self): lines = [ f"<{self.__class__.__name__}", f" Solver: {self.solver}", ] if self.stats: lines.append(" Solver stats:") lines.extend(f" {k}: {v!r}" for k, v in self.stats.items()) if self.times: lines.append( f" Time interval: [{self.times[0]}, {self.times[-1]}]" f" ({len(self.times)} steps)" ) lines.append(f" Number of e_ops: {len(self.e_ops)}") if self.states: lines.append(" States saved.") elif self.final_state is not None: lines.append(" Final state saved.") else: lines.append(" State not saved.") lines.append(">") return "\n".join(lines) @property def expect(self): return [np.array(e_op) for e_op in self.e_data.values()] @property def final_state(self): if self._final_state is not None: return self._final_state if self.states: return self.states[-1] return None
class MultiTrajResultOptions(TypedDict): store_states: bool store_final_state: bool keep_runs_results: bool
[docs]class MultiTrajResult(_BaseResult): """ Base class for storing results for solver using multiple trajectories. Parameters ---------- e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these The ``e_ops`` parameter defines the set of values to record at each time step ``t``. If an element is a :obj:`.Qobj` or :obj:`.QobjEvo` the value recorded is the expectation value of that operator given the state at ``t``. If the element is a function, ``f``, the value recorded is ``f(t, state)``. The values are recorded in the ``.expect`` attribute of this result object. ``.expect`` is a list, where each item contains the values of the corresponding ``e_op``. Function ``e_ops`` must return a number so the average can be computed. options : dict The options for this result class. solver : str or None The name of the solver generating these results. stats : dict or None The stats generated by the solver while producing these results. Note that the solver may update the stats directly while producing results. kw : dict Additional parameters specific to a result sub-class. Attributes ---------- times : list A list of the times at which the expectation values and states were recorded. average_states : list of :obj:`.Qobj` The state at each time ``t`` (if the recording of the state was requested) averaged over all trajectories as a density matrix. runs_states : list of list of :obj:`.Qobj` The state for each trajectory and each time ``t`` (if the recording of the states and trajectories was requested) final_state : :obj:`.Qobj`: The final state (if the recording of the final state was requested) averaged over all trajectories as a density matrix. runs_final_state : list of :obj:`.Qobj` The final state for each trajectory (if the recording of the final state and trajectories was requested). average_expect : list of array of expectation values A list containing the values of each ``e_op`` averaged over each trajectories. The list is in the same order in which the ``e_ops`` were supplied and empty if no ``e_ops`` were given. Each element is itself an array and contains the values of the corresponding ``e_op``, with one value for each time in ``.times``. std_expect : list of array of expectation values A list containing the standard derivation of each ``e_op`` over each trajectories. The list is in the same order in which the ``e_ops`` were supplied and empty if no ``e_ops`` were given. Each element is itself an array and contains the values of the corresponding ``e_op``, with one value for each time in ``.times``. runs_expect : list of array of expectation values A list containing the values of each ``e_op`` for each trajectories. The list is in the same order in which the ``e_ops`` were supplied and empty if no ``e_ops`` were given. Only available if the storing of trajectories was requested. The order of the elements is ``runs_expect[e_ops][trajectory][time]``. Each element is itself an array and contains the values of the corresponding ``e_op``, with one value for each time in ``.times``. average_e_data : dict A dictionary containing the values of each ``e_op`` averaged over each trajectories. If the ``e_ops`` were supplied as a dictionary, the keys are the same as in that dictionary. Otherwise the keys are the index of the ``e_op`` in the ``.expect`` list. The lists of expectation values returned are the *same* lists as those returned by ``.expect``. average_e_data : dict A dictionary containing the standard derivation of each ``e_op`` over each trajectories. If the ``e_ops`` were supplied as a dictionary, the keys are the same as in that dictionary. Otherwise the keys are the index of the ``e_op`` in the ``.expect`` list. The lists of expectation values returned are the *same* lists as those returned by ``.expect``. runs_e_data : dict A dictionary containing the values of each ``e_op`` for each trajectories. If the ``e_ops`` were supplied as a dictionary, the keys are the same as in that dictionary. Otherwise the keys are the index of the ``e_op`` in the ``.expect`` list. Only available if the storing of trajectories was requested. The order of the elements is ``runs_expect[e_ops][trajectory][time]``. The lists of expectation values returned are the *same* lists as those returned by ``.expect``. solver : str or None The name of the solver generating these results. stats : dict or None The stats generated by the solver while producing these results. options : :obj:`~SolverResultsOptions` The options for this result class. """ options: MultiTrajResultOptions def __init__( self, e_ops, options: MultiTrajResultOptions, *, solver=None, stats=None, **kw, ): super().__init__(options, solver=solver, stats=stats) self._raw_ops = self._e_ops_to_dict(e_ops) self.times = [] self.trajectories = [] self.num_trajectories = 0 self.seeds = [] self._sum_states = None self._sum_final_states = None self._sum_expect = None self._sum2_expect = None self._target_tols = None self.average_e_data = {} self.std_e_data = {} self.runs_e_data = {} self._post_init(**kw) @property def _store_average_density_matricies(self) -> bool: return ( self.options["store_states"] or (self.options["store_states"] is None and self._raw_ops == {}) ) and not self.options["keep_runs_results"] @property def _store_final_density_matrix(self) -> bool: return ( self.options["store_final_state"] and not self._store_average_density_matricies and not self.options["keep_runs_results"] ) @staticmethod def _to_dm(state): if state.type == "ket": state = state.proj() return state def _add_first_traj(self, trajectory): """ Read the first trajectory, intitializing needed data. """ self.times = trajectory.times if trajectory.states and self._store_average_density_matricies: self._sum_states = [ qzero_like(self._to_dm(state)) for state in trajectory.states ] if trajectory.final_state and self._store_final_density_matrix: state = trajectory.final_state self._sum_final_states = qzero_like(self._to_dm(state)) self._sum_expect = [ np.zeros_like(expect) for expect in trajectory.expect ] self._sum2_expect = [ np.zeros_like(expect) for expect in trajectory.expect ] self.e_ops = trajectory.e_ops self.average_e_data = { k: list(avg_expect) for k, avg_expect in zip(self._raw_ops, self._sum_expect) } if self.options["keep_runs_results"]: self.runs_e_data = {k: [] for k in self._raw_ops} def _store_trajectory(self, trajectory): self.trajectories.append(trajectory) def _reduce_states(self, trajectory): self._sum_states = [ accu + self._to_dm(state) for accu, state in zip(self._sum_states, trajectory.states) ] def _reduce_final_state(self, trajectory): self._sum_final_states += self._to_dm(trajectory.final_state) def _reduce_expect(self, trajectory): """ Compute the average of the expectation values and store it in it's multiple formats. """ for i, k in enumerate(self._raw_ops): expect_traj = trajectory.expect[i] self._sum_expect[i] += expect_traj self._sum2_expect[i] += expect_traj**2 avg = self._sum_expect[i] / self.num_trajectories avg2 = self._sum2_expect[i] / self.num_trajectories self.average_e_data[k] = list(avg) # mean(expect**2) - mean(expect)**2 can something be very small # negative (-1e-15) which raise an error for float sqrt. self.std_e_data[k] = list(np.sqrt(np.abs(avg2 - np.abs(avg**2)))) if self.runs_e_data: self.runs_e_data[k].append(trajectory.e_data[k]) def _increment_traj(self, trajectory): if self.num_trajectories == 0: self._add_first_traj(trajectory) self.num_trajectories += 1 def _no_end(self): """ Remaining number of trajectories needed to finish cannot be determined by this object. """ return np.inf def _fixed_end(self): """ Finish at a known number of trajectories. """ ntraj_left = self._target_ntraj - self.num_trajectories if ntraj_left == 0: self.stats["end_condition"] = "ntraj reached" return ntraj_left def _average_computer(self): avg = np.array(self._sum_expect) / self.num_trajectories avg2 = np.array(self._sum2_expect) / self.num_trajectories return avg, avg2 def _target_tolerance_end(self): """ Compute the error on the expectation values using jackknife resampling. Return the approximate number of trajectories needed to have this error within the tolerance fot all e_ops and times. """ if self.num_trajectories <= 1: return np.inf avg, avg2 = self._average_computer() target = np.array( [ atol + rtol * mean for mean, (atol, rtol) in zip(avg, self._target_tols) ] ) target_ntraj = np.max((avg2 - abs(avg) ** 2) / target**2 + 1) self._estimated_ntraj = min(target_ntraj, self._target_ntraj) if (self._estimated_ntraj - self.num_trajectories) <= 0: self.stats["end_condition"] = "target tolerance reached" return self._estimated_ntraj - self.num_trajectories def _post_init(self): self.num_trajectories = 0 self._target_ntraj = None self.add_processor(self._increment_traj) store_trajectory = self.options["keep_runs_results"] if store_trajectory: self.add_processor(self._store_trajectory) if self._store_average_density_matricies: self.add_processor(self._reduce_states) if self._store_final_density_matrix: self.add_processor(self._reduce_final_state) if self._raw_ops: self.add_processor(self._reduce_expect) self._early_finish_check = self._no_end self.stats["end_condition"] = "unknown" def add(self, trajectory_info): """ Add a trajectory to the evolution. Trajectories can be saved or average canbe extracted depending on the options ``keep_runs_results``. Parameters ---------- trajectory_info : tuple of seed and trajectory - seed: int, SeedSequence Seed used to generate the trajectory. - trajectory : :class:`Result` Run result for one evolution over the times. Returns ------- remaing_traj : number Return the number of trajectories still needed to reach the target tolerance. If no tolerance is provided, return infinity. """ seed, trajectory = trajectory_info self.seeds.append(seed) for op in self._state_processors: op(trajectory) return self._early_finish_check() def add_end_condition(self, ntraj, target_tol=None): """ Set the condition to stop the computing trajectories when the certain condition are fullfilled. Supported end condition for multi trajectories computation are: - Reaching a number of trajectories. - Error bar on the expectation values reach smaller than a given tolerance. Parameters ---------- ntraj : int Number of trajectories expected. target_tol : float, array_like, [optional] Target tolerance of the evolution. The evolution will compute trajectories until the error on the expectation values is lower than this tolerance. The error is computed using jackknife resampling. ``target_tol`` can be an absolute tolerance, a pair of absolute and relative tolerance, in that order. Lastly, it can be a list of pairs of (atol, rtol) for each e_ops. Error estimation is done with jackknife resampling. """ self._target_ntraj = ntraj self.stats["end_condition"] = "timeout" if target_tol is None: self._early_finish_check = self._fixed_end return num_e_ops = len(self._raw_ops) if not num_e_ops: raise ValueError("Cannot target a tolerance without e_ops") self._estimated_ntraj = ntraj targets = np.array(target_tol) if targets.ndim == 0: self._target_tols = np.array([(target_tol, 0.0)] * num_e_ops) elif targets.shape == (2,): self._target_tols = np.ones((num_e_ops, 2)) * targets elif targets.shape == (num_e_ops, 2): self._target_tols = targets else: raise ValueError( "target_tol must be a number, a pair of (atol, " "rtol) or a list of (atol, rtol) for each e_ops" ) self._early_finish_check = self._target_tolerance_end @property def runs_states(self): """ States of every runs as ``states[run][t]``. """ if self.trajectories and self.trajectories[0].states: return [traj.states for traj in self.trajectories] else: return None @property def average_states(self): """ States averages as density matrices. """ if self._sum_states is None: if not (self.trajectories and self.trajectories[0].states): return None self._sum_states = [ qzero_like(self._to_dm(state)) for state in self.trajectories[0].states ] for trajectory in self.trajectories: self._reduce_states(trajectory) return [ final / self.num_trajectories for final in self._sum_states ] @property def states(self): """ Runs final states if available, average otherwise. """ return self.runs_states or self.average_states @property def runs_final_states(self): """ Last states of each trajectories. """ if self.trajectories and self.trajectories[0].final_state: return [traj.final_state for traj in self.trajectories] else: return None @property def average_final_state(self): """ Last states of each trajectories averaged into a density matrix. """ if self._sum_final_states is None: if self.average_states is not None: return self.average_states[-1] return None return self._sum_final_states / self.num_trajectories @property def final_state(self): """ Runs final states if available, average otherwise. """ return self.runs_final_states or self.average_final_state @property def average_expect(self): return [np.array(val) for val in self.average_e_data.values()] @property def std_expect(self): return [np.array(val) for val in self.std_e_data.values()] @property def runs_expect(self): return [np.array(val) for val in self.runs_e_data.values()] @property def expect(self): return [np.array(val) for val in self.e_data.values()] @property def e_data(self): return self.runs_e_data or self.average_e_data
[docs] def steady_state(self, N=0): """ Average the states of the last ``N`` times of every runs as a density matrix. Should converge to the steady state in the right circumstances. Parameters ---------- N : int [optional] Number of states from the end of ``tlist`` to average. Per default all states will be averaged. """ N = int(N) or len(self.times) N = len(self.times) if N > len(self.times) else N states = self.average_states if states is not None: return sum(states[-N:]) / N else: return None
def __repr__(self): lines = [ f"<{self.__class__.__name__}", f" Solver: {self.solver}", ] if self.stats: lines.append(" Solver stats:") lines.extend(f" {k}: {v!r}" for k, v in self.stats.items()) if self.times: lines.append( f" Time interval: [{self.times[0]}, {self.times[-1]}]" f" ({len(self.times)} steps)" ) lines.append(f" Number of e_ops: {len(self.e_data)}") if self.states: lines.append(" States saved.") elif self.final_state is not None: lines.append(" Final state saved.") else: lines.append(" State not saved.") lines.append(f" Number of trajectories: {self.num_trajectories}") if self.trajectories: lines.append(" Trajectories saved.") else: lines.append(" Trajectories not saved.") lines.append(">") return "\n".join(lines) def __add__(self, other): if not isinstance(other, MultiTrajResult): return NotImplemented if self._raw_ops != other._raw_ops: raise ValueError("Shared `e_ops` is required to merge results") if self.times != other.times: raise ValueError("Shared `times` are is required to merge results") new = self.__class__( self._raw_ops, self.options, solver=self.solver, stats=self.stats ) new.e_ops = self.e_ops if self.trajectories and other.trajectories: new.trajectories = self.trajectories + other.trajectories new.num_trajectories = self.num_trajectories + other.num_trajectories new.times = self.times new.seeds = self.seeds + other.seeds if ( self._sum_states is not None and other._sum_states is not None ): new._sum_states = [ state1 + state2 for state1, state2 in zip( self._sum_states, other._sum_states ) ] if ( self._sum_final_states is not None and other._sum_final_states is not None ): new._sum_final_states = ( self._sum_final_states + other._sum_final_states ) new._target_tols = None new._sum_expect = [] new._sum2_expect = [] new.average_e_data = {} new.std_e_data = {} for i, k in enumerate(self._raw_ops): new._sum_expect.append(self._sum_expect[i] + other._sum_expect[i]) new._sum2_expect.append( self._sum2_expect[i] + other._sum2_expect[i] ) avg = new._sum_expect[i] / new.num_trajectories avg2 = new._sum2_expect[i] / new.num_trajectories new.average_e_data[k] = list(avg) new.std_e_data[k] = np.sqrt(np.abs(avg2 - np.abs(avg**2))) if self.runs_e_data and other.runs_e_data: new.runs_e_data[k] = self.runs_e_data[k] + other.runs_e_data[k] new.stats["run time"] += other.stats["run time"] new.stats["end_condition"] = "Merged results" return new
class McTrajectoryResult(Result): """ Result class used by the :class:`.MCSolver` for single trajectories. """ def __init__(self, e_ops, options, *args, **kwargs): super().__init__( e_ops, {**options, "normalize_output": False}, *args, **kwargs )
[docs]class McResult(MultiTrajResult): """ Class for storing Monte-Carlo solver results. Parameters ---------- e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these The ``e_ops`` parameter defines the set of values to record at each time step ``t``. If an element is a :obj:`.Qobj` or :obj:`.QobjEvo` the value recorded is the expectation value of that operator given the state at ``t``. If the element is a function, ``f``, the value recorded is ``f(t, state)``. The values are recorded in the ``.expect`` attribute of this result object. ``.expect`` is a list, where each item contains the values of the corresponding ``e_op``. options : :obj:`~SolverResultsOptions` The options for this result class. solver : str or None The name of the solver generating these results. stats : dict The stats generated by the solver while producing these results. Note that the solver may update the stats directly while producing results. Must include a value for "num_collapse". kw : dict Additional parameters specific to a result sub-class. Attributes ---------- collapse : list For each runs, a list of every collapse as a tuple of the time it happened and the corresponding ``c_ops`` index. """ # Collapse are only produced by mcsolve. def _add_collapse(self, trajectory): self.collapse.append(trajectory.collapse) def _post_init(self): super()._post_init() self.num_c_ops = self.stats["num_collapse"] self.collapse = [] self.add_processor(self._add_collapse) @property def col_times(self): """ List of the times of the collapses for each runs. """ out = [] for col_ in self.collapse: col = list(zip(*col_)) col = [] if len(col) == 0 else col[0] out.append(col) return out @property def col_which(self): """ List of the indexes of the collapses for each runs. """ out = [] for col_ in self.collapse: col = list(zip(*col_)) col = [] if len(col) == 0 else col[1] out.append(col) return out @property def photocurrent(self): """ Average photocurrent or measurement of the evolution. """ cols = [[] for _ in range(self.num_c_ops)] tlist = self.times for collapses in self.collapse: for t, which in collapses: cols[which].append(t) mesurement = [ np.histogram(cols[i], tlist)[0] / np.diff(tlist) / self.num_trajectories for i in range(self.num_c_ops) ] return mesurement @property def runs_photocurrent(self): """ Photocurrent or measurement of each runs. """ tlist = self.times measurements = [] for collapses in self.collapse: cols = [[] for _ in range(self.num_c_ops)] for t, which in collapses: cols[which].append(t) measurements.append( [ np.histogram(cols[i], tlist)[0] / np.diff(tlist) for i in range(self.num_c_ops) ] ) return measurements
class McResultImprovedSampling(McResult, MultiTrajResult): """ See docstring for McResult and MultiTrajResult for all relevant documentation. This class computes expectation values and sums of states, etc using the improved sampling algorithm, which samples the no-jump trajectory first and then only samples jump trajectories afterwards. """ def __init__(self, e_ops, options, **kw): MultiTrajResult.__init__(self, e_ops=e_ops, options=options, **kw) self._sum_expect_no_jump = None self._sum_expect_jump = None self._sum2_expect_no_jump = None self._sum2_expect_jump = None self._sum_states_no_jump = None self._sum_states_jump = None self._sum_final_states_no_jump = None self._sum_final_states_jump = None self.no_jump_prob = None def _reduce_states(self, trajectory): if self.num_trajectories == 1: self._sum_states_no_jump = [ accu + self._to_dm(state) for accu, state in zip( self._sum_states_no_jump, trajectory.states ) ] else: self._sum_states_jump = [ accu + self._to_dm(state) for accu, state in zip( self._sum_states_jump, trajectory.states ) ] def _reduce_final_state(self, trajectory): dm_final_state = self._to_dm(trajectory.final_state) if self.num_trajectories == 1: self._sum_final_states_no_jump += dm_final_state else: self._sum_final_states_jump += dm_final_state def _average_computer(self): avg = np.array(self._sum_expect_jump) / (self.num_trajectories - 1) avg2 = np.array(self._sum2_expect_jump) / (self.num_trajectories - 1) return avg, avg2 def _add_first_traj(self, trajectory): super()._add_first_traj(trajectory) if trajectory.states and self._store_average_density_matricies: del self._sum_states self._sum_states_no_jump = [ qzero_like(self._to_dm(state)) for state in trajectory.states ] self._sum_states_jump = [ qzero_like(self._to_dm(state)) for state in trajectory.states ] if trajectory.final_state and self._store_final_density_matrix: state = trajectory.final_state del self._sum_final_states self._sum_final_states_no_jump = qzero_like(self._to_dm(state)) self._sum_final_states_jump = qzero_like(self._to_dm(state)) self._sum_expect_jump = [ np.zeros_like(expect) for expect in trajectory.expect ] self._sum2_expect_jump = [ np.zeros_like(expect) for expect in trajectory.expect ] self._sum_expect_no_jump = [ np.zeros_like(expect) for expect in trajectory.expect ] self._sum2_expect_no_jump = [ np.zeros_like(expect) for expect in trajectory.expect ] self._sum_expect_jump = [ np.zeros_like(expect) for expect in trajectory.expect ] self._sum2_expect_jump = [ np.zeros_like(expect) for expect in trajectory.expect ] del self._sum_expect del self._sum2_expect def _reduce_expect(self, trajectory): """ Compute the average of the expectation values appropriately weighting the jump and no-jump trajectories """ for i, k in enumerate(self._raw_ops): expect_traj = trajectory.expect[i] p = self.no_jump_prob if self.num_trajectories == 1: self._sum_expect_no_jump[i] += expect_traj * p self._sum2_expect_no_jump[i] += expect_traj**2 * p # no jump trajectory will always be the first one, no need # to worry about including jump trajectories avg = self._sum_expect_no_jump[i] avg2 = self._sum2_expect_no_jump[i] else: self._sum_expect_jump[i] += expect_traj * (1 - p) self._sum2_expect_jump[i] += expect_traj**2 * (1 - p) avg = self._sum_expect_no_jump[i] + ( self._sum_expect_jump[i] / (self.num_trajectories - 1) ) avg2 = self._sum2_expect_no_jump[i] + ( self._sum2_expect_jump[i] / (self.num_trajectories - 1) ) self.average_e_data[k] = list(avg) # mean(expect**2) - mean(expect)**2 can something be very small # negative (-1e-15) which raise an error for float sqrt. self.std_e_data[k] = list(np.sqrt(np.abs(avg2 - np.abs(avg**2)))) if self.runs_e_data: self.runs_e_data[k].append(trajectory.e_data[k]) @property def average_states(self): """ States averages as density matrices. """ if self._sum_states_no_jump is None: if not (self.trajectories and self.trajectories[0].states): return None self._sum_states_no_jump = [ qzero_like(self._to_dm(state)) for state in self.trajectories[0].states ] self._sum_states_jump = [ qzero_like(self._to_dm(state)) for state in self.trajectories[0].states ] self.num_trajectories = 0 for trajectory in self.trajectories: self.num_trajectories += 1 self._reduce_states(trajectory) p = self.no_jump_prob return [ p * final_no_jump + (1 - p) * final_jump / (self.num_trajectories - 1) for final_no_jump, final_jump in zip( self._sum_states_no_jump, self._sum_states_jump ) ] @property def average_final_state(self): """ Last states of each trajectory averaged into a density matrix. """ if self._sum_final_states_no_jump is None: if self.average_states is not None: return self.average_states[-1] p = self.no_jump_prob return p * self._sum_final_states_no_jump + ( ((1 - p) * self._sum_final_states_jump) / (self.num_trajectories - 1) ) def __add__(self, other): raise NotImplemented @property def photocurrent(self): """ Average photocurrent or measurement of the evolution. """ cols = [[] for _ in range(self.num_c_ops)] tlist = self.times for collapses in self.collapse: for t, which in collapses: cols[which].append(t) mesurement = [ (1 - self.no_jump_prob) / (self.num_trajectories - 1) * np.histogram(cols[i], tlist)[0] / np.diff(tlist) for i in range(self.num_c_ops) ] return mesurement class NmmcTrajectoryResult(McTrajectoryResult): """ Result class used by the :class:`.NonMarkovianMCSolver` for single trajectories. Additionally stores the trace of the state along the trajectory. """ def __init__(self, e_ops, options, *args, **kwargs): self._nm_solver = kwargs.pop("__nm_solver") super().__init__(e_ops, options, *args, **kwargs) self.trace = [] # This gets called during the Monte-Carlo simulation of the associated # completely positive master equation. To obtain the state of the actual # system, we simply multiply the provided state with the current martingale # before storing it / computing expectation values. def add(self, t, state): if isket(state): state = ket2dm(state) mu = self._nm_solver.current_martingale() super().add(t, state * mu) self.trace.append(mu) add.__doc__ = Result.add.__doc__
[docs]class NmmcResult(McResult): """ Class for storing the results of the non-Markovian Monte-Carlo solver. Parameters ---------- e_ops : :obj:`.Qobj`, :obj:`.QobjEvo`, function or list or dict of these The ``e_ops`` parameter defines the set of values to record at each time step ``t``. If an element is a :obj:`.Qobj` or :obj:`.QobjEvo` the value recorded is the expectation value of that operator given the state at ``t``. If the element is a function, ``f``, the value recorded is ``f(t, state)``. The values are recorded in the ``.expect`` attribute of this result object. ``.expect`` is a list, where each item contains the values of the corresponding ``e_op``. options : :obj:`~SolverResultsOptions` The options for this result class. solver : str or None The name of the solver generating these results. stats : dict The stats generated by the solver while producing these results. Note that the solver may update the stats directly while producing results. Must include a value for "num_collapse". kw : dict Additional parameters specific to a result sub-class. Attributes ---------- average_trace : list The average trace (i.e., averaged over all trajectories) at each time. std_trace : list The standard deviation of the trace at each time. runs_trace : list of lists For each recorded trajectory, the trace at each time. Only present if ``keep_runs_results`` is set in the options. """ def _post_init(self): super()._post_init() self._sum_trace = None self._sum2_trace = None self.average_trace = [] self.std_trace = [] self.runs_trace = [] self.add_processor(self._add_trace) def _add_first_traj(self, trajectory): super()._add_first_traj(trajectory) self._sum_trace = np.zeros_like(trajectory.times) self._sum2_trace = np.zeros_like(trajectory.times) def _add_trace(self, trajectory): new_trace = np.array(trajectory.trace) self._sum_trace += new_trace self._sum2_trace += np.abs(new_trace) ** 2 avg = self._sum_trace / self.num_trajectories avg2 = self._sum2_trace / self.num_trajectories self.average_trace = avg self.std_trace = np.sqrt(np.abs(avg2 - np.abs(avg) ** 2)) if self.options["keep_runs_results"]: self.runs_trace.append(trajectory.trace) @property def trace(self): """ Refers to ``average_trace`` or ``runs_trace``, depending on whether ``keep_runs_results`` is set in the options. """ return self.runs_trace or self.average_trace