# -*- coding: utf-8 -*-
# @author: Alexander Pitchford
# @email1: agp1@aber.ac.uk
# @email2: alex.pitchford@gmail.com
# @organization: Aberystwyth University
# @supervisor: Daniel Burgarth
"""
Timeslot Computer
These classes determine which dynamics generators, propagators and evolutions
are recalculated when there is a control amplitude update.
The timeslot computer processes the lists held by the dynamics object
The default (UpdateAll) updates all of these each amp update, on the
assumption that all amplitudes are changed each iteration. This is typical
when using optimisation methods like BFGS in the GRAPE algorithm
The alternative (DynUpdate) assumes that only a subset of amplitudes
are updated each iteration and attempts to minimise the number of expensive
calculations accordingly. This would be the appropriate class for Krotov type
methods. Note that the Stats_DynTsUpdate class must be used for stats
in conjunction with this class.
NOTE: AJGP 2011-10-2014: This _DynUpdate class currently has some bug,
no pressing need to fix it presently
If all amplitudes change at each update, then the behavior of the classes is
equivalent. _UpdateAll is easier to understand and potentially slightly faster
in this situation.
Note the methods in the _DynUpdate class were inspired by:
DYNAMO - Dynamic Framework for Quantum Optimal Control
See Machnes et.al., arXiv.1011.4874
"""
import warnings
import numpy as np
import timeit
# QuTiP
from qutip.qobj import Qobj
# QuTiP control modules
import qutip.control.errors as errors
import qutip.control.dump as qtrldump
# QuTiP logging
import qutip.logging_utils as logging
logger = logging.get_logger()
def _func_deprecation(message, stacklevel=3):
"""
Issue deprecation warning
Using stacklevel=3 will ensure message refers the function
calling with the deprecated parameter,
"""
warnings.warn(message, DeprecationWarning, stacklevel=stacklevel)
[docs]class TimeslotComputer(object):
"""
Base class for all Timeslot Computers
Note: this must be instantiated with a Dynamics object, that is the
container for the data that the methods operate on
Attributes
----------
log_level : integer
level of messaging output from the logger.
Options are attributes of qutip.logging_utils,
in decreasing levels of messaging, are:
DEBUG_INTENSE, DEBUG_VERBOSE, DEBUG, INFO, WARN, ERROR, CRITICAL
Anything WARN or above is effectively 'quiet' execution,
assuming everything runs as expected.
The default NOTSET implies that the level will be taken from
the QuTiP settings file, which by default is WARN
evo_comp_summary : EvoCompSummary
A summary of the most recent evolution computation
Used in the stats and dump
Will be set to None if neither stats or dump are set
"""
def __init__(self, dynamics, params=None):
from qutip.control.dynamics import Dynamics
if not isinstance(dynamics, Dynamics):
raise TypeError("Must instantiate with {} type".format(
Dynamics))
self.parent = dynamics
self.params = params
self.reset()
def reset(self):
self.log_level = self.parent.log_level
self.id_text = 'TS_COMP_BASE'
self.evo_comp_summary = None
[docs] def apply_params(self, params=None):
"""
Set object attributes based on the dictionary (if any) passed in the
instantiation, or passed as a parameter
This is called during the instantiation automatically.
The key value pairs are the attribute name and value
Note: attributes are created if they do not exist already,
and are overwritten if they do.
"""
if not params:
params = self.params
if isinstance(params, dict):
self.params = params
for key in params:
setattr(self, key, params[key])
def flag_all_calc_now(self):
pass
def init_comp(self):
pass
@property
def log_level(self):
return logger.level
@log_level.setter
def log_level(self, lvl):
"""
Set the log_level attribute and set the level of the logger
that is call logger.setLevel(lvl)
"""
logger.setLevel(lvl)
[docs] def dump_current(self):
"""Store a copy of the current time evolution"""
dyn = self.parent
dump = dyn.dump
if not isinstance(dump, qtrldump.DynamicsDump):
raise RuntimeError("Cannot dump current evolution, "
"as dynamics.dump is not set")
anything_dumped = False
item_idx = None
if dump.dump_any:
dump_item = dump.add_evo_dump()
item_idx = dump_item.idx
anything_dumped = True
if dump.dump_summary:
dump.add_evo_comp_summary(dump_item_idx=item_idx)
anything_dumped = True
if not anything_dumped:
logger.warning("Dump set, but nothing dumped, check dump config")
[docs]class TSlotCompUpdateAll(TimeslotComputer):
"""
Timeslot Computer - Update All
Updates all dynamics generators, propagators and evolutions when
ctrl amplitudes are updated
"""
def reset(self):
TimeslotComputer.reset(self)
self.id_text = 'ALL'
self.apply_params()
[docs] def compare_amps(self, new_amps):
"""
Determine if any amplitudes have changed. If so, then mark the
timeslots as needing recalculation
Returns: True if amplitudes are the same, False if they have changed
"""
changed = False
dyn = self.parent
if (dyn.stats or dyn.dump):
if self.evo_comp_summary:
self.evo_comp_summary.reset()
else:
self.evo_comp_summary = EvoCompSummary()
ecs = self.evo_comp_summary
if dyn.ctrl_amps is None:
# Flag fidelity and gradients as needing recalculation
changed = True
if ecs:
ecs.num_amps_changed = len(new_amps.flat)
ecs.num_timeslots_changed = new_amps.shape[0]
else:
# create boolean array with same shape as ctrl_amps
# True where value in new_amps differs, otherwise false
changed_amps = dyn.ctrl_amps != new_amps
if np.any(changed_amps):
# Flag fidelity and gradients as needing recalculation
changed = True
if self.log_level <= logging.DEBUG:
logger.debug("{} amplitudes changed".format(
changed_amps.sum()))
if ecs:
ecs.num_amps_changed = changed_amps.sum()
ecs.num_timeslots_changed = np.any(changed_amps, 1).sum()
else:
if self.log_level <= logging.DEBUG:
logger.debug("No amplitudes changed")
# *** update stats ***
if dyn.stats:
dyn.stats.num_ctrl_amp_updates += bool(ecs.num_amps_changed)
dyn.stats.num_ctrl_amp_changes += ecs.num_amps_changed
dyn.stats.num_timeslot_changes += ecs.num_timeslots_changed
if changed:
dyn.ctrl_amps = new_amps
dyn.flag_system_changed()
return False
else:
return True
[docs] def recompute_evolution(self):
"""
Recalculates the evolution operators.
Dynamics generators (e.g. Hamiltonian) and
prop (propagators) are calculated as necessary
"""
dyn = self.parent
prop_comp = dyn.prop_computer
n_ts = dyn.num_tslots
n_ctrls = dyn.num_ctrls
# Clear the public lists
# These are only set if (external) users access them
dyn._dyn_gen_qobj = None
dyn._prop_qobj = None
dyn._prop_grad_qobj = None
dyn._fwd_evo_qobj = None
dyn._onwd_evo_qobj = None
dyn._onto_evo_qobj = None
if (dyn.stats or dyn.dump) and not self.evo_comp_summary:
self.evo_comp_summary = EvoCompSummary()
ecs = self.evo_comp_summary
if dyn.stats is not None:
dyn.stats.num_tslot_recompute += 1
if self.log_level <= logging.DEBUG:
logger.log(logging.DEBUG, "recomputing evolution {} "
"(UpdateAll)".format(
dyn.stats.num_tslot_recompute))
# calculate the Hamiltonians
if ecs: time_start = timeit.default_timer()
for k in range(n_ts):
dyn._combine_dyn_gen(k)
if dyn._decomp_curr is not None:
dyn._decomp_curr[k] = False
if ecs:
ecs.wall_time_dyn_gen_compute = \
timeit.default_timer() - time_start
# calculate the propagators and the propagotor gradients
if ecs: time_start = timeit.default_timer()
for k in range(n_ts):
if prop_comp.grad_exact and dyn.cache_prop_grad:
for j in range(n_ctrls):
if j == 0:
dyn._prop[k], dyn._prop_grad[k, j] = \
prop_comp._compute_prop_grad(k, j)
if self.log_level <= logging.DEBUG_INTENSE:
logger.log(logging.DEBUG_INTENSE,
"propagator {}:\n{:10.3g}".format(
k, self._prop[k]))
else:
dyn._prop_grad[k, j] = \
prop_comp._compute_prop_grad(k, j,
compute_prop=False)
else:
dyn._prop[k] = prop_comp._compute_propagator(k)
if ecs:
ecs.wall_time_prop_compute = \
timeit.default_timer() - time_start
if ecs: time_start = timeit.default_timer()
# compute the forward propagation
R = range(n_ts)
for k in R:
if dyn.oper_dtype == Qobj:
dyn._fwd_evo[k+1] = dyn._prop[k]*dyn._fwd_evo[k]
else:
dyn._fwd_evo[k+1] = dyn._prop[k].dot(dyn._fwd_evo[k])
if ecs:
ecs.wall_time_fwd_prop_compute = \
timeit.default_timer() - time_start
time_start = timeit.default_timer()
# compute the onward propagation
if dyn.fid_computer.uses_onwd_evo:
dyn._onwd_evo[n_ts-1] = dyn._prop[n_ts-1]
R = range(n_ts-2, -1, -1)
for k in R:
if dyn.oper_dtype == Qobj:
dyn._onwd_evo[k] = dyn._onwd_evo[k+1]*dyn._prop[k]
else:
dyn._onwd_evo[k] = dyn._onwd_evo[k+1].dot(dyn._prop[k])
if dyn.fid_computer.uses_onto_evo:
#R = range(n_ts-1, -1, -1)
R = range(n_ts-1, -1, -1)
for k in R:
if dyn.oper_dtype == Qobj:
dyn._onto_evo[k] = dyn._onto_evo[k+1]*dyn._prop[k]
else:
dyn._onto_evo[k] = dyn._onto_evo[k+1].dot(dyn._prop[k])
if ecs:
ecs.wall_time_onwd_prop_compute = \
timeit.default_timer() - time_start
if dyn.stats:
dyn.stats.wall_time_dyn_gen_compute += \
ecs.wall_time_dyn_gen_compute
dyn.stats.wall_time_prop_compute += \
ecs.wall_time_prop_compute
dyn.stats.wall_time_fwd_prop_compute += \
ecs.wall_time_fwd_prop_compute
dyn.stats.wall_time_onwd_prop_compute += \
ecs.wall_time_onwd_prop_compute
if dyn.unitarity_check_level:
dyn.check_unitarity()
if dyn.dump:
self.dump_current()
[docs] def get_timeslot_for_fidelity_calc(self):
"""
Returns the timeslot index that will be used calculate current fidelity
value.
This (default) method simply returns the last timeslot
"""
_func_deprecation("'get_timeslot_for_fidelity_calc' is deprecated. "
"Use '_get_timeslot_for_fidelity_calc'")
return self._get_timeslot_for_fidelity_calc
def _get_timeslot_for_fidelity_calc(self):
"""
Returns the timeslot index that will be used calculate current fidelity
value.
This (default) method simply returns the last timeslot
"""
return self.parent.num_tslots
class TSlotCompDynUpdate(TimeslotComputer):
"""
Timeslot Computer - Dynamic Update
********************************
***** CURRENTLY HAS ISSUES *****
***** AJGP 2014-10-02
***** and is therefore not being maintained
***** i.e. changes made to _UpdateAll are not being implemented here
********************************
Updates only the dynamics generators, propagators and evolutions as
required when a subset of the ctrl amplitudes are updated.
Will update all if all amps have changed.
"""
def reset(self):
self.dyn_gen_recalc = None
self.prop_recalc = None
self.evo_init2t_recalc = None
self.evo_t2targ_recalc = None
self.dyn_gen_calc_now = None
self.prop_calc_now = None
self.evo_init2t_calc_now = None
self.evo_t2targ_calc_now = None
TimeslotComputer.reset(self)
self.id_text = 'DYNAMIC'
self.apply_params()
def init_comp(self):
"""
Initialise the flags
"""
####
# These maps are used to determine what needs to be updated
####
# Note _recalc means the value needs updating at some point
# e.g. here no values have been set, except the initial and final
# evolution operator vals (which never change) and hence all other
# values are set as requiring calculation.
n_ts = self.parent.num_tslots
self.dyn_gen_recalc = np.ones(n_ts, dtype=bool)
# np.ones(n_ts, dtype=bool)
self.prop_recalc = np.ones(n_ts, dtype=bool)
self.evo_init2t_recalc = np.ones(n_ts + 1, dtype=bool)
self.evo_init2t_recalc[0] = False
self.evo_t2targ_recalc = np.ones(n_ts + 1, dtype=bool)
self.evo_t2targ_recalc[-1] = False
# The _calc_now map is used to during the calcs to specify
# which values need updating immediately
self.dyn_gen_calc_now = np.zeros(n_ts, dtype=bool)
self.prop_calc_now = np.zeros(n_ts, dtype=bool)
self.evo_init2t_calc_now = np.zeros(n_ts + 1, dtype=bool)
self.evo_t2targ_calc_now = np.zeros(n_ts + 1, dtype=bool)
def compare_amps(self, new_amps):
"""
Determine which timeslots will have changed Hamiltonians
i.e. any where control amplitudes have changed for that slot
and mark (using masks) them and corresponding exponentiations and
time evo operators for update
Returns: True if amplitudes are the same, False if they have changed
"""
dyn = self.parent
n_ts = dyn.num_tslots
# create boolean array with same shape as ctrl_amps
# True where value in New_amps differs, otherwise false
if self.parent.ctrl_amps is None:
changed_amps = np.ones(new_amps.shape, dtype=bool)
else:
changed_amps = self.parent.ctrl_amps != new_amps
if self.log_level <= logging.DEBUG_VERBOSE:
logger.log(logging.DEBUG_VERBOSE, "changed_amps:\n{}".format(
changed_amps))
# create Boolean vector with same length as number of timeslots
# True where any of the amplitudes have changed, otherwise false
changed_ts_mask = np.any(changed_amps, 1)
# if any of the amplidudes have changed then mark for recalc
if np.any(changed_ts_mask):
self.dyn_gen_recalc[changed_ts_mask] = True
self.prop_recalc[changed_ts_mask] = True
dyn.ctrl_amps = new_amps
if self.log_level <= logging.DEBUG:
logger.debug("Control amplitudes updated")
# find first and last changed dynamics generators
first_changed = None
for i in range(n_ts):
if changed_ts_mask[i]:
last_changed = i
if first_changed is None:
first_changed = i
# set all fwd evo ops after first changed Ham to be recalculated
self.evo_init2t_recalc[first_changed + 1:] = True
# set all bkwd evo ops up to (incl) last changed Ham to be
# recalculated
self.evo_t2targ_recalc[:last_changed + 1] = True
# Flag fidelity and gradients as needing recalculation
dyn.flag_system_changed()
# *** update stats ***
if dyn.stats is not None:
dyn.stats.num_ctrl_amp_updates += 1
dyn.stats.num_ctrl_amp_changes += changed_amps.sum()
dyn.stats.num_timeslot_changes += changed_ts_mask.sum()
return False
else:
return True
def flag_all_calc_now(self):
"""
Flags all Hamiltonians, propagators and propagations to be
calculated now
"""
# set flags for calculations
self.dyn_gen_calc_now[:] = True
self.prop_calc_now[:] = True
self.evo_init2t_calc_now[:-1] = True
self.evo_t2targ_calc_now[1:] = True
def recompute_evolution(self):
"""
Recalculates the evo_init2t (forward) and evo_t2targ (onward) time
evolution operators
DynGen (Hamiltonians etc) and prop (propagator) are calculated
as necessary
"""
if self.log_level <= logging.DEBUG_VERBOSE:
logger.log(logging.DEBUG_VERBOSE, "recomputing evolution "
"(DynUpdate)")
dyn = self.parent
n_ts = dyn.num_tslots
# find the op slots that have been marked for update now
# and need recalculation
evo_init2t_recomp_now = self.evo_init2t_calc_now & \
self.evo_init2t_recalc
evo_t2targ_recomp_now = self.evo_t2targ_calc_now & \
self.evo_t2targ_recalc
# to recomupte evo_init2t, will need to start
# at a cell that has been computed
if np.any(evo_init2t_recomp_now):
for k in range(n_ts, 0, -1):
if evo_init2t_recomp_now[k] and self.evo_init2t_recalc[k-1]:
evo_init2t_recomp_now[k-1] = True
# for evo_t2targ, will also need to start
# at a cell that has been computed
if np.any(evo_t2targ_recomp_now):
for k in range(0, n_ts):
if evo_t2targ_recomp_now[k] and self.evo_t2targ_recalc[k+1]:
evo_t2targ_recomp_now[k+1] = True
# determine which dyn gen and prop need recalculating now in order to
# calculate the forwrd and onward evolutions
prop_recomp_now = (evo_init2t_recomp_now[1:]
| evo_t2targ_recomp_now[:-1]
| self.prop_calc_now[:]) & self.prop_recalc[:]
dyn_gen_recomp_now = (prop_recomp_now[:] | self.dyn_gen_calc_now[:]) \
& self.dyn_gen_recalc[:]
if np.any(dyn_gen_recomp_now):
time_start = timeit.default_timer()
for k in range(n_ts):
if dyn_gen_recomp_now[k]:
# calculate the dynamics generators
dyn.dyn_gen[k] = dyn.compute_dyn_gen(k)
self.dyn_gen_recalc[k] = False
if dyn.stats is not None:
dyn.stats.num_dyn_gen_computes += dyn_gen_recomp_now.sum()
dyn.stats.wall_time_dyn_gen_compute += \
timeit.default_timer() - time_start
if np.any(prop_recomp_now):
time_start = timeit.default_timer()
for k in range(n_ts):
if prop_recomp_now[k]:
# calculate exp(H) and other per H computations needed for
# the gradient function
dyn.prop[k] = dyn._compute_propagator(k)
self.prop_recalc[k] = False
if dyn.stats is not None:
dyn.stats.num_prop_computes += prop_recomp_now.sum()
dyn.stats.wall_time_prop_compute += \
timeit.default_timer() - time_start
# compute the forward propagation
if np.any(evo_init2t_recomp_now):
time_start = timeit.default_timer()
R = range(1, n_ts + 1)
for k in R:
if evo_init2t_recomp_now[k]:
dyn.evo_init2t[k] = \
dyn.prop[k-1].dot(dyn.evo_init2t[k-1])
self.evo_init2t_recalc[k] = False
if dyn.stats is not None:
dyn.stats.num_fwd_prop_step_computes += \
evo_init2t_recomp_now.sum()
dyn.stats.wall_time_fwd_prop_compute += \
timeit.default_timer() - time_start
if np.any(evo_t2targ_recomp_now):
time_start = timeit.default_timer()
# compute the onward propagation
R = range(n_ts-1, -1, -1)
for k in R:
if evo_t2targ_recomp_now[k]:
dyn.evo_t2targ[k] = dyn.evo_t2targ[k+1].dot(dyn.prop[k])
self.evo_t2targ_recalc[k] = False
if dyn.stats is not None:
dyn.stats.num_onwd_prop_step_computes += \
evo_t2targ_recomp_now.sum()
dyn.stats.wall_time_onwd_prop_compute += \
timeit.default_timer() - time_start
# Clear calc now flags
self.dyn_gen_calc_now[:] = False
self.prop_calc_now[:] = False
self.evo_init2t_calc_now[:] = False
self.evo_t2targ_calc_now[:] = False
def get_timeslot_for_fidelity_calc(self):
"""
Returns the timeslot index that will be used calculate current fidelity
value. Attempts to find a timeslot where the least number of propagator
calculations will be required.
Flags the associated evolution operators for calculation now
"""
dyn = self.parent
n_ts = dyn.num_tslots
kBothEvoCurrent = -1
kFwdEvoCurrent = -1
kUse = -1
# If no specific timeslot set in config, then determine dynamically
if kUse < 0:
for k in range(n_ts):
# find first timeslot where both evo_init2t and
# evo_t2targ are current
if not self.evo_init2t_recalc[k]:
kFwdEvoCurrent = k
if not self.evo_t2targ_recalc[k]:
kBothEvoCurrent = k
break
if kBothEvoCurrent >= 0:
kUse = kBothEvoCurrent
elif kFwdEvoCurrent >= 0:
kUse = kFwdEvoCurrent
else:
raise errors.FunctionalError("No timeslot found matching "
"criteria")
self.evo_init2t_calc_now[kUse] = True
self.evo_t2targ_calc_now[kUse] = True
return kUse
class EvoCompSummary(qtrldump.DumpSummaryItem):
"""
A summary of the most recent time evolution computation
Used in stats calculations and for data dumping
Attributes
----------
evo_dump_idx : int
Index of the linked :class:`dump.EvoCompDumpItem`
None if no linked item
iter_num : int
Iteration number of the pulse optimisation
None if evolution compute outside of a pulse optimisation
fid_func_call_num : int
Fidelity function call number of the pulse optimisation
None if evolution compute outside of a pulse optimisation
grad_func_call_num : int
Gradient function call number of the pulse optimisation
None if evolution compute outside of a pulse optimisation
num_amps_changed : int
Number of control timeslot amplitudes changed since previous
evolution calculation
num_timeslots_changed : int
Number of timeslots in which any amplitudes changed since previous
evolution calculation
wall_time_dyn_gen_compute : float
Time spent computing dynamics generators
(in seconds of elapsed time)
wall_time_prop_compute : float
Time spent computing propagators (including and propagator gradients)
(in seconds of elapsed time)
wall_time_fwd_prop_compute : float
Time spent computing the forward evolution of the system
see :property:`dynamics.fwd_evo`
(in seconds of elapsed time)
wall_time_onwd_prop_compute : float
Time spent computing the 'backward' evolution of the system
see :property:`dynamics.onwd_evo` and :property:`dynamics.onto_evo`
(in seconds of elapsed time)
"""
min_col_width = 11
summary_property_names = (
"idx", "evo_dump_idx",
"iter_num", "fid_func_call_num", "grad_func_call_num",
"num_amps_changed", "num_timeslots_changed",
"wall_time_dyn_gen_compute", "wall_time_prop_compute",
"wall_time_fwd_prop_compute", "wall_time_onwd_prop_compute")
summary_property_fmt_type = (
'd', 'd',
'd', 'd', 'd',
'd', 'd',
'g', 'g',
'g', 'g'
)
summary_property_fmt_prec = (
0, 0,
0, 0, 0,
0, 0,
3, 3,
3, 3
)
def __init__(self):
self.reset()
def reset(self):
qtrldump.DumpSummaryItem.reset(self)
self.evo_dump_idx = None
self.iter_num = None
self.fid_func_call_num = None
self.grad_func_call_num = None
self.num_amps_changed = 0
self.num_timeslots_changed = 0
self.wall_time_dyn_gen_compute = 0.0
self.wall_time_prop_compute = 0.0
self.wall_time_fwd_prop_compute = 0.0
self.wall_time_onwd_prop_compute = 0.0