Source code for qutip.qip.compiler.scheduler

# This file is part of QuTiP: Quantum Toolbox in Python.
#
#    Copyright (c) 2011 and later, Paul D. Nation and Robert J. Johansson.
#    All rights reserved.
#
#    Redistribution and use in source and binary forms, with or without
#    modification, are permitted provided that the following conditions are
#    met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
#    3. Neither the name of the QuTiP: Quantum Toolbox in Python nor the names
#       of its contributors may be used to endorse or promote products derived
#       from this software without specific prior written permission.
#
#    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
#    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
#    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
#    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
#    HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
#    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
#    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
#    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
#    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
#    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
###############################################################################
from collections import deque
from copy import deepcopy
from functools import cmp_to_key
from random import shuffle

from ..circuit import QubitCircuit, Gate
from .instruction import Instruction


class InstructionsGraph():
    """
    A directed acyclic graph (DAG) representation
    of the quantum instruction dependency.
    An example is Fig3(b) in https://doi.org/10.1117/12.666419.
    It contains methods of generating the instruction dependency graph,
    a list-schedule algorithm to find the topological order
    and the computation of the distance in the weighted graph
    (circuit latency).

    It uses the `Instruction` object as a representation of node
    and adds the following attributes to it:

    predecessors, successors: dependency arrow of the DAG
    distance_to_start, distance_to_end: longest distance to the start and end

    Parameters
    ----------
    instructions: list
        A list of instructions

    Attributes
    ----------
    nodes: list
        The input list of instruction with additional graph information.
    start, end: list
        List of indices of nodes connected to the start or end nodes.
    """
    def __init__(self, instructions):
        instructions = deepcopy(instructions)
        self.nodes = []
        for instruction in instructions:
            if isinstance(instruction, Gate):
                self.nodes.append(Instruction(instruction))
            else:
                self.nodes.append(instruction)
        for node in self.nodes:
            if node.duration is None:
                node.duration = 1
        self.start = None
        self.end = None

    def generate_dependency_graph(self, commuting):
        """
        Generate the instruction dependency graph.
        It modifies the class attribute `nodes`, where each element (node)
        is an `Instruction`.
        The graph is represented by attributes `predecessors` and
        `successors`, each a set of indices
        pointing to the position of the target node in the nodes list.

        The graph preserves the mobility of the gates,
        i.e. if two gates commute with each other,
        such as ``CNOT 2, 3`` and ``CNOT 2, 1``,
        there should be no dependency arrow between them.
        Because of this, the generated graph does not consider
        the hardware constraints,
        e.g. two commuting gates addressing the same qubits
        cannot be executed at the same time.
        A dependency arrow between Instruction 1 and instruction 2
        means that they do not commute.
        However, the converse does not hold because we do not need
        gate1->gate3 if we already have gate1->gate2->gate3.

        Parameters
        ----------
        commuting: function
            A Python function that determines if two gates commute,
            given that their used qubits overlap.
        """
        # initialize the graph
        for node in self.nodes:
            node.predecessors = set()
            node.successors = set()

        num_qubits = max(set().union(
            *[instruction.used_qubits for instruction in self.nodes])) + 1
        qubits_instructions_dependency = [[set()] for i in range(num_qubits)]
        # qubits_instructions_dependency:
        # instruction dependency for each qubits, a nested list of level 3.
        # E.g. [
        #       [{1, }],
        #       [{0, 1}, {2, }],
        #       [{0, }]
        #       ]
        # means that
        # Gate0 acts on qubit 1 and 2, gate1 act on qubit0 and qubit1,
        # but gate0 and gate1 cummute with each other.
        # Therefore, there is not dependency between gate0 and gate1;
        # Gate 2 acts on qubit1 and must be executed after gate0 and gate1.
        # Hence, gate2 will depends on gate0 and gate1.

        # Generate instruction dependency for each qubit
        for current_ind, instruction in enumerate(self.nodes):
            for qubit in instruction.used_qubits:
                # For each used qubit, find the last dependency gate set.
                # If the current gate commute with all of them,
                # add it to the list.
                # Otherwise,
                # append a new set with the current gate to the list.
                dependent = False
                for dependent_ind in qubits_instructions_dependency[qubit][-1]:
                    if not commuting(current_ind, dependent_ind, self.nodes):
                        dependent = True
                if not dependent:
                    qubits_instructions_dependency[qubit][-1].add(current_ind)
                else:
                    qubits_instructions_dependency[qubit].append({current_ind})

        # Generate the dependency graph
        for instructions_cycles in qubits_instructions_dependency:
            for cycle_ind1 in range(len(instructions_cycles) - 1):
                for instruction_ind1 in instructions_cycles[cycle_ind1]:
                    for instruction_ind2 in instructions_cycles[cycle_ind1+1]:
                        self.nodes[instruction_ind1].successors.add(
                            instruction_ind2)
                        self.nodes[instruction_ind2].predecessors.add(
                            instruction_ind1)

        # Find start and end nodes of the graph
        start = []
        end = []
        for i, instruction in enumerate(self.nodes):
            if not instruction.successors:
                end.append(i)
            if not instruction.predecessors:
                start.append(i)
        self.start = start
        self.end = end

    def reverse_graph(self):
        """
        Reverse the graph.
        The start node becomes the end node
        Predecessors and successors of each node are exchanged.
        """
        for node in self.nodes:
            node.predecessors, node.successors \
                = node.successors, node.predecessors
            try:
                self.distance_to_start, self.distance_to_end = \
                    self.distance_to_end, self.distance_to_start
            except AttributeError:
                pass
        self.start, self.end = self.end, self.start

    def find_topological_order(
            self, priority=True, apply_constraint=None, random=False):
        """
        A list-schedule algorithm, it
        finds the topological order of the directed graph
        under certain constraint and priority indicator.
        The function returns a list of cycles,
        where each cycle is a list of instructions
        that can be executed in parallel.
        In the case of gates schedule,
        the result will be the gates cycle list.

        Parameters
        ----------
        priority: bool
            If use distance to the start and end nodes
            as a priority measure for the schedule problem.
        apply_constraint: function
            A Python function that determines
            if to instruction can be executed in parallel.
            E.g. if two gates apply to the same qubit, the function
            returns False.

        Returns
        -------
        cycles_list: list
            A list of cycles, where each cycle is a list of instructions
            that can be executed in parallel.
        constraint_dependency: set
            A set of instruction pairs that are found conflicted
            due to the hardware constraints.
            Because of this, they are executed in different cycles.
            This set is used to add this dependency to the graph
            in another method.
        """
        # The method will destruct the graph, therefore we make a copy.
        graph = deepcopy(self.nodes)
        cycles_list = []
        available_nodes = list(self.start)  # a list of available instructions
        # pairs of instructions that are limited by hardware constraint
        constraint_dependency = set()

        while available_nodes:
            if random:
                shuffle(available_nodes)
            if priority:
                available_nodes.sort(key=cmp_to_key(self._compare_priority))
            current_cycle = []
            if apply_constraint is None:  # if no constraits
                current_cycle = deepcopy(available_nodes)
            else:  # check if constraits allow the parallelization
                for node1 in available_nodes:
                    approval = True
                    for node2 in current_cycle:
                        if not apply_constraint(node1, node2, graph):
                            approval = False
                            # save the conflicted pairs of instructions
                            constraint_dependency.add((node2, node1))
                    if approval:
                        current_cycle.append(node1)
            # add this cycle to cycles_list
            cycles_list.append(current_cycle)

            # update the list of available nodes
            # remove the executed nodes from available_node
            for node in current_cycle:
                available_nodes.remove(node)
            # add new nodes to available_nodes
            # if they have no other predecessors
            for node in current_cycle:
                for successor_ind in graph[node].successors:
                    graph[successor_ind].predecessors.remove(node)
                    if not graph[successor_ind].predecessors:
                        available_nodes.append(successor_ind)
                graph[node].successors = set()

        return cycles_list, constraint_dependency

    def compute_distance(self, cycles_list):
        """
        Compute the longest distance of each node
        to the start and end nodes.
        The weight for each dependency arrow is
        the duration of the source instruction
        (which should be 1 for gates schedule).
        The method solves the longest path problem
        by using the topological order in cycles_list.
        It makes sure that by following the list,
        the distance to the predecessors (successors) of
        the source (target) node is always calculated
        before the target (source) node.

        Parameters
        ----------
        cycles_list: list
            A `cycles_list` obtained by the method `find_topological_order`.
        """
        cycles_list = deepcopy(cycles_list)

        # distance to the start node
        for cycle in cycles_list:
            for ind in cycle:
                if not self.nodes[ind].predecessors:
                    self.nodes[ind].distance_to_start = \
                        self.nodes[ind].duration
                else:
                    self.nodes[ind].distance_to_start = max(
                        [
                            self.nodes[predecessor_ind].distance_to_start
                            for predecessor_ind
                            in self.nodes[ind].predecessors
                        ]
                        ) + self.nodes[ind].duration

        # distance to the end node
        cycles_list.reverse()
        self.reverse_graph()
        for cycle in cycles_list:
            for ind in cycle:
                if not self.nodes[ind].predecessors:
                    self.nodes[ind].distance_to_end = self.nodes[ind].duration
                else:
                    self.nodes[ind].distance_to_end = max(
                        [
                            self.nodes[predecessor_ind].distance_to_end
                            for predecessor_ind
                            in self.nodes[ind].predecessors
                        ]
                        ) + self.nodes[ind].duration
        self.longest_distance = max(
            [self.nodes[i].distance_to_end for i in self.end])
        self.reverse_graph()

    def _compare_priority(self, ind1, ind2):
        """
        The node with longer `distance_to_end` has higher priority.
        If it is the same for the two nodes,
        the node with shorter `distance_to_start` has higher priority.
        If node1 has higher priority, the method returns a negative value.

        Parameters
        ----------
        ind1, ind2: int
            Indices of nodes.
        """
        if self.nodes[ind1].distance_to_end == \
                self.nodes[ind2].distance_to_end:
            # lower distance_to_start, higher priority
            return self.nodes[ind1].distance_to_start - \
                self.nodes[ind2].distance_to_start
        else:
            # higher distance_to_end, higher priority
            return self.nodes[ind2].distance_to_end - \
                self.nodes[ind1].distance_to_end

    def add_constraint_dependency(self, constraint_dependency):
        """
        Add the dependency caused by hardware constraint to the graph.

        Parameters
        ----------
        constraint_dependency: list
            `constraint_dependency` obtained by the method
            `find_topological_order`.
        """
        for ind1, ind2 in constraint_dependency:
            self.nodes[ind1].successors.add(ind2)
            self.nodes[ind2].predecessors.add(ind1)

        # Update the start and end nodes of the graph
        start = []
        end = []
        for i, instruction in enumerate(self.nodes):
            if not instruction.successors:
                end.append(i)
            if not instruction.predecessors:
                start.append(i)
        self.start = start
        self.end = end


[docs]class Scheduler(): """ A gate (pulse) scheduler for quantum circuits (instructions). It schedules a given circuit or instructions to reduce the total execution time by parallelization. It uses heuristic methods mainly from in https://doi.org/10.1117/12.666419. The scheduler includes two methods, "ASAP", as soon as possible, and "ALAP", as late as possible. The later is commonly used in quantum computation because of the finite lifetime of qubits. The scheduler aims at pulse schedule and therefore does not consider merging gates to reduce the gates number. It assumes that the input circuit is optimized at the gate level and matches the hardware topology. Parameters ---------- method: str "ASAP" for as soon as possible. "ALAP" for as late as possible. constraint_functions: list, optional A list of hardware constraint functions. Default includes a function `qubit_contraint`, i.e. one qubit cannot be used by two gates at the same time. """ def __init__(self, method="ALAP", constraint_functions=None): self.method = method if constraint_functions is None: self.constraint_functions = [qubit_constraint] else: return constraint_functions
[docs] def schedule(self, circuit, gates_schedule=False, return_cycles_list=False, random_shuffle=False, repeat_num=0): """ Schedule a `QubitCircuit`, a list of `Gates` or a list of `Instruction`. For pulse schedule, the execution time for each `Instruction` is given in its `duration` attributes. The scheduler first generates a quantum gates dependency graph, containing information about which gates have to be executed before some other gates. The graph preserves the mobility of the gates, i.e. commuting gates are not dependent on each other, even if they use the same qubits. Next, it computes the longest distance of each node to the start and end nodes. The distance for each dependency arrow is defined by the execution time of the instruction (By default, it is 1 for all gates). This is used as a priority measure in the next step. The gate with a longer distance to the end node and a shorter distance to the start node has higher priority. In the last step, it uses a list-schedule algorithm with hardware constraint and priority and returns a list of cycles for gates/instructions. For pulse schedule, an additional step is required to compute the start time of each instruction. It adds the additional dependency caused by hardware constraint to the graph and recomputes the distance of each node to the start and end node. This distance is then converted to the start time of each instruction. Parameters ---------- circuit: QubitCircuit or list For gate schedule, it should be a QubitCircuit or a list of Gate objects. For pulse schedule, it should be a list of Instruction objects, each with an attribute `duration` that indicates the execution time of this instruction. gates_schedule: bool, optional `True`, if only gates schedule is needed. This saves some computation that is only useful to pulse schedule. If the input `circuit` is a `QubitCircuit`, it will be assigned to `True` automatically. Otherwise, the default is `False`. return_cycles_list: bool, optional If `True`, the method returns the `cycles_list`, e.g. [{0, 2}, {1, 3}], which means that the first cycle contains gates0 and gates2 while the second cycle contains gates1 and gates3. It is only usefull for gates schedule. random_shuffle: bool, optional If the commuting gates are randomly scuffled to explore larger search space. repeat_num: int, optional Repeat the scheduling several times and use the best result. Used together with ``random_shuffle=Ture``. Returns ------- gate_cycle_indices or instruction_start_time: list The cycle indices for each gate or the start time for each instruction. Examples -------- >>> from qutip.qip.circuit import QubitCircuit >>> from qutip.qip.scheduler import Scheduler >>> circuit = QubitCircuit(7) >>> circuit.add_gate("SNOT", 3) # gate0 >>> circuit.add_gate("CZ", 5, 3) # gate1 >>> circuit.add_gate("CZ", 4, 3) # gate2 >>> circuit.add_gate("CZ", 2, 3) # gate3 >>> circuit.add_gate("CZ", 6, 5) # gate4 >>> circuit.add_gate("CZ", 2, 6) # gate5 >>> circuit.add_gate("SWAP", [0, 2]) # gate6 >>> >>> scheduler = Scheduler("ASAP") >>> scheduler.schedule(circuit, gates_schedule=True) [0, 1, 3, 2, 2, 3, 4] The result list is the cycle indices for each gate. It means that the circuit can be executed in 5 gate cycles: ``[gate0, gate1, (gate3, gate4), (gate2, gate5), gate6]`` Notice that gate3 and gate4 commute with gate2, therefore, the order is changed to reduce the number of cycles. """ circuit = deepcopy(circuit) if repeat_num > 0: random_shuffle = True result = [0] max_length = 4294967296 for i in range(repeat_num): gate_cycle_indices = self.schedule( circuit, gates_schedule=gates_schedule, return_cycles_list=return_cycles_list, random_shuffle=random_shuffle, repeat_num=0) current_length = max(gate_cycle_indices) if current_length < max_length: result = gate_cycle_indices max_length = current_length return result if isinstance(circuit, QubitCircuit): gates = circuit.gates else: gates = circuit # Generate the quantum operations dependency graph. instructions_graph = InstructionsGraph(gates) instructions_graph.generate_dependency_graph( commuting=self.commutation_rules) if self.method == "ALAP": instructions_graph.reverse_graph() # Schedule without hardware constraints, then # use this cycles_list to compute the distance. cycles_list, _ = instructions_graph.find_topological_order( priority=False, apply_constraint=None, random=random_shuffle) instructions_graph.compute_distance(cycles_list=cycles_list) # Schedule again with priority and hardware constraint. cycles_list, constraint_dependency = \ instructions_graph.find_topological_order( priority=True, apply_constraint=self.apply_constraint, random=random_shuffle) # If we only need gates schedule, we can output the result here. if gates_schedule or return_cycles_list: if self.method == "ALAP": cycles_list.reverse() if return_cycles_list: return cycles_list gate_cycles_indices = [0] * len(gates) for cycle_ind, cycle in enumerate(cycles_list): for instruction_ind in cycle: gate_cycles_indices[instruction_ind] = cycle_ind return gate_cycles_indices # For pulse schedule, # we add the hardware dependency to the graph # and compute the longest distance to the start node again. # The longest distance to the start node determines # the start time of each pulse. instructions_graph.add_constraint_dependency(constraint_dependency) instructions_graph.compute_distance(cycles_list=cycles_list) # Output pulse schedule result. instruction_start_time = [] if self.method == "ASAP": for instruction in instructions_graph.nodes: instruction_start_time.append( instruction.distance_to_start - instruction.duration) elif self.method == "ALAP": for instruction in instructions_graph.nodes: instruction_start_time.append( instructions_graph.longest_distance - instruction.distance_to_start) return instruction_start_time
[docs] def commutation_rules(self, ind1, ind2, instructions): """ Determine if two gates commute, given that their used qubits overlap. Since usually the input gates are already in a universal gate sets, it uses an oversimplified condition: If the two gates do not have the same name, they are considered as not commuting. If they are the same gate and have the same controls or targets, they are considered as commuting. E.g. `CNOT 0, 1` commute with `CNOT 0, 2`. """ instruction1 = instructions[ind1] instruction2 = instructions[ind2] if instruction1.name != instruction2.name: return False if (instruction1.controls) and \ (instruction1.controls == instruction2.controls): return True elif instruction1.targets == instruction2.targets: return True else: return False
[docs] def apply_constraint(self, ind1, ind2, instructions): """ Apply hardware constraint to check if two instructions can be executed in parallel. Parameters ---------- ind1, ind2: int indices of the two instructions instructions: list The instruction list """ result = [] for constraint_function in self.constraint_functions: result.append(constraint_function(ind1, ind2, instructions)) return all(result)
def qubit_constraint(ind1, ind2, instructions): """ Determine if two instructions have overlap in the used qubits. """ if instructions[ind1].used_qubits & instructions[ind2].used_qubits: return False else: return True