Compare commits

...

2 Commits

Author SHA1 Message Date
5a692033a6 添加MPI并行TN benchmark及辅助脚本,移除旧benchmark
Some checks failed
Build wheels / build (ubuntu-latest, 3.11) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.12) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / check (push) Has been cancelled
Tests / build (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.13) (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-05 19:04:09 +08:00
a3f39a1d67 删除tn脚本implementation 2026-05-03 19:06:21 +08:00
9 changed files with 411 additions and 562 deletions

3
.gitignore vendored
View File

@@ -6,6 +6,9 @@ data/
# C extensions
*.so
bak/
path/
profiles/
perf*
# Distribution / packaging
.Python

View File

@@ -1,12 +1,39 @@
"""Benchmark: qibotn/quimb generic TN — expectation values."""
import multiprocessing
multiprocessing.set_start_method("spawn", force=True)
import pickle
import time
import threading
import argparse
import numpy as np
import cotengra as ctg
import qibo
from qibo import Circuit, gates
class TimedTrialFn:
def __init__(self, trial_fn, timeout=15):
self.trial_fn = trial_fn
self.timeout = timeout
def __call__(self, *args, **kwargs):
result = [None]
exc = [None]
def _run():
try:
result[0] = self.trial_fn(*args, **kwargs)
except Exception as e:
exc[0] = e
t = threading.Thread(target=_run, daemon=True)
t.start()
t.join(self.timeout)
if t.is_alive():
raise TimeoutError(f"trial exceeded {self.timeout}s")
if exc[0] is not None:
raise exc[0]
return result[0]
def make_circuit(circuit_type, nqubits, nlayers=1):
c = Circuit(nqubits)
if circuit_type == "qft":
@@ -98,7 +125,7 @@ def run_quimb_tn_statevector(circuit, nqubits, num_slices, load_path=None, save_
import torch
qc = b._qibo_circuit_to_quimb(circuit, quimb_circuit_type=b.circuit_ansatz,
gate_opts={"max_bond": None, "cutoff": 1e-10})
qc.to_backend = torch.from_numpy
qc.to_backend = lambda x: torch.from_numpy(x).to(torch.complex64)
if load_path:
with open(load_path, "rb") as f:
saved = pickle.load(f)
@@ -107,28 +134,33 @@ def run_quimb_tn_statevector(circuit, nqubits, num_slices, load_path=None, save_
print(f" [path loaded] {load_path}")
else:
opt = ctg.HyperOptimizer(
methods=['kahypar', 'random-greedy', 'spinglass'],
max_repeats=128,
#methods=['kahypar', 'random-greedy', 'spinglass'],
max_repeats=1024,
#parallel="concurrent.futures",
parallel=64,
max_time=100,
max_time=60,
minimize='size',
slicing_opts={'target_slices': num_slices},
#slicing_opts={'target_size': 2**30},
progbar=True,
on_trial_error='ignore'
)
t0 = time.time()
rehearsal = qc.to_dense(optimize=opt, rehearse=True)
t_search = time.time() - t0
tree = rehearsal['tree']
print(f" [path search] {t_search:.3f}s flops~2^{tree.contraction_cost():.2f} size~2^{tree.contraction_width():.2f}")
#print(f" [path search] {t_search:.3f}s flops~2^{tree.contraction_cost():.2f} size~2^{tree.contraction_width():.2f}")
if save_path:
with open(save_path, "wb") as f:
pickle.dump({"tree": tree}, f)
print(f" [path saved] {save_path}")
print(f" [path search] {t_search:.3f}s flops~2^{tree.contraction_cost():.2f} size~2^{tree.contraction_width():.2f}")
return None, t_search
t0 = time.time()
sv = qc.to_dense(optimize=tree,implementation="cotengra").reshape(-1)
sv = qc.to_dense(optimize=tree).reshape(-1)
t_contract = time.time() - t0
print(f" [contraction] {t_contract:.3f}s")
sv_tn = np.array(sv)
@@ -186,42 +218,48 @@ def run_quimb_tn_statevector_mpi(circuit, nqubits, num_slices, load_path=None, s
import torch
qc = b._qibo_circuit_to_quimb(circuit, quimb_circuit_type=b.circuit_ansatz,
gate_opts={"max_bond": None, "cutoff": 1e-10})
qc.to_backend = torch.from_numpy
qc.to_backend = lambda x: torch.from_numpy(x).to(torch.complex64)
# path search on rank 0, broadcast to all
if rank == 0:
if load_path:
if load_path:
if rank == 0:
with open(load_path, "rb") as f:
saved = pickle.load(f)
tree = saved["tree"]
psi = saved["psi"]
t_search = 0.0
tree, psi, t_search = saved["tree"], saved["psi"], 0.0
print(f" [path loaded] {load_path}")
else:
opt = ctg.HyperOptimizer(
methods=['kahypar', 'random-greedy', 'spinglass'],
max_repeats=128,
parallel=64,
#max_repeats=1,
max_time=100,
minimize='size',
slicing_opts={'target_slices': max(num_slices, size), 'allow_outer': False},
progbar=True,
)
t0 = time.time()
rehearsal = qc.to_dense(optimize=opt, rehearse=True)
t_search = time.time() - t0
tree = rehearsal['tree']
psi = rehearsal['tn']
tree = psi = None
t_search = 0.0
else:
# each rank runs serial search over its share of trials
total_repeats = 1024
rank_repeats = max(1, total_repeats // size)
opt = ctg.HyperOptimizer(
methods=['kahypar', 'random-greedy', 'spinglass'],
max_repeats=rank_repeats,
parallel=False,
max_time=100,
minimize='size',
slicing_opts={'target_slices': max(num_slices, size), 'allow_outer': False},
progbar=(rank == 0),
)
t0 = time.time()
rehearsal = qc.to_dense(optimize=opt, rehearse=True)
t_search = time.time() - t0
local_tree = rehearsal['tree']
local_psi = rehearsal['tn']
local_size = local_tree.contraction_width()
# gather all trees to rank 0, pick best by contraction_width
all_results = comm.gather((local_size, local_tree, local_psi), root=0)
if rank == 0:
_, tree, psi = min(all_results, key=lambda x: x[0])
print(f" [path search] {t_search:.3f}s flops~2^{tree.contraction_cost():.2f} size~2^{tree.contraction_width():.2f} slices={tree.multiplicity}")
if save_path:
with open(save_path, "wb") as f:
pickle.dump({"tree": tree, "psi": psi}, f)
print(f" [path saved] {save_path}")
else:
tree = None
psi = None
t_search = 0.0
else:
tree = psi = None
tree = comm.bcast(tree, root=0)
psi = comm.bcast(psi, root=0)

246
benchmark_tn_mpi.py Normal file
View File

@@ -0,0 +1,246 @@
"""MPI-parallel TN benchmark: path search + contraction via MPI."""
import pickle
import time
import argparse
import numpy as np
import cotengra as ctg
import qibo
from qibo import Circuit, gates
from mpi4py import MPI
from concurrent.futures import ProcessPoolExecutor, as_completed
def _run_serial_search(tn_bytes, output_inds, repeats, seed, num_slices, n_ranks):
"""Run one serial HyperOptimizer in a subprocess, return (width, tree)."""
import pickle, cotengra as ctg, random
random.seed(seed)
tn = pickle.loads(tn_bytes)
opt = ctg.HyperOptimizer(
methods=['kahypar', 'kahypar-agglom', 'spinglass'],
max_repeats=repeats,
parallel=False,
minimize='flops',
max_time=600,
optlib="random",
slicing_opts={'target_size': 2**30, 'allow_outer': False},
progbar=False,
)
tree = tn.contraction_tree(optimize=opt, output_inds=output_inds)
return tree.contraction_width(), tree
def parallel_search(tn, output_inds, total_repeats, n_workers, num_slices, n_ranks,
timeout=None):
"""Launch n_workers subprocesses each running serial search, return best tree."""
import pickle, os, signal
from concurrent.futures import ProcessPoolExecutor, as_completed
tn_bytes = pickle.dumps(tn)
repeats_per = max(1, total_repeats // n_workers)
best_width, best_tree = float('inf'), None
with ProcessPoolExecutor(max_workers=n_workers) as pool:
futures = {
pool.submit(_run_serial_search, tn_bytes, output_inds,
repeats_per, seed, num_slices, n_ranks): seed
for seed in range(n_workers)
}
pids = {f: p.pid for f, p in zip(futures, pool._processes.values())}
try:
for fut in as_completed(futures, timeout=timeout):
try:
width, tree = fut.result()
if width < best_width:
best_width, best_tree = width, tree
except Exception as e:
print(f" [worker failed] {e}")
except TimeoutError:
pass
for fut, pid in pids.items():
if not fut.done():
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
return best_tree
def make_circuit(circuit_type, nqubits, nlayers=1):
c = Circuit(nqubits)
if circuit_type == "qft":
from qibo.models import QFT
return QFT(nqubits)
elif circuit_type == "variational":
for layer in range(nlayers):
for q in range(nqubits):
c.add(gates.RY(q, theta=np.random.uniform(0, 2 * np.pi)))
offset = layer % 2
for q in range(offset, nqubits - 1, 2):
c.add(gates.CZ(q, q + 1))
elif circuit_type == "ghz":
c.add(gates.H(0))
for q in range(nqubits - 1):
c.add(gates.CNOT(q, q + 1))
elif circuit_type == "brickwork":
for q in range(nqubits):
c.add(gates.H(q))
for layer in range(nlayers):
offset = layer % 2
for q in range(offset, nqubits - 1, 2):
c.add(gates.CNOT(q, q + 1))
c.add(gates.RZ(q, theta=np.random.uniform(0, 2 * np.pi)))
c.add(gates.RZ(q + 1, theta=np.random.uniform(0, 2 * np.pi)))
else:
raise ValueError(f"Unknown circuit: {circuit_type}")
return c
def _contract_mpi(tree, arrays, comm, root=0):
rank = comm.Get_rank()
size = comm.Get_size()
is_torch = type(arrays[0]).__module__.startswith("torch")
result_np = None
for i in range(rank, tree.multiplicity, size):
x = tree.contract_slice(arrays, i)
x_np = np.asfortranarray(x.detach().cpu().numpy() if is_torch else np.asarray(x))
result_np = x_np if result_np is None else result_np + x_np
if result_np is None:
result_np = np.zeros(1, dtype=np.complex64)
result = np.zeros_like(result_np) if rank == root else None
comm.Reduce(result_np, result, root=root)
if rank == root:
import torch
return torch.from_numpy(np.asarray(result)) if is_torch else result
return None
def run_mpi(circuit, nqubits, num_slices, total_repeats=1024,
load_path=None, save_path=None):
"""Each MPI rank runs serial path search over total_repeats/size trials,
rank 0 picks the global best, then all ranks contract in parallel."""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
qibo.set_backend("qibotn", platform="quimb")
b = qibo.get_backend()
b.configure_tn_simulation(ansatz="tn")
import torch
qc = b._qibo_circuit_to_quimb(circuit, quimb_circuit_type=b.circuit_ansatz,
gate_opts={"max_bond": None, "cutoff": 1e-10})
qc.to_backend = lambda x: torch.from_numpy(x).to(torch.complex64)
# --- path search: each rank serial, gather best to rank 0 ---
if load_path:
if rank == 0:
with open(load_path, "rb") as f:
saved = pickle.load(f)
tree, psi, t_search = saved["tree"], saved["psi"], 0.0
print(f" [path loaded] {load_path}")
else:
tree = psi = None
t_search = 0.0
else:
rank_repeats = max(1, total_repeats // size)
t0 = time.time()
# get TN object first (no contraction), then run parallel search
psi_tn = qc.to_dense(rehearse="tn")
local_tree = parallel_search(
psi_tn, psi_tn.outer_inds(), rank_repeats, n_workers=48,
num_slices=num_slices, n_ranks=size, timeout=630,
)
t_search = time.time() - t0
local_psi = psi_tn
all_results = comm.gather((local_tree.contraction_width(), local_tree, local_psi), root=0)
if rank == 0:
_, tree, psi = min(all_results, key=lambda x: x[0])
print(f" [path search] {t_search:.3f}s "
f"flops~2^{tree.contraction_cost():.2f} "
f"size~2^{tree.contraction_width():.2f} "
f"slices={tree.multiplicity}")
if save_path:
with open(save_path, "wb") as f:
pickle.dump({"tree": tree, "psi": psi}, f)
print(f" [path saved] {save_path}")
else:
tree = psi = None
if save_path:
t_search = comm.bcast(t_search, root=0)
return None, t_search
tree = comm.bcast(tree, root=0)
psi = comm.bcast(psi, root=0)
t_search = comm.bcast(t_search, root=0)
# --- contraction: all ranks work in parallel ---
import torch
torch.set_num_threads(max(1, 48 // size))
arrays = [torch.from_numpy(np.asarray(a)).to(torch.complex64) for a in psi.arrays]
t0 = time.time()
sv = _contract_mpi(tree, arrays, comm, root=0)
t_contract = time.time() - t0
if rank == 0:
print(f" [contraction] {t_contract:.3f}s")
return np.array(sv).reshape(-1), t_search + t_contract
return None, t_search + t_contract
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--nqubits", type=int, default=30)
parser.add_argument("--circuit", type=str, default="qft",
choices=["qft", "variational", "ghz", "brickwork"])
parser.add_argument("--nlayers", type=int, default=3)
parser.add_argument("--num-slices", type=int, default=1)
parser.add_argument("--total-repeats", type=int, default=1024)
parser.add_argument("--save-path", type=str, default=None)
parser.add_argument("--load-path", type=str, default=None)
parser.add_argument("--no-compare", action="store_true")
args = parser.parse_args()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
print(f"Circuit: {args.circuit}, nqubits={args.nqubits}, "
f"nlayers={args.nlayers}, ranks={comm.Get_size()}")
np.random.seed(42)
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers)
try:
sv, t_total = run_mpi(circuit, args.nqubits, args.num_slices,
total_repeats=args.total_repeats,
load_path=args.load_path, save_path=args.save_path)
except Exception as e:
if rank == 0:
print(f"[FAILED] {e}")
raise
if rank == 0 and sv is not None:
print(f"\n[quimb TN MPI] time={t_total:.4f}s shape={sv.shape}")
np.save(f"data/sv_tn_{args.circuit}{args.nqubits}_mpi.npy", sv)
if not args.no_compare:
from benchmark_tn import run_qibojit
np.random.seed(42)
circuit_ref = make_circuit(args.circuit, args.nqubits, args.nlayers)
sv_ref, t_ref = run_qibojit(circuit_ref)
fid = abs(np.dot(sv_ref.conj(), sv)) ** 2
print(f"[qibojit] time={t_ref:.4f}s")
print(f"Fidelity : {fid:.8f}")
print(f"L2 error : {np.linalg.norm(sv_ref - sv):.2e}")
if t_total > 0:
print(f"Speedup : {t_ref/t_total:.2f}x")
if __name__ == "__main__":
main()

View File

@@ -1,519 +0,0 @@
"""Benchmark and profile the qibotn/quimb CPU backend.
This script is intended to be the stable baseline for quimb backend
optimization work. It supports:
- multiple circuit families
- MPS or generic TN execution
- statevector, sampling, conversion, and local expectation workloads
- warmup/repeat timing
- optional correctness checks against qibojit
- optional cProfile output
"""
from __future__ import annotations
import argparse
import cProfile
import json
import math
import os
import pstats
import time
from pathlib import Path
from statistics import mean, pstdev
import numpy as np
import qibo
from qibo import Circuit, gates
def configure_runtime_env(quimb_num_procs: int | None, blas_threads: int | None):
"""Pin process-level thread settings before heavy work starts."""
if quimb_num_procs is not None:
os.environ["QUIMB_NUM_PROCS"] = str(quimb_num_procs)
if blas_threads is not None:
value = str(blas_threads)
os.environ["OMP_NUM_THREADS"] = value
os.environ["OPENBLAS_NUM_THREADS"] = value
os.environ["MKL_NUM_THREADS"] = value
os.environ["NUMEXPR_NUM_THREADS"] = value
def make_circuit(
circuit_type: str,
nqubits: int,
nlayers: int,
seed: int,
add_measurements: bool = False,
) -> Circuit:
"""Construct repeatable workloads covering low/high entanglement cases."""
rng = np.random.default_rng(seed)
circuit = Circuit(nqubits)
if circuit_type == "qft":
from qibo.models import QFT
circuit = QFT(nqubits)
elif circuit_type == "variational":
for layer in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RY(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
offset = layer % 2
for qubit in range(offset, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
elif circuit_type == "ghz":
circuit.add(gates.H(0))
for qubit in range(nqubits - 1):
circuit.add(gates.CNOT(qubit, qubit + 1))
elif circuit_type == "qaoa":
for _ in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RZ(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
for qubit in range(0, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
for qubit in range(nqubits):
circuit.add(gates.RX(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
elif circuit_type == "ising1d":
for _ in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RX(qubit, theta=rng.uniform(0.0, 2.0 * np.pi)))
for qubit in range(0, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
for qubit in range(1, nqubits - 1, 2):
circuit.add(gates.CZ(qubit, qubit + 1))
elif circuit_type == "rcs":
cols = math.ceil(math.sqrt(nqubits))
rows = math.ceil(nqubits / cols)
single_qubit_gates = [gates.T, gates.X, gates.Y]
for layer in range(nlayers):
for qubit in range(nqubits):
gate_cls = single_qubit_gates[rng.integers(0, len(single_qubit_gates))]
circuit.add(gate_cls(qubit))
if layer % 2 == 0:
for row in range(rows):
for col in range(0, cols - 1, 2):
q1, q2 = row * cols + col, row * cols + col + 1
if q2 < nqubits:
circuit.add(gates.CZ(q1, q2))
else:
for row in range(0, rows - 1, 2):
for col in range(cols):
q1, q2 = row * cols + col, (row + 1) * cols + col
if q2 < nqubits:
circuit.add(gates.CZ(q1, q2))
else:
raise ValueError(f"Unknown circuit type: {circuit_type}")
if add_measurements:
circuit.add(gates.M(*range(nqubits)))
return circuit
def prepare_quimb_backend(
ansatz: str,
max_bond: int | None,
svd_cutoff: float,
optimizer: str,
n_most_frequent_states: int,
):
"""Create and configure the qibotn/quimb backend once."""
qibo.set_backend("qibotn", platform="quimb")
backend = qibo.get_backend()
backend.configure_tn_simulation(
ansatz=ansatz,
max_bond_dimension=max_bond,
svd_cutoff=svd_cutoff,
n_most_frequent_states=n_most_frequent_states,
)
backend.contractions_optimizer = optimizer
return backend
def run_qibojit_state(circuit: Circuit):
qibo.set_backend("qibojit", platform="numba")
t0 = time.perf_counter()
result = circuit()
elapsed = time.perf_counter() - t0
state = np.asarray(result.state(), dtype=complex).reshape(-1)
return state, elapsed
def run_qibojit_shots(circuit: Circuit, nshots: int):
qibo.set_backend("qibojit", platform="numba")
t0 = time.perf_counter()
result = circuit(nshots=nshots)
elapsed = time.perf_counter() - t0
return dict(result.frequencies()), elapsed
def z_expectation_from_statevector(statevector: np.ndarray, nqubits: int, qubit: int):
probs = np.abs(np.asarray(statevector).reshape(-1)) ** 2
bit_index = nqubits - qubit - 1
bits = (np.arange(len(probs)) >> bit_index) & 1
return float(np.dot(probs, 1.0 - 2.0 * bits))
def fidelity_and_l2(reference: np.ndarray, candidate: np.ndarray):
ref = np.asarray(reference, dtype=complex).reshape(-1)
cand = np.asarray(candidate, dtype=complex).reshape(-1)
fidelity = abs(np.vdot(ref, cand)) ** 2
l2_error = np.linalg.norm(ref - cand)
return float(fidelity), float(l2_error)
def total_variation_distance(reference: dict[str, int], candidate: dict[str, int], nshots: int):
keys = set(reference) | set(candidate)
return 0.5 * sum(abs(reference.get(key, 0) - candidate.get(key, 0)) for key in keys) / nshots
def profile_callable(func, output_path: Path, sort_by: str):
"""Profile a single invocation and dump textual stats."""
profiler = cProfile.Profile()
profiler.enable()
result = func()
profiler.disable()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as stream:
stats = pstats.Stats(profiler, stream=stream)
stats.strip_dirs().sort_stats(sort_by).print_stats(80)
stats.print_callers(30)
return result
def time_callable(func, repeats: int, warmup: int, profile_output: Path | None, profile_sort: str):
for _ in range(warmup):
func()
profiled_payload = None
if profile_output is not None:
profiled_payload = profile_callable(func, profile_output, profile_sort)
samples = []
payloads = []
for _ in range(repeats):
t0 = time.perf_counter()
payload = func()
elapsed = time.perf_counter() - t0
samples.append(elapsed)
payloads.append(payload)
final_payload = payloads[-1] if payloads else profiled_payload
return samples, final_payload
def summarize_samples(samples: list[float]):
return {
"min_s": min(samples),
"mean_s": mean(samples),
"max_s": max(samples),
"std_s": pstdev(samples) if len(samples) > 1 else 0.0,
"repeats": len(samples),
}
def workload_state(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
result = backend.execute_circuit(circuit, return_array=True)
return np.asarray(result.statevector).reshape(-1)
samples, statevector = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
correctness = None
if not args.no_compare:
ref_state, ref_time = run_qibojit_state(circuit)
fidelity, l2_error = fidelity_and_l2(ref_state, statevector)
correctness = {
"qibojit_time_s": ref_time,
"fidelity": fidelity,
"l2_error": l2_error,
}
return summary, correctness
def workload_shots(args):
circuit = make_circuit(
args.circuit, args.nqubits, args.nlayers, args.seed, add_measurements=True
)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
result = backend.execute_circuit(circuit, nshots=args.nshots)
return dict(result.frequencies())
samples, frequencies = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
correctness = None
if not args.no_compare:
ref_freq, ref_time = run_qibojit_shots(circuit, args.nshots)
correctness = {
"qibojit_time_s": ref_time,
"tvd": total_variation_distance(ref_freq, frequencies, args.nshots),
}
return summary, correctness
def workload_convert(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
quimb_circuit = backend._qibo_circuit_to_quimb( # pylint: disable=protected-access
circuit,
quimb_circuit_type=backend.circuit_ansatz,
gate_opts={"max_bond": backend.max_bond_dimension, "cutoff": backend.svd_cutoff},
)
return len(quimb_circuit.gates)
samples, gate_count = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
summary["gate_count"] = gate_count
return summary, None
def workload_expectation(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
operators = ["z"]
sites = [(args.observable_qubit,)]
coeffs = [1.0]
def run_once():
return float(
backend.exp_value_observable_symbolic(
circuit, operators, sites, coeffs, args.nqubits
)
)
samples, expval = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
correctness = None
if not args.no_compare:
ref_state, ref_time = run_qibojit_state(circuit)
correctness = {
"qibojit_time_s": ref_time,
"reference_expval": z_expectation_from_statevector(
ref_state, args.nqubits, args.observable_qubit
),
"abs_error": abs(
z_expectation_from_statevector(ref_state, args.nqubits, args.observable_qubit)
- expval
),
}
return summary, correctness
def workload_raw_local_exp(args):
circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed)
backend = prepare_quimb_backend(
ansatz=args.ansatz,
max_bond=args.max_bond,
svd_cutoff=args.svd_cutoff,
optimizer=args.optimizer,
n_most_frequent_states=args.topk,
)
def run_once():
metrics = {}
t0 = time.perf_counter()
quimb_circuit = backend._qibo_circuit_to_quimb( # pylint: disable=protected-access
circuit,
quimb_circuit_type=backend.circuit_ansatz,
gate_opts={"max_bond": backend.max_bond_dimension, "cutoff": backend.svd_cutoff},
)
metrics["convert_s"] = time.perf_counter() - t0
operator = backend._string_to_quimb_operator("z") # pylint: disable=protected-access
if args.rehearse:
t1 = time.perf_counter()
rehearsal = quimb_circuit.local_expectation(
operator,
where=(args.observable_qubit,),
backend=backend.backend,
optimize=backend.contractions_optimizer,
simplify_sequence="R",
rehearse=True,
)
metrics["rehearse_s"] = time.perf_counter() - t1
optimize = rehearsal["tree"]
else:
metrics["rehearse_s"] = 0.0
optimize = backend.contractions_optimizer
t2 = time.perf_counter()
expval = quimb_circuit.local_expectation(
operator,
where=(args.observable_qubit,),
backend=backend.backend,
optimize=optimize,
simplify_sequence="R",
)
metrics["contract_s"] = time.perf_counter() - t2
metrics["total_inner_s"] = (
metrics["convert_s"] + metrics["rehearse_s"] + metrics["contract_s"]
)
metrics["expval"] = float(np.real(expval))
return metrics
samples, metrics = time_callable(
run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort
)
summary = summarize_samples(samples)
summary.update(
{
"convert_s": metrics["convert_s"],
"rehearse_s": metrics["rehearse_s"],
"contract_s": metrics["contract_s"],
"total_inner_s": metrics["total_inner_s"],
}
)
correctness = None
if not args.no_compare:
ref_state, ref_time = run_qibojit_state(circuit)
ref_expval = z_expectation_from_statevector(
ref_state, args.nqubits, args.observable_qubit
)
correctness = {
"qibojit_time_s": ref_time,
"reference_expval": ref_expval,
"abs_error": abs(ref_expval - metrics["expval"]),
}
return summary, correctness
WORKLOADS = {
"state": workload_state,
"shots": workload_shots,
"convert": workload_convert,
"expectation": workload_expectation,
"raw-local-exp": workload_raw_local_exp,
}
def build_parser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--mode",
choices=sorted(WORKLOADS),
default="raw-local-exp",
help="Workload to benchmark.",
)
parser.add_argument(
"--circuit",
choices=["ghz", "ising1d", "qaoa", "qft", "rcs", "variational"],
default="variational",
)
parser.add_argument("--nqubits", type=int, default=10)
parser.add_argument("--nlayers", type=int, default=3)
parser.add_argument("--ansatz", choices=["mps", "tn"], default="tn")
parser.add_argument("--max-bond", type=int, default=None)
parser.add_argument("--svd-cutoff", type=float, default=1e-10)
parser.add_argument("--optimizer", type=str, default="auto-hq")
parser.add_argument("--observable-qubit", type=int, default=0)
parser.add_argument("--nshots", type=int, default=1024)
parser.add_argument("--topk", type=int, default=100)
parser.add_argument("--warmup", type=int, default=1)
parser.add_argument("--repeats", type=int, default=3)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--quimb-num-procs", type=int, default=None)
parser.add_argument("--blas-threads", type=int, default=None)
parser.add_argument("--rehearse", action="store_true")
parser.add_argument("--no-compare", action="store_true")
parser.add_argument("--profile-output", type=Path, default=None)
parser.add_argument("--profile-sort", type=str, default="cumulative")
parser.add_argument("--json-output", type=Path, default=None)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
configure_runtime_env(args.quimb_num_procs, args.blas_threads)
print(
f"mode={args.mode} circuit={args.circuit} nqubits={args.nqubits} "
f"nlayers={args.nlayers} ansatz={args.ansatz} optimizer={args.optimizer}"
)
if args.quimb_num_procs is not None or args.blas_threads is not None:
print(
"threads:"
f" QUIMB_NUM_PROCS={os.environ.get('QUIMB_NUM_PROCS')}"
f" OMP_NUM_THREADS={os.environ.get('OMP_NUM_THREADS')}"
)
workload = WORKLOADS[args.mode]
summary, correctness = workload(args)
print("\nTiming")
for key, value in summary.items():
if isinstance(value, float):
print(f"{key:>16}: {value:.6f}")
else:
print(f"{key:>16}: {value}")
if correctness is not None:
print("\nCorrectness")
for key, value in correctness.items():
if isinstance(value, float):
print(f"{key:>16}: {value:.6e}")
else:
print(f"{key:>16}: {value}")
if args.profile_output is not None:
print(f"\nProfile written to: {args.profile_output}")
if args.json_output is not None:
payload = {"timing": summary, "correctness": correctness, "args": vars(args)}
args.json_output.parent.mkdir(parents=True, exist_ok=True)
args.json_output.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8")
print(f"JSON written to: {args.json_output}")
if __name__ == "__main__":
main()

51
compare_jit_tn_quimb.py Normal file
View File

@@ -0,0 +1,51 @@
import numpy as np
import os
import sys
def check_results(ref_path, tn_path):
# 1. 检查文件是否存在
if not os.path.exists(ref_path) or not os.path.exists(tn_path):
print(f"Error: 找不到文件!\n参考文件: {ref_path}\n待测文件: {tn_path}")
return
print(f"正在加载数据并对比: \n [Ref] {ref_path}\n [TN ] {tn_path}\n")
try:
# 2. 加载状态向量
# mmap_mode='r' 可以防止大文件直接撑爆内存
sv_ref = np.load(ref_path, mmap_mode='r')
sv_tn = np.load(tn_path, mmap_mode='r')
# 3. 计算保真度 (Fidelity)
# fid = |<ref|tn>|^2
inner_product = np.dot(sv_ref.conj(), sv_tn)
fidelity = np.abs(inner_product)**2
# 4. 计算 L2 误差 (欧氏距离)
l2_error = np.linalg.norm(sv_ref - sv_tn)
# 5. 打印结果
print("-" * 30)
print(f"保真度 (Fidelity): {fidelity:.12f}")
#print(f"L2 范数误差: {l2_error:.2e}")
print("-" * 30)
# phase-invariant L2: align global phase first
phase = inner_product / np.abs(inner_product)
l2_phase_corrected = np.linalg.norm(sv_ref - sv_tn / phase)
print(f"L2 误差(相位校正后): {l2_phase_corrected:.2e}")
if fidelity > 0.999999:
print("✅ 验证通过:结果高度一致。")
else:
print("❌ 警告:保真度较低,请检查收缩路径或截断误差。")
except Exception as e:
print(f"计算过程中发生错误: {e}")
if __name__ == "__main__":
# 你可以在这里直接修改文件名
REF_FILE = 'data/sv_qibojit_qft30.npy'
TN_FILE = 'data/sv_tn_qft30_mpi.npy'
check_results(REF_FILE, TN_FILE)

2
hostfile Normal file
View File

@@ -0,0 +1,2 @@
10.20.6.74:1
10.20.6.102:1

21
inspect_path.py Normal file
View File

@@ -0,0 +1,21 @@
import pickle
import sys
path = sys.argv[1] if len(sys.argv) > 1 else 'path/path.pkl.34.mpi'
with open(path, 'rb') as f:
d = pickle.load(f)
tree = d['tree']
width = tree.contraction_width()
slices = tree.multiplicity
sliced_width = width - (slices.bit_length() - 1)
print(f"Path: {path}")
print(f"Width (unsliced): {width:.1f}")
print(f"Slices: {slices}")
print(f"Sliced width: {sliced_width:.1f}")
print(f"Peak memory (total): {2**width * 16 / 1e9:.1f} GB")
print(f"Peak per slice: {2**sliced_width * 16 / 1e9:.2f} GB")
print(f"Sliced indices: {len(tree.sliced_inds)}")
print(f"Cost (log2 flops): {tree.contraction_cost(log=True):.2f}")

11
log
View File

@@ -1,11 +0,0 @@
[qibojit] loaded from cache: /home/yx/qibotn/data/jit_variational_n32_l5.npy
bond time(s) fidelity l2_err
----------------------------------------------
1 157.4587 0.00000280 9.99e-01
8 61.9126 0.99999014 2.22e-03
16 63.4902 0.99999014 2.22e-03
32 58.3594 0.99999014 2.22e-03
64 59.7043 0.99999014 2.22e-03
128 64.6368 0.99999014 2.22e-03
256 64.9058 0.99999014 2.22e-03

18
run_qibojit_ref.py Normal file
View File

@@ -0,0 +1,18 @@
"""Run qibojit on 30-qubit QFT and save statevector for comparison."""
import time
import numpy as np
import qibo
from qibo.models import QFT
#np.random.seed(42)
circuit = QFT(32)
qibo.set_backend("qibojit", platform="numba")
t0 = time.time()
result = circuit()
elapsed = time.time() - t0
sv = np.array(result.state(), dtype=complex).flatten()
np.save("data/sv_qibojit_qft30.npy", sv)
print(f"[qibojit] time={elapsed:.4f}s shape={sv.shape}")
print(f"Saved to sv_qibojit_qft30.npy")