Files
qibotn/benchmarks/benchmark_quimb.py
jaunatisblue dd222587b7
Some checks failed
Build wheels / build (ubuntu-latest, 3.11) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.12) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / check (push) Has been cancelled
Tests / build (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.13) (push) Has been cancelled
tn脚本更新
2026-05-03 18:54:05 +08:00

520 lines
17 KiB
Python

"""Benchmark and profile the qibotn/quimb CPU backend.
This script is intended to be the stable baseline for quimb backend
optimization work. It supports:
- multiple circuit families
- MPS or generic TN execution
- statevector, sampling, conversion, and local expectation workloads
- warmup/repeat timing
- optional correctness checks against qibojit
- optional cProfile output
"""
from __future__ import annotations
import argparse
import cProfile
import json
import math
import os
import pstats
import time
from pathlib import Path
from statistics import mean, pstdev
import numpy as np
import qibo
from qibo import Circuit, gates
def configure_runtime_env(quimb_num_procs: int | None, blas_threads: int | None):
"""Pin process-level thread settings before heavy work starts."""
if quimb_num_procs is not None:
os.environ["QUIMB_NUM_PROCS"] = str(quimb_num_procs)
if blas_threads is not None:
value = str(blas_threads)
os.environ["OMP_NUM_THREADS"] = value
os.environ["OPENBLAS_NUM_THREADS"] = value
os.environ["MKL_NUM_THREADS"] = value
os.environ["NUMEXPR_NUM_THREADS"] = value
def make_circuit(
circuit_type: str,
nqubits: int,
nlayers: int,
seed: int,
add_measurements: bool = False,
) -> Circuit:
"""Construct repeatable workloads covering low/high entanglement cases."""
rng = np.random.default_rng(seed)
circuit = Circuit(nqubits)
if circuit_type == "qft":
from qibo.models import QFT
circuit = QFT(nqubits)
elif circuit_type == "variational":
for layer in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RY(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
offset = layer % 2
for qubit in range(offset, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
elif circuit_type == "ghz":
circuit.add(gates.H(0))
for qubit in range(nqubits - 1):
circuit.add(gates.CNOT(qubit, qubit + 1))
elif circuit_type == "qaoa":
for _ in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RZ(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
for qubit in range(0, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
for qubit in range(nqubits):
circuit.add(gates.RX(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
elif circuit_type == "ising1d":
for _ in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RX(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
for qubit in range(0, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
for qubit in range(1, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
elif circuit_type == "rcs":
cols = math.ceil(math.sqrt(nqubits))
rows = math.ceil(nqubits / cols)
single_qubit_gates = [gates.T, gates.X, gates.Y]
for layer in range(nlayers):
for qubit in range(nqubits):
gate_cls = single_qubit_gates[rng.integers(0, len(single_qubit_gates))]
circuit.add(gate_cls(qubit))
if layer % 2 == 0:
for row in range(rows):
for col in range(0, cols - 1, 2):
q1, q2 = row * cols + col, row * cols + col + 1
if q2 < nqubits:
circuit.add(gates.CZ(q1, q2))
else:
for row in range(0, rows - 1, 2):
for col in range(cols):
q1, q2 = row * cols + col, (row + 1) * cols + col
if q2 < nqubits:
circuit.add(gates.CZ(q1, q2))
else:
raise ValueError(f"Unknown circuit type: {circuit_type}")
if add_measurements:
circuit.add(gates.M(*range(nqubits)))
return circuit
def prepare_quimb_backend(
ansatz: str,
max_bond: int | None,
svd_cutoff: float,
optimizer: str,
n_most_frequent_states: int,
):
"""Create and configure the qibotn/quimb backend once."""
qibo.set_backend("qibotn", platform="quimb")
backend = qibo.get_backend()
backend.configure_tn_simulation(
ansatz=ansatz,
max_bond_dimension=max_bond,
svd_cutoff=svd_cutoff,
n_most_frequent_states=n_most_frequent_states,
)
backend.contractions_optimizer = optimizer
return backend
def run_qibojit_state(circuit: Circuit):
qibo.set_backend("qibojit", platform="numba")
t0 = time.perf_counter()
result = circuit()
elapsed = time.perf_counter() - t0
state = np.asarray(result.state(), dtype=complex).reshape(-1)
return state, elapsed
def run_qibojit_shots(circuit: Circuit, nshots: int):
qibo.set_backend("qibojit", platform="numba")
t0 = time.perf_counter()
result = circuit(nshots=nshots)
elapsed = time.perf_counter() - t0
return dict(result.frequencies()), elapsed
def z_expectation_from_statevector(statevector: np.ndarray, nqubits: int, qubit: int):
probs = np.abs(np.asarray(statevector).reshape(-1)) ** 2
bit_index = nqubits - qubit - 1
bits = (np.arange(len(probs)) >> bit_index) & 1
return float(np.dot(probs, 1.0 - 2.0 * bits))
def fidelity_and_l2(reference: np.ndarray, candidate: np.ndarray):
ref = np.asarray(reference, dtype=complex).reshape(-1)
cand = np.asarray(candidate, dtype=complex).reshape(-1)
fidelity = abs(np.vdot(ref, cand)) ** 2
l2_error = np.linalg.norm(ref - cand)
return float(fidelity), float(l2_error)
def total_variation_distance(reference: dict[str, int], candidate: dict[str, int], nshots: int):
keys = set(reference) | set(candidate)
return 0.5 * sum(abs(reference.get(key, 0) - candidate.get(key, 0)) for key in keys) / nshots
def profile_callable(func, output_path: Path, sort_by: str):
"""Profile a single invocation and dump textual stats."""
profiler = cProfile.Profile()
profiler.enable()
result = func()
profiler.disable()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as stream:
stats = pstats.Stats(profiler, stream=stream)
stats.strip_dirs().sort_stats(sort_by).print_stats(80)
stats.print_callers(30)
return result
def time_callable(func, repeats: int, warmup: int, profile_output: Path | None, profile_sort: str):
for _ in range(warmup):
func()
profiled_payload = None
if profile_output is not None:
profiled_payload = profile_callable(func, profile_output, profile_sort)
samples = []
payloads = []
for _ in range(repeats):
t0 = time.perf_counter()
payload = func()
elapsed = time.perf_counter() - t0
samples.append(elapsed)
payloads.append(payload)
final_payload = payloads[-1] if payloads else profiled_payload
return samples, final_payload
def summarize_samples(samples: list[float]):
return {
"min_s": min(samples),
"mean_s": mean(samples),
"max_s": max(samples),
"std_s": pstdev(samples) if len(samples) > 1 else 0.0,
"repeats": len(samples),
}
def workload_state(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
result = backend.execute_circuit(circuit, return_array=True)
return np.asarray(result.statevector).reshape(-1)
samples, statevector = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
correctness = None
if not args.no_compare:
ref_state, ref_time = run_qibojit_state(circuit)
fidelity, l2_error = fidelity_and_l2(ref_state, statevector)
correctness = {
"qibojit_time_s": ref_time,
"fidelity": fidelity,
"l2_error": l2_error,
}
return summary, correctness
def workload_shots(args):
circuit = make_circuit(
args.circuit, args.nqubits, args.nlayers, args.seed, add_measurements=True
)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
result = backend.execute_circuit(circuit, nshots=args.nshots)
return dict(result.frequencies())
samples, frequencies = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
correctness = None
if not args.no_compare:
ref_freq, ref_time = run_qibojit_shots(circuit, args.nshots)
correctness = {
"qibojit_time_s": ref_time,
"tvd": total_variation_distance(ref_freq, frequencies, args.nshots),
}
return summary, correctness
def workload_convert(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
quimb_circuit = backend._qibo_circuit_to_quimb( # pylint: disable=protected-access
circuit,
quimb_circuit_type=backend.circuit_ansatz,
gate_opts={"max_bond": backend.max_bond_dimension, "cutoff": backend.svd_cutoff},
)
return len(quimb_circuit.gates)
samples, gate_count = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
summary["gate_count"] = gate_count
return summary, None
def workload_expectation(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
operators = ["z"]
sites = [(args.observable_qubit,)]
coeffs = [1.0]
def run_once():
return float(
backend.exp_value_observable_symbolic(
circuit, operators, sites, coeffs, args.nqubits
)
)
samples, expval = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
correctness = None
if not args.no_compare:
ref_state, ref_time = run_qibojit_state(circuit)
correctness = {
"qibojit_time_s": ref_time,
"reference_expval": z_expectation_from_statevector(
ref_state, args.nqubits, args.observable_qubit
),
"abs_error": abs(
z_expectation_from_statevector(ref_state, args.nqubits, args.observable_qubit)
- expval
),
}
return summary, correctness
def workload_raw_local_exp(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
metrics = {}
t0 = time.perf_counter()
quimb_circuit = backend._qibo_circuit_to_quimb( # pylint: disable=protected-access
circuit,
quimb_circuit_type=backend.circuit_ansatz,
gate_opts={"max_bond": backend.max_bond_dimension, "cutoff": backend.svd_cutoff},
)
metrics["convert_s"] = time.perf_counter() - t0
operator = backend._string_to_quimb_operator("z") # pylint: disable=protected-access
if args.rehearse:
t1 = time.perf_counter()
rehearsal = quimb_circuit.local_expectation(
operator,
where=(args.observable_qubit,),
backend=backend.backend,
optimize=backend.contractions_optimizer,
simplify_sequence="R",
rehearse=True,
)
metrics["rehearse_s"] = time.perf_counter() - t1
optimize = rehearsal["tree"]
else:
metrics["rehearse_s"] = 0.0
optimize = backend.contractions_optimizer
t2 = time.perf_counter()
expval = quimb_circuit.local_expectation(
operator,
where=(args.observable_qubit,),
backend=backend.backend,
optimize=optimize,
simplify_sequence="R",
)
metrics["contract_s"] = time.perf_counter() - t2
metrics["total_inner_s"] = (
metrics["convert_s"] + metrics["rehearse_s"] + metrics["contract_s"]
)
metrics["expval"] = float(np.real(expval))
return metrics
samples, metrics = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
summary.update(
{
"convert_s": metrics["convert_s"],
"rehearse_s": metrics["rehearse_s"],
"contract_s": metrics["contract_s"],
"total_inner_s": metrics["total_inner_s"],
}
)
correctness = None
if not args.no_compare:
ref_state, ref_time = run_qibojit_state(circuit)
ref_expval = z_expectation_from_statevector(
ref_state, args.nqubits, args.observable_qubit
)
correctness = {
"qibojit_time_s": ref_time,
"reference_expval": ref_expval,
"abs_error": abs(ref_expval - metrics["expval"]),
}
return summary, correctness
WORKLOADS = {
"state": workload_state,
"shots": workload_shots,
"convert": workload_convert,
"expectation": workload_expectation,
"raw-local-exp": workload_raw_local_exp,
}
def build_parser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--mode",
choices=sorted(WORKLOADS),
default="raw-local-exp",
help="Workload to benchmark.",
)
parser.add_argument(
"--circuit",
choices=["ghz", "ising1d", "qaoa", "qft", "rcs", "variational"],
default="variational",
)
parser.add_argument("--nqubits", type=int, default=10)
parser.add_argument("--nlayers", type=int, default=3)
parser.add_argument("--ansatz", choices=["mps", "tn"], default="tn")
parser.add_argument("--max-bond", type=int, default=None)
parser.add_argument("--svd-cutoff", type=float, default=1e-10)
parser.add_argument("--optimizer", type=str, default="auto-hq")
parser.add_argument("--observable-qubit", type=int, default=0)
parser.add_argument("--nshots", type=int, default=1024)
parser.add_argument("--topk", type=int, default=100)
parser.add_argument("--warmup", type=int, default=1)
parser.add_argument("--repeats", type=int, default=3)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--quimb-num-procs", type=int, default=None)
parser.add_argument("--blas-threads", type=int, default=None)
parser.add_argument("--rehearse", action="store_true")
parser.add_argument("--no-compare", action="store_true")
parser.add_argument("--profile-output", type=Path, default=None)
parser.add_argument("--profile-sort", type=str, default="cumulative")
parser.add_argument("--json-output", type=Path, default=None)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
configure_runtime_env(args.quimb_num_procs, args.blas_threads)
print(
f"mode={args.mode} circuit={args.circuit} nqubits={args.nqubits} "
f"nlayers={args.nlayers} ansatz={args.ansatz} optimizer={args.optimizer}"
)
if args.quimb_num_procs is not None or args.blas_threads is not None:
print(
"threads:"
f" QUIMB_NUM_PROCS={os.environ.get('QUIMB_NUM_PROCS')}"
f" OMP_NUM_THREADS={os.environ.get('OMP_NUM_THREADS')}"
)
workload = WORKLOADS[args.mode]
summary, correctness = workload(args)
print("\nTiming")
for key, value in summary.items():
if isinstance(value, float):
print(f"{key:>16}: {value:.6f}")
else:
print(f"{key:>16}: {value}")
if correctness is not None:
print("\nCorrectness")
for key, value in correctness.items():
if isinstance(value, float):
print(f"{key:>16}: {value:.6e}")
else:
print(f"{key:>16}: {value}")
if args.profile_output is not None:
print(f"\nProfile written to: {args.profile_output}")
if args.json_output is not None:
payload = {"timing": summary, "correctness": correctness, "args": vars(args)}
args.json_output.parent.mkdir(parents=True, exist_ok=True)
args.json_output.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8")
print(f"JSON written to: {args.json_output}")
if __name__ == "__main__":
main()