Change to expectation calculation to accept hamiltonian object

This commit is contained in:
tankya2
2025-02-20 16:37:55 +08:00
parent 106dfcb50e
commit 4cb6edc013
3 changed files with 257 additions and 103 deletions

View File

@@ -1,9 +1,10 @@
import numpy as np
from qibo.backends import NumpyBackend
from qibo.config import raise_error
from qibo.result import QuantumState
from qibotn.result import TensorNetworkResult
from qibotn.backends.abstract import QibotnBackend
from qibo import hamiltonians
CUDA_TYPES = {}
@@ -28,15 +29,17 @@ class CuTensorNet(QibotnBackend, NumpyBackend): # pragma: no cover
expectation_enabled_value = runcard.get("expectation_enabled")
if expectation_enabled_value is True:
self.expectation_enabled = True
self.pauli_string_pattern = "XXXZ"
self.observable = None
elif expectation_enabled_value is False:
self.expectation_enabled = False
elif isinstance(expectation_enabled_value, dict):
self.expectation_enabled = True
expectation_enabled_dict = runcard.get("expectation_enabled", {})
self.pauli_string_pattern = expectation_enabled_dict.get(
"pauli_string_pattern", None
)
self.observable = runcard.get("expectation_enabled", {})
elif isinstance(
expectation_enabled_value, hamiltonians.SymbolicHamiltonian
):
self.expectation_enabled = True
self.observable = expectation_enabled_value
else:
raise TypeError("expectation_enabled has an unexpected type")
@@ -158,17 +161,15 @@ class CuTensorNet(QibotnBackend, NumpyBackend): # pragma: no cover
and self.NCCL_enabled == False
and self.expectation_enabled == True
):
state = eval.expectation_pauli_tn(
circuit, self.dtype, self.pauli_string_pattern
)
state = eval.expectation_tn(circuit, self.dtype, self.observable)
elif (
self.MPI_enabled == True
and self.MPS_enabled == False
and self.NCCL_enabled == False
and self.expectation_enabled == True
):
state, rank = eval.expectation_pauli_tn_MPI(
circuit, self.dtype, self.pauli_string_pattern, 32
state, rank = eval.expectation_tn_MPI(
circuit, self.dtype, self.observable, 32
)
if rank > 0:
state = np.array(0)
@@ -178,15 +179,27 @@ class CuTensorNet(QibotnBackend, NumpyBackend): # pragma: no cover
and self.NCCL_enabled == True
and self.expectation_enabled == True
):
state, rank = eval.expectation_pauli_tn_nccl(
circuit, self.dtype, self.pauli_string_pattern, 32
state, rank = eval.expectation_tn_nccl(
circuit, self.dtype, self.observable, 32
)
if rank > 0:
state = np.array(0)
else:
raise_error(NotImplementedError, "Compute type not supported.")
if return_array:
return state.flatten()
if self.expectation_enabled:
return state.flatten().real
else:
return QuantumState(state.flatten())
if return_array:
statevector = state.flatten()
else:
statevector = state
return TensorNetworkResult(
nqubits=circuit.nqubits,
backend=self,
measures=None,
measured_probabilities=None,
prob_type=None,
statevector=statevector,
)

View File

@@ -195,12 +195,12 @@ class QiboCircuitToEinsum:
gates.append((operand, (qubit,)))
return gates
def expectation_operands(self, pauli_string):
def expectation_operands(self, ham_gates):
"""Create the operands for pauli string expectation computation in the
interleave format.
Parameters:
pauli_string: A string representating the list of pauli gates.
ham_gates: A list of gates derived from Qibo hamiltonian object.
Returns:
Operands for the contraction in the interleave format.
@@ -208,8 +208,6 @@ class QiboCircuitToEinsum:
input_bitstring = "0" * self.circuit.nqubits
input_operands = self._get_bitstring_tensors(input_bitstring)
pauli_string = dict(zip(range(self.circuit.nqubits), pauli_string))
pauli_map = pauli_string
(
mode_labels,
@@ -228,11 +226,7 @@ class QiboCircuitToEinsum:
next_frontier = max(qubits_frontier.values()) + 1
pauli_gates = self.get_pauli_gates(
pauli_map, dtype=self.dtype, backend=self.backend
)
gates_inverse = pauli_gates + self.gate_tensors_inverse
gates_inverse = ham_gates + self.gate_tensors_inverse
(
gate_mode_labels_inverse,

View File

@@ -9,6 +9,154 @@ from qibotn.circuit_convertor import QiboCircuitToEinsum
from qibotn.circuit_to_mps import QiboCircuitToMPS
from qibotn.mps_contraction_helper import MPSContractionHelper
import cuquantum.cutensornet as cutn
from cuquantum import Network
from mpi4py import MPI
from cupy.cuda import nccl
from qibo import hamiltonians
from qibo.symbols import X, Y, Z, I
def check_observable(observable, circuit_nqubit):
"""Checks the type of observable and returns the appropriate Hamiltonian."""
if observable is None:
return build_observable(circuit_nqubit)
elif isinstance(observable, dict):
return create_hamiltonian_from_dict(observable, circuit_nqubit)
elif isinstance(observable, hamiltonians.SymbolicHamiltonian):
# TODO: check if the observable is compatible with the circuit
return observable
else:
raise TypeError("Invalid observable type.")
def build_observable(circuit_nqubit):
"""Helper function to construct a target observable."""
hamiltonian_form = 0
for i in range(circuit_nqubit):
hamiltonian_form += 0.5 * X(i % circuit_nqubit) * Z((i + 1) % circuit_nqubit)
print("Default hamiltonian: ", hamiltonian_form)
hamiltonian = hamiltonians.SymbolicHamiltonian(form=hamiltonian_form)
return hamiltonian
def create_hamiltonian_from_dict(data, circuit_nqubit):
"""Create a Qibo SymbolicHamiltonian from a dictionary representation.
Ensures that each Hamiltonian term explicitly acts on all circuit qubits
by adding identity (`I`) gates where needed.
Args:
data (dict): Dictionary containing Hamiltonian terms.
circuit_nqubit (int): Total number of qubits in the quantum circuit.
Returns:
hamiltonians.SymbolicHamiltonian: The constructed Hamiltonian.
"""
PAULI_GATES = {"X": X, "Y": Y, "Z": Z}
terms = []
for term in data["terms"]:
coeff = term["coefficient"]
operators = term["operators"] # List of tuples like [("Z", 0), ("X", 1)]
# Convert the operator list into a dictionary {qubit_index: gate}
operator_dict = {q: PAULI_GATES[g] for g, q in operators}
# Build the full term ensuring all qubits are covered
full_term_expr = [
operator_dict[q](q) if q in operator_dict else I(q)
for q in range(circuit_nqubit)
]
# Multiply all operators together to form a single term
term_expr = full_term_expr[0]
for op in full_term_expr[1:]:
term_expr *= op
# Scale by the coefficient
final_term = coeff * term_expr
# print(f"Adding term: {final_term}") # Debugging output
terms.append(final_term)
if not terms:
raise ValueError("No valid Hamiltonian terms were added.")
# Combine all terms
hamiltonian_form = sum(terms)
# print(f"Hamiltonian Form After Summation: {hamiltonian_form}")
return hamiltonians.SymbolicHamiltonian(hamiltonian_form)
def get_ham_gates(pauli_map, dtype="complex128", backend=cp):
"""Populate the gates for all pauli operators.
Parameters:
pauli_map: A dictionary mapping qubits to pauli operators.
dtype: Data type for the tensor operands.
backend: The package the tensor operands belong to.
Returns:
A sequence of pauli gates.
"""
asarray = backend.asarray
pauli_i = asarray([[1, 0], [0, 1]], dtype=dtype)
pauli_x = asarray([[0, 1], [1, 0]], dtype=dtype)
pauli_y = asarray([[0, -1j], [1j, 0]], dtype=dtype)
pauli_z = asarray([[1, 0], [0, -1]], dtype=dtype)
operand_map = {"I": pauli_i, "X": pauli_x, "Y": pauli_y, "Z": pauli_z}
gates = []
for qubit, pauli_char, coeff in pauli_map:
operand = operand_map.get(pauli_char)
if operand is None:
raise ValueError("pauli string character must be one of I/X/Y/Z")
operand = coeff * operand
gates.append((operand, (qubit,)))
return gates
def extract_gates_and_qubits(hamiltonian):
"""
Extracts the gates and their corresponding qubits from a Qibo Hamiltonian.
Parameters:
hamiltonian (qibo.hamiltonians.Hamiltonian or qibo.hamiltonians.SymbolicHamiltonian):
A Qibo Hamiltonian object.
Returns:
list of tuples: [(coefficient, [(gate, qubit), ...]), ...]
- coefficient: The prefactor of the term.
- list of (gate, qubit): Each term's gates and the qubits they act on.
"""
extracted_terms = []
if isinstance(hamiltonian, hamiltonians.SymbolicHamiltonian):
for term in hamiltonian.terms:
coeff = term.coefficient # Extract coefficient
gate_qubit_list = []
# Extract gate and qubit information
for factor in term.factors:
gate_name = str(factor)[
0
] # Extract the gate type (X, Y, Z) from 'X0', 'Z1'
qubit = int(str(factor)[1:]) # Extract the qubit index
gate_qubit_list.append((qubit, gate_name, coeff))
coeff = 1.0
extracted_terms.append(gate_qubit_list)
else:
raise ValueError(
"Unsupported Hamiltonian type. Must be SymbolicHamiltonian or Hamiltonian."
)
return extracted_terms
def initialize_mpi():
"""Initialize MPI communication and device selection."""
@@ -166,7 +314,7 @@ def dense_vector_tn(qibo_circ, datatype):
return contract(*myconvertor.state_vector_operands())
def expectation_pauli_tn_nccl(qibo_circ, datatype, pauli_string_pattern, n_samples=8):
def expectation_tn_nccl(qibo_circ, datatype, observable, n_samples=8):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
expectation of given Pauli string using multi node and multi GPU through
NCCL.
@@ -194,39 +342,48 @@ def expectation_pauli_tn_nccl(qibo_circ, datatype, pauli_string_pattern, n_sampl
comm_nccl = initialize_nccl(comm_mpi, rank, size)
# Perform circuit conversion
observable = check_observable(observable, qibo_circ.nqubits)
ham_gate_map = extract_gates_and_qubits(observable)
if rank == 0:
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
operands = myconvertor.expectation_operands(
pauli_string_gen(qibo_circ.nqubits, pauli_string_pattern)
exp = 0
for each_ham in ham_gate_map:
ham_gates = get_ham_gates(each_ham)
# Perform circuit conversion
if rank == 0:
operands = myconvertor.expectation_operands(ham_gates)
else:
operands = None
operands = comm_mpi.bcast(operands, root=0)
network = Network(*operands)
# Compute the path on all ranks with 8 samples for hyperoptimization. Force slicing to enable parallel contraction.
info = compute_optimal_path(network, n_samples, size, comm_mpi)
# Recompute path with the selected optimal settings
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
else:
operands = None
operands = comm_mpi.bcast(operands, root=0)
slices = compute_slices(info, rank, size)
network = Network(*operands)
# Contract the group of slices the process is responsible for.
result = compute_contraction(network, slices)
# Compute the path on all ranks with 8 samples for hyperoptimization. Force slicing to enable parallel contraction.
info = compute_optimal_path(network, n_samples, size, comm_mpi)
# Sum the partial contribution from each process on root.
result = reduce_result(result, comm_nccl, method="NCCL", root=0)
# Recompute path with the selected optimal settings
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
exp += result
slices = compute_slices(info, rank, size)
# Contract the group of slices the process is responsible for.
result = compute_contraction(network, slices)
# Sum the partial contribution from each process on root.
result = reduce_result(result, comm_nccl, method="NCCL", root=0)
return result, rank
return exp, rank
def expectation_pauli_tn_MPI(qibo_circ, datatype, pauli_string_pattern, n_samples=8):
def expectation_tn_MPI(qibo_circ, datatype, observable, n_samples=8):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
expectation of given Pauli string using multi node and multi GPU through
MPI.
@@ -252,42 +409,51 @@ def expectation_pauli_tn_MPI(qibo_circ, datatype, pauli_string_pattern, n_sample
# Initialize MPI and device
comm, rank, size, device_id = initialize_mpi()
# Perform circuit conversion
observable = check_observable(observable, qibo_circ.nqubits)
ham_gate_map = extract_gates_and_qubits(observable)
if rank == 0:
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
exp = 0
for each_ham in ham_gate_map:
ham_gates = get_ham_gates(each_ham)
# Perform circuit conversion
# Perform circuit conversion
if rank == 0:
operands = myconvertor.expectation_operands(ham_gates)
else:
operands = None
operands = myconvertor.expectation_operands(
pauli_string_gen(qibo_circ.nqubits, pauli_string_pattern)
operands = comm.bcast(operands, root=0)
# Create network object.
network = Network(*operands, options={"device_id": device_id})
# Compute optimal contraction path
info = compute_optimal_path(network, n_samples, size, comm)
# Set path and slices.
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
else:
operands = None
operands = comm.bcast(operands, root=0)
# Compute slice range for each rank
slices = compute_slices(info, rank, size)
# Create network object.
network = Network(*operands, options={"device_id": device_id})
# Perform contraction
result = compute_contraction(network, slices)
# Compute optimal contraction path
info = compute_optimal_path(network, n_samples, size, comm)
# Sum the partial contribution from each process on root.
result = reduce_result(result, comm, method="MPI", root=0)
# Set path and slices.
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
if rank == 0:
exp += result
# Compute slice range for each rank
slices = compute_slices(info, rank, size)
# Perform contraction
result = compute_contraction(network, slices)
# Sum the partial contribution from each process on root.
result = reduce_result(result, comm, method="MPI", root=0)
return result, rank
return exp, rank
def expectation_pauli_tn(qibo_circ, datatype, pauli_string_pattern):
def expectation_tn(qibo_circ, datatype, observable):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
expectation of given Pauli string.
@@ -300,11 +466,16 @@ def expectation_pauli_tn(qibo_circ, datatype, pauli_string_pattern):
Expectation of quantum circuit due to pauli string.
"""
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
return contract(
*myconvertor.expectation_operands(
pauli_string_gen(qibo_circ.nqubits, pauli_string_pattern)
)
)
observable = check_observable(observable, qibo_circ.nqubits)
ham_gate_map = extract_gates_and_qubits(observable)
exp = 0
for each_ham in ham_gate_map:
ham_gates = get_ham_gates(each_ham)
expectation_operands = myconvertor.expectation_operands(ham_gates)
exp += contract(*expectation_operands)
return exp
def dense_vector_mps(qibo_circ, gate_algo, datatype):
@@ -325,27 +496,3 @@ def dense_vector_mps(qibo_circ, gate_algo, datatype):
return mps_helper.contract_state_vector(
myconvertor.mps_tensors, {"handle": myconvertor.handle}
)
def pauli_string_gen(nqubits, pauli_string_pattern):
"""Used internally to generate the string based on given pattern and number
of qubit.
Parameters:
nqubits(int): Number of qubits of Quantum Circuit
pauli_string_pattern(str): Strings representing sequence of pauli gates.
Returns:
String representation of the actual pauli string from the pattern.
Example: pattern: "XZ", number of qubit: 7, output = XZXZXZX
"""
if nqubits <= 0:
return "Invalid input. N should be a positive integer."
result = ""
for i in range(nqubits):
char_to_add = pauli_string_pattern[i % len(pauli_string_pattern)]
result += char_to_add
return result