################################################################## ## ## This file defines the commands used to build and run AMSS-NCKU ## Author: Xiaoqu ## 2025/01/24 ## ################################################################## import AMSS_NCKU_Input as input_data import os import shutil import subprocess import time def get_last_n_cores_per_socket(n=32): """ Read CPU topology via lscpu and return a taskset -c string selecting the last `n` cores of each NUMA node (socket). Example: 2 sockets x 56 cores each, n=32 -> node0: 24-55, node1: 80-111 -> "taskset -c 24-55,80-111" """ result = subprocess.run(["lscpu", "--parse=NODE,CPU"], capture_output=True, text=True) # Build a dict: node_id -> sorted list of CPU ids node_cpus = {} for line in result.stdout.splitlines(): if line.startswith("#") or not line.strip(): continue parts = line.split(",") if len(parts) < 2: continue node_id, cpu_id = int(parts[0]), int(parts[1]) node_cpus.setdefault(node_id, []).append(cpu_id) segments = [] for node_id in sorted(node_cpus): cpus = sorted(node_cpus[node_id]) selected = cpus[-n:] # last n cores of this socket segments.append(f"{selected[0]}-{selected[-1]}") cpu_str = ",".join(segments) total = len(segments) * n print(f" CPU binding: taskset -c {cpu_str} ({total} cores, last {n} per socket)") #return f"taskset -c {cpu_str}" return f"" ## CPU core binding: dynamically select the last 32 cores of each socket (64 cores total) NUMACTL_CPU_BIND = get_last_n_cores_per_socket(n=32) ## Build parallelism: match the number of bound cores BUILD_JOBS = 64 ################################################################## def _truthy(value, default=False): if value is None: return default if isinstance(value, bool): return value text = str(value).strip().lower() if text == "": return default return text in ("1", "yes", "y", "true", "on", "enable", "enabled") def _input_or_env(input_name, env_name, default=None): if env_name in os.environ: return os.environ[env_name] return getattr(input_data, input_name, default) def _input_env_passthrough(runtime_env, env_name): if env_name in runtime_env: return if hasattr(input_data, env_name): runtime_env[env_name] = str(getattr(input_data, env_name)) def _start_cuda_mps_if_requested(runtime_env): if input_data.GPU_Calculation != "yes": return False default_auto_mps = int(getattr(input_data, "MPI_processes", 1)) > 1 auto_mps = _truthy( _input_or_env("CUDA_Auto_MPS", "AMSS_CUDA_AUTO_MPS", default_auto_mps), default=default_auto_mps, ) if not auto_mps: return False mps_control = shutil.which("nvidia-cuda-mps-control") if not mps_control: print(" CUDA MPS control command was not found; running without MPS.") return False uid = os.getuid() pipe_dir = str(_input_or_env("CUDA_MPS_PIPE_DIRECTORY", "CUDA_MPS_PIPE_DIRECTORY", f"/tmp/amss-ncku-mps-{uid}")) log_dir = str(_input_or_env("CUDA_MPS_LOG_DIRECTORY", "CUDA_MPS_LOG_DIRECTORY", f"/tmp/amss-ncku-mps-log-{uid}")) os.makedirs(pipe_dir, exist_ok=True) os.makedirs(log_dir, exist_ok=True) mps_env = runtime_env.copy() mps_env["CUDA_MPS_PIPE_DIRECTORY"] = pipe_dir mps_env["CUDA_MPS_LOG_DIRECTORY"] = log_dir if os.path.exists(os.path.join(pipe_dir, "control")): runtime_env.update({ "CUDA_MPS_PIPE_DIRECTORY": pipe_dir, "CUDA_MPS_LOG_DIRECTORY": log_dir, }) print(f" Reusing CUDA MPS daemon: {pipe_dir}") return False print(f" Starting CUDA MPS daemon for this run: {pipe_dir}") result = subprocess.run([mps_control, "-d"], env=mps_env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: print(" CUDA MPS daemon did not start; running without MPS.") if result.stdout: print(result.stdout, end="") return False runtime_env.update({ "CUDA_MPS_PIPE_DIRECTORY": pipe_dir, "CUDA_MPS_LOG_DIRECTORY": log_dir, }) return True def _stop_cuda_mps(runtime_env): mps_control = shutil.which("nvidia-cuda-mps-control") if not mps_control: return subprocess.run([mps_control], input="quit\n", env=runtime_env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def _gpu_runtime_env(): runtime_env = os.environ.copy() finite_difference = str(getattr(input_data, "Finite_Diffenence_Method", "4th-order")).strip() defaults = { "AMSS_EVOLVE_TIMING": "1", "AMSS_ESCALAR_STEP_TIMING": "0", "AMSS_INTERP_FAST": "1", "AMSS_INTERP_GPU": "1", "AMSS_ANALYSIS_MAP_EVERY": "1000000", "AMSS_CUDA_AWARE_MPI": "1", "AMSS_CUDA_KEEP_RESIDENT_AFTER_STEP": "1", "AMSS_CUDA_KEEP_ALL_LEVELS": "1", "AMSS_CUDA_ESCALAR_KEEP_RESIDENT_AFTER_STEP": "1", "AMSS_CUDA_ESCALAR_KEEP_ALL_LEVELS": "1", "AMSS_CUDA_EM_CACHE_SOURCES": "1", "AMSS_CUDA_EM_ZERO_FASTPATH": "1", "AMSS_EM_ZERO_ANALYSIS_FASTPATH": "1", "AMSS_EM_ZERO_RESIDENT_DOWNLOAD_FASTPATH": "1", "AMSS_CUDA_AMR_HOST_STAGED": "1", "AMSS_CUDA_AMR_RESTRICT_DEVICE": "0", "AMSS_CUDA_AMR_RESTRICT_BATCH": "0", "AMSS_CUDA_DEVICE_SEGMENT_BATCH": "0", "AMSS_CUDA_UNCACHED_DEVICE_BUFFERS": "1", "AMSS_SHELL_FAST_INTERP": "0", "AMSS_SHELL_PARALLEL_INTERP": "0", "AMSS_SHELL_CUDA_INTERP": "0", } if finite_difference in ("2nd-order", "8th-order"): defaults.update({ "AMSS_INTERP_FAST": "0", "AMSS_INTERP_GPU": "0", "AMSS_CUDA_AWARE_MPI": "0", }) if finite_difference == "8th-order" and getattr(input_data, "Equation_Class", "") == "BSSN-EM": defaults.update({ "AMSS_CUDA_AMR_RESTRICT_DEVICE": "1", "AMSS_CUDA_AMR_RESTRICT_BATCH": "1", "AMSS_CUDA_DEVICE_SEGMENT_BATCH": "1", }) if getattr(input_data, "basic_grid_set", "") == "Shell-Patch": defaults.update({ "AMSS_CUDA_AWARE_MPI": "0", "AMSS_SHELL_FAST_INTERP": "1", "AMSS_SHELL_PARALLEL_INTERP": "1", "AMSS_SHELL_INTERP_THREADS": "16", }) if getattr(input_data, "Equation_Class", "") in ("BSSN", "BSSN-EScalar", "Z4C"): defaults["AMSS_CUDA_AMR_RESTRICT_DEVICE"] = "1" if getattr(input_data, "Equation_Class", "") == "Z4C": defaults.update({ "AMSS_Z4C_CUDA_RESIDENT": "1", "AMSS_CONSTRAINT_OUT_EVERY": "1000000", }) for key, value in defaults.items(): runtime_env.setdefault(key, value) passthrough_envs = [ "AMSS_CUDA_RESIDENT_SYNC", "AMSS_CUDA_BSSN_RESIDENT_SYNC", "AMSS_CUDA_EM_RESIDENT_SYNC", "AMSS_CUDA_ESCALAR_RESIDENT_SYNC", "AMSS_CUDA_BH_INTERP_RESIDENT", "AMSS_CUDA_KEEP_RESIDENT_AFTER_STEP", "AMSS_CUDA_KEEP_ALL_LEVELS", "AMSS_CUDA_EM_KEEP_RESIDENT_AFTER_STEP", "AMSS_CUDA_EM_KEEP_ALL_LEVELS", "AMSS_CUDA_ESCALAR_KEEP_RESIDENT_AFTER_STEP", "AMSS_CUDA_ESCALAR_KEEP_ALL_LEVELS", "AMSS_CUDA_AMR_HOST_STAGED", "AMSS_CUDA_AMR_RESTRICT_DEVICE", "AMSS_CUDA_AMR_RESTRICT_BATCH", "AMSS_CUDA_DEVICE_SEGMENT_BATCH", "AMSS_CUDA_UNCACHED_DEVICE_BUFFERS", "AMSS_CUDA_EM_CACHE_SOURCES", "AMSS_CUDA_EM_ZERO_FASTPATH", "AMSS_CUDA_AWARE_MPI", "AMSS_CUDA_REGRID_FLUSH_ALWAYS", "AMSS_Z4C_CUDA_RESIDENT", "AMSS_SHELL_FAST_INTERP", "AMSS_SHELL_PARALLEL_INTERP", "AMSS_SHELL_CUDA_INTERP", "AMSS_SHELL_INTERP_THREADS", "AMSS_EM_ZERO_ANALYSIS_FASTPATH", "AMSS_EM_ZERO_RESIDENT_DOWNLOAD_FASTPATH", "AMSS_INTERP_FAST", "AMSS_INTERP_GPU", ] for env_name in passthrough_envs: _input_env_passthrough(runtime_env, env_name) optional_overrides = { "AMSS_INTERP_FAST_COMPARE": "AMSS_Interp_Fast_Compare", "AMSS_INTERP_FAST_COMPARE_LIMIT": "AMSS_Interp_Fast_Compare_Limit", "AMSS_INTERP_FAST_COMPARE_TOL": "AMSS_Interp_Fast_Compare_Tol", "AMSS_GPU_STAGE_TIMING": "AMSS_GPU_Stage_Timing", "AMSS_GPU_STAGE_TIMING_EVERY": "AMSS_GPU_Stage_Timing_Every", } for env_name, input_name in optional_overrides.items(): if env_name not in runtime_env and hasattr(input_data, input_name): runtime_env[env_name] = str(getattr(input_data, input_name)) return runtime_env ################################################################## ################################################################## ## Compile the AMSS-NCKU main program ABE def makefile_ABE(): print( ) print( " Compiling the AMSS-NCKU executable file ABE/ABEGPU " ) print( ) z4c_mrbd = int(getattr(input_data, "AMSS_Z4C_MRBD", 0)) ## Build command with CPU binding to nohz_full cores if (input_data.GPU_Calculation == "no"): makefile_command = f"{NUMACTL_CPU_BIND} env AMSS_Z4C_MRBD={z4c_mrbd} make -j{BUILD_JOBS} INTERP_LB_MODE=off USE_CUDA_BSSN=0 USE_CUDA_Z4C=0 ABE" elif (input_data.GPU_Calculation == "yes"): makefile_command = f"{NUMACTL_CPU_BIND} env AMSS_Z4C_MRBD={z4c_mrbd} make -j{BUILD_JOBS} INTERP_LB_MODE=off USE_CUDA_BSSN=1 USE_CUDA_Z4C=1 ABE_CUDA" else: print( " CPU/GPU numerical calculation setting is wrong " ) print( ) ## Execute the command with subprocess.Popen and stream output makefile_process = subprocess.Popen(makefile_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) ## Read and print output lines as they arrive for line in makefile_process.stdout: print(line, end='') # stream output in real time ## Wait for the process to finish makefile_return_code = makefile_process.wait() if makefile_return_code != 0: raise subprocess.CalledProcessError(makefile_return_code, makefile_command) print( ) print( " Compilation of the AMSS-NCKU executable file ABE is finished " ) print( ) return ################################################################## ################################################################## ## Compile the AMSS-NCKU TwoPuncture program TwoPunctureABE def makefile_TwoPunctureABE(): print( ) print( " Compiling the AMSS-NCKU executable file TwoPunctureABE " ) print( ) ## Build command with CPU binding to nohz_full cores makefile_command = f"{NUMACTL_CPU_BIND} make -j{BUILD_JOBS} TwoPunctureABE" ## Execute the command with subprocess.Popen and stream output makefile_process = subprocess.Popen(makefile_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) ## Read and print output lines as they arrive for line in makefile_process.stdout: print(line, end='') # stream output in real time ## Wait for the process to finish makefile_return_code = makefile_process.wait() if makefile_return_code != 0: raise subprocess.CalledProcessError(makefile_return_code, makefile_command) print( ) print( " Compilation of the AMSS-NCKU executable file TwoPunctureABE is finished " ) print( ) return ################################################################## ################################################################## ## Run the AMSS-NCKU main program ABE def run_ABE(): print( ) print( " Running the AMSS-NCKU executable file ABE/ABEGPU " ) print( ) ## Define the command to run; cast other values to strings as needed mpi_env = None started_mps = False mpi_processes = int(input_data.MPI_processes) if (input_data.GPU_Calculation == "yes" and getattr(input_data, "Equation_Class", "") == "Z4C"): z4c_env_np = os.environ.get("AMSS_Z4C_GPU_MPI_PROCESSES") if z4c_env_np and int(z4c_env_np) > 0: mpi_processes = int(z4c_env_np) elif mpi_processes < 4: mpi_processes = 4 if (input_data.GPU_Calculation == "yes" and getattr(input_data, "basic_grid_set", "") == "Shell-Patch"): shell_env_np = os.environ.get("AMSS_SHELL_GPU_MPI_PROCESSES") if shell_env_np and int(shell_env_np) > 0: mpi_processes = int(shell_env_np) elif mpi_processes < 4: mpi_processes = 4 if (input_data.GPU_Calculation == "no"): mpi_command = NUMACTL_CPU_BIND + " mpirun -np " + str(mpi_processes) + " ./ABE" #mpi_command = " mpirun -np " + str(input_data.MPI_processes) + " ./ABE" mpi_command_outfile = "ABE_out.log" elif (input_data.GPU_Calculation == "yes"): mpi_command = NUMACTL_CPU_BIND + " I_MPI_OFFLOAD=1 I_MPI_OFFLOAD_IPC=0 mpirun -np " + str(mpi_processes) + " ./ABE_CUDA" mpi_command_outfile = "ABEGPU_out.log" mpi_env = _gpu_runtime_env() started_mps = _start_cuda_mps_if_requested(mpi_env) print(" GPU optimized runtime switches:") print(f" MPI processes={mpi_processes}") print(f" AMSS_INTERP_FAST={mpi_env.get('AMSS_INTERP_FAST', '')}") print(f" AMSS_INTERP_GPU={mpi_env.get('AMSS_INTERP_GPU', '')}") print(f" AMSS_ANALYSIS_MAP_EVERY={mpi_env.get('AMSS_ANALYSIS_MAP_EVERY', '')}") print(f" AMSS_EVOLVE_TIMING={mpi_env.get('AMSS_EVOLVE_TIMING', '')}") print(f" AMSS_ESCALAR_STEP_TIMING={mpi_env.get('AMSS_ESCALAR_STEP_TIMING', '')}") print(f" AMSS_CUDA_AWARE_MPI={mpi_env.get('AMSS_CUDA_AWARE_MPI', '')}") print(f" AMSS_CUDA_KEEP_RESIDENT_AFTER_STEP={mpi_env.get('AMSS_CUDA_KEEP_RESIDENT_AFTER_STEP', '')}") print(f" AMSS_CUDA_KEEP_ALL_LEVELS={mpi_env.get('AMSS_CUDA_KEEP_ALL_LEVELS', '')}") print(f" AMSS_CUDA_ESCALAR_KEEP_RESIDENT_AFTER_STEP={mpi_env.get('AMSS_CUDA_ESCALAR_KEEP_RESIDENT_AFTER_STEP', '')}") print(f" AMSS_CUDA_ESCALAR_KEEP_ALL_LEVELS={mpi_env.get('AMSS_CUDA_ESCALAR_KEEP_ALL_LEVELS', '')}") print(f" AMSS_CUDA_EM_CACHE_SOURCES={mpi_env.get('AMSS_CUDA_EM_CACHE_SOURCES', '')}") print(f" AMSS_CUDA_EM_ZERO_FASTPATH={mpi_env.get('AMSS_CUDA_EM_ZERO_FASTPATH', '')}") print(f" AMSS_EM_ZERO_ANALYSIS_FASTPATH={mpi_env.get('AMSS_EM_ZERO_ANALYSIS_FASTPATH', '')}") print(f" AMSS_EM_ZERO_RESIDENT_DOWNLOAD_FASTPATH={mpi_env.get('AMSS_EM_ZERO_RESIDENT_DOWNLOAD_FASTPATH', '')}") print(f" AMSS_CUDA_AMR_HOST_STAGED={mpi_env.get('AMSS_CUDA_AMR_HOST_STAGED', '')}") print(f" AMSS_CUDA_AMR_RESTRICT_DEVICE={mpi_env.get('AMSS_CUDA_AMR_RESTRICT_DEVICE', '')}") print(f" AMSS_CUDA_AMR_RESTRICT_BATCH={mpi_env.get('AMSS_CUDA_AMR_RESTRICT_BATCH', '')}") print(f" AMSS_CUDA_DEVICE_SEGMENT_BATCH={mpi_env.get('AMSS_CUDA_DEVICE_SEGMENT_BATCH', '')}") print(f" AMSS_CUDA_UNCACHED_DEVICE_BUFFERS={mpi_env.get('AMSS_CUDA_UNCACHED_DEVICE_BUFFERS', '')}") print(f" AMSS_SHELL_FAST_INTERP={mpi_env.get('AMSS_SHELL_FAST_INTERP', '')}") print(f" AMSS_SHELL_PARALLEL_INTERP={mpi_env.get('AMSS_SHELL_PARALLEL_INTERP', '')}") print(f" AMSS_SHELL_CUDA_INTERP={mpi_env.get('AMSS_SHELL_CUDA_INTERP', '')}") print(f" AMSS_SHELL_INTERP_THREADS={mpi_env.get('AMSS_SHELL_INTERP_THREADS', '')}") print(f" AMSS_Z4C_CUDA_RESIDENT={mpi_env.get('AMSS_Z4C_CUDA_RESIDENT', '')}") print(f" AMSS_CONSTRAINT_OUT_EVERY={mpi_env.get('AMSS_CONSTRAINT_OUT_EVERY', '')}") if "CUDA_MPS_PIPE_DIRECTORY" in mpi_env: print(f" CUDA_MPS_PIPE_DIRECTORY={mpi_env['CUDA_MPS_PIPE_DIRECTORY']}") try: ## Execute the MPI command and stream output mpi_process = subprocess.Popen(mpi_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=mpi_env) ## Write ABE run output to file while printing to stdout with open(mpi_command_outfile, 'w') as file0: ## Read and print output lines; also write each line to file for line in mpi_process.stdout: print(line, end='') # stream output in real time file0.write(line) # write the line to file file0.flush() # flush to ensure each line is written immediately (optional) ## Wait for the process to finish mpi_return_code = mpi_process.wait() if mpi_return_code != 0: raise subprocess.CalledProcessError(mpi_return_code, mpi_command) finally: if started_mps: _stop_cuda_mps(mpi_env) print( ) print( " The ABE/ABEGPU simulation is finished " ) print( ) return ################################################################## ################################################################## ## Run the AMSS-NCKU TwoPuncture program TwoPunctureABE def run_TwoPunctureABE(): tp_time1=time.time() print( ) print( " Running the AMSS-NCKU executable file TwoPunctureABE " ) print( ) ## Define the command to run #TwoPuncture_command = NUMACTL_CPU_BIND + " ./TwoPunctureABE" TwoPuncture_command = " ./TwoPunctureABE" TwoPuncture_command_outfile = "TwoPunctureABE_out.log" ## Execute the command with subprocess.Popen and stream output TwoPuncture_process = subprocess.Popen(TwoPuncture_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) ## Write TwoPunctureABE run output to file while printing to stdout with open(TwoPuncture_command_outfile, 'w') as file0: ## Read and print output lines; also write each line to file for line in TwoPuncture_process.stdout: print(line, end='') # stream output in real time file0.write(line) # write the line to file file0.flush() # flush to ensure each line is written immediately (optional) file0.close() ## Wait for the process to finish TwoPuncture_command_return_code = TwoPuncture_process.wait() print( ) print( " The TwoPunctureABE simulation is finished " ) print( ) tp_time2=time.time() et=tp_time2-tp_time1 print(f"Used time: {et}") return ##################################################################