diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7632d9b3b..6a09808b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,10 @@ jobs: runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 + with: + fetch-depth: 0 # Fetch all, necessary to find tags and branches + fetch-tags: true - uses: actions/setup-python@v5 with: diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml index 3f459f974..855e74250 100644 --- a/.github/workflows/cov.yml +++ b/.github/workflows/cov.yml @@ -14,7 +14,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 + with: + fetch-depth: 0 # Fetch all, necessary to find tags and branches + fetch-tags: true - name: Set up Python uses: actions/setup-python@v5 diff --git a/.github/workflows/doc.yml b/.github/workflows/doc.yml index b6d0685c0..2f71a2d81 100644 --- a/.github/workflows/doc.yml +++ b/.github/workflows/doc.yml @@ -20,7 +20,10 @@ jobs: name: "Check documentation" runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 + with: + fetch-depth: 0 # Fetch all, necessary to find tags and branches + fetch-tags: true - uses: actions/setup-python@v5 with: diff --git a/.github/workflows/typecheck.yml b/.github/workflows/typecheck.yml index 572aa4530..76bb98914 100644 --- a/.github/workflows/typecheck.yml +++ b/.github/workflows/typecheck.yml @@ -19,7 +19,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 + with: + fetch-depth: 0 # Fetch all, necessary to find tags and branches + fetch-tags: true - uses: actions/setup-python@v5 with: @@ -46,7 +49,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 + with: + fetch-depth: 0 # Fetch all, necessary to find tags and branches + fetch-tags: true - uses: actions/setup-python@v5 with: diff --git a/.gitignore b/.gitignore index 031cfa715..84bb49b3c 100644 --- a/.gitignore +++ b/.gitignore @@ -167,3 +167,4 @@ docs/source/benchmarks docs/source/gallery graphix/_version.py docs/source/sg_execution_times.rst +/.benchmarks/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a65f0251..cb7663074 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- #343: Circuit exporter to OpenQASM3: + `graphix.qasm3_exporter.circuit_to_qasm3`. + +- #337: New module `graphix.find_pauliflow` with the $O(N^3)$ + Pauli-flow finding algorithm introduced in Mitosek and Backens, 2024 + (arXiv:2410.23439). + - #332: New class `StandardizedPattern` that contains the decomposed parts of a standardized pattern. @@ -30,6 +37,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- #337: Dropped dependence on `sympy` and `galois`. + - #332: `Pattern.extract_graph` now returns a networkx graph. - #220, #332: `Pattern.extract_graph`, `Pattern.extract_isolated_nodes`, diff --git a/graphix/_linalg.py b/graphix/_linalg.py new file mode 100644 index 000000000..3667b597c --- /dev/null +++ b/graphix/_linalg.py @@ -0,0 +1,317 @@ +r"""Performant module for linear algebra on :math:`\mathbb F_2` field.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numba as nb +import numpy as np +import numpy.typing as npt + +if TYPE_CHECKING: + from typing import Self + + +class MatGF2(npt.NDArray[np.uint8]): + r"""Custom implementation of :math:`\mathbb F_2` matrices. This class specializes `:class:np.ndarray` to the :math:`\mathbb F_2` field with increased efficiency.""" + + def __new__(cls, data: npt.ArrayLike, copy: bool = True) -> Self: + """Instantiate new `MatGF2` object. + + Parameters + ---------- + data : array + Data in array + copy : bool + Optional, defaults to `True`. If `False` and if possible, data + is not copied. + + Return + ------- + MatGF2 + """ + arr = np.array(data, dtype=np.uint8, copy=copy) + return super().__new__(cls, shape=arr.shape, dtype=arr.dtype, buffer=arr) + + def mat_mul(self, other: MatGF2 | npt.NDArray[np.uint8]) -> MatGF2: + r"""Multiply two matrices. + + Parameters + ---------- + other : array + Matrix that right-multiplies `self`. + + Returns + ------- + MatGF2 + Matrix product `self` @ `other` in :math:`\mathbb F_2`. + + Notes + ----- + This function is a wrapper over :func:`_mat_mul_jit` which is a just-time compiled implementation of the matrix multiplication in :math:`\mathbb F_2`. It is more efficient than `galois.GF2.__matmul__` when the matrix `self` is sparse. + The implementation assumes that the arguments have the right dimensions. + """ + if self.ndim != 2 or other.ndim != 2: + raise ValueError( + "`mat_mul` method only supports two-dimensional arrays. Use `np.matmul(self, other) % 2` instead." + ) + if self.shape[1] != other.shape[0]: + raise ValueError( + f"Dimension mismatch. Attempted to multiply `self` with shape {self.shape} and `other` with shape {other.shape}" + ) + + return MatGF2(_mat_mul_jit(self, other), copy=False) + + def compute_rank(self) -> np.intp: + """Get the rank of the matrix. + + Returns + ------- + int : int + Rank of the matrix. + """ + mat_a = self.row_reduction(copy=True) + return np.count_nonzero(mat_a.any(axis=1)) + + def right_inverse(self) -> MatGF2 | None: + r"""Return any right inverse of the matrix. + + Returns + ------- + rinv : MatGF2 + Any right inverse of the matrix. + or `None` + If the matrix does not have a right inverse. + + Notes + ----- + Let us consider a matrix :math:`A` of size :math:`(m \times n)`. The right inverse is a matrix :math:`B` of size :math:`(n \times m)` s.t. :math:`AB = I` where :math:`I` is the identity matrix. + - The right inverse only exists if :math:`rank(A) = m`. Therefore, it is necessary but not sufficient that :math:`m ≤ n`. + - The right inverse is unique only if :math:`m=n`. + """ + m, n = self.shape + if m > n: + return None + + ident = np.eye(m, dtype=np.uint8) + aug = np.hstack([self.data, ident]).view(MatGF2) + red = aug.row_reduction(ncols=n, copy=False) # Reduced row echelon form + + # Check that rank of right block is equal to the number of rows. + # We don't use `MatGF2.compute_rank()` to avoid row-reducing twice. + if m != np.count_nonzero(red[:, :n].any(axis=1)): + return None + rinv = np.zeros((n, m), dtype=np.uint8).view(MatGF2) + + for i, row in enumerate(red): + j = np.flatnonzero(row)[0] # Column index corresponding to the leading 1 in row `i`. + rinv[j, :] = red[i, n:] + + return rinv + + def null_space(self) -> MatGF2: + r"""Return the null space of the matrix. + + Returns + ------- + MatGF2 + The rows of the basis matrix are the basis vectors that span the null space. The number of rows of the basis matrix is the dimension of the null space. + + Notes + ----- + This implementation appear to be more efficient than `:func:galois.GF2.null_space`. + """ + m, n = self.shape + + ident = np.eye(n, dtype=np.uint8) + ref = np.hstack([self.T, ident]).view(MatGF2) + ref.gauss_elimination(ncols=m, copy=False) + row_idxs = np.flatnonzero(~ref[:, :m].any(axis=1)) # Row indices of the 0-rows in the first block of `ref`. + + return ref[row_idxs, m:].view(MatGF2) + + def gauss_elimination(self, ncols: int | None = None, copy: bool = True) -> MatGF2: + """Return row echelon form (REF) by performing Gaussian elimination. + + Parameters + ---------- + n_cols : int (optional) + Number of columns over which to perform Gaussian elimination. The default is `None` which represents the number of columns of the matrix. + + copy : bool (optional) + If `True`, the REF matrix is copied into a new instance, otherwise `self` is modified. Defaults to `True`. + + Returns + ------- + mat_ref : MatGF2 + The matrix in row echelon form. + """ + ncols_value = self.shape[1] if ncols is None else ncols + mat_ref = MatGF2(self) if copy else self + + return MatGF2(_elimination_jit(mat_ref, ncols=ncols_value, full_reduce=False), copy=False) + + def row_reduction(self, ncols: int | None = None, copy: bool = True) -> MatGF2: + """Return row-reduced echelon form (RREF) by performing Gaussian elimination. + + Parameters + ---------- + n_cols : int (optional) + Number of columns over which to perform Gaussian elimination. The default is `None` which represents the number of columns of the matrix. + + copy : bool (optional) + If `True`, the RREF matrix is copied into a new instance, otherwise `self` is modified. Defaults to `True`. + + Returns + ------- + mat_ref: MatGF2 + The matrix in row-reduced echelon form. + """ + ncols_value = self.shape[1] if ncols is None else ncols + mat_ref = self.copy() if copy else self + + return MatGF2(_elimination_jit(mat_ref, ncols=ncols_value, full_reduce=True), copy=False) + + +def solve_f2_linear_system(mat: MatGF2, b: MatGF2) -> MatGF2: + r"""Solve the linear system (LS) `mat @ x == b`. + + Parameters + ---------- + mat : MatGF2 + Matrix with shape `(m, n)` containing the LS coefficients in row echelon form (REF). + b : MatGF2 + Matrix with shape `(m,)` containing the constants column vector. + + Returns + ------- + x : MatGF2 + Matrix with shape `(n,)` containing the solutions of the LS. + + Notes + ----- + This function is not integrated in `:class: graphix.linalg.MatGF2` because it does not perform any checks on the form of `mat` to ensure that it is in REF or that the system is solvable. + """ + return MatGF2(_solve_f2_linear_system_jit(mat, b), copy=False) + + +@nb.njit("uint8[::1](uint8[:,::1], uint8[::1])") +def _solve_f2_linear_system_jit( + mat_data: npt.NDArray[np.uint8], b_data: npt.NDArray[np.uint8] +) -> npt.NDArray[np.uint8]: + """See docstring of `:func:solve_f2_linear_system` for details.""" + m, n = mat_data.shape + x = np.zeros(n, dtype=np.uint8) + + # Find first row that is all-zero + for i in range(m): + for j in range(n): + if mat_data[i, j] == 1: + break # Row is not zero → go to next row + else: + m_nonzero = i # No break: this row is all-zero + break + else: + m_nonzero = m + + # Backward substitution from row m_nonzero - 1 to 0 + for i in range(m_nonzero - 1, -1, -1): + # Find first non-zero column in row i + pivot = -1 + for j in range(n): + if mat_data[i, j] == 1: + pivot = j + break + + # Sum x_k for k such that mat_data[i, k] == 1 + acc = 0 + for k in range(pivot, n): + if mat_data[i, k] == 1: + acc ^= x[k] + + x[pivot] = b_data[i] ^ acc + + return x + + +@nb.njit("uint8[:,::1](uint8[:,::1], uint64, boolean)") +def _elimination_jit(mat_data: npt.NDArray[np.uint8], ncols: int, full_reduce: bool) -> npt.NDArray[np.uint8]: + r"""Return row echelon form (REF) or row-reduced echelon form (RREF) by performing Gaussian elimination. + + Parameters + ---------- + mat_data : npt.NDArray[np.uint8] + Matrix to be gaussian-eliminated. + n_cols : int + Number of columns over which to perform Gaussian elimination. + full_reduce : bool + Flag determining the operation mode. Output is in RREF if `True`, REF otherwise. + + Returns + ------- + mat_data: npt.NDArray[np.uint8] + The matrix in row(-reduced) echelon form. + + Notes + ----- + Adapted from `:func: galois.FieldArray.row_reduction`, which renders the matrix in row-reduced echelon form (RREF) and specialized for :math:`\mathbb F_2`. + + Row echelon form (REF): + 1. All rows having only zero entries are at the bottom. + 2. The leading entry of every nonzero row is on the right of the leading entry of every row above. + 3. (1) and (2) imply that all entries in a column below a leading coefficient are zeros. + 4. It's the result of Gaussian elimination. + + For matrices over :math:`\mathbb F_2` the only difference between REF and RREF is that elements above a leading 1 can be non-zero in REF but must be 0 in RREF. + """ + m, n = mat_data.shape + p = 0 # Pivot + + for j in range(ncols): + # Find a pivot in column `j` at or below row `p`. + for i in range(p, m): + if mat_data[i, j] == 1: + break # `i` is a row with a pivot + else: + continue # No break: column `j` does not have a pivot below row `p`. + + # Swap row `p` and `i`. The pivot is now located at row `p`. + if i != p: + for k in range(n): + mat_data[i, k], mat_data[p, k] = mat_data[p, k], mat_data[i, k] + + if full_reduce: + # Force zeros BELOW and ABOVE the pivot by xor-ing with the pivot row + for k in range(m): + if mat_data[k, j] == 1 and k != p: + for l in range(n): + mat_data[k, l] ^= mat_data[p, l] + else: + # Force zeros BELOW the pivot by xor-ing with the pivot row + for k in range(p + 1, m): + if mat_data[k, j] == 1: + for l in range(n): + mat_data[k, l] ^= mat_data[p, l] + + p += 1 + if p == m: + break + + return mat_data + + +@nb.njit("uint8[:,::1](uint8[:,::1], uint8[:,::1])", parallel=True) +def _mat_mul_jit(m1: npt.NDArray[np.uint8], m2: npt.NDArray[np.uint8]) -> npt.NDArray[np.uint8]: + """See docstring of `:func:MatGF2.__matmul__` for details.""" + m, l = m1.shape + _, n = m2.shape + + res = np.zeros((m, n), dtype=np.uint8) + + for i in nb.prange(m): + for k in nb.prange(l): + if m1[i, k] == 1: + for j in range(n): + res[i, j] = np.bitwise_xor(res[i, j], m2[k, j]) + + return res diff --git a/graphix/find_pflow.py b/graphix/find_pflow.py new file mode 100644 index 000000000..6a0b8e949 --- /dev/null +++ b/graphix/find_pflow.py @@ -0,0 +1,612 @@ +"""Pauli flow finding algorithm. + +This module implements the algorithm presented in [1]. For a given labelled open graph (G, I, O, meas_plane), this algorithm finds a maximally delayed Pauli flow [2] in polynomial time with the number of nodes, :math:`O(N^3)`. +If the input graph does not have Pauli measurements, the algorithm returns a general flow (gflow) if it exists by definition. + +References +---------- +[1] Mitosek and Backens, 2024 (arXiv:2410.23439). +[2] Browne et al., 2007 New J. Phys. 9 250 (arXiv:quant-ph/0702212) +""" + +from __future__ import annotations + +from copy import deepcopy +from typing import TYPE_CHECKING + +import numpy as np + +from graphix._linalg import MatGF2, solve_f2_linear_system +from graphix.fundamentals import Axis, Plane +from graphix.measurements import PauliMeasurement +from graphix.sim.base_backend import NodeIndex + +if TYPE_CHECKING: + from collections.abc import Set as AbstractSet + + from graphix.opengraph import OpenGraph + + +class OpenGraphIndex: + """A class for managing the mapping between node numbers of a given open graph and matrix indices in the Pauli flow finding algorithm. + + It reuses the class `:class: graphix.sim.base_backend.NodeIndex` introduced for managing the mapping between node numbers and qubit indices in the internal state of the backend. + + Attributes + ---------- + og (OpenGraph) + non_inputs (NodeIndex) : Mapping between matrix indices and non-input nodes (labelled with integers). + non_outputs (NodeIndex) : Mapping between matrix indices and non-output nodes (labelled with integers). + non_outputs_optim (NodeIndex) : Mapping between matrix indices and a subset of non-output nodes (labelled with integers). + + Notes + ----- + At initialization, `non_outputs_optim` is a copy of `non_outputs`. The nodes corresponding to zero-rows of the order-demand matrix are removed for calculating the P matrix more efficiently in the `:func: _find_pflow_general` routine. + """ + + def __init__(self, og: OpenGraph) -> None: + self.og = og + nodes = set(og.inside.nodes) + + # Nodes don't need to be sorted. We do it for debugging purposes, so we can check the matrices in intermediate steps of the algorithm. + + nodes_non_input = sorted(nodes - set(og.inputs)) + nodes_non_output = sorted(nodes - set(og.outputs)) + + self.non_inputs = NodeIndex() + self.non_inputs.extend(nodes_non_input) + + self.non_outputs = NodeIndex() + self.non_outputs.extend(nodes_non_output) + + # Needs to be a deep copy because it may be modified during runtime. + self.non_outputs_optim = deepcopy(self.non_outputs) + + +def _compute_reduced_adj(ogi: OpenGraphIndex) -> MatGF2: + r"""Return the reduced adjacency matrix (RAdj) of the input open graph. + + Parameters + ---------- + ogi : OpenGraphIndex + Open graph whose RAdj is computed. + + Returns + ------- + adj_red : MatGF2 + Reduced adjacency matrix. + + Notes + ----- + The adjacency matrix of a graph :math:`Adj_G` is an :math:`n \times n` matrix. + + The RAdj matrix of an open graph OG is an :math:`(n - n_O) \times (n - n_I)` submatrix of :math:`Adj_G` constructed by removing the output rows and input columns of :math:`Adj_G`. + + See Definition 3.3 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + graph = ogi.og.inside + row_tags = ogi.non_outputs + col_tags = ogi.non_inputs + + adj_red = np.zeros((len(row_tags), len(col_tags)), dtype=np.uint8).view(MatGF2) + + for n1, n2 in graph.edges: + for u, v in ((n1, n2), (n2, n1)): + if u in row_tags and v in col_tags: + i, j = row_tags.index(u), col_tags.index(v) + adj_red[i, j] = 1 + + return adj_red + + +def _compute_pflow_matrices(ogi: OpenGraphIndex) -> tuple[MatGF2, MatGF2]: + r"""Construct flow-demand and order-demand matrices. + + Parameters + ---------- + ogi : OpenGraphIndex + Open graph whose flow-demand and order-demand matrices are computed. + + Returns + ------- + flow_demand_matrix : MatGF2 + order_demand_matrix : MatGF2 + + Notes + ----- + See Definitions 3.4 and 3.5, and Algorithm 1 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + flow_demand_matrix = _compute_reduced_adj(ogi) + order_demand_matrix = flow_demand_matrix.copy() + + inputs_set = set(ogi.og.inputs) + meas = ogi.og.measurements + + row_tags = ogi.non_outputs + col_tags = ogi.non_inputs + + # TODO: integrate pauli measurements in open graphs + meas_planes = {i: m.plane for i, m in meas.items()} + meas_angles = {i: m.angle for i, m in meas.items()} + meas_plane_axis = { + node: pm.axis if (pm := PauliMeasurement.try_from(plane, meas_angles[node])) else plane + for node, plane in meas_planes.items() + } + + for v in row_tags: # v is a node tag + i = row_tags.index(v) + plane_axis_v = meas_plane_axis[v] + + if plane_axis_v in {Plane.YZ, Plane.XZ, Axis.Z}: + flow_demand_matrix[i, :] = 0 # Set row corresponding to node v to 0 + if plane_axis_v in {Plane.YZ, Plane.XZ, Axis.Y, Axis.Z} and v not in inputs_set: + j = col_tags.index(v) + flow_demand_matrix[i, j] = 1 # Set element (v, v) = 0 + if plane_axis_v in {Plane.XY, Axis.X, Axis.Y, Axis.Z}: + order_demand_matrix[i, :] = 0 # Set row corresponding to node v to 0 + if plane_axis_v in {Plane.XY, Plane.XZ} and v not in inputs_set: + j = col_tags.index(v) + order_demand_matrix[i, j] = 1 # Set element (v, v) = 1 + + return flow_demand_matrix, order_demand_matrix + + +def _find_pflow_simple(ogi: OpenGraphIndex) -> tuple[MatGF2, MatGF2] | None: + r"""Construct the correction matrix :math:`C` and the ordering matrix, :math:`NC` for an open graph with equal number of inputs and outputs. + + Parameters + ---------- + ogi : OpenGraphIndex + Open graph for which :math:`C` and :math:`NC` are computed. + + Returns + ------- + correction_matrix : MatGF2 + Matrix encoding the correction function. + ordering_matrix : MatGF2 + Matrix encoding the partial ordering between nodes. + + or `None` + if the input open graph does not have Pauli flow. + + Notes + ----- + - The ordering matrix is defined as the product of the order-demand matrix :math:`N` and the correction matrix. + + - The function only returns `None` when the flow-demand matrix is not invertible (meaning that `ogi` does not have Pauli flow). The condition that the ordering matrix :math:`NC` must encode a directed acyclic graph (DAG) is verified in a subsequent step by `:func: _compute_topological_generations`. + + See Definitions 3.4, 3.5 and 3.6, Theorems 3.1 and 4.1, and Algorithm 2 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + flow_demand_matrix, order_demand_matrix = _compute_pflow_matrices(ogi) + + correction_matrix = flow_demand_matrix.right_inverse() # C matrix + + if correction_matrix is None: + return None # The flow-demand matrix is not invertible, therefore there's no flow. + + ordering_matrix = order_demand_matrix.mat_mul(correction_matrix) # NC matrix + + return correction_matrix, ordering_matrix + + +def _compute_p_matrix(ogi: OpenGraphIndex, nb_matrix: MatGF2) -> MatGF2 | None: + r"""Perform the steps 8 - 12 of the general case (larger number of outputs than inputs) algorithm. + + Parameters + ---------- + ogi : OpenGraphIndex + Open graph for which the matrix :math:`P` is computed. + nb_matrix : MatGF2 + Matrix :math:`N_B` + + Returns + ------- + p_matrix : MatGF2 + Matrix encoding the correction function. + + or `None` + if the input open graph does not have Pauli flow. + + Notes + ----- + See Theorem 4.4, steps 8 - 12 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + n_no = len(ogi.non_outputs) # number of columns of P matrix. + n_oi_diff = len(ogi.og.outputs) - len(ogi.og.inputs) # number of rows of P matrix. + n_no_optim = len(ogi.non_outputs_optim) # number of rows and columns of the third block of the K_{LS} matrix. + + # Steps 8, 9 and 10 + kils_matrix = np.concatenate( + (nb_matrix[:, n_no:], nb_matrix[:, :n_no], np.eye(n_no_optim, dtype=np.uint8)), axis=1 + ).view(MatGF2) # N_R | N_L | 1 matrix. + kls_matrix = kils_matrix.gauss_elimination(ncols=n_oi_diff, copy=True) # RREF form is not needed, only REF. + + # Step 11 + p_matrix = np.zeros((n_oi_diff, n_no), dtype=np.uint8).view(MatGF2) + solved_nodes: set[int] = set() + non_outputs_set = set(ogi.non_outputs) + + # Step 12 + while solved_nodes != non_outputs_set: + solvable_nodes = _find_solvable_nodes(ogi, kls_matrix, non_outputs_set, solved_nodes, n_oi_diff) # Step 12.a + if not solvable_nodes: + return None + + _update_p_matrix(ogi, kls_matrix, p_matrix, solvable_nodes, n_oi_diff) # Steps 12.b, 12.c + _update_kls_matrix(ogi, kls_matrix, kils_matrix, solvable_nodes, n_oi_diff, n_no, n_no_optim) # Step 12.d + solved_nodes.update(solvable_nodes) + + return p_matrix + + +def _find_solvable_nodes( + ogi: OpenGraphIndex, + kls_matrix: MatGF2, + non_outputs_set: AbstractSet[int], + solved_nodes: AbstractSet[int], + n_oi_diff: int, +) -> set[int]: + """Return the set nodes whose associated linear system is solvable. + + A node is solvable if: + - It has not been solved yet. + - Its column in the second block of :math:`K_{LS}` (which determines the constants in each equation) has only zeros where it intersects rows for which all the coefficients in the first block are 0s. + + See Theorem 4.4, step 12.a in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + solvable_nodes: set[int] = set() + + row_idxs = np.flatnonzero( + ~kls_matrix[:, :n_oi_diff].any(axis=1) + ) # Row indices of the 0-rows in the first block of K_{LS}. + if row_idxs.size: + for v in non_outputs_set - solved_nodes: + j = n_oi_diff + ogi.non_outputs.index(v) # `n_oi_diff` is the column offset from the first block of K_{LS}. + if not kls_matrix[row_idxs, j].any(): + solvable_nodes.add(v) + else: + # If the first block of K_{LS} does not have 0-rows, all non-solved nodes are solvable. + solvable_nodes = set(non_outputs_set - solved_nodes) + + return solvable_nodes + + +def _update_p_matrix( + ogi: OpenGraphIndex, kls_matrix: MatGF2, p_matrix: MatGF2, solvable_nodes: AbstractSet[int], n_oi_diff: int +) -> None: + """Update `p_matrix`. + + The solution of the linear system associated with node :math:`v` in `solvable_nodes` corresponds to the column of `p_matrix` associated with node :math:`v`. + + See Theorem 4.4, steps 12.b and 12.c in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + for v in solvable_nodes: + j = ogi.non_outputs.index(v) + j_shift = n_oi_diff + j # `n_oi_diff` is the column offset from the first block of K_{LS}. + mat = MatGF2(kls_matrix[:, :n_oi_diff]) # First block of K_{LS}, in row echelon form. + b = MatGF2(kls_matrix[:, j_shift]) + x = solve_f2_linear_system(mat, b) + p_matrix[:, j] = x + + +def _update_kls_matrix( + ogi: OpenGraphIndex, + kls_matrix: MatGF2, + kils_matrix: MatGF2, + solvable_nodes: AbstractSet[int], + n_oi_diff: int, + n_no: int, + n_no_optim: int, +) -> None: + """Update `kls_matrix`. + + Bring the linear system encoded in :math:`K_{LS}` to the row-echelon form (REF) that would be achieved by Gaussian elimination if the row and column vectors corresponding to vertices in `solvable_nodes` where not included in the starting matrix. + + See Theorem 4.4, step 12.d in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + shift = n_oi_diff + n_no # `n_oi_diff` + `n_no` is the column offset from the first two blocks of K_{LS}. + row_permutation: list[int] + + def reorder(old_pos: int, new_pos: int) -> None: # Used in step 12.d.vi + """Reorder the elements of `row_permutation`. + + The element at `old_pos` is placed on the right of the element at `new_pos`. + Example: + ``` + row_permutation = [0, 1, 2, 3, 4] + reorder(1, 3) -> [0, 2, 3, 1, 4] + reorder(2, -1) -> [2, 0, 1, 3, 4] + ``` + """ + val = row_permutation.pop(old_pos) + row_permutation.insert(new_pos + (new_pos < old_pos), val) + + for v in solvable_nodes: + if ( + v in ogi.non_outputs_optim + ): # if `v` corresponded to a zero row in N_B, it was not present in `kls_matrix` because we removed it in the optimization process, so there's no need to do Gaussian elimination for that vertex. + # Step 12.d.ii + j = ogi.non_outputs_optim.index(v) + j_shift = shift + j + row_idxs = np.flatnonzero( + kls_matrix[:, j_shift] + ).tolist() # Row indices with 1s in column of node `v` in third block. + + # `row_idxs` can't be empty: + # The third block of K_{LS} is initially the identity matrix, so all columns have initially a 1. Row permutations and row additions in the Gaussian elimination routine can't remove all 1s from a given column. + k = row_idxs.pop() + + # Step 12.d.iii + kls_matrix[row_idxs] ^= kls_matrix[k] # Adding a row to previous rows preserves REF. + + # Step 12.d.iv + kls_matrix[k] ^= kils_matrix[j] # Row `k` may now break REF. + + # Step 12.d.v + pivots: list[np.int_] = [] # Store pivots for next step. + for i, row in enumerate(kls_matrix): + if i != k: + col_idxs = np.flatnonzero(row[:n_oi_diff]) # Column indices with 1s in first block. + if col_idxs.size == 0: + # Row `i` has all zeros in the first block. Only row `k` can break REF, so rows below have all zeros in the first block too. + break + pivots.append(p := col_idxs[0]) + if kls_matrix[k, p]: # Row `k` has a 1 in the column corresponding to the leading 1 of row `i`. + kls_matrix[k] ^= row + + row_permutation = list(range(n_no_optim)) # Row indices of `kls_matrix`. + n_pivots = len(pivots) + + col_idxs = np.flatnonzero(kls_matrix[k, :n_oi_diff]) + pk = col_idxs[0] if col_idxs.size else None # Pivot of row `k`. + + if pk and k >= n_pivots: # Row `k` is non-zero in the FB (first block) and it's among zero rows. + # Find row `new_pos` s.t. `pivots[new_pos] <= pk < pivots[new_pos+1]`. + new_pos = ( + int(np.argmax(np.array(pivots) > pk) - 1) if pivots else -1 + ) # `pivots` can be empty. If so, we bring row `k` to the top since it's non-zero. + elif pk: # Row `k` is non-zero in the FB and it's among non-zero rows. + # Find row `new_pos` s.t. `pivots[new_pos] <= pk < pivots[new_pos+1]` + new_pos = int(np.argmax(np.array(pivots) > pk) - 1) + # We skipped row `k` in loop of step 12.d.v, so `pivots[j]` can be the pivot of row `j` or `j+1`. + if new_pos >= k: + new_pos += 1 + elif k < n_pivots: # Row `k` is zero in the first block and it's among non-zero rows. + new_pos = ( + n_pivots # Move row `k` to the top of the zeros block (i.e., below the row of the last pivot). + ) + else: # Row `k` is zero in the first block and it's among zero rows. + new_pos = k # Do nothing. + + if new_pos != k: + reorder(k, new_pos) # Modify `row_permutation` in-place. + kls_matrix[:] = kls_matrix[ + row_permutation + ] # `[:]` is crucial to modify the data pointed by `kls_matrix`. + + +def _find_pflow_general(ogi: OpenGraphIndex) -> tuple[MatGF2, MatGF2] | None: + r"""Construct the generalized correction matrix :math:`C'C^B` and the generalized ordering matrix, :math:`NC'C^B` for an open graph with larger number of outputs than inputs. + + Parameters + ---------- + ogi : OpenGraphIndex + Open graph for which :math:`C'C^B` and :math:`NC'C^B` are computed. + + Returns + ------- + correction_matrix : MatGF2 + Matrix encoding the correction function. + ordering_matrix : MatGF2 + Matrix encoding the partial ordering between nodes. + + or `None` + if the input open graph does not have Pauli flow. + + Notes + ----- + - The function returns `None` if + a) The flow-demand matrix is not invertible, or + b) Not all linear systems of equations associated to the non-output nodes are solvable, + meaning that `ogi` does not have Pauli flow. + Condition (b) is satisfied when the flow-demand matrix :math:`M` does not have a right inverse :math:`C` such that :math:`NC` represents a directed acyclical graph (DAG). + + See Theorem 4.4 and Algorithm 3 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + n_no = len(ogi.non_outputs) + n_oi_diff = len(ogi.og.outputs) - len(ogi.og.inputs) + + # Steps 1 and 2 + flow_demand_matrix, order_demand_matrix = _compute_pflow_matrices(ogi) + + # Steps 3 and 4 + correction_matrix_0 = flow_demand_matrix.right_inverse() # C0 matrix. + if correction_matrix_0 is None: + return None # The flow-demand matrix is not invertible, therefore there's no flow. + + # Steps 5, 6 and 7 + ker_flow_demand_matrix = flow_demand_matrix.null_space().transpose() # F matrix. + c_prime_matrix = np.concatenate((correction_matrix_0, ker_flow_demand_matrix), axis=1).view(MatGF2) + + row_idxs = np.flatnonzero(order_demand_matrix.any(axis=1)) # Row indices of the non-zero rows. + + if row_idxs.size: + # The p-matrix finding algorithm runs on the `order_demand_matrix` without the zero rows. + # This optimization is allowed because: + # - The zero rows remain zero after the change of basis (multiplication by `c_prime_matrix`). + # - The zero rows remain zero after gaussian elimination. + # - Removing the zero rows does not change the solvability condition of the open graph nodes. + nb_matrix_optim = ( + order_demand_matrix[row_idxs].view(MatGF2).mat_mul(c_prime_matrix) + ) # `view` is used to keep mypy happy without copying data. + for i in set(range(order_demand_matrix.shape[0])).difference(row_idxs): + ogi.non_outputs_optim.remove(ogi.non_outputs[i]) # Update the node-index mapping. + + # Steps 8 - 12 + if (p_matrix := _compute_p_matrix(ogi, nb_matrix_optim)) is None: + return None + else: + # If all rows of `order_demand_matrix` are zero, any matrix will solve the associated linear system of equations. + p_matrix = np.zeros((n_oi_diff, n_no), dtype=np.uint8).view(MatGF2) + + # Step 13 + cb_matrix = np.concatenate((np.eye(n_no, dtype=np.uint8), p_matrix), axis=0).view(MatGF2) + + correction_matrix = c_prime_matrix.mat_mul(cb_matrix) + ordering_matrix = order_demand_matrix.mat_mul(correction_matrix) + + return correction_matrix, ordering_matrix + + +def _compute_topological_generations(ordering_matrix: MatGF2) -> list[list[int]] | None: + """Stratify the directed acyclic graph (DAG) represented by the ordering matrix into generations. + + Parameters + ---------- + ordering_matrix : MatGF2 + Matrix encoding the partial ordering between nodes interpreted as the adjacency matrix of a directed graph. + + Returns + ------- + list[list[int]] + Topological generations. Integers represent the indices of the matrix `ordering_matrix`, not the labelling of the nodes. + + or `None` + if `ordering_matrix` is not a DAG. + + Notes + ----- + This function is adapted from `:func: networkx.algorithms.dag.topological_generations` so that it works directly on the adjacency matrix (which is the output of the Pauli-flow finding algorithm) instead of a `:class: nx.DiGraph` object. This avoids calling the function `nx.from_numpy_array` which can be expensive for certain graph instances. + + Here we use the convention that the element `ordering_matrix[i,j]` represents a link `j -> i`. NetworkX uses the opposite convention. + """ + adj_mat = ordering_matrix + + indegree_map: dict[int, int] = {} + zero_indegree: list[int] = [] + neighbors = {node: set(np.flatnonzero(row).astype(int)) for node, row in enumerate(adj_mat.T)} + for node, col in enumerate(adj_mat): + parents = np.flatnonzero(col) + if parents.size: + indegree_map[node] = parents.size + else: + zero_indegree.append(node) + + generations: list[list[int]] = [] + + while zero_indegree: + this_generation = zero_indegree + zero_indegree = [] + for node in this_generation: + for child in neighbors[node]: + indegree_map[child] -= 1 + if indegree_map[child] == 0: + zero_indegree.append(child) + del indegree_map[child] + generations.append(this_generation) + + if indegree_map: + return None + return generations + + +def _cnc_matrices2pflow( + ogi: OpenGraphIndex, + correction_matrix: MatGF2, + ordering_matrix: MatGF2, +) -> tuple[dict[int, set[int]], dict[int, int]] | None: + r"""Transform the correction and ordering matrices into a Pauli flow in its standard form (correction function and partial order). + + Parameters + ---------- + ogi : OpenGraphIndex + Open graph whose Pauli flow is calculated. + correction_matrix : MatGF2 + Matrix encoding the correction function. + ordering_matrix : MatGF2 + Matrix encoding the partial ordering between nodes (DAG). + + Returns + ------- + pf : dict[int, set[int]] + Pauli flow correction function. pf[i] is the set of qubits to be corrected for the measurement of qubit i. + l_k : dict[int, int] + Partial order between corrected qubits, such that the pair (`key`, `value`) corresponds to (node, depth). + + or `None` + if the ordering matrix is not a DAG, in which case the input open graph does not have Pauli flow. + + Notes + ----- + - The correction matrix :math:`C` is an :math:`(n - n_I) \times (n - n_O)` matrix related to the correction function :math:`c(v) = \{u \in I^c|C_{u,v} = 1\}`, where :math:`I^c` are the non-input nodes of `ogi`. In other words, the column :math:`v` of :math:`C` encodes the correction set of :math:`v`, :math:`c(v)`. + + - The Pauli flow's ordering :math:`<_c` is the transitive closure of :math:`\lhd_c`, where the latter is related to the ordering matrix :math:`NC` as :math:`v \lhd_c w \Leftrightarrow (NC)_{w,v} = 1`, for :math:`v, w, \in O^c` two non-output nodes of `ogi`. + + See Definition 3.6, Lemma 3.12, and Theorem 3.1 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + row_tags = ogi.non_inputs + col_tags = ogi.non_outputs + + # Calculation of the partial ordering + + if (topo_gen := _compute_topological_generations(ordering_matrix)) is None: + return None # The NC matrix is not a DAG, therefore there's no flow. + + l_k = dict.fromkeys(ogi.og.outputs, 0) # Output nodes are always in layer 0. + + # If m >_c n, with >_c the flow order for two nodes m, n, then layer(n) > layer(m). + # Therefore, we iterate the topological sort of the graph in _reverse_ order to obtain the order of measurements. + for layer, idx in enumerate(reversed(topo_gen), start=1): + l_k.update({col_tags[i]: layer for i in idx}) + + # Calculation of the correction function + + pf: dict[int, set[int]] = {} + for node in col_tags: + i = col_tags.index(node) + correction_set = {row_tags[j] for j in np.flatnonzero(correction_matrix[:, i])} + pf[node] = correction_set + + return pf, l_k + + +def find_pflow(og: OpenGraph) -> tuple[dict[int, set[int]], dict[int, int]] | None: + """Return a Pauli flow of the input open graph if it exists. + + Parameters + ---------- + og : OpenGraph + Open graph whose Pauli flow is calculated. + + Returns + ------- + pf : dict[int, set[int]] + Pauli flow correction function. `pf[i]` is the set of qubits to be corrected for the measurement of qubit `i`. + l_k : dict[int, int] + Partial order between corrected qubits, such that the pair (`key`, `value`) corresponds to (node, depth). + + or `None` + if the input open graph does not have Pauli flow. + + Notes + ----- + See Theorems 3.1, 4.2 and 4.4, and Algorithms 2 and 3 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + ni = len(og.inputs) + no = len(og.outputs) + + if ni > no: + return None + + ogi = OpenGraphIndex(og) + + cnc_matrices = _find_pflow_simple(ogi) if ni == no else _find_pflow_general(ogi) + if cnc_matrices is None: + return None + pflow = _cnc_matrices2pflow(ogi, *cnc_matrices) + if pflow is None: + return None + + pf, l_k = pflow + + return pf, l_k diff --git a/graphix/generator.py b/graphix/generator.py index ab6857832..815488c2f 100644 --- a/graphix/generator.py +++ b/graphix/generator.py @@ -4,9 +4,9 @@ from typing import TYPE_CHECKING +import graphix.gflow from graphix.command import E, M, N, X, Z from graphix.fundamentals import Plane -from graphix.gflow import find_flow, find_gflow, find_odd_neighbor, find_pauliflow, get_layers from graphix.pattern import Pattern if TYPE_CHECKING: @@ -78,7 +78,7 @@ def generate_from_graph( meas_planes = dict.fromkeys(measuring_nodes, Plane.XY) if not meas_planes else dict(meas_planes) # search for flow first - f, l_k = find_flow(graph, inputs_set, outputs_set, meas_planes=meas_planes) + f, l_k = graphix.gflow.find_flow(graph, inputs_set, outputs_set, meas_planes=meas_planes) if f is not None: # flow found pattern = _flow2pattern(graph, angles, inputs, f, l_k) @@ -86,16 +86,16 @@ def generate_from_graph( return pattern # no flow found - we try gflow - g, l_k = find_gflow(graph, inputs_set, outputs_set, meas_planes=meas_planes) - if g is not None: + g, l_k = graphix.gflow.find_gflow(graph, inputs_set, outputs_set, meas_planes=meas_planes) + if g is not None and l_k is not None: # gflow found pattern = _gflow2pattern(graph, angles, inputs, meas_planes, g, l_k) pattern.reorder_output_nodes(outputs) return pattern # no flow or gflow found - we try pflow - p, l_k = find_pauliflow(graph, inputs_set, outputs_set, meas_planes=meas_planes, meas_angles=angles) - if p is not None: + p, l_k = graphix.gflow.find_pauliflow(graph, inputs_set, outputs_set, meas_planes=meas_planes, meas_angles=angles) + if p is not None and l_k is not None: # pflow found pattern = _pflow2pattern(graph, angles, inputs, meas_planes, p, l_k) pattern.reorder_output_nodes(outputs) @@ -112,7 +112,7 @@ def _flow2pattern( l_k: Mapping[int, int], ) -> Pattern: """Construct a measurement pattern from a causal flow according to the theorem 1 of [NJP 9, 250 (2007)].""" - depth, layers = get_layers(l_k) + depth, layers = graphix.gflow.get_layers(l_k) pattern = Pattern(input_nodes=inputs) for i in set(graph.nodes) - set(inputs): pattern.add(N(node=i)) @@ -143,7 +143,7 @@ def _gflow2pattern( l_k: Mapping[int, int], ) -> Pattern: """Construct a measurement pattern from a generalized flow according to the theorem 2 of [NJP 9, 250 (2007)].""" - depth, layers = get_layers(l_k) + depth, layers = graphix.gflow.get_layers(l_k) pattern = Pattern(input_nodes=inputs) for i in set(graph.nodes) - set(inputs): pattern.add(N(node=i)) @@ -152,7 +152,7 @@ def _gflow2pattern( for i in range(depth, 0, -1): # i from depth, depth-1, ... 1 for j in layers[i]: pattern.add(M(node=j, plane=meas_planes[j], angle=angles[j])) - odd_neighbors = find_odd_neighbor(graph, g[j]) + odd_neighbors = graphix.gflow.find_odd_neighbor(graph, g[j]) for k in odd_neighbors - {j}: pattern.add(Z(node=k, domain={j})) for k in g[j] - {j}: @@ -169,7 +169,7 @@ def _pflow2pattern( l_k: Mapping[int, int], ) -> Pattern: """Construct a measurement pattern from a Pauli flow according to the theorem 4 of [NJP 9, 250 (2007)].""" - depth, layers = get_layers(l_k) + depth, layers = graphix.gflow.get_layers(l_k) pattern = Pattern(input_nodes=inputs) for i in set(graph.nodes) - set(inputs): pattern.add(N(node=i)) @@ -178,7 +178,7 @@ def _pflow2pattern( for i in range(depth, 0, -1): # i from depth, depth-1, ... 1 for j in layers[i]: pattern.add(M(node=j, plane=meas_planes[j], angle=angles[j])) - odd_neighbors = find_odd_neighbor(graph, p[j]) + odd_neighbors = graphix.gflow.find_odd_neighbor(graph, p[j]) future_nodes: set[int] = set.union( *(nodes for (layer, nodes) in layers.items() if layer < i) ) # {k | k > j}, with "j" last corrected node and ">" the Pauli flow ordering diff --git a/graphix/gflow.py b/graphix/gflow.py index 01abdbf75..5faae477c 100644 --- a/graphix/gflow.py +++ b/graphix/gflow.py @@ -1,7 +1,6 @@ """Flow finding algorithm. -For a given underlying graph (G, I, O, meas_plane), this method finds a (generalized) flow [NJP 9, 250 (2007)] -in polynomincal time. +For a given underlying graph (G, I, O, meas_plane), this method finds a (generalized) flow [NJP 9, 250 (2007)] in polynomial time. In particular, this outputs gflow with minimum depth, maximally delayed gflow. Ref: Mhalla and Perdrix, International Colloquium on Automata, @@ -13,23 +12,23 @@ from __future__ import annotations from copy import deepcopy -from itertools import product from typing import TYPE_CHECKING -import networkx as nx -import numpy as np -import sympy as sp from typing_extensions import assert_never +import graphix.opengraph from graphix.command import CommandKind +from graphix.find_pflow import find_pflow as _find_pflow from graphix.fundamentals import Axis, Plane -from graphix.linalg import MatGF2 -from graphix.measurements import PauliMeasurement +from graphix.measurements import Measurement, PauliMeasurement +from graphix.parameter import Placeholder if TYPE_CHECKING: from collections.abc import Mapping from collections.abc import Set as AbstractSet + import networkx as nx + from graphix.parameter import ExpressionOrFloat from graphix.pattern import Pattern @@ -42,194 +41,60 @@ def check_meas_planes(meas_planes: dict[int, Plane]) -> None: raise TypeError(f"Measure plane for {node} is `{plane}`, which is not an instance of `Plane`") +# NOTE: In a future version this function will take an `OpenGraph` object as input. def find_gflow( graph: nx.Graph[int], - iset: set[int], - oset: set[int], - meas_planes: dict[int, Plane], - mode: str = "single", -) -> tuple[dict[int, set[int]], dict[int, int]]: - """Maximally delayed gflow finding algorithm. - - For open graph g with input, output, and measurement planes, this returns maximally delayed gflow. - - gflow consist of function g(i) where i is the qubit labels, - and strict partial ordering < or layers labels l_k where each element - specify the order of qubits to be measured to maintain determinism in MBQC. - In practice, we must measure qubits in order specified in array l_k (increasing order - of l_k from 1), and for each measurements of qubit i we must perform corrections on - qubits in g(i), depending on the measurement outcome. - - For more details of gflow, see Browne et al., NJP 9, 250 (2007). - We use the extended gflow finding algorithm in Backens et al., Quantum 5, 421 (2021). + iset: AbstractSet[int], + oset: AbstractSet[int], + meas_planes: Mapping[int, Plane], + mode: str = "single", # noqa: ARG001 Compatibility with old API +) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: + r"""Return a maximally delayed general flow (gflow) of the input open graph if it exists. Parameters ---------- graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - meas_planes: dict - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. + Graph (including input and output). + iset: AbstractSet[int] + Set of input nodes. + oset: AbstractSet[int] + Set of output nodes. + meas_planes: Mapping[int, Plane] + Measurement planes for each qubit. meas_planes[i] is the measurement plane for qubit i. mode: str - The gflow finding algorithm can yield multiple equivalent solutions. So there are three options - - - "single": Returrns a single solution - - - "all": Returns all possible solutions - - - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, - requiring user substitution to get a concrete answer. - - Optional. Default is "single". + Deprecated. Reminiscent of old API, it will be removed in future versions. Returns ------- - g: dict - gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. - """ - check_meas_planes(meas_planes) - l_k = {} - g = {} - for node in graph.nodes: - l_k[node] = 0 - return gflowaux(graph, iset, oset, meas_planes, 1, l_k, g, mode=mode) + dict[int, set[int]] + Gflow correction function. In a given pair (key, value), value is the set of qubits to be corrected for the measurement of qubit key. + dict[int, int] + Partial order between corrected qubits, such that the pair (key, value) corresponds to (node, depth). + or None, None + if the input open graph does not have gflow. -def gflowaux( - graph: nx.Graph, - iset: set[int], - oset: set[int], - meas_planes: dict[int, Plane], - k: int, - l_k: dict[int, int], - g: dict[int, set[int]], - mode: str = "single", -): - """Find one layer of the gflow. + Notes + ----- + This function implements the algorithm in [1], see module graphix.find_pflow. + See [1] or [2] for a definition of gflow. - Ref: Backens et al., Quantum 5, 421 (2021). - - Parameters + References ---------- - graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - meas_planes: dict - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - k: int - current layer number. - l_k: dict - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. - g: dict - gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. - mode: str(optional) - The gflow finding algorithm can yield multiple equivalent solutions. So there are three options - - "single": Returrns a single solution - - "all": Returns all possible solutions - - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, - requiring user substitution to get a concrete answer. - - Returns - ------- - g: dict - gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. + [1] Mitosek and Backens, 2024 (arXiv:2410.23439). + [2] Backens et al., Quantum 5, 421 (2021). """ - nodes = set(graph.nodes) - if oset == nodes: - return g, l_k - non_output = nodes - oset - correction_candidate = oset - iset - adj_mat, node_order_list = get_adjacency_matrix(graph) - node_order_row = node_order_list.copy() - node_order_row.sort() - node_order_col = node_order_list.copy() - node_order_col.sort() - for out in oset: - adj_mat.remove_row(node_order_row.index(out)) - node_order_row.remove(out) - adj_mat_row_reduced = adj_mat.copy() # later use for construct RHS - for node in nodes - correction_candidate: - adj_mat.remove_col(node_order_col.index(node)) - node_order_col.remove(node) - - b = MatGF2(np.zeros((adj_mat.data.shape[0], len(non_output)), dtype=int)) - for i_row in range(len(node_order_row)): - node = node_order_row[i_row] - vec = MatGF2(np.zeros(len(node_order_row), dtype=int)) - if meas_planes[node] == Plane.XY: - vec.data[i_row] = 1 - elif meas_planes[node] == Plane.XZ: - vec.data[i_row] = 1 - vec_add = adj_mat_row_reduced.data[:, node_order_list.index(node)] - vec += vec_add - elif meas_planes[node] == Plane.YZ: - vec.data = adj_mat_row_reduced.data[:, node_order_list.index(node)].reshape(vec.data.shape) - b.data[:, i_row] = vec.data - adj_mat, b, _, col_permutation = adj_mat.forward_eliminate(b) - x, kernels = adj_mat.backward_substitute(b) - - corrected_nodes = set() - for i_row in range(len(node_order_row)): - non_out_node = node_order_row[i_row] - x_col = x[:, i_row] - if 0 in x_col.shape or x_col[0] == sp.nan: # no solution - continue - if mode == "single": - sol_list = [x_col[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_col))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - g[non_out_node] = {node_order_col[col_permutation.index(i)] for i in sol_index} - if meas_planes[non_out_node] in {Plane.XZ, Plane.YZ}: - g[non_out_node] |= {non_out_node} - - elif mode == "all": - g[non_out_node] = set() - binary_combinations = product([0, 1], repeat=len(kernels)) - for binary_combination in binary_combinations: - sol_list = [x_col[i].subs(zip(kernels, binary_combination)) for i in range(len(x_col))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - g_i = {node_order_col[col_permutation.index(i)] for i in sol_index} - if meas_planes[non_out_node] in {Plane.XZ, Plane.YZ}: - g_i |= {non_out_node} - - g[non_out_node] |= {frozenset(g_i)} - - elif mode == "abstract": - g[non_out_node] = {} - for i in range(len(x_col)): - node = node_order_col[col_permutation.index(i)] - g[non_out_node][node] = x_col[i] - if meas_planes[non_out_node] in {Plane.XZ, Plane.YZ}: - g[non_out_node][non_out_node] = sp.true - - l_k[non_out_node] = k - corrected_nodes |= {non_out_node} - - if len(corrected_nodes) == 0: - if oset == nodes: - return g, l_k - return None, None - return gflowaux( - graph, - iset, - oset | corrected_nodes, - meas_planes, - k + 1, - l_k, - g, - mode=mode, + meas = {node: Measurement(Placeholder("Angle"), plane) for node, plane in meas_planes.items()} + og = graphix.opengraph.OpenGraph( + inside=graph, + inputs=list(iset), + outputs=list(oset), + measurements=meas, ) + gf = _find_pflow(og) + if gf is None: + return None, None # This is to comply with old API. It will be change in the future to `None`` + return gf[0], gf[1] def find_flow( @@ -358,291 +223,63 @@ def flowaux( ) +# NOTE: In a future version this function will take an `OpenGraph` object as input. def find_pauliflow( graph: nx.Graph[int], - iset: set[int], - oset: set[int], - meas_planes: dict[int, Plane], + iset: AbstractSet[int], + oset: AbstractSet[int], + meas_planes: Mapping[int, Plane], meas_angles: Mapping[int, ExpressionOrFloat], - mode: str = "single", -) -> tuple[dict[int, set[int]], dict[int, int]]: - """Maximally delayed Pauli flow finding algorithm. - - For open graph g with input, output, measurement planes and measurement angles, this returns maximally delayed Pauli flow. - - Pauli flow consist of function p(i) where i is the qubit labels, - and strict partial ordering < or layers labels l_k where each element - specify the order of qubits to be measured to maintain determinism in MBQC. - In practice, we must measure qubits in order specified in array l_k (increasing order - of l_k from 1), and for each measurements of qubit i we must perform corrections on - qubits in p(i), depending on the measurement outcome. - - For more details of Pauli flow and the finding algorithm used in this method, - see Simmons et al., EPTCS 343, 2021, pp. 50-101 (arXiv:2109.05654). + mode: str = "single", # noqa: ARG001 Compatibility with old API +) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: + r"""Return a maximally delayed Pauli flow of the input open graph if it exists. Parameters ---------- graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - meas_planes: dict - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - meas_angles: dict - measurement angles for each qubits. meas_angles[i] is the measurement angle for qubit i. + Graph (including input and output). + iset: AbstractSet[int] + Set of input nodes. + oset: AbstractSet[int] + Set of output nodes. + meas_planes: Mapping[int, Plane] + Measurement planes for each qubit. meas_planes[i] is the measurement plane for qubit i. + meas_angles: Mapping[int, ExpressionOrFloat] + Measurement angles for each qubit. meas_angles[i] is the measurement angle for qubit i. mode: str - The Pauliflow finding algorithm can yield multiple equivalent solutions. So there are three options - - - "single": Returrns a single solution - - - "all": Returns all possible solutions - - - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, - requiring user substitution to get a concrete answer. - - Optional. Default is "single". + Deprecated. Reminiscent of old API, it will be removed in future versions. Returns ------- - p: dict - Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict - layers obtained by Pauli flow algorithm. l_k[d] is a node set of depth d. - """ - check_meas_planes(meas_planes) - l_k = {} - p = {} - l_x, l_y, l_z = get_pauli_nodes(meas_planes, meas_angles) - for node in graph.nodes: - if node in oset: - l_k[node] = 0 - - return pauliflowaux(graph, iset, oset, meas_planes, 0, set(), oset, l_k, p, (l_x, l_y, l_z), mode) + dict[int, set[int]] + Pauli flow correction function. In a given pair (key, value), value is the set of qubits to be corrected for the measurement of qubit key. + dict[int, int] + Partial order between corrected qubits, such that the pair (key, value) corresponds to (node, depth). + or None, None + if the input open graph does not have gflow. -def pauliflowaux( - graph: nx.Graph, - iset: set[int], - oset: set[int], - meas_planes: dict[int, Plane], - k: int, - correction_candidate: set[int], - solved_nodes: set[int], - l_k: dict[int, int], - p: dict[int, set[int]], - ls: tuple[set[int], set[int], set[int]], - mode: str = "single", -): - """Find one layer of the Pauli flow. - - Ref: Simmons et al., EPTCS 343, 2021, pp. 50-101 (arXiv:2109.05654). + Notes + ----- + This function implements the algorithm in [1], see module graphix.find_pflow. + See [1] or [2] for a definition of Pauli flow. - Parameters + References ---------- - graph: :class:`networkx.Graph` - Graph (incl. input and output) - iset: set - set of node labels for input - oset: set - set of node labels for output - meas_planes: dict - measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. - k: int - current layer number. - correction_candidate: set - set of qubits to be corrected. - solved_nodes: set - set of qubits whose layers are already determined. - l_k: dict - layers obtained by gflow algorithm. l_k[d] is a node set of depth d. - p: dict - Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. - ls: tuple - ls = (l_x, l_y, l_z) where l_x, l_y, l_z are sets of qubits whose measurement operators are X, Y, Z, respectively. - mode: str(optional) - The Pauliflow finding algorithm can yield multiple equivalent solutions. So there are three options - - "single": Returrns a single solution - - "all": Returns all possible solutions - - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, - requiring user substitution to get a concrete answer. - - Returns - ------- - p: dict - Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. - l_k: dict - layers obtained by Pauli flow algorithm. l_k[d] is a node set of depth d. + [1] Mitosek and Backens, 2024 (arXiv:2410.23439). + [2] Browne et al., 2007 New J. Phys. 9 250 (arXiv:quant-ph/0702212) """ - l_x, l_y, l_z = ls - solved_update = set() - nodes = set(graph.nodes) - if oset == nodes: - return p, l_k - unsolved_nodes = nodes - solved_nodes - - adj_mat, node_order_list = get_adjacency_matrix(graph) - adj_mat_w_id = adj_mat.copy() + MatGF2(np.identity(adj_mat.data.shape[0], dtype=int)) - node_order_row = node_order_list.copy() - node_order_row_lower = node_order_list.copy() - node_order_col = node_order_list.copy() - - p_bar = correction_candidate | l_y | l_z - pset = nodes - p_bar - kset = (correction_candidate | l_x | l_y) & (nodes - iset) - yset = l_y - correction_candidate - - for node in unsolved_nodes: - adj_mat_ = adj_mat.copy() - adj_mat_w_id_ = adj_mat_w_id.copy() - node_order_row_ = node_order_row.copy() - node_order_row_lower_ = node_order_row_lower.copy() - node_order_col_ = node_order_col.copy() - for node_ in nodes - (pset | {node}): - adj_mat_.remove_row(node_order_row_.index(node_)) - node_order_row_.remove(node_) - for node_ in nodes - (yset - {node}): - adj_mat_w_id_.remove_row(node_order_row_lower_.index(node_)) - node_order_row_lower_.remove(node_) - for node_ in nodes - (kset - {node}): - adj_mat_.remove_col(node_order_col_.index(node_)) - adj_mat_w_id_.remove_col(node_order_col_.index(node_)) - node_order_col_.remove(node_) - adj_mat_.concatenate(adj_mat_w_id_, axis=0) - - if mode == "all": - p[node] = set() - - if mode == "abstract": - p[node] = [] - - solved = False - if meas_planes[node] == Plane.XY or node in l_x or node in l_y: - mat = MatGF2(np.zeros((len(node_order_row_), 1), dtype=int)) - mat.data[node_order_row_.index(node), :] = 1 - mat_lower = MatGF2(np.zeros((len(node_order_row_lower_), 1), dtype=int)) - mat.concatenate(mat_lower, axis=0) - adj_mat_xy, mat, _, col_permutation_xy = adj_mat_.forward_eliminate(mat, copy=True) - x_xy, kernels = adj_mat_xy.backward_substitute(mat) - - if 0 not in x_xy.shape and x_xy[0, 0] != sp.nan: - solved_update |= {node} - x_xy = x_xy[:, 0] - l_k[node] = k - - if mode == "single": - sol_list = [x_xy[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_xy))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - p[node] = {node_order_col_[col_permutation_xy.index(i)] for i in sol_index} - solved = True - - elif mode == "all": - binary_combinations = product([0, 1], repeat=len(kernels)) - for binary_combination in binary_combinations: - sol_list = [x_xy[i].subs(zip(kernels, binary_combination)) for i in range(len(x_xy))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - p_i = {node_order_col_[col_permutation_xy.index(i)] for i in sol_index} - p[node].add(frozenset(p_i)) - - elif mode == "abstract": - p_i = {} - for i in range(len(x_xy)): - node_temp = node_order_col_[col_permutation_xy.index(i)] - p_i[node_temp] = x_xy[i] - p[node].append(p_i) - - if not solved and (meas_planes[node] == Plane.XZ or node in l_z or node in l_x): - mat = MatGF2(np.zeros((len(node_order_row_), 1), dtype=int)) - mat.data[node_order_row_.index(node)] = 1 - for neighbor in search_neighbor(node, graph.edges): - if neighbor in pset | {node}: - mat.data[node_order_row_.index(neighbor), :] = 1 - mat_lower = MatGF2(np.zeros((len(node_order_row_lower_), 1), dtype=int)) - for neighbor in search_neighbor(node, graph.edges): - if neighbor in yset - {node}: - mat_lower.data[node_order_row_lower_.index(neighbor), :] = 1 - mat.concatenate(mat_lower, axis=0) - adj_mat_xz, mat, _, col_permutation_xz = adj_mat_.forward_eliminate(mat, copy=True) - x_xz, kernels = adj_mat_xz.backward_substitute(mat) - if 0 not in x_xz.shape and x_xz[0, 0] != sp.nan: - solved_update |= {node} - x_xz = x_xz[:, 0] - l_k[node] = k - - if mode == "single": - sol_list = [x_xz[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_xz))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - p[node] = {node_order_col_[col_permutation_xz.index(i)] for i in sol_index} | {node} - solved = True - - elif mode == "all": - binary_combinations = product([0, 1], repeat=len(kernels)) - for binary_combination in binary_combinations: - sol_list = [x_xz[i].subs(zip(kernels, binary_combination)) for i in range(len(x_xz))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - p_i = {node_order_col_[col_permutation_xz.index(i)] for i in sol_index} | {node} - p[node].add(frozenset(p_i)) - - elif mode == "abstract": - p_i = {} - for i in range(len(x_xz)): - node_temp = node_order_col_[col_permutation_xz.index(i)] - p_i[node_temp] = x_xz[i] - p_i[node] = sp.true - p[node].append(p_i) - - if not solved and (meas_planes[node] == Plane.YZ or node in l_y or node in l_z): - mat = MatGF2(np.zeros((len(node_order_row_), 1), dtype=int)) - for neighbor in search_neighbor(node, graph.edges): - if neighbor in pset | {node}: - mat.data[node_order_row_.index(neighbor), :] = 1 - mat_lower = MatGF2(np.zeros((len(node_order_row_lower_), 1), dtype=int)) - for neighbor in search_neighbor(node, graph.edges): - if neighbor in yset - {node}: - mat_lower.data[node_order_row_lower_.index(neighbor), :] = 1 - mat.concatenate(mat_lower, axis=0) - adj_mat_yz, mat, _, col_permutation_yz = adj_mat_.forward_eliminate(mat, copy=True) - x_yz, kernels = adj_mat_yz.backward_substitute(mat) - if 0 not in x_yz.shape and x_yz[0, 0] != sp.nan: - solved_update |= {node} - x_yz = x_yz[:, 0] - l_k[node] = k - - if mode == "single": - sol_list = [x_yz[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_yz))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - p[node] = {node_order_col_[col_permutation_yz.index(i)] for i in sol_index} | {node} - solved = True - - elif mode == "all": - binary_combinations = product([0, 1], repeat=len(kernels)) - for binary_combination in binary_combinations: - sol_list = [x_yz[i].subs(zip(kernels, binary_combination)) for i in range(len(x_yz))] - sol = np.array(sol_list) - sol_index = sol.nonzero()[0] - p_i = {node_order_col_[col_permutation_yz.index(i)] for i in sol_index} | {node} - p[node].add(frozenset(p_i)) - - elif mode == "abstract": - p_i = {} - for i in range(len(x_yz)): - node_temp = node_order_col_[col_permutation_yz.index(i)] - p_i[node_temp] = x_yz[i] - p_i[node] = sp.true - p[node].append(p_i) - - if solved_update == set() and k > 0: - if solved_nodes == nodes: - return p, l_k - return None, None - bset = solved_nodes | solved_update - return pauliflowaux(graph, iset, oset, meas_planes, k + 1, bset, bset, l_k, p, (l_x, l_y, l_z), mode) + meas = {node: Measurement(angle, meas_planes[node]) for node, angle in meas_angles.items()} + og = graphix.opengraph.OpenGraph( + inside=graph, + inputs=list(iset), + outputs=list(oset), + measurements=meas, + ) + pf = _find_pflow(og) + if pf is None: + return None, None # This is to comply with old API. It will be change in the future to `None`` + return pf[0], pf[1] def flow_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, int]]: @@ -747,7 +384,11 @@ def gflow_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, return None, None -def pauliflow_from_pattern(pattern: Pattern, mode="single") -> tuple[dict[int, set[int]], dict[int, int]]: +# TODO: Shouldn't call `find_pauliflow` +def pauliflow_from_pattern( + pattern: Pattern, + mode="single", # noqa: ARG001 Compatibility with old API +) -> tuple[dict[int, set[int]], dict[int, int]] | tuple[None, None]: """Check if the pattern has a valid Pauliflow. If so, return the Pauliflow and layers. Parameters @@ -771,52 +412,12 @@ def pauliflow_from_pattern(pattern: Pattern, mode="single") -> tuple[dict[int, s if not pattern.is_standard(strict=True): raise ValueError("The pattern should be standardized first.") graph = pattern.extract_graph() - nodes = set(graph.nodes) input_nodes = set(pattern.input_nodes) if pattern.input_nodes else set() - output_nodes = set(pattern.output_nodes) - non_outputs = nodes - output_nodes + output_nodes = set(pattern.output_nodes) if pattern.output_nodes else set() meas_planes = pattern.get_meas_plane() meas_angles = pattern.get_angles() - l_x, l_y, l_z = get_pauli_nodes(meas_planes, meas_angles) - - p_all, l_k = find_pauliflow(graph, input_nodes, output_nodes, meas_planes, meas_angles, mode="all") - if p_all is None: - return None, None - - p = {} - - xflow, zflow = get_corrections_from_pattern(pattern) - for node in non_outputs: - xflow_node = xflow.get(node, set()) - zflow_node = zflow.get(node, set()) - p_list = list(p_all[node]) if node in p_all else [] - valid = False - - for p_i in p_list: - if xflow_node & p_i == xflow_node: - ignored_nodes = p_i - xflow_node - {node} - # check if nodes in ignored_nodes are measured in X or Y basis - if ignored_nodes & (l_x | l_y) != ignored_nodes: - continue - odd_neighbers = find_odd_neighbor(graph, p_i) - if zflow_node & odd_neighbers == zflow_node: - ignored_nodes = zflow_node - odd_neighbers - {node} - # check if nodes in ignored_nodes are measured in Z or Y basis - if ignored_nodes & (l_y | l_z) == ignored_nodes: - valid = True - if mode == "single": - p[node] = set(p_i) - break - if mode == "all": - if node not in p: - p[node] = set() - p[node].add(frozenset(p_i)) - continue - if not valid: - return None, None - - return p, l_k + return find_pauliflow(graph, input_nodes, output_nodes, meas_planes, meas_angles) def get_corrections_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, set[int]]]: @@ -1090,30 +691,6 @@ def get_layers_from_flow( return layers, depth -def get_adjacency_matrix(graph: nx.Graph) -> tuple[MatGF2, list[int]]: - """Get adjacency matrix of the graph. - - Parameters - ---------- - graph : :class:`networkx.Graph` - Graph whose adjacency matrix is computed. - - Returns - ------- - adjacency_matrix: graphix.linalg.MatGF2 - adjacency matrix of the graph. the matrix is defined on GF(2) field. - node_list: list - ordered list of nodes. ``node_list[i]`` is the node label of the i-th - row/column of the adjacency matrix. - - """ - node_list = list(graph.nodes) - node_list.sort() - adjacency_matrix = nx.to_numpy_array(graph, nodelist=node_list) - adjacency_matrix = MatGF2(adjacency_matrix.astype(int)) - return adjacency_matrix, node_list - - def verify_flow( graph: nx.Graph, iset: set[int], diff --git a/graphix/linalg.py b/graphix/linalg.py deleted file mode 100644 index 22a5e142c..000000000 --- a/graphix/linalg.py +++ /dev/null @@ -1,315 +0,0 @@ -"""Algorithms for linear algebra.""" - -from __future__ import annotations - -import galois -import numpy as np -import numpy.typing as npt -import sympy as sp - - -class MatGF2: - """Matrix on GF2 field.""" - - def __init__(self, data): - """Construct a matrix of GF2. - - Parameters - ---------- - data: array_like - input data - """ - if isinstance(data, MatGF2): - self.data = data.data - else: - self.data = galois.GF2(data) - - def __repr__(self) -> str: - """Return the representation string of the matrix.""" - return repr(self.data) - - def __str__(self) -> str: - """Return the displayable string of the matrix.""" - return str(self.data) - - def __eq__(self, other: MatGF2) -> bool: - """Return `True` if two matrices are equal, `False` otherwise.""" - return np.all(self.data == other.data) - - def __add__(self, other: npt.NDArray | MatGF2) -> MatGF2: - """Add two matrices.""" - if isinstance(other, np.ndarray): - other = MatGF2(other) - return MatGF2(self.data + other.data) - - def __sub__(self, other: npt.NDArray | MatGF2) -> MatGF2: - """Substract two matrices.""" - if isinstance(other, np.ndarray): - other = MatGF2(other) - return MatGF2(self.data - other.data) - - def __mul__(self, other: npt.NDArray | MatGF2) -> MatGF2: - """Compute the point-wise multiplication of two matrices.""" - if isinstance(other, np.ndarray): - other = MatGF2(other) - return MatGF2(self.data * other.data) - - def __matmul__(self, other: npt.NDArray | MatGF2) -> MatGF2: - """Multiply two matrices.""" - if isinstance(other, np.ndarray): - other = MatGF2(other) - return MatGF2(self.data @ other.data) - - def copy(self) -> MatGF2: - """Return a copy of the matrix.""" - return MatGF2(self.data.copy()) - - def add_row(self, array_to_add=None, row=None) -> None: - """Add a row to the matrix. - - Parameters - ---------- - array_to_add: array_like(optional) - row to add. Defaults to None. if None, add a zero row. - row: int(optional) - index to add a new row. Defaults to None. - """ - if row is None: - row = self.data.shape[0] - if array_to_add is None: - array_to_add = np.zeros((1, self.data.shape[1]), dtype=int) - array_to_add = array_to_add.reshape((1, self.data.shape[1])) - self.data = np.insert(self.data, row, array_to_add, axis=0) - - def add_col(self, array_to_add=None, col=None) -> None: - """Add a column to the matrix. - - Parameters - ---------- - array_to_add: array_like(optional) - column to add. Defaults to None. if None, add a zero column. - col: int(optional) - index to add a new column. Defaults to None. - """ - if col is None: - col = self.data.shape[1] - if array_to_add is None: - array_to_add = np.zeros((1, self.data.shape[0]), dtype=int) - array_to_add = array_to_add.reshape((1, self.data.shape[0])) - self.data = np.insert(self.data, col, array_to_add, axis=1) - - def concatenate(self, other: MatGF2, axis: int = 1) -> None: - """Concatinate two matrices. - - Parameters - ---------- - other: MatGF2 - matrix to concatinate - axis: int(optional) - axis to concatinate. Defaults to 1. - """ - self.data = np.concatenate((self.data, other.data), axis=axis) - - def remove_row(self, row: int) -> None: - """Remove a row from the matrix. - - Parameters - ---------- - row: int - index to remove a row - """ - self.data = np.delete(self.data, row, axis=0) - - def remove_col(self, col: int) -> None: - """Remove a column from the matrix. - - Parameters - ---------- - col: int - index to remove a column - """ - self.data = np.delete(self.data, col, axis=1) - - def swap_row(self, row1: int, row2: int) -> None: - """Swap two rows. - - Parameters - ---------- - row1: int - row index - row2: int - row index - """ - self.data[[row1, row2]] = self.data[[row2, row1]] - - def swap_col(self, col1: int, col2: int) -> None: - """Swap two columns. - - Parameters - ---------- - col1: int - column index - col2: int - column index - """ - self.data[:, [col1, col2]] = self.data[:, [col2, col1]] - - def permute_row(self, row_permutation) -> None: - """Permute rows. - - Parameters - ---------- - row_permutation: array_like - row permutation - """ - self.data = self.data[row_permutation, :] - - def permute_col(self, col_permutation) -> None: - """Permute columns. - - Parameters - ---------- - col_permutation: array_like - column permutation - """ - self.data = self.data[:, col_permutation] - - def is_canonical_form(self) -> bool: - """Check if the matrix is in a canonical form (row reduced echelon form). - - Returns - ------- - bool: bool - True if the matrix is in canonical form - """ - diag = self.data.diagonal() - nonzero_diag_index = diag.nonzero()[0] - - rank = len(nonzero_diag_index) - for i in range(len(nonzero_diag_index)): - if diag[nonzero_diag_index[i]] == 0: - if np.count_nonzero(diag[i:]) != 0: - break - return False - - ref_array = MatGF2(np.diag(np.diagonal(self.data[:rank, :rank]))) - if np.count_nonzero(self.data[:rank, :rank] - ref_array.data) != 0: - return False - - return np.count_nonzero(self.data[rank:, :]) == 0 - - def get_rank(self) -> int: - """Get the rank of the matrix. - - Returns - ------- - int: int - rank of the matrix - """ - mat_a = self.forward_eliminate(copy=True)[0] if not self.is_canonical_form() else self - nonzero_index = np.diag(mat_a.data).nonzero() - return len(nonzero_index[0]) - - def forward_eliminate(self, b=None, copy=False) -> tuple[MatGF2, MatGF2, list[int], list[int]]: - r"""Forward eliminate the matrix. - - |A B| --\ |I X| - |C D| --/ |0 0| - where X is an arbitrary matrix - - Parameters - ---------- - b: array_like(otional) - Left hand side of the system of equations. Defaults to None. - copy: bool(optional) - copy the matrix or not. Defaults to False. - - Returns - ------- - mat_a: MatGF2 - forward eliminated matrix - b: MatGF2 - forward eliminated right hand side - row_permutation: list - row permutation - col_permutation: list - column permutation - """ - mat_a = MatGF2(self.data) if copy else self - if b is None: - b = np.zeros((mat_a.data.shape[0], 1), dtype=int) - b = MatGF2(b) - # Remember the row and column order - row_permutation = list(range(mat_a.data.shape[0])) - col_permutation = list(range(mat_a.data.shape[1])) - - # Gauss-Jordan Elimination - max_rank = min(mat_a.data.shape) - for row in range(max_rank): - if mat_a.data[row, row] == 0: - pivot = mat_a.data[row:, row:].nonzero() - if len(pivot[0]) == 0: - break - pivot_row = pivot[0][0] + row - if pivot_row != row: - mat_a.swap_row(row, pivot_row) - b.swap_row(row, pivot_row) - former_row = row_permutation.index(row) - former_pivot_row = row_permutation.index(pivot_row) - row_permutation[former_row] = pivot_row - row_permutation[former_pivot_row] = row - pivot_col = pivot[1][0] + row - if pivot_col != row: - mat_a.swap_col(row, pivot_col) - former_col = col_permutation.index(row) - former_pivot_col = col_permutation.index(pivot_col) - col_permutation[former_col] = pivot_col - col_permutation[former_pivot_col] = row - assert mat_a.data[row, row] == 1 - eliminate_rows = set(mat_a.data[:, row].nonzero()[0]) - {row} - for eliminate_row in eliminate_rows: - mat_a.data[eliminate_row, :] += mat_a.data[row, :] - b.data[eliminate_row, :] += b.data[row, :] - return mat_a, b, row_permutation, col_permutation - - def backward_substitute(self, b) -> tuple[npt.NDArray, list[sp.Symbol]]: - """Backward substitute the matrix. - - Parameters - ---------- - b: array_like - right hand side of the system of equations - - Returns - ------- - x: sympy.MutableDenseMatrix - answer of the system of equations - kernels: list-of-sympy.Symbol - kernel of the matrix. - matrix x contains sympy.Symbol if the input matrix is not full rank. - nan-column vector means that there is no solution. - """ - rank = self.get_rank() - b = MatGF2(b) - x = [] - kernels = sp.symbols(f"x0:{self.data.shape[1] - rank}") - for col in range(b.data.shape[1]): - x_col = [] - b_col = b.data[:, col] - if np.count_nonzero(b_col[rank:]) != 0: - x_col = [sp.nan for i in range(self.data.shape[1])] - x.append(x_col) - continue - for row in range(rank - 1, -1, -1): - sol = sp.true if b_col[row] == 1 else sp.false - kernel_index = np.nonzero(self.data[row, rank:])[0] - for k in kernel_index: - sol ^= kernels[k] - x_col.insert(0, sol) - for row in range(rank, self.data.shape[1]): - x_col.append(kernels[row - rank]) - x.append(x_col) - - x = np.array(x).T - - return x, kernels diff --git a/graphix/opengraph.py b/graphix/opengraph.py index 446156c52..e6bbadd41 100644 --- a/graphix/opengraph.py +++ b/graphix/opengraph.py @@ -7,7 +7,7 @@ import networkx as nx -from graphix.generator import generate_from_graph +import graphix.generator from graphix.measurements import Measurement if TYPE_CHECKING: @@ -114,7 +114,7 @@ def to_pattern(self) -> Pattern: angles = {node: m.angle for node, m in meas.items()} planes = {node: m.plane for node, m in meas.items()} - return generate_from_graph(g, angles, inputs, outputs, planes) + return graphix.generator.generate_from_graph(g, angles, inputs, outputs, planes) def compose(self, other: OpenGraph, mapping: Mapping[int, int]) -> tuple[OpenGraph, dict[int, int]]: r"""Compose two open graphs by merging subsets of nodes from `self` and `other`, and relabeling the nodes of `other` that were not merged. diff --git a/graphix/pattern.py b/graphix/pattern.py index 0332f7f7a..2c96d77b2 100644 --- a/graphix/pattern.py +++ b/graphix/pattern.py @@ -1024,7 +1024,7 @@ def get_measurement_order_from_gflow(self) -> list[int]: vout = set(self.output_nodes) meas_planes = self.get_meas_plane() flow, l_k = find_gflow(graph, vin, vout, meas_planes=meas_planes) - if not flow: + if flow is None or l_k is None: # We check both to avoid typing issues with `get_layers`. raise ValueError("No gflow found") k, layers = get_layers(l_k) meas_order: list[int] = [] diff --git a/graphix/pauli.py b/graphix/pauli.py index e801fcfe6..6b50b5686 100644 --- a/graphix/pauli.py +++ b/graphix/pauli.py @@ -1,4 +1,4 @@ -"""Pauli gates ± {1,j} × {I, X, Y, Z}.""" # noqa: RUF002 +"""Pauli gates ± {1,j} × {I, X, Y, Z}.""" from __future__ import annotations diff --git a/graphix/pretty_print.py b/graphix/pretty_print.py index 2892ce174..773f10d30 100644 --- a/graphix/pretty_print.py +++ b/graphix/pretty_print.py @@ -27,7 +27,9 @@ class OutputFormat(Enum): Unicode = enum.auto() -def angle_to_str(angle: float, output: OutputFormat, max_denominator: int = 1000) -> str: +def angle_to_str( + angle: float, output: OutputFormat, max_denominator: int = 1000, multiplication_sign: bool = False +) -> str: r""" Return a string representation of an angle given in units of π. @@ -43,6 +45,13 @@ def angle_to_str(angle: float, output: OutputFormat, max_denominator: int = 1000 Desired formatting style: Unicode (π symbol), LaTeX (\pi), or ASCII ("pi"). max_denominator : int, optional Maximum denominator for detecting a simple fraction (default: 1000). + multiplication_sign : bool + Optional (default: ``False``). + If ``True``, the multiplication sign is made explicit between the + numerator and π: + ``2×π`` in Unicode, ``2 \times \pi`` in LaTeX, and ``2*pi`` in ASCII. + If ``False``, the multiplication sign is implicit: + ``2π`` in Unicode, ``2\pi`` in LaTeX, ``2pi`` in ASCII. Returns ------- @@ -54,7 +63,7 @@ def angle_to_str(angle: float, output: OutputFormat, max_denominator: int = 1000 if not math.isclose(angle, float(frac)): rad = angle * math.pi - return f"{rad:.2f}" + return f"{rad}" num, den = frac.numerator, frac.denominator sign = "-" if num < 0 else "" @@ -65,21 +74,28 @@ def angle_to_str(angle: float, output: OutputFormat, max_denominator: int = 1000 def mkfrac(num: str, den: str) -> str: return rf"\frac{{{num}}}{{{den}}}" + + mul = r" \times " else: pi = "π" if output == OutputFormat.Unicode else "pi" def mkfrac(num: str, den: str) -> str: return f"{num}/{den}" + mul = "×" if output == OutputFormat.Unicode else "*" + + if not multiplication_sign: + mul = "" + if den == 1: if num == 0: return "0" if num == 1: return f"{sign}{pi}" - return f"{sign}{num}{pi}" + return f"{sign}{num}{mul}{pi}" den_str = f"{den}" - num_str = pi if num == 1 else f"{num}{pi}" + num_str = pi if num == 1 else f"{num}{mul}{pi}" return f"{sign}{mkfrac(num_str, den_str)}" diff --git a/graphix/qasm3_exporter.py b/graphix/qasm3_exporter.py new file mode 100644 index 000000000..60e9dec5f --- /dev/null +++ b/graphix/qasm3_exporter.py @@ -0,0 +1,120 @@ +"""Exporter to OpenQASM3.""" + +from __future__ import annotations + +from math import pi +from typing import TYPE_CHECKING + +# assert_never added in Python 3.11 +from typing_extensions import assert_never + +from graphix.fundamentals import Axis, Sign +from graphix.instruction import Instruction, InstructionKind +from graphix.measurements import PauliMeasurement +from graphix.pretty_print import OutputFormat, angle_to_str + +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator + + from graphix import Circuit + from graphix.parameter import ExpressionOrFloat + + +def circuit_to_qasm3(circuit: Circuit) -> str: + """Export circuit instructions to OpenQASM 3.0 representation. + + Returns + ------- + str + The OpenQASM 3.0 string representation of the circuit. + """ + return "\n".join(circuit_to_qasm3_lines(circuit)) + + +def circuit_to_qasm3_lines(circuit: Circuit) -> Iterator[str]: + """Export circuit instructions to line-by-line OpenQASM 3.0 representation. + + Returns + ------- + Iterator[str] + The OpenQASM 3.0 lines that represent the circuit. + """ + yield "OPENQASM 3;" + yield 'include "stdgates.inc";' + yield f"qubit[{circuit.width}] q;" + if any(instr.kind == InstructionKind.M for instr in circuit.instruction): + yield f"bit[{circuit.width}] b;" + for instr in circuit.instruction: + yield f"{instruction_to_qasm3(instr)};" + + +def qasm3_qubit(index: int) -> str: + """Return the name of the indexed qubit.""" + return f"q[{index}]" + + +def qasm3_gate_call(gate: str, operands: Iterable[str], args: Iterable[str] | None = None) -> str: + """Return the OpenQASM3 gate call.""" + operands_str = ", ".join(operands) + if args is None: + return f"{gate} {operands_str}" + args_str = ", ".join(args) + return f"{gate}({args_str}) {operands_str}" + + +def angle_to_qasm3(angle: ExpressionOrFloat) -> str: + """Get the OpenQASM3 representation of an angle.""" + if not isinstance(angle, float): + raise TypeError("QASM export of symbolic pattern is not supported") + rad_over_pi = angle / pi + return angle_to_str(rad_over_pi, output=OutputFormat.ASCII, multiplication_sign=True) + + +def instruction_to_qasm3(instruction: Instruction) -> str: + """Get the OpenQASM3 representation of a single circuit instruction.""" + if instruction.kind == InstructionKind.M: + if PauliMeasurement.try_from(instruction.plane, instruction.angle) != PauliMeasurement(Axis.Z, Sign.PLUS): + raise ValueError("OpenQASM3 only supports measurements in Z axis.") + return f"b[{instruction.target}] = measure q[{instruction.target}]" + # Use of `==` here for mypy + if ( + instruction.kind == InstructionKind.RX # noqa: PLR1714 + or instruction.kind == InstructionKind.RY + or instruction.kind == InstructionKind.RZ + ): + angle = angle_to_qasm3(instruction.angle) + return qasm3_gate_call(instruction.kind.name.lower(), args=[angle], operands=[qasm3_qubit(instruction.target)]) + + # Use of `==` here for mypy + if ( + instruction.kind == InstructionKind.H # noqa: PLR1714 + or instruction.kind == InstructionKind.S + or instruction.kind == InstructionKind.X + or instruction.kind == InstructionKind.Y + or instruction.kind == InstructionKind.Z + ): + return qasm3_gate_call(instruction.kind.name.lower(), [qasm3_qubit(instruction.target)]) + if instruction.kind == InstructionKind.I: + return qasm3_gate_call("id", [qasm3_qubit(instruction.target)]) + if instruction.kind == InstructionKind.CNOT: + return qasm3_gate_call("cx", [qasm3_qubit(instruction.control), qasm3_qubit(instruction.target)]) + if instruction.kind == InstructionKind.SWAP: + return qasm3_gate_call("swap", [qasm3_qubit(instruction.targets[i]) for i in (0, 1)]) + if instruction.kind == InstructionKind.RZZ: + angle = angle_to_qasm3(instruction.angle) + return qasm3_gate_call( + "crz", args=[angle], operands=[qasm3_qubit(instruction.control), qasm3_qubit(instruction.target)] + ) + if instruction.kind == InstructionKind.CCX: + return qasm3_gate_call( + "ccx", + [ + qasm3_qubit(instruction.controls[0]), + qasm3_qubit(instruction.controls[1]), + qasm3_qubit(instruction.target), + ], + ) + # Use of `==` here for mypy + if instruction.kind == InstructionKind._XC or instruction.kind == InstructionKind._ZC: # noqa: PLR1714 + raise ValueError("Internal instruction should not appear") + assert_never(instruction.kind) diff --git a/graphix/sim/density_matrix.py b/graphix/sim/density_matrix.py index 61a94822e..2c7882280 100644 --- a/graphix/sim/density_matrix.py +++ b/graphix/sim/density_matrix.py @@ -106,7 +106,11 @@ def get_row( @property def nqubit(self) -> int: """Return the number of qubits.""" - return self.rho.shape[0].bit_length() - 1 + # Circumvent typing bug with numpy>=2.3 + # `shape` field is typed `tuple[Any, ...]` instead of `tuple[int, ...]` + # See https://github.com/numpy/numpy/issues/29830 + nqubit: int = self.rho.shape[0].bit_length() - 1 + return nqubit def __str__(self) -> str: """Return a string description.""" diff --git a/noxfile.py b/noxfile.py index 4f19886e6..79aff5549 100644 --- a/noxfile.py +++ b/noxfile.py @@ -10,13 +10,13 @@ def install_pytest(session: Session) -> None: """Install pytest when requirements-dev.txt is not installed.""" - session.install("pytest", "pytest-mock", "psutil") + session.install("pytest", "pytest-mock", "pytest-benchmark", "psutil") @nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) def tests_minimal(session: Session) -> None: """Run the test suite with minimal dependencies.""" - session.install("-e", ".") + session.install(".") install_pytest(session) # We cannot run `pytest --doctest-modules` here, since some tests # involve optional dependencies, like pyzx. @@ -27,7 +27,7 @@ def tests_minimal(session: Session) -> None: @nox.session(python=["3.10", "3.11", "3.12", "3.13"]) def tests_dev(session: Session) -> None: """Run the test suite with dev dependencies.""" - session.install("-e", ".[dev]") + session.install(".[dev]") # We cannot run `pytest --doctest-modules` here, since some tests # involve optional dependencies, like pyzx. session.run("pytest") @@ -36,7 +36,7 @@ def tests_dev(session: Session) -> None: @nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) def tests_extra(session: Session) -> None: """Run the test suite with extra dependencies.""" - session.install("-e", ".[extra]") + session.install(".[extra]") install_pytest(session) session.install("nox") # needed for `--doctest-modules` session.run("pytest", "--doctest-modules") @@ -45,14 +45,14 @@ def tests_extra(session: Session) -> None: @nox.session(python=["3.10", "3.11", "3.12", "3.13"]) def tests_all(session: Session) -> None: """Run the test suite with all dependencies.""" - session.install("-e", ".[dev,extra]") + session.install(".[dev,extra]") session.run("pytest", "--doctest-modules") @nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) def tests_symbolic(session: Session) -> None: """Run the test suite of graphix-symbolic.""" - session.install("-e", ".") + session.install(".") install_pytest(session) session.install("nox") # needed for `--doctest-modules` # Use `session.cd` as a context manager to ensure that the @@ -62,8 +62,23 @@ def tests_symbolic(session: Session) -> None: with TemporaryDirectory() as tmpdir, session.cd(tmpdir): # If you need a specific branch: # session.run("git", "clone", "-b", "branch-name", "https://github.com/TeamGraphix/graphix-symbolic") - # See https://github.com/TeamGraphix/graphix-symbolic/pull/4 - session.run("git", "clone", "-b", "branch_selector", "https://github.com/thierry-martinez/graphix-symbolic") - # session.run("git", "clone", "https://github.com/TeamGraphix/graphix-symbolic") + session.run("git", "clone", "https://github.com/TeamGraphix/graphix-symbolic") with session.cd("graphix-symbolic"): session.run("pytest", "--doctest-modules") + + +@nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) +def tests_qasm_parser(session: Session) -> None: + """Run the test suite of graphix-qasm-parser.""" + session.install(".") + install_pytest(session) + session.install("nox") # needed for `--doctest-modules` + # Use `session.cd` as a context manager to ensure that the + # working directory is restored afterward. This is important + # because Windows cannot delete a temporary directory while it + # is the working directory. + with TemporaryDirectory() as tmpdir, session.cd(tmpdir): + session.run("git", "clone", "https://github.com/TeamGraphix/graphix-qasm-parser") + with session.cd("graphix-qasm-parser"): + session.install(".") + session.run("pytest", "--doctest-modules") diff --git a/pyproject.toml b/pyproject.toml index 22af33377..76a9f4be6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,7 +93,8 @@ extend-ignore = [ "W191", ] # Allow "α" (U+03B1 GREEK SMALL LETTER ALPHA) which could be confused for "a" -allowed-confusables = ["α"] +# Allow "×" (U+00D7 MULTIPLICATION SIGN) which could be confused for "x" +allowed-confusables = ["α", "×"] [tool.ruff.format] docstring-code-format = true @@ -126,7 +127,7 @@ ban-relative-imports = "all" required-imports = ["from __future__ import annotations"] [tool.pytest.ini_options] -addopts = ["--ignore=examples", "--ignore=docs", "--ignore=benchmarks"] +addopts = ["--ignore=examples", "--ignore=docs", "--ignore=benchmarks", "--benchmark-autosave"] # Silence cotengra warning filterwarnings = ["ignore:Couldn't import `kahypar`"] diff --git a/requirements-dev.txt b/requirements-dev.txt index 292c07733..df3fe2148 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,7 +2,7 @@ mypy==1.18.2 pre-commit # for language-agnostic hooks pyright -ruff==0.13.2 +ruff==0.14.1 # Stubs types-networkx==3.5.0.20250712 @@ -11,12 +11,15 @@ types-setuptools # Tests # Keep in sync with CI -nox==2025.5.1 +nox==2025.10.16 psutil pytest +pytest-benchmark pytest-cov pytest-mock # Optional dependencies qiskit>=1.0 qiskit-aer + +graphix-qasm-parser @ git+https://github.com/TeamGraphix/graphix-qasm-parser.git@id_gate diff --git a/requirements.txt b/requirements.txt index b57c42a27..0a9a57154 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -galois matplotlib networkx numpy>=2,<3 diff --git a/stubs/numba/__init__.pyi b/stubs/numba/__init__.pyi new file mode 100644 index 000000000..93376bae9 --- /dev/null +++ b/stubs/numba/__init__.pyi @@ -0,0 +1,9 @@ +from typing import Any, Callable, TypeVar, overload + +_F = TypeVar("_F", bound=Callable[..., Any]) + +@overload +def njit(f: _F) -> _F: ... +@overload +def njit(ty: str, parallel: bool = False) -> Callable[[_F], _F]: ... +def prange(low: int, high: int | None = None) -> range: ... diff --git a/stubs/pytest_benchmark/__init__.pyi b/stubs/pytest_benchmark/__init__.pyi new file mode 100644 index 000000000..11c852544 --- /dev/null +++ b/stubs/pytest_benchmark/__init__.pyi @@ -0,0 +1,7 @@ +from typing import Callable, ParamSpec, Protocol, TypeVar + +_P = ParamSpec("_P") +_R = TypeVar("_R") + +class BenchmarkFixture(Protocol): + def __call__(self, func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R: ... diff --git a/tests/test_find_pflow.py b/tests/test_find_pflow.py new file mode 100644 index 000000000..185418139 --- /dev/null +++ b/tests/test_find_pflow.py @@ -0,0 +1,682 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, NamedTuple + +import networkx as nx +import numpy as np +import pytest + +from graphix._linalg import MatGF2 +from graphix.find_pflow import ( + OpenGraphIndex, + _compute_pflow_matrices, + _compute_reduced_adj, + _compute_topological_generations, + _find_pflow_simple, + find_pflow, +) +from graphix.fundamentals import Plane +from graphix.generator import _pflow2pattern +from graphix.measurements import Measurement +from graphix.opengraph import OpenGraph +from graphix.parameter import Placeholder +from graphix.random_objects import rand_circuit +from graphix.states import PlanarState +from tests.conftest import fx_rng + +if TYPE_CHECKING: + from numpy.random import Generator + from pytest_benchmark import BenchmarkFixture + + +class OpenGraphTestCase(NamedTuple): + ogi: OpenGraphIndex + radj: MatGF2 | None + flow_demand_mat: MatGF2 | None + order_demand_mat: MatGF2 | None + has_pflow: bool + + +class DAGTestCase(NamedTuple): + adj_mat: MatGF2 + generations: list[list[int]] | None + + +def get_og_rndcircuit(depth: int, n_qubits: int, n_inputs: int | None = None) -> OpenGraph: + """Return an open graph from a random circuit. + + Parameters + ---------- + depth : int + Circuit depth of the random circuits for generating open graphs. + n_qubits : int + Number of qubits in the random circuits for generating open graphs. It controls the number of outputs. + n_inputs : int | None + Optional (default to `None`). Maximum number of inputs in the returned open graph. The returned open graph is the open graph generated from the random circuit where `n_qubits - n_inputs` nodes have been removed from the input-nodes set. This operation does not change the flow properties of the graph. + + Returns + ------- + OpenGraph + Open graph with causal flow. + """ + circuit = rand_circuit(n_qubits, depth, fx_rng._fixture_function()) + pattern = circuit.transpile().pattern + graph = pattern.extract_graph() + + angles = pattern.get_angles() + planes = pattern.get_meas_plane() + meas = {node: Measurement(angle, planes[node]) for node, angle in angles.items()} + + og = OpenGraph( + inside=graph, + inputs=pattern.input_nodes, + outputs=pattern.output_nodes, + measurements=meas, + ) + + if n_inputs is not None: + ni_remove = max(0, n_qubits - n_inputs) + for i in range(ni_remove): + og.inputs.remove(i) + + return og + + +def get_og_dense(ni: int, no: int, m: int) -> OpenGraph: + """Return a dense open graph with causal, gflow and pflow. + + Parameters + ---------- + ni : int + Number of input nodes (must be equal or smaller than `no` ). + no : int + Number of output nodes (must be larger than 1). + m : int + Number of total nodes (it must satisfy `m - 2*no > 0`). + + Returns + ------- + OpenGraph + Open graph with causal and gflow. + + Notes + ----- + Adapted from Fig. 1 in Houshmand et al., Phys. Rev. A, 98 (2018) (arXiv:1705.01535) + """ + if no <= 1: + raise ValueError("Number of outputs must be larger than 1 (no > 1).") + if m - 2 * no <= 0: + raise ValueError("Total number of nodes must be larger than twice the number of outputs (m - 2no > 0).") + + inputs = list(range(no)) # we remove inputs afterwards + outputs = list(range(no, 2 * no)) + edges = [(i, o) for i, o in zip(inputs[:-2], outputs[:-2])] + edges.extend((node, node + 1) for node in range(2 * no - 1, m - 1)) + edges.append((inputs[-2], m - 1)) + + graph: nx.Graph[int] = nx.Graph() + graph.add_nodes_from(range(m)) + graph.add_edges_from(edges) + graph_c = nx.complement(graph) + + meas = {node: Measurement(Placeholder("Angle"), Plane.XY) for node in range(m) if node not in set(outputs)} + + og = OpenGraph( + inside=graph_c, + inputs=inputs, + outputs=outputs, + measurements=meas, + ) # This open graph corresponds to the example in the reference. Now we remove nodes from the set of inputs, since this operation preserves the flow properties. + + ni_remove = max(0, len(og.inputs) - ni) + for i in og.inputs[ni_remove:]: + og.inputs.remove(i) + return og + + +def prepare_test_og() -> list[OpenGraphTestCase]: + test_cases: list[OpenGraphTestCase] = [] + + # Trivial open graph with pflow and nI = nO + def get_og_0() -> OpenGraph: + """Return an open graph with Pauli flow and equal number of outputs and inputs. + + The returned graph has the following structure: + + [0]-1-(2) + """ + graph: nx.Graph[int] = nx.Graph([(0, 1), (1, 2)]) + inputs = [0] + outputs = [2] + meas = { + 0: Measurement(0.1, Plane.XY), # XY + 1: Measurement(0.5, Plane.YZ), # Y + } + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_0()), + radj=MatGF2([[1, 0], [0, 1]]), + flow_demand_mat=MatGF2([[1, 0], [1, 1]]), + order_demand_mat=MatGF2([[0, 0], [0, 0]]), + has_pflow=True, + ) + ) + + # Non-trivial open graph without pflow and nI = nO + def get_og_1() -> OpenGraph: + """Return an open graph without Pauli flow and equal number of outputs and inputs. + + The returned graph has the following structure: + + [0]-2-4-(6) + | | + [1]-3-5-(7) + """ + graph: nx.Graph[int] = nx.Graph([(0, 2), (1, 3), (2, 3), (2, 4), (3, 5), (4, 5), (4, 6), (5, 7)]) + inputs = [1, 0] + outputs = [6, 7] + meas = { + 0: Measurement(0.1, Plane.XY), # XY + 1: Measurement(0.1, Plane.XZ), # XZ + 2: Measurement(0.5, Plane.XZ), # X + 3: Measurement(0.5, Plane.YZ), # Y + 4: Measurement(0.5, Plane.YZ), # Y + 5: Measurement(0.1, Plane.YZ), # YZ + } + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_1()), + radj=MatGF2( + [ + [1, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 1, 1, 0, 0, 0], + [1, 0, 0, 1, 0, 0], + [1, 0, 0, 1, 1, 0], + [0, 1, 1, 0, 0, 1], + ] + ), + flow_demand_mat=MatGF2( + [ + [1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 1, 1, 0, 0, 0], + [1, 1, 0, 1, 0, 0], + [1, 0, 1, 1, 1, 0], + [0, 0, 0, 1, 0, 0], + ] + ), + order_demand_mat=MatGF2( + [ + [0, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 1, 1, 0, 0, 1], + ] + ), + has_pflow=False, + ) + ) + + # Non-trivial open graph with pflow and nI = nO + def get_og_2() -> OpenGraph: + """Return an open graph with Pauli flow and equal number of outputs and inputs. + + The returned graph has the following structure: + + [0]-2-4-(6) + | | + [1]-3-5-(7) + """ + graph: nx.Graph[int] = nx.Graph([(0, 2), (1, 3), (2, 3), (2, 4), (3, 5), (4, 5), (4, 6), (5, 7)]) + inputs = [0, 1] + outputs = [6, 7] + meas = { + 0: Measurement(0.1, Plane.XY), # XY + 1: Measurement(0.1, Plane.XY), # XY + 2: Measurement(0.0, Plane.XY), # X + 3: Measurement(0.1, Plane.XY), # XY + 4: Measurement(0.0, Plane.XY), # X + 5: Measurement(0.5, Plane.XY), # Y + } + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_2()), + radj=MatGF2( + [ + [1, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 1, 1, 0, 0, 0], + [1, 0, 0, 1, 0, 0], + [1, 0, 0, 1, 1, 0], + [0, 1, 1, 0, 0, 1], + ] + ), + flow_demand_mat=MatGF2( + [ + [1, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 1, 1, 0, 0, 0], + [1, 0, 0, 1, 0, 0], + [1, 0, 0, 1, 1, 0], + [0, 1, 1, 1, 0, 1], + ] + ), + order_demand_mat=MatGF2( + [ + [0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + ] + ), + has_pflow=True, + ) + ) + + # Non-trivial open graph with pflow and nI != nO + def get_og_3() -> OpenGraph: + """Return an open graph with Pauli flow and unequal number of outputs and inputs. + + Example from Fig. 1 in Mitosek and Backens, 2024 (arXiv:2410.23439). + """ + graph: nx.Graph[int] = nx.Graph( + [(0, 2), (2, 4), (3, 4), (4, 6), (1, 4), (1, 6), (2, 3), (3, 5), (2, 6), (3, 6)] + ) + inputs = [0] + outputs = [5, 6] + meas = { + 0: Measurement(0.1, Plane.XY), # XY + 1: Measurement(0.1, Plane.XZ), # XZ + 2: Measurement(0.5, Plane.YZ), # Y + 3: Measurement(0.1, Plane.XY), # XY + 4: Measurement(0, Plane.XZ), # Z + } + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_3()), + radj=MatGF2( + [[0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 1], [0, 0, 1, 1, 0, 1], [0, 1, 0, 1, 1, 1], [1, 1, 1, 0, 0, 1]] + ), + flow_demand_mat=MatGF2( + [[0, 1, 0, 0, 0, 0], [1, 0, 0, 0, 0, 0], [0, 1, 1, 1, 0, 1], [0, 1, 0, 1, 1, 1], [0, 0, 0, 1, 0, 0]] + ), + order_demand_mat=MatGF2( + [[0, 0, 0, 0, 0, 0], [1, 0, 0, 1, 0, 1], [0, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0]] + ), + has_pflow=True, + ) + ) + + # The following tests check the final result only, not the intermediate steps. + + # Non-trivial open graph with pflow and nI != nO + def get_og_4() -> OpenGraph: + """Return an open graph with Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 2), (1, 3), (2, 3), (2, 6), (3, 4), (4, 7), (4, 5), (7, 8)]) + inputs = [0, 1] + outputs = [5, 6, 8] + meas = { + 0: Measurement(0.1, Plane.XY), + 1: Measurement(0.1, Plane.XY), + 2: Measurement(0.0, Plane.XY), + 3: Measurement(0, Plane.XY), + 4: Measurement(0.5, Plane.XY), + 7: Measurement(0, Plane.XY), + } + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_4()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ) + ) + + # Non-trivial open graph with pflow and nI != nO + def get_og_5() -> OpenGraph: + """Return an open graph with Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 2), (1, 2), (2, 3), (3, 4)]) + inputs = [0, 1] + outputs = [1, 3, 4] + meas = {0: Measurement(0.1, Plane.XY), 2: Measurement(0.5, Plane.YZ)} + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_5()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ) + ) + + # Non-trivial open graph with pflow and nI != nO + def get_og_6() -> OpenGraph: + """Return an open graph with Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 1), (0, 3), (1, 4), (3, 4), (2, 3), (2, 5), (3, 6), (4, 7)]) + inputs = [1] + outputs = [6, 2, 7] + meas = { + 0: Measurement(0.1, Plane.XZ), + 1: Measurement(0.1, Plane.XY), + 3: Measurement(0, Plane.XY), + 4: Measurement(0.1, Plane.XY), + 5: Measurement(0.1, Plane.YZ), + } + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_6()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ) + ) + + # Disconnected open graph with pflow and nI != nO + def get_og_7() -> OpenGraph: + """Return an open graph with Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 1), (0, 2), (2, 3), (1, 3), (4, 6)]) + inputs: list[int] = [] + outputs = [1, 3, 4] + meas = {0: Measurement(0.5, Plane.XZ), 2: Measurement(0, Plane.YZ), 6: Measurement(0.2, Plane.XY)} + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_7()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ) + ) + + # Non-trivial open graph without pflow and nI != nO + def get_og_8() -> OpenGraph: + """Return an open graph without Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph( + [(0, 1), (0, 3), (1, 4), (3, 4), (2, 3), (2, 5), (3, 6), (4, 7), (5, 6), (6, 7)] + ) + inputs = [1] + outputs = [6, 2, 7] + meas = { + 0: Measurement(0.1, Plane.XZ), + 1: Measurement(0.1, Plane.XY), + 3: Measurement(0, Plane.XY), + 4: Measurement(0.1, Plane.XY), + 5: Measurement(0.1, Plane.XY), + } + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_8()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=False, + ) + ) + + # Disconnected open graph without pflow and nI != nO + def get_og_9() -> OpenGraph: + """Return an open graph without Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 1), (0, 2), (2, 3), (1, 3), (4, 6)]) + inputs = [0] + outputs = [1, 3, 4] + meas = {0: Measurement(0.1, Plane.XZ), 2: Measurement(0, Plane.YZ), 6: Measurement(0.2, Plane.XY)} + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_9()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=False, + ) + ) + + # Non-trivial open graph without pflow and nI != nO + def get_og_10() -> OpenGraph: + """Return a graph constructed by adding a disconnected input to graph_6. The resulting graph does not have pflow.""" + graph: nx.Graph[int] = nx.Graph([(0, 1), (0, 3), (1, 4), (3, 4), (2, 3), (2, 5), (3, 6), (4, 7)]) + graph.add_node(8) + inputs = [1, 8] + outputs = [6, 2, 7] + meas = { + 0: Measurement(0.1, Plane.XZ), + 1: Measurement(0.1, Plane.XY), + 3: Measurement(0, Plane.XY), + 4: Measurement(0.1, Plane.XY), + 5: Measurement(0.1, Plane.YZ), + 8: Measurement(0.1, Plane.XY), + } + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_10()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=False, + ) + ) + + # Open graph with only Pauli measurements, without pflow and nI != nO + def get_og_11() -> OpenGraph: + """Return an open graph without Pauli flow and unequal number of outputs and inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 2), (1, 3), (2, 3), (2, 6), (3, 4), (4, 7), (4, 5), (7, 8)]) + inputs = [0, 1] + outputs = [5, 6, 8] + meas = { + 0: Measurement(0, Plane.XY), + 1: Measurement(0, Plane.XY), + 2: Measurement(0, Plane.XZ), + 3: Measurement(0, Plane.XY), + 4: Measurement(0.5, Plane.XY), + 7: Measurement(0, Plane.YZ), + } + + return OpenGraph(inside=graph, inputs=inputs, outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_11()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=False, + ) + ) + + # Open graph with only Pauli measurements, with pflow and nI != nO + def get_og_12() -> OpenGraph: + """Return an open graph with Pauli flow and unequal number of outputs and inputs. Even though all nodes are Pauli-measured, open graph has flow because none of them are inputs.""" + graph: nx.Graph[int] = nx.Graph([(0, 2), (1, 3), (2, 3), (2, 6), (3, 4), (4, 7), (4, 5), (7, 8)]) + outputs = [5, 6, 8] + meas = { + 0: Measurement(0, Plane.XZ), + 1: Measurement(0, Plane.XZ), + 2: Measurement(0, Plane.XZ), + 3: Measurement(0, Plane.XZ), + 4: Measurement(0, Plane.XZ), + 7: Measurement(0, Plane.XZ), + } + + return OpenGraph(inside=graph, inputs=[], outputs=outputs, measurements=meas) + + test_cases.append( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_12()), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ) + ) + + return test_cases + + +def prepare_benchmark_og() -> list[OpenGraphTestCase]: + test_cases: list[OpenGraphTestCase] = [] + + # Open graph from random circuit + test_cases.extend( + ( + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_rndcircuit(depth=20, n_qubits=7, n_inputs=1)), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ), + OpenGraphTestCase( + ogi=OpenGraphIndex(get_og_dense(ni=3, no=6, m=400)), + radj=None, + flow_demand_mat=None, + order_demand_mat=None, + has_pflow=True, + ), + ) + ) + return test_cases + + +def prepare_test_dag() -> list[DAGTestCase]: + test_cases: list[DAGTestCase] = [] + + # Simple DAG + test_cases.extend( + ( # Simple DAG + DAGTestCase( + adj_mat=MatGF2([[0, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0], [0, 1, 1, 0]]), generations=[[0], [1, 2], [3]] + ), + # Graph with loop + DAGTestCase(adj_mat=MatGF2([[0, 0, 0, 0], [1, 0, 0, 1], [1, 0, 0, 0], [0, 1, 1, 0]]), generations=None), + # Disconnected graph + DAGTestCase( + adj_mat=MatGF2([[0, 0, 0, 0, 0], [1, 0, 0, 0, 0], [0, 0, 0, 0, 0], [0, 0, 1, 0, 0], [0, 0, 1, 0, 0]]), + generations=[[0, 2], [1, 3, 4]], + ), + ) + ) + + return test_cases + + +class TestPflow: + @pytest.mark.parametrize("test_case", prepare_test_og()) + def test_compute_reduced_adj(self, test_case: OpenGraphTestCase) -> None: + if test_case.radj is not None: + ogi = test_case.ogi + radj = _compute_reduced_adj(ogi) + assert np.all(radj == test_case.radj) + + @pytest.mark.parametrize("test_case", prepare_test_og()) + def test_compute_pflow_matrices(self, test_case: OpenGraphTestCase) -> None: + if test_case.flow_demand_mat is not None and test_case.order_demand_mat is not None: + ogi = test_case.ogi + flow_demand_matrix, order_demand_matrix = _compute_pflow_matrices(ogi) + + assert np.all(flow_demand_matrix == test_case.flow_demand_mat) + assert np.all(order_demand_matrix == test_case.order_demand_mat) + + @pytest.mark.parametrize("test_case", prepare_test_og()) + def test_find_pflow_simple(self, test_case: OpenGraphTestCase) -> None: + if test_case.flow_demand_mat is not None: + ogi = test_case.ogi + if len(ogi.og.outputs) == len(ogi.og.inputs): + pflow_algebraic = _find_pflow_simple(ogi) + + if not test_case.has_pflow: + assert pflow_algebraic is None + else: + assert pflow_algebraic is not None + correction_matrix, _ = pflow_algebraic + ident = MatGF2(np.eye(len(ogi.non_outputs), dtype=np.uint8)) + assert np.all( + (test_case.flow_demand_mat @ correction_matrix) % 2 == ident + ) # Test with numpy matrix product. + + @pytest.mark.parametrize("test_case", prepare_test_og()) + def test_find_pflow_determinism(self, test_case: OpenGraphTestCase, fx_rng: Generator) -> None: + og = test_case.ogi.og + + pflow = find_pflow(og) + + if not test_case.has_pflow: + assert pflow is None + else: + assert pflow is not None + + pattern = _pflow2pattern( + graph=og.inside, + inputs=set(og.inputs), + meas_planes={i: m.plane for i, m in og.measurements.items()}, + angles={i: m.angle for i, m in og.measurements.items()}, + p=pflow[0], + l_k=pflow[1], + ) + pattern.reorder_output_nodes(og.outputs) + + alpha = 2 * np.pi * fx_rng.random() + state_ref = pattern.simulate_pattern(input_state=PlanarState(Plane.XY, alpha)) + + n_shots = 5 + results = [] + for _ in range(n_shots): + state = pattern.simulate_pattern(input_state=PlanarState(Plane.XY, alpha)) + results.append(np.abs(np.dot(state.flatten().conjugate(), state_ref.flatten()))) + + avg = sum(results) / n_shots + + assert avg == pytest.approx(1) + + @pytest.mark.parametrize("test_case", prepare_benchmark_og()) + def test_benchmark_pflow(self, benchmark: BenchmarkFixture, test_case: OpenGraphTestCase) -> None: + og = test_case.ogi.og + + pflow = benchmark(find_pflow, og) + + if not test_case.has_pflow: + assert pflow is None + else: + assert pflow is not None + + @pytest.mark.parametrize("test_case", prepare_test_dag()) + def test_compute_topological_generations(self, test_case: DAGTestCase) -> None: + adj_mat = test_case.adj_mat + generations_ref = test_case.generations + + assert generations_ref == _compute_topological_generations(adj_mat) diff --git a/tests/test_gflow.py b/tests/test_gflow.py index f076e05ee..87fbae81a 100644 --- a/tests/test_gflow.py +++ b/tests/test_gflow.py @@ -282,7 +282,7 @@ def _graph8() -> GraphForTest: "graph with no flow and no gflow but pauliflow, No.2", flow_exist=False, gflow_exist=False, - pauliflow_exist=True, + pauliflow_exist=False, ) @@ -311,7 +311,7 @@ def _graph9() -> GraphForTest: "graph with no flow and no gflow but pauliflow, No.3", flow_exist=False, gflow_exist=False, - pauliflow_exist=True, + pauliflow_exist=False, ) diff --git a/tests/test_linalg.py b/tests/test_linalg.py index 6c897b9e8..66b3a6c34 100644 --- a/tests/test_linalg.py +++ b/tests/test_linalg.py @@ -1,180 +1,220 @@ from __future__ import annotations -from typing import NamedTuple +from typing import TYPE_CHECKING, NamedTuple -import galois import numpy as np -import numpy.typing as npt import pytest -from graphix.linalg import MatGF2 +from graphix._linalg import MatGF2, solve_f2_linear_system + +if TYPE_CHECKING: + from numpy.random import Generator + from pytest_benchmark import BenchmarkFixture class LinalgTestCase(NamedTuple): matrix: MatGF2 - forward_eliminated: npt.NDArray[np.int_] rank: int - rhs_input: npt.NDArray[np.int_] - rhs_forward_eliminated: npt.NDArray[np.int_] - x: list[npt.NDArray[np.int_]] | None kernel_dim: int + right_invertible: bool + + +class LSF2TestCase(NamedTuple): + mat: MatGF2 + b: MatGF2 def prepare_test_matrix() -> list[LinalgTestCase]: return [ # empty matrix LinalgTestCase( - MatGF2(np.array([[]], dtype=np.int_)), - np.array([[]], dtype=np.int_), + MatGF2(np.array([[]], dtype=np.uint8)), 0, - np.array([[]], dtype=np.int_), - np.array([[]], dtype=np.int_), - [np.array([], dtype=np.int_)], 0, + False, ), # column vector LinalgTestCase( - MatGF2(np.array([[1], [1], [1]], dtype=np.int_)), - np.array([[1], [0], [0]], dtype=np.int_), + MatGF2(np.array([[1], [1], [1]], dtype=np.uint8)), 1, - np.array([[1], [1], [1]], dtype=np.int_), - np.array([[1], [0], [0]], dtype=np.int_), - [np.array([1])], 0, + False, ), # row vector LinalgTestCase( - MatGF2(np.array([[1, 1, 1]], dtype=np.int_)), - np.array([[1, 1, 1]], dtype=np.int_), + MatGF2(np.array([[1, 1, 1]], dtype=np.uint8)), 1, - np.array([[1]], dtype=np.int_), - np.array([[1]], dtype=np.int_), - None, # TODO: add x 2, + True, ), # diagonal matrix LinalgTestCase( MatGF2(np.diag(np.ones(10)).astype(int)), - np.diag(np.ones(10)).astype(int), 10, - np.ones(10).reshape(10, 1).astype(int), - np.ones(10).reshape(10, 1).astype(int), - list(np.ones((10, 1))), 0, + True, ), # full rank dense matrix LinalgTestCase( - MatGF2(np.array([[1, 0, 1], [0, 1, 0], [1, 0, 0]], dtype=np.int_)), - np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=np.int_), + MatGF2(np.array([[1, 0, 1], [0, 1, 0], [1, 0, 0]], dtype=np.uint8)), 3, - np.array([[1], [1], [1]], dtype=np.int_), - np.array([[1], [1], [0]], dtype=np.int_), - list(np.array([[1], [1], [0]])), # nan for no solution 0, + True, ), # not full-rank matrix LinalgTestCase( - MatGF2(np.array([[1, 0, 1], [0, 1, 0], [1, 1, 1]], dtype=np.int_)), - np.array([[1, 0, 1], [0, 1, 0], [0, 0, 0]], dtype=np.int_), + MatGF2(np.array([[1, 0, 1], [0, 1, 0], [1, 1, 1]], dtype=np.uint8)), 2, - np.array([[1, 1], [1, 1], [0, 1]], dtype=np.int_), - np.array([[1, 1], [1, 1], [0, 1]], dtype=np.int_), - None, # TODO: add x 1, + False, ), # non-square matrix LinalgTestCase( - MatGF2(np.array([[1, 0, 1], [0, 1, 0]], dtype=np.int_)), - np.array([[1, 0, 1], [0, 1, 0]], dtype=np.int_), + MatGF2(np.array([[1, 0, 1], [0, 1, 0]], dtype=np.uint8)), 2, - np.array([[1], [1]], dtype=np.int_), - np.array([[1], [1]], dtype=np.int_), - None, # TODO: add x 1, + True, ), # non-square matrix LinalgTestCase( - MatGF2(np.array([[1, 0], [0, 1], [1, 0]], dtype=np.int_)), - np.array([[1, 0], [0, 1], [0, 0]], dtype=np.int_), + MatGF2(np.array([[1, 0], [0, 1], [1, 0]], dtype=np.uint8)), 2, - np.array([[1], [1], [1]], dtype=np.int_), - np.array([[1], [1], [0]], dtype=np.int_), - [np.array([1], dtype=np.int_), np.array([1], dtype=np.int_)], 0, + False, ), ] +def prepare_test_f2_linear_system() -> list[LSF2TestCase]: + test_cases: list[LSF2TestCase] = [] + + # `mat` must be in row echelon form. + # `b` must have zeros in the indices corresponding to the zero rows of `mat`. + + test_cases.extend( + ( + LSF2TestCase(mat=MatGF2([[1, 0, 1, 1], [0, 1, 0, 1], [0, 0, 0, 1]]), b=MatGF2([1, 0, 0])), + LSF2TestCase( + mat=MatGF2([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 1], [0, 0, 0, 0], [0, 0, 0, 0]]), + b=MatGF2([0, 1, 1, 0, 0]), + ), + LSF2TestCase( + mat=MatGF2([[1, 1, 1], [0, 0, 1], [0, 0, 0], [0, 0, 0]]), + b=MatGF2([0, 0, 0, 0]), + ), + LSF2TestCase( + mat=MatGF2([[1, 0, 0], [0, 1, 1], [0, 0, 1], [0, 0, 0]]), + b=MatGF2([1, 0, 1, 0]), + ), + LSF2TestCase( + mat=MatGF2([[1, 0, 1], [0, 1, 0], [0, 0, 1]]), + b=MatGF2([1, 1, 1]), + ), + ) + ) + + return test_cases + + +def verify_elimination(mat: MatGF2, mat_red: MatGF2, n_cols_red: int, full_reduce: bool) -> None: + """Test gaussian elimination (GE). + + Parameters + ---------- + mat : MatGF2 + Original matrix. + mat_red : MatGF2 + Gaussian-eliminated matrix. + n_cols_red : int + Number of columns over which `mat` was reduced. + full_reduce : bool + Flag to check for row-reduced echelon form (`True`) or row echelon form (`False`). + + Notes + ----- + It tests that: + 1) Matrix is in row echelon form (REF) or row-reduced echelon form. + 2) The procedure only entails row operations. + + Check (2) implies that the GE procedure can be represented by a linear transformation. Thefore, we perform GE on :math:`A = [M|1]`, with :math:`M` the test matrix and :math:`1` the identiy, and we verify that :math:`M = L^{-1}M'`, where :math:`M', L` are the left and right blocks of :math:`A` after gaussian elimination. + """ + mat_red_block = MatGF2(mat_red[:, :n_cols_red]) + # Check 1 + p = -1 # pivot + for i, row in enumerate(mat_red_block): + col_idxs = np.flatnonzero(row) # Column indices with 1s + if col_idxs.size == 0: + assert not mat_red_block[i:, :].any() # If there aren't any 1s, we verify that the remaining rows are all 0 + break + j = col_idxs[0] + assert j > p + if full_reduce: + assert ( + np.sum(mat_red_block[:, j] == 1) == 1 + ) # If checking row-reduced echelon form, verify it is the only 1. + p = j + + # Check 2 + ncols = mat.shape[1] + mat_ge = MatGF2(mat_red[:, :ncols]) + mat_l = MatGF2(mat_red[:, ncols:]) + + mat_linv = mat_l.right_inverse() + if mat_linv is not None: + assert np.all((mat_linv @ mat_ge) % 2 == mat) # Test with numpy matrix product. + + class TestLinAlg: - def test_add_row(self) -> None: - test_mat = MatGF2(np.diag(np.ones(2, dtype=np.int_))) - test_mat.add_row() - assert test_mat.data.shape == (3, 2) - assert np.all(test_mat.data == np.array([[1, 0], [0, 1], [0, 0]])) - - def test_add_col(self) -> None: - test_mat = MatGF2(np.diag(np.ones(2, dtype=np.int_))) - test_mat.add_col() - assert test_mat.data.shape == (2, 3) - assert np.all(test_mat.data == galois.GF2(np.array([[1, 0, 0], [0, 1, 0]]))) - - def test_remove_row(self) -> None: - test_mat = MatGF2(np.array([[1, 0], [0, 1], [0, 0]], dtype=np.int_)) - test_mat.remove_row(2) - assert np.all(test_mat.data == np.array([[1, 0], [0, 1]])) - - def test_remove_col(self) -> None: - test_mat = MatGF2(np.array([[1, 0, 0], [0, 1, 0]], dtype=np.int_)) - test_mat.remove_col(2) - assert np.all(test_mat.data == np.array([[1, 0], [0, 1]])) - - def test_swap_row(self) -> None: - test_mat = MatGF2(np.array([[1, 0], [0, 1], [0, 0]], dtype=np.int_)) - test_mat.swap_row(1, 2) - assert np.all(test_mat.data == np.array([[1, 0], [0, 0], [0, 1]])) - - def test_swap_col(self) -> None: - test_mat = MatGF2(np.array([[1, 0, 0], [0, 1, 0]], dtype=np.int_)) - test_mat.swap_col(1, 2) - assert np.all(test_mat.data == np.array([[1, 0, 0], [0, 0, 1]])) - - def test_is_canonical_form(self) -> None: - test_mat = MatGF2(np.array([[1, 0], [0, 1]], dtype=np.int_)) - assert test_mat.is_canonical_form() - - test_mat = MatGF2(np.array([[1, 0], [0, 1], [0, 0]], dtype=np.int_)) - assert test_mat.is_canonical_form() - - test_mat = MatGF2(np.array([[1, 0, 1], [0, 1, 0], [0, 0, 0]], dtype=np.int_)) - assert test_mat.is_canonical_form() - - test_mat = MatGF2(np.array([[1, 1, 0], [0, 0, 1], [0, 1, 0]], dtype=np.int_)) - assert not test_mat.is_canonical_form() + @pytest.mark.parametrize("test_case", prepare_test_matrix()) + def test_compute_rank(self, test_case: LinalgTestCase) -> None: + mat = test_case.matrix + rank = test_case.rank + assert mat.compute_rank() == rank @pytest.mark.parametrize("test_case", prepare_test_matrix()) - def test_forward_eliminate(self, test_case: LinalgTestCase) -> None: + def test_right_inverse(self, benchmark: BenchmarkFixture, test_case: LinalgTestCase) -> None: mat = test_case.matrix - answer = test_case.forward_eliminated - rhs_input = test_case.rhs_input - rhs_forward_elimnated = test_case.rhs_forward_eliminated - mat_elimnated, rhs, _, _ = mat.forward_eliminate(rhs_input) - assert np.all(mat_elimnated.data == answer) - assert np.all(rhs.data == rhs_forward_elimnated) + rinv = benchmark(mat.right_inverse) + + if test_case.right_invertible: + assert rinv is not None + ident = MatGF2(np.eye(mat.shape[0], dtype=np.uint8)) + assert np.all((mat @ rinv) % 2 == ident) # Test with numpy matrix product. + else: + assert rinv is None @pytest.mark.parametrize("test_case", prepare_test_matrix()) - def test_get_rank(self, test_case: LinalgTestCase) -> None: + def test_gauss_elimination(self, test_case: LinalgTestCase) -> None: mat = test_case.matrix - rank = test_case.rank - assert mat.get_rank() == rank + mat_red = mat.gauss_elimination(ncols=mat.shape[1], copy=True) + verify_elimination(mat, mat_red, mat.shape[1], full_reduce=False) @pytest.mark.parametrize("test_case", prepare_test_matrix()) - def test_backward_substitute(self, test_case: LinalgTestCase) -> None: + def test_null_space(self, benchmark: BenchmarkFixture, test_case: LinalgTestCase) -> None: mat = test_case.matrix - rhs_input = test_case.rhs_input - x = test_case.x kernel_dim = test_case.kernel_dim - mat_eliminated, rhs_eliminated, _, _ = mat.forward_eliminate(rhs_input) - x, kernel = mat_eliminated.backward_substitute(rhs_eliminated) - if x is not None: - assert np.all(x == x) # noqa: PLR0124 - assert len(kernel) == kernel_dim + + kernel = benchmark(mat.null_space) + + assert kernel_dim == kernel.shape[0] + for v in kernel: + p = (mat @ v.transpose()) % 2 # Test with numpy matrix product. + assert ~p.any() + + @pytest.mark.parametrize("test_case", prepare_test_f2_linear_system()) + def test_solve_f2_linear_system(self, benchmark: BenchmarkFixture, test_case: LSF2TestCase) -> None: + mat = test_case.mat + b = test_case.b + + x = benchmark(solve_f2_linear_system, mat, b) + + assert np.all((mat @ x) % 2 == b) # Test with numpy matrix product. + + def test_row_reduction(self, fx_rng: Generator) -> None: + sizes = [(10, 10), (3, 7), (6, 2)] + ncols = [4, 5, 2] + + for size, ncol in zip(sizes, ncols): + mat = MatGF2(fx_rng.integers(size=size, low=0, high=2, dtype=np.uint8)) + mat_red = mat.row_reduction(ncols=ncol, copy=True) + verify_elimination(mat, mat_red, ncol, full_reduce=True) diff --git a/tests/test_opengraph.py b/tests/test_opengraph.py index 340366cca..58844fd55 100644 --- a/tests/test_opengraph.py +++ b/tests/test_opengraph.py @@ -34,7 +34,7 @@ def test_open_graph_to_pattern() -> None: meas = { 0: Measurement(0, Plane.XY), 1: Measurement(1.0, Plane.XY), - 3: Measurement(1.0, Plane.YZ), + 3: Measurement(0.5, Plane.YZ), 4: Measurement(1.0, Plane.XY), } diff --git a/tests/test_parameter.py b/tests/test_parameter.py index 31f5577b5..b57f9bba4 100644 --- a/tests/test_parameter.py +++ b/tests/test_parameter.py @@ -6,7 +6,6 @@ import numpy as np import pytest -import graphix import graphix.command from graphix.parameter import Placeholder, PlaceholderOperationError from graphix.pattern import Pattern diff --git a/tests/test_qasm3_exporter.py b/tests/test_qasm3_exporter.py new file mode 100644 index 000000000..22202e388 --- /dev/null +++ b/tests/test_qasm3_exporter.py @@ -0,0 +1,90 @@ +"""Test exporter to OpenQASM3.""" + +from __future__ import annotations + +from math import pi +from typing import TYPE_CHECKING + +import pytest +from numpy.random import PCG64, Generator + +from graphix import Circuit, instruction +from graphix.fundamentals import Plane +from graphix.qasm3_exporter import angle_to_qasm3, circuit_to_qasm3 +from graphix.random_objects import rand_circuit + +if TYPE_CHECKING: + from graphix.instruction import Instruction + +try: + from graphix_qasm_parser import OpenQASMParser +except ImportError: + pytestmark = pytest.mark.skip(reason="graphix-qasm-parser not installed") + + if TYPE_CHECKING: + import sys + + # We skip type-checking the case where there is no + # graphix-qasm-parser, since pyright cannot figure out that + # tests are skipped in this case. + sys.exit(1) + + +def check_round_trip(circuit: Circuit) -> None: + qasm = circuit_to_qasm3(circuit) + parser = OpenQASMParser() + parsed_circuit = parser.parse_str(qasm) + assert parsed_circuit.instruction == circuit.instruction + + +@pytest.mark.parametrize("jumps", range(1, 11)) +def test_circuit_to_qasm3(fx_bg: PCG64, jumps: int) -> None: + rng = Generator(fx_bg.jumped(jumps)) + nqubits = 5 + depth = 4 + check_round_trip(rand_circuit(nqubits, depth, rng)) + + +@pytest.mark.parametrize( + "instruction", + [ + instruction.CCX(target=0, controls=(1, 2)), + instruction.RZZ(target=0, control=1, angle=pi / 4), + instruction.CNOT(target=0, control=1), + instruction.SWAP(targets=(0, 1)), + instruction.H(target=0), + instruction.S(target=0), + instruction.X(target=0), + instruction.Y(target=0), + instruction.Z(target=0), + instruction.I(target=0), + instruction.RX(target=0, angle=pi / 4), + instruction.RY(target=0, angle=pi / 4), + instruction.RZ(target=0, angle=pi / 4), + ], +) +def test_instruction_to_qasm3(instruction: Instruction) -> None: + check_round_trip(Circuit(3, instr=[instruction])) + + +@pytest.mark.parametrize("check", [(pi / 4, "pi/4"), (3 * pi / 4, "3*pi/4"), (0.5, "0.5")]) +def test_angle_to_qasm3(check: tuple[float, str]) -> None: + angle, expected = check + assert angle_to_qasm3(angle) == expected + + +def test_measurement() -> None: + # Measurements are not supported yet by the parser. + # https://github.com/TeamGraphix/graphix-qasm-parser/issues/3 + # The best we can do is to check if the measurement instruction + # is exported as expected. + circuit = Circuit(1, instr=[instruction.M(target=0, plane=Plane.XZ, angle=0)]) + qasm = circuit_to_qasm3(circuit) + assert ( + qasm + == """OPENQASM 3; +include "stdgates.inc"; +qubit[1] q; +bit[1] b; +b[0] = measure q[0];""" + )