#!/usr/bin/env python3 # # Current most stable GPU-branch baseline: # GPU_Calculation="yes" # Equation_Class="BSSN" # Initial_Data_Method="Ansorg-TwoPuncture" # puncture_data_set="Manually" # basic_grid_set="Patch" # grid_center_set="Cell" # Symmetry="equatorial-symmetry" # Time_Evolution_Method="runge-kutta-45" # Finite_Diffenence_Method="4th-order" # boundary_choice="BAM-choice" # gauge_choice=0 # tetrad_type=2 # AHF_Find="no" # devide_factor=2.0 # static_grid_type="Linear" # moving_grid_type="Linear" # AMSS_Z4C_MRBD=0 # Do not enable AMSS_CUDA_BH_INTERP_RESIDENT unless a dedicated # CPU/GPU trajectory comparison has been run for that configuration. """ Check whether AMSS_NCKU_Input.py is suitable for the current GPU branch. Usage: python3 AMSS_NCKU_GPUCheck.py python3 AMSS_NCKU_GPUCheck.py -f /path/to/AMSS_NCKU_Input.py """ from __future__ import annotations import argparse import importlib.util import os from dataclasses import dataclass, field from pathlib import Path from typing import Any, Iterable, List, Sequence SUPPORTED_EQUATIONS = {"BSSN", "BSSN-EScalar", "BSSN-EM", "Z4C"} SUPPORTED_INITIAL_DATA = { "Ansorg-TwoPuncture", "Lousto-Analytical", "Cao-Analytical", "KerrSchild-Analytical", } SUPPORTED_SYMMETRIES = { "no-symmetry", "equatorial-symmetry", "octant-symmetry", } SUPPORTED_GRIDS = {"Patch", "Shell-Patch"} SUPPORTED_CENTERS = {"Cell", "Vertex"} SUPPORTED_FD = {"2nd-order", "4th-order", "6th-order", "8th-order"} SUPPORTED_GAUGES = {0, 1, 2, 3, 4, 5, 6, 7} SUPPORTED_TETRADS = {0, 1, 2} SUPPORTED_AHF = {"yes", "no"} SUPPORTED_BOUNDARIES = {"BAM-choice", "Shibata-choice"} SUPPORTED_PUNCTURE_DATA = {"Manually", "Automatically-BBH"} STABLE_BASELINE = { "GPU_Calculation": "yes", "Equation_Class": "BSSN", "Initial_Data_Method": "Ansorg-TwoPuncture", "puncture_data_set": "Manually", "basic_grid_set": "Patch", "grid_center_set": "Cell", "Symmetry": "equatorial-symmetry", "Time_Evolution_Method": "runge-kutta-45", "Finite_Diffenence_Method": "4th-order", "boundary_choice": "BAM-choice", "gauge_choice": 0, "tetrad_type": 2, "AHF_Find": "no", "devide_factor": 2.0, "static_grid_type": "Linear", "moving_grid_type": "Linear", "AMSS_Z4C_MRBD": 0, } @dataclass class CheckResult: ok: bool = True warnings: List[str] = field(default_factory=list) risks: List[str] = field(default_factory=list) notes: List[str] = field(default_factory=list) def add_warning(self, msg: str) -> None: self.warnings.append(msg) def add_risk(self, msg: str) -> None: self.ok = False self.risks.append(msg) def add_note(self, msg: str) -> None: self.notes.append(msg) def extend_notes(self, messages: Iterable[str]) -> None: self.notes.extend(messages) def load_input_module(path: Path): spec = importlib.util.spec_from_file_location("amss_ncku_input", str(path)) if spec is None or spec.loader is None: raise RuntimeError(f"cannot load input module from {path}") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # type: ignore[union-attr] return module def get_attr(mod: Any, name: str, default: Any = None) -> Any: return getattr(mod, name, default) def as_text(value: Any) -> str: if isinstance(value, str): return value.strip() return str(value).strip() def as_lower_text(value: Any) -> str: return as_text(value).lower() def as_float(value: Any, default: float | None = None) -> float | None: try: return float(value) except (TypeError, ValueError): return default def as_int(value: Any, default: int | None = None) -> int | None: try: return int(value) except (TypeError, ValueError): return default def sequence_len(value: Any) -> int | None: try: return len(value) except TypeError: return None def sequence_values(value: Any) -> List[float] | None: try: return [float(v) for v in value] except (TypeError, ValueError): return None def approx_equal(a: Any, b: float, tol: float = 1.0e-12) -> bool: value = as_float(a) return value is not None and abs(value - b) <= tol def env_truthy(name: str) -> bool: value = os.environ.get(name) return value is not None and value.strip().lower() in { "1", "yes", "y", "true", "on", "enable", "enabled", } def stable_baseline_differences(mod: Any) -> List[str]: diffs = [] for name, expected in STABLE_BASELINE.items(): if not hasattr(mod, name): continue actual = get_attr(mod, name, None) if isinstance(expected, float): if not approx_equal(actual, expected): diffs.append(f"{name}={actual!r} (stable baseline: {expected!r})") elif actual != expected: diffs.append(f"{name}={actual!r} (stable baseline: {expected!r})") return diffs def add_membership_check( r: CheckResult, name: str, value: Any, supported: Sequence[Any] | set[Any], *, risk_message: str | None = None, note_message: str | None = None, ) -> None: if value not in supported: r.add_risk(risk_message or f"Unsupported {name}: {value!r}") elif note_message: r.add_note(note_message) def check_positive_int(r: CheckResult, name: str, value: Any) -> None: parsed = as_int(value) if parsed is None or parsed <= 0: r.add_risk(f"{name} must be a positive integer; got {value!r}") def check_nonnegative_number(r: CheckResult, name: str, value: Any) -> None: parsed = as_float(value) if parsed is None or parsed < 0.0: r.add_risk(f"{name} must be a non-negative number; got {value!r}") def check_grid_geometry(r: CheckResult, mod: Any, grid: str) -> None: grid_level = as_int(get_attr(mod, "grid_level", None)) static_grid_level = as_int(get_attr(mod, "static_grid_level", None)) moving_grid_level = as_int(get_attr(mod, "moving_grid_level", None)) refinement_level = as_int(get_attr(mod, "refinement_level", None)) analysis_level = as_int(get_attr(mod, "analysis_level", 0)) for name in ( "grid_level", "static_grid_level", "moving_grid_level", "static_grid_number", "moving_grid_number", "quarter_sphere_number", ): check_positive_int(r, name, get_attr(mod, name, None)) if grid_level is not None and static_grid_level is not None: if static_grid_level > grid_level: r.add_risk("static_grid_level cannot exceed grid_level.") if moving_grid_level is not None and moving_grid_level != grid_level - static_grid_level: r.add_risk( "moving_grid_level should equal grid_level - static_grid_level; " f"got {moving_grid_level}, expected {grid_level - static_grid_level}." ) if grid_level is not None: if refinement_level is None or refinement_level < 0 or refinement_level > grid_level: r.add_risk(f"refinement_level must be in [0, grid_level]; got {refinement_level!r}") if analysis_level is None or analysis_level < 0 or analysis_level >= grid_level: r.add_risk(f"analysis_level must be in [0, grid_level); got {analysis_level!r}") largest_max = sequence_values(get_attr(mod, "largest_box_xyz_max", None)) largest_min = sequence_values(get_attr(mod, "largest_box_xyz_min", None)) if largest_max is None or len(largest_max) != 3: r.add_risk("largest_box_xyz_max must contain three numeric values.") elif any(v <= 0.0 for v in largest_max): r.add_risk(f"largest_box_xyz_max values must be positive; got {largest_max!r}") if largest_min is None or len(largest_min) != 3: r.add_risk("largest_box_xyz_min must contain three numeric values.") elif largest_max is not None and len(largest_max) == 3: for idx, (lo, hi) in enumerate(zip(largest_min, largest_max)): if lo >= hi: r.add_risk( f"largest_box_xyz_min[{idx}] must be smaller than largest_box_xyz_max[{idx}]." ) if grid == "Shell-Patch" and largest_max is not None and len(largest_max) == 3: if max(largest_max) - min(largest_max) > 1.0e-12: r.add_risk("Shell-Patch requires a cubic largest_box_xyz_max.") if not approx_equal(get_attr(mod, "devide_factor", None), 2.0): r.add_risk("devide_factor must remain 2.0; the AMR code documents only this ratio as supported.") if as_text(get_attr(mod, "static_grid_type", "")) != "Linear": r.add_risk("static_grid_type must remain 'Linear'.") if as_text(get_attr(mod, "moving_grid_type", "")) != "Linear": r.add_risk("moving_grid_type must remain 'Linear'.") shell_shape = sequence_values(get_attr(mod, "shell_grid_number", None)) if grid == "Shell-Patch": if shell_shape is None or len(shell_shape) != 3: r.add_risk("Shell-Patch requires shell_grid_number with three numeric values.") elif any(int(v) <= 0 for v in shell_shape): r.add_risk(f"shell_grid_number values must be positive; got {shell_shape!r}") def check_punctures(r: CheckResult, mod: Any, init: str, puncture_data: str) -> None: puncture_number = as_int(get_attr(mod, "puncture_number", None)) if puncture_number is None or puncture_number <= 0: r.add_risk(f"puncture_number must be a positive integer; got {puncture_number!r}") return if init == "Ansorg-TwoPuncture" and puncture_number != 2: r.add_warning( "Ansorg-TwoPuncture is validated on the GPU branch mainly for puncture_number=2." ) if puncture_data == "Automatically-BBH": r.add_risk("puncture_data_set='Automatically-BBH' is documented as still developing.") for name in ("position_BH", "parameter_BH", "dimensionless_spin_BH", "momentum_BH"): value = get_attr(mod, name, None) outer = sequence_len(value) if outer != puncture_number: r.add_risk(f"{name} must have puncture_number rows; got {outer!r}.") continue for idx in range(puncture_number): if sequence_len(value[idx]) != 3: r.add_risk(f"{name}[{idx}] must contain three values.") break if init == "Ansorg-TwoPuncture": for name in ("parameter_BH", "position_BH", "momentum_BH"): if get_attr(mod, name, None) is None: r.add_risk(f"Ansorg-TwoPuncture requires {name}.") def check_output_and_time(r: CheckResult, mod: Any) -> None: for name in ( "Final_Evolution_Time", "Check_Time", "Dump_Time", "D2_Dump_Time", "Analysis_Time", "Courant_Factor", "Dissipation", ): check_nonnegative_number(r, name, get_attr(mod, name, None)) check_positive_int(r, "Evolution_Step_Number", get_attr(mod, "Evolution_Step_Number", None)) start_time = as_float(get_attr(mod, "Start_Evolution_Time", None)) final_time = as_float(get_attr(mod, "Final_Evolution_Time", None)) if start_time is None: r.add_risk("Start_Evolution_Time must be numeric.") elif final_time is not None and final_time <= start_time: r.add_risk("Final_Evolution_Time must be greater than Start_Evolution_Time.") for name in ("GW_L_max", "GW_M_max", "Detector_Number"): check_positive_int(r, name, get_attr(mod, name, None)) detector_min = as_float(get_attr(mod, "Detector_Rmin", None)) detector_max = as_float(get_attr(mod, "Detector_Rmax", None)) if detector_min is None or detector_min <= 0.0: r.add_risk(f"Detector_Rmin must be positive; got {detector_min!r}") if detector_max is None or detector_max <= 0.0: r.add_risk(f"Detector_Rmax must be positive; got {detector_max!r}") if detector_min is not None and detector_max is not None and detector_max <= detector_min: r.add_risk("Detector_Rmax must be greater than Detector_Rmin.") def check_equation_specific(r: CheckResult, mod: Any, eq: str, grid: str, fd: str) -> None: if eq == "BSSN": r.add_note("Equation_Class=BSSN is the current validated GPU baseline.") elif eq == "BSSN-EScalar": r.add_warning("BSSN-EScalar has a CUDA path, but it is less broadly validated than BSSN.") fr_choice = as_int(get_attr(mod, "FR_Choice", None)) if fr_choice not in {1, 2, 3, 4, 5}: r.add_risk(f"FR_Choice must be one of 1..5 for BSSN-EScalar; got {fr_choice!r}") if approx_equal(get_attr(mod, "FR_a2", None), 0.0): r.add_risk("CUDA BSSN-EScalar requires nonzero FR_a2.") elif not approx_equal(get_attr(mod, "FR_a2", None), 3.0): r.add_warning("CUDA BSSN-EScalar now passes FR_a2 to the kernel, but non-3.0 values need CPU/GPU regression.") for name in ("FR_l2", "FR_phi0", "FR_r0", "FR_sigma0"): check_nonnegative_number(r, name, get_attr(mod, name, None)) elif eq == "BSSN-EM": r.add_warning( "BSSN-EM is accepted by the build, but this checker cannot certify its physics/output " "without a CPU/GPU regression run." ) if fd == "8th-order": r.add_note("BSSN-EM with 8th-order enables extra CUDA AMR batching defaults.") elif eq == "Z4C": r.add_warning( "Z4C has CUDA support, but the resident path and Shell/CPBC combinations are more constrained." ) if grid == "Patch": r.add_warning("Z4C+Patch avoids Shell CPBC, but still needs a dedicated regression test.") else: r.add_warning("Z4C+Shell-Patch uses CPBC/Shell logic and is not the stable BSSN baseline.") def check_runtime_environment(r: CheckResult, mod: Any, eq: str, grid: str, fd: str) -> None: if env_truthy("AMSS_CUDA_BH_INTERP_RESIDENT"): r.add_risk( "AMSS_CUDA_BH_INTERP_RESIDENT is enabled in the environment; this option previously caused " "late-time trajectory drift and should stay off unless explicitly revalidated." ) else: r.add_note("AMSS_CUDA_BH_INTERP_RESIDENT is not enabled; this matches the fixed stable default.") if eq in {"BSSN", "BSSN-EScalar", "Z4C"}: r.add_note("makefile_and_run.py will default AMSS_CUDA_AMR_RESTRICT_DEVICE=1 for this equation.") if fd in {"2nd-order", "8th-order"}: r.add_warning( f"{fd} disables some interpolation/CUDA-aware MPI fast paths by default; validate performance and output." ) if grid == "Shell-Patch": r.add_warning( "Shell-Patch changes runtime defaults and MPI process handling; use at least the script-adjusted 4 MPI ranks." ) z4c_mrbd = as_int(get_attr(mod, "AMSS_Z4C_MRBD", 0), 0) if z4c_mrbd not in {0, 1, 2}: r.add_risk(f"AMSS_Z4C_MRBD must be 0, 1, or 2; got {z4c_mrbd!r}") elif eq == "Z4C" and z4c_mrbd == 2: r.add_risk("Z4C GPU resident path does not support AMSS_Z4C_MRBD=2.") elif eq == "Z4C" and z4c_mrbd in {0, 1}: r.add_note(f"Z4C will build with AMSS_Z4C_MRBD={z4c_mrbd}.") def check_stable_profile(r: CheckResult, mod: Any) -> None: diffs = stable_baseline_differences(mod) if not diffs: r.add_note("This input matches the documented most stable GPU baseline.") return r.add_warning( "This input differs from the documented most stable GPU baseline: " + "; ".join(diffs) ) def check_input(mod: Any) -> CheckResult: r = CheckResult() gpu_text = as_lower_text(get_attr(mod, "GPU_Calculation", "no")) gpu = gpu_text == "yes" eq = as_text(get_attr(mod, "Equation_Class", "")) init = as_text(get_attr(mod, "Initial_Data_Method", "")) symmetry = as_text(get_attr(mod, "Symmetry", "")) time_method = as_text(get_attr(mod, "Time_Evolution_Method", "")) grid = as_text(get_attr(mod, "basic_grid_set", "")) center = as_text(get_attr(mod, "grid_center_set", "")) fd = as_text(get_attr(mod, "Finite_Diffenence_Method", "")) gauge = get_attr(mod, "gauge_choice", None) tetrad = get_attr(mod, "tetrad_type", None) ahf = as_text(get_attr(mod, "AHF_Find", "no")).lower() boundary = as_text(get_attr(mod, "boundary_choice", "")) puncture_data = as_text(get_attr(mod, "puncture_data_set", "")) cpu_part = get_attr(mod, "CPU_Part", None) gpu_part = get_attr(mod, "GPU_Part", None) if gpu_text not in {"yes", "no"}: r.add_risk(f"GPU_Calculation must be 'yes' or 'no'; got {get_attr(mod, 'GPU_Calculation', None)!r}") if not gpu: r.add_note("GPU_Calculation=no; this check only targets the GPU branch.") return r r.add_note("GPU_Calculation=yes detected.") add_membership_check(r, "Equation_Class", eq, SUPPORTED_EQUATIONS) add_membership_check(r, "Symmetry", symmetry, SUPPORTED_SYMMETRIES) add_membership_check(r, "Initial_Data_Method", init, SUPPORTED_INITIAL_DATA) add_membership_check(r, "basic_grid_set", grid, SUPPORTED_GRIDS) add_membership_check(r, "grid_center_set", center, SUPPORTED_CENTERS) add_membership_check(r, "Finite_Diffenence_Method", fd, SUPPORTED_FD) add_membership_check(r, "gauge_choice", gauge, SUPPORTED_GAUGES) add_membership_check(r, "tetrad_type", tetrad, SUPPORTED_TETRADS) add_membership_check(r, "AHF_Find", ahf, SUPPORTED_AHF) add_membership_check(r, "boundary_choice", boundary, SUPPORTED_BOUNDARIES) add_membership_check(r, "puncture_data_set", puncture_data, SUPPORTED_PUNCTURE_DATA) if init != "Ansorg-TwoPuncture": r.add_risk( f"Initial_Data_Method={init!r} is not validated as safe on this GPU branch; " "the stable path is Ansorg-TwoPuncture." ) else: r.add_note("Initial_Data_Method=Ansorg-TwoPuncture is supported.") if time_method != "runge-kutta-45": r.add_risk(f"Only Time_Evolution_Method='runge-kutta-45' is supported; got {time_method!r}.") if grid == "Patch": r.add_note("basic_grid_set=Patch is the current stable GPU grid path.") elif grid == "Shell-Patch": r.add_warning("basic_grid_set=Shell-Patch has GPU support but is outside the stable BSSN baseline.") if center == "Vertex": r.add_warning("grid_center_set=Vertex is compiled by macros, but the stable GPU baseline is Cell.") if symmetry != "equatorial-symmetry": r.add_warning("The stable validation case uses equatorial-symmetry; other symmetries need regression tests.") if fd != "4th-order": r.add_warning("The stable validation case uses 4th-order finite differences.") if gauge not in {0, 1}: r.add_warning("Input comments recommend gauge_choice 0 or 1; other gauges need dedicated validation.") if tetrad != 2: r.add_warning("Input comments recommend tetrad_type=2; other tetrads affect wave extraction conventions.") if ahf == "yes": r.add_warning("AHF_Find=yes is supported by macros, but it is outside the current stable GPU baseline.") if boundary == "Shibata-choice": r.add_risk("Shibata-choice is not faithfully distinguished in the current macro generator; it maps to the BAM branch.") elif boundary == "BAM-choice": r.add_note("boundary_choice=BAM-choice is supported.") if cpu_part is not None or gpu_part is not None: r.add_warning("CPU_Part/GPU_Part are printed and propagated, but they do not control a real mixed CPU/GPU split in this branch.") check_output_and_time(r, mod) check_grid_geometry(r, mod, grid) check_punctures(r, mod, init, puncture_data) check_equation_specific(r, mod, eq, grid, fd) check_runtime_environment(r, mod, eq, grid, fd) check_stable_profile(r, mod) return r def main() -> int: parser = argparse.ArgumentParser() parser.add_argument( "-f", "--file", "--input", dest="input_file", default="AMSS_NCKU_Input.py", help="path to AMSS_NCKU_Input.py", ) args = parser.parse_args() path = Path(args.input_file).resolve() if not path.exists(): print(f"ERROR: input file not found: {path}") return 2 try: mod = load_input_module(path) except Exception as exc: print(f"ERROR: failed to load input file: {exc}") return 2 result = check_input(mod) print(f"Input: {path}") print(f"GPU_Calculation: {get_attr(mod, 'GPU_Calculation', 'no')}") print(f"Symmetry: {get_attr(mod, 'Symmetry', '')}") print(f"Equation_Class: {get_attr(mod, 'Equation_Class', '')}") print(f"Initial_Data_Method: {get_attr(mod, 'Initial_Data_Method', '')}") print(f"puncture_data_set: {get_attr(mod, 'puncture_data_set', '')}") print(f"basic_grid_set: {get_attr(mod, 'basic_grid_set', '')}") print(f"grid_center_set: {get_attr(mod, 'grid_center_set', '')}") print(f"Finite_Diffenence_Method: {get_attr(mod, 'Finite_Diffenence_Method', '')}") print(f"gauge_choice: {get_attr(mod, 'gauge_choice', '')}") print(f"tetrad_type: {get_attr(mod, 'tetrad_type', '')}") print(f"boundary_choice: {get_attr(mod, 'boundary_choice', '')}") print(f"AHF_Find: {get_attr(mod, 'AHF_Find', '')}") print(f"AMSS_Z4C_MRBD: {get_attr(mod, 'AMSS_Z4C_MRBD', 0)}") print("") for msg in result.notes: print(f"NOTE: {msg}") for msg in result.warnings: print(f"WARNING: {msg}") for msg in result.risks: print(f"RISK: {msg}") print("") if result.risks: print("Verdict: review the risks above before running.") return 1 if result.warnings: print("Verdict: runnable on the current GPU branch, but keep the warnings in mind.") return 0 print("Verdict: OK to run on the current GPU branch.") return 0 if __name__ == "__main__": raise SystemExit(main())