Files
final-qibotn/src/qibotn/eval.py
2025-12-01 07:08:26 +00:00

498 lines
17 KiB
Python

import cupy as cp
import cuquantum.bindings.cutensornet as cutn
from cupy.cuda import nccl
from cupy.cuda.runtime import getDeviceCount
from cuquantum.tensornet import Network, contract
from mpi4py import MPI
from qibo import hamiltonians
from qibo.symbols import I, X, Y, Z
from qibotn.circuit_convertor import QiboCircuitToEinsum
from qibotn.circuit_to_mps import QiboCircuitToMPS
from qibotn.mps_contraction_helper import MPSContractionHelper
def check_observable(observable, circuit_nqubit):
"""Checks the type of observable and returns the appropriate Hamiltonian."""
if observable is None:
return build_observable(circuit_nqubit)
elif isinstance(observable, dict):
return create_hamiltonian_from_dict(observable, circuit_nqubit)
elif isinstance(observable, hamiltonians.SymbolicHamiltonian):
# TODO: check if the observable is compatible with the circuit
return observable
else:
raise TypeError("Invalid observable type.")
def build_observable(circuit_nqubit):
"""Helper function to construct a target observable."""
hamiltonian_form = 0
for i in range(circuit_nqubit):
hamiltonian_form += 0.5 * X(i % circuit_nqubit) * Z((i + 1) % circuit_nqubit)
hamiltonian = hamiltonians.SymbolicHamiltonian(form=hamiltonian_form)
return hamiltonian
def create_hamiltonian_from_dict(data, circuit_nqubit):
"""Create a Qibo SymbolicHamiltonian from a dictionary representation.
Ensures that each Hamiltonian term explicitly acts on all circuit qubits
by adding identity (`I`) gates where needed.
Args:
data (dict): Dictionary containing Hamiltonian terms.
circuit_nqubit (int): Total number of qubits in the quantum circuit.
Returns:
hamiltonians.SymbolicHamiltonian: The constructed Hamiltonian.
"""
PAULI_GATES = {"X": X, "Y": Y, "Z": Z}
terms = []
for term in data["terms"]:
coeff = term["coefficient"]
operators = term["operators"] # List of tuples like [("Z", 0), ("X", 1)]
# Convert the operator list into a dictionary {qubit_index: gate}
operator_dict = {q: PAULI_GATES[g] for g, q in operators}
# Build the full term ensuring all qubits are covered
full_term_expr = [
operator_dict[q](q) if q in operator_dict else I(q)
for q in range(circuit_nqubit)
]
# Multiply all operators together to form a single term
term_expr = full_term_expr[0]
for op in full_term_expr[1:]:
term_expr *= op
# Scale by the coefficient
final_term = coeff * term_expr
terms.append(final_term)
if not terms:
raise ValueError("No valid Hamiltonian terms were added.")
# Combine all terms
hamiltonian_form = sum(terms)
return hamiltonians.SymbolicHamiltonian(hamiltonian_form)
def get_ham_gates(pauli_map, dtype="complex128", backend=cp):
"""Populate the gates for all pauli operators.
Parameters:
pauli_map: A dictionary mapping qubits to pauli operators.
dtype: Data type for the tensor operands.
backend: The package the tensor operands belong to.
Returns:
A sequence of pauli gates.
"""
asarray = backend.asarray
pauli_i = asarray([[1, 0], [0, 1]], dtype=dtype)
pauli_x = asarray([[0, 1], [1, 0]], dtype=dtype)
pauli_y = asarray([[0, -1j], [1j, 0]], dtype=dtype)
pauli_z = asarray([[1, 0], [0, -1]], dtype=dtype)
operand_map = {"I": pauli_i, "X": pauli_x, "Y": pauli_y, "Z": pauli_z}
gates = []
for qubit, pauli_char, coeff in pauli_map:
operand = operand_map.get(pauli_char)
if operand is None:
raise ValueError("pauli string character must be one of I/X/Y/Z")
operand = coeff * operand
gates.append((operand, (qubit,)))
return gates
def extract_gates_and_qubits(hamiltonian):
"""
Extracts the gates and their corresponding qubits from a Qibo Hamiltonian.
Parameters:
hamiltonian (qibo.hamiltonians.Hamiltonian or qibo.hamiltonians.SymbolicHamiltonian):
A Qibo Hamiltonian object.
Returns:
list of tuples: [(coefficient, [(gate, qubit), ...]), ...]
- coefficient: The prefactor of the term.
- list of (gate, qubit): Each term's gates and the qubits they act on.
"""
extracted_terms = []
if isinstance(hamiltonian, hamiltonians.SymbolicHamiltonian):
for term in hamiltonian.terms:
coeff = term.coefficient # Extract coefficient
gate_qubit_list = []
# Extract gate and qubit information
for factor in term.factors:
gate_name = str(factor)[
0
] # Extract the gate type (X, Y, Z) from 'X0', 'Z1'
qubit = int(str(factor)[1:]) # Extract the qubit index
gate_qubit_list.append((qubit, gate_name, coeff))
coeff = 1.0
extracted_terms.append(gate_qubit_list)
else:
raise ValueError(
"Unsupported Hamiltonian type. Must be SymbolicHamiltonian or Hamiltonian."
)
return extracted_terms
def initialize_mpi():
"""Initialize MPI communication and device selection."""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
device_id = rank % getDeviceCount()
cp.cuda.Device(device_id).use()
return comm, rank, size, device_id
def initialize_nccl(comm_mpi, rank, size):
"""Initialize NCCL communication."""
nccl_id = nccl.get_unique_id() if rank == 0 else None
nccl_id = comm_mpi.bcast(nccl_id, root=0)
return nccl.NcclCommunicator(size, nccl_id, rank)
def get_operands(qibo_circ, datatype, rank, comm):
"""Perform circuit conversion and broadcast operands."""
if rank == 0:
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
operands = myconvertor.state_vector_operands()
else:
operands = None
return comm.bcast(operands, root=0)
def compute_optimal_path(network, n_samples, size, comm):
"""Compute contraction path and broadcast optimal selection."""
path, info = network.contract_path(
optimize={
"samples": n_samples,
"slicing": {
"min_slices": max(32, size),
"memory_model": cutn.MemoryModel.CUTENSOR,
},
}
)
opt_cost, sender = comm.allreduce(
sendobj=(info.opt_cost, comm.Get_rank()), op=MPI.MINLOC
)
return comm.bcast(info, sender)
def compute_slices(info, rank, size):
"""Determine the slice range each process should compute."""
num_slices = info.num_slices
chunk, extra = num_slices // size, num_slices % size
slice_begin = rank * chunk + min(rank, extra)
slice_end = (
num_slices if rank == size - 1 else (rank + 1) * chunk + min(rank + 1, extra)
)
return range(slice_begin, slice_end)
def reduce_result(result, comm, method="MPI", root=0):
"""Reduce results across processes."""
if method == "MPI":
return comm.reduce(sendobj=result, op=MPI.SUM, root=root)
elif method == "NCCL":
stream_ptr = cp.cuda.get_current_stream().ptr
if result.dtype == cp.complex128:
count = result.size * 2 # complex128 has 2 float64 numbers
nccl_type = nccl.NCCL_FLOAT64
elif result.dtype == cp.complex64:
count = result.size * 2 # complex64 has 2 float32 numbers
nccl_type = nccl.NCCL_FLOAT32
else:
raise TypeError(f"Unsupported dtype for NCCL reduce: {result.dtype}")
comm.reduce(
result.data.ptr,
result.data.ptr,
count,
nccl_type,
nccl.NCCL_SUM,
root,
stream_ptr,
)
return result
else:
raise ValueError(f"Unknown reduce method: {method}")
def dense_vector_tn_MPI(qibo_circ, datatype, n_samples=8):
"""Convert qibo circuit to tensornet (TN) format and perform contraction
using multi node and multi GPU through MPI.
The conversion is performed by QiboCircuitToEinsum(), after which it
goes through 2 steps: pathfinder and execution. The pathfinder looks
at user defined number of samples (n_samples) iteratively to select
the least costly contraction path. This is sped up with multi
thread. After pathfinding the optimal path is used in the actual
contraction to give a dense vector representation of the TN.
Parameters:
qibo_circ: The quantum circuit object.
datatype (str): Either single ("complex64") or double (complex128) precision.
n_samples(int): Number of samples for pathfinding.
Returns:
Dense vector of quantum circuit.
"""
comm, rank, size, device_id = initialize_mpi()
operands = get_operands(qibo_circ, datatype, rank, comm)
network = Network(*operands, options={"device_id": device_id})
info = compute_optimal_path(network, n_samples, size, comm)
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
slices = compute_slices(info, rank, size)
result = network.contract(slices=slices)
return reduce_result(result, comm, method="MPI"), rank
def dense_vector_tn_nccl(qibo_circ, datatype, n_samples=8):
"""Convert qibo circuit to tensornet (TN) format and perform contraction
using multi node and multi GPU through NCCL.
The conversion is performed by QiboCircuitToEinsum(), after which it
goes through 2 steps: pathfinder and execution. The pathfinder looks
at user defined number of samples (n_samples) iteratively to select
the least costly contraction path. This is sped up with multi
thread. After pathfinding the optimal path is used in the actual
contraction to give a dense vector representation of the TN.
Parameters:
qibo_circ: The quantum circuit object.
datatype (str): Either single ("complex64") or double (complex128) precision.
n_samples(int): Number of samples for pathfinding.
Returns:
Dense vector of quantum circuit.
"""
comm_mpi, rank, size, device_id = initialize_mpi()
comm_nccl = initialize_nccl(comm_mpi, rank, size)
operands = get_operands(qibo_circ, datatype, rank, comm_mpi)
network = Network(*operands)
info = compute_optimal_path(network, n_samples, size, comm_mpi)
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
slices = compute_slices(info, rank, size)
result = network.contract(slices=slices)
return reduce_result(result, comm_nccl, method="NCCL"), rank
def dense_vector_tn(qibo_circ, datatype):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
dense vector.
Parameters:
qibo_circ: The quantum circuit object.
datatype (str): Either single ("complex64") or double (complex128) precision.
Returns:
Dense vector of quantum circuit.
"""
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
return contract(*myconvertor.state_vector_operands())
def expectation_tn_nccl(qibo_circ, datatype, observable, n_samples=8):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
expectation of given Pauli string using multi node and multi GPU through
NCCL.
The conversion is performed by QiboCircuitToEinsum(), after which it
goes through 2 steps: pathfinder and execution. The
pauli_string_pattern is used to generate the pauli string
corresponding to the number of qubits of the system. The pathfinder
looks at user defined number of samples (n_samples) iteratively to
select the least costly contraction path. This is sped up with multi
thread. After pathfinding the optimal path is used in the actual
contraction to give an expectation value.
Parameters:
qibo_circ: The quantum circuit object.
datatype (str): Either single ("complex64") or double (complex128) precision.
pauli_string_pattern(str): pauli string pattern.
n_samples(int): Number of samples for pathfinding.
Returns:
Expectation of quantum circuit due to pauli string.
"""
comm_mpi, rank, size, device_id = initialize_mpi()
comm_nccl = initialize_nccl(comm_mpi, rank, size)
observable = check_observable(observable, qibo_circ.nqubits)
ham_gate_map = extract_gates_and_qubits(observable)
if rank == 0:
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
exp = 0
for each_ham in ham_gate_map:
ham_gates = get_ham_gates(each_ham)
# Perform circuit conversion
if rank == 0:
operands = myconvertor.expectation_operands(ham_gates)
else:
operands = None
operands = comm_mpi.bcast(operands, root=0)
network = Network(*operands)
# Compute the path on all ranks with 8 samples for hyperoptimization. Force slicing to enable parallel contraction.
info = compute_optimal_path(network, n_samples, size, comm_mpi)
# Recompute path with the selected optimal settings
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
slices = compute_slices(info, rank, size)
# Contract the group of slices the process is responsible for.
result = network.contract(slices=slices)
# Sum the partial contribution from each process on root.
result = reduce_result(result, comm_nccl, method="NCCL", root=0)
exp += result
return exp, rank
def expectation_tn_MPI(qibo_circ, datatype, observable, n_samples=8):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
expectation of given Pauli string using multi node and multi GPU through
MPI.
The conversion is performed by QiboCircuitToEinsum(), after which it
goes through 2 steps: pathfinder and execution. The
pauli_string_pattern is used to generate the pauli string
corresponding to the number of qubits of the system. The pathfinder
looks at user defined number of samples (n_samples) iteratively to
select the least costly contraction path. This is sped up with multi
thread. After pathfinding the optimal path is used in the actual
contraction to give an expectation value.
Parameters:
qibo_circ: The quantum circuit object.
datatype (str): Either single ("complex64") or double (complex128) precision.
pauli_string_pattern(str): pauli string pattern.
n_samples(int): Number of samples for pathfinding.
Returns:
Expectation of quantum circuit due to pauli string.
"""
# Initialize MPI and device
comm, rank, size, device_id = initialize_mpi()
observable = check_observable(observable, qibo_circ.nqubits)
ham_gate_map = extract_gates_and_qubits(observable)
if rank == 0:
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
exp = 0
for each_ham in ham_gate_map:
ham_gates = get_ham_gates(each_ham)
# Perform circuit conversion
# Perform circuit conversion
if rank == 0:
operands = myconvertor.expectation_operands(ham_gates)
else:
operands = None
operands = comm.bcast(operands, root=0)
# Create network object.
network = Network(*operands, options={"device_id": device_id})
# Compute optimal contraction path
info = compute_optimal_path(network, n_samples, size, comm)
# Set path and slices.
path, info = network.contract_path(
optimize={"path": info.path, "slicing": info.slices}
)
# Compute slice range for each rank
slices = compute_slices(info, rank, size)
# Perform contraction
result = network.contract(slices=slices)
# Sum the partial contribution from each process on root.
result = reduce_result(result, comm, method="MPI", root=0)
if rank == 0:
exp += result
return exp, rank
def expectation_tn(qibo_circ, datatype, observable):
"""Convert qibo circuit to tensornet (TN) format and perform contraction to
expectation of given Pauli string.
Parameters:
qibo_circ: The quantum circuit object.
datatype (str): Either single ("complex64") or double (complex128) precision.
pauli_string_pattern(str): pauli string pattern.
Returns:
Expectation of quantum circuit due to pauli string.
"""
myconvertor = QiboCircuitToEinsum(qibo_circ, dtype=datatype)
observable = check_observable(observable, qibo_circ.nqubits)
ham_gate_map = extract_gates_and_qubits(observable)
exp = 0
for each_ham in ham_gate_map:
ham_gates = get_ham_gates(each_ham)
expectation_operands = myconvertor.expectation_operands(ham_gates)
exp += contract(*expectation_operands)
return exp
def dense_vector_mps(qibo_circ, gate_algo, datatype):
"""Convert qibo circuit to matrix product state (MPS) format and perform
contraction to dense vector.
Parameters:
qibo_circ: The quantum circuit object.
gate_algo(dict): Dictionary for SVD and QR settings.
datatype (str): Either single ("complex64") or double (complex128) precision.
Returns:
Dense vector of quantum circuit.
"""
myconvertor = QiboCircuitToMPS(qibo_circ, gate_algo, dtype=datatype)
mps_helper = MPSContractionHelper(myconvertor.num_qubits)
return mps_helper.contract_state_vector(
myconvertor.mps_tensors, {"handle": myconvertor.handle}
)