Advertisement
shihab2196

Untitled

Jun 11th, 2025
1,239
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 14.38 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. Author: Shihab Ahmed
  4. Created on Sun Jun  1 15:47:42 2025
  5.  
  6. Full python version of auto slurm job submitter
  7. """
  8.  
  9. import os
  10. import re
  11. import subprocess
  12. import time
  13. from datetime import datetime
  14. import concurrent.futures
  15.  
  16. def run_slurm_command(command, config):
  17.     log = config.get("logger", print)
  18.     label = command[0]
  19.     try:
  20.         result = subprocess.run(command, capture_output=True, text=True, check=True)
  21.         return result
  22.     except subprocess.CalledProcessError as e:
  23.         log(f"'{label}' access error | Exit code: {e.returncode}\n\t- {e.stderr.strip()}", level="SLURM Error")
  24.         return None
  25.  
  26.  
  27.  
  28. # 1
  29. def extract_lammps_variables(input_file_path):  
  30.     variables = {}
  31.     # only can handle literal-string
  32.     styles = "(string|equal)"  # Only handling a subset of variable styles
  33.     variable_pattern = re.compile(
  34.         rf"^\s*variable\s+(\w+)\s+{styles}(?:\s+(.*))?", re.IGNORECASE
  35.     )
  36.  
  37.     with open(input_file_path, 'r') as file:
  38.         for line in file:
  39.             line = line.split('#', 1)[0].strip()  # Remove comments
  40.             if not line:
  41.                 continue
  42.             match = variable_pattern.match(line)
  43.             if match:
  44.                 var_name = match.group(1)
  45.                 var_value = match.group(3).strip() if match.group(3) else ""
  46.                 variables[var_name] = var_value
  47.    
  48.     return variables
  49.  
  50. # 2
  51. def get_dependency_file_path(dirr, config):
  52.     job_dir_identifying_filename = config["job_dir_identifying_filename"]
  53.    
  54.     input_file_path = os.path.join(dirr, job_dir_identifying_filename)
  55.     variables = extract_lammps_variables(input_file_path)
  56.        
  57.     max_try = 20
  58.     with open(input_file_path, 'r') as file:
  59.         for line in file:
  60.             if line.strip().startswith('read_restart') or line.strip().startswith('read_data'):
  61.                 parts = line.split()
  62.                 if len(parts) > 1:
  63.                     file_location = parts[1].strip()
  64.                     pattern = r"\$\{?(\w+)\}?"
  65.                     try_count = 0
  66.                     while try_count < max_try:
  67.                         try_count+=1
  68.                         match = re.search(pattern, file_location)
  69.                         if match:
  70.                             var_name = match.group(1)
  71.                             var_value = variables.get(var_name, f"${{{var_name}}}")
  72.                             file_location = re.sub(pattern, var_value, file_location, count=1)
  73.                         else:
  74.                             break
  75.  
  76.                     if not os.path.isabs(file_location) and file_location:
  77.                         file_location = os.path.abspath(os.path.join(dirr, file_location))
  78.  
  79.                     return file_location
  80.     return None
  81.  
  82. # 3
  83. def read_job_id_from_text_file(job_dir):
  84.     job_id_path = os.path.join(job_dir, "job_id.txt")
  85.  
  86.     if os.path.isfile(job_id_path):
  87.         with open(job_id_path, 'r') as f:
  88.             return f.readline().strip()
  89.     else:
  90.         return None
  91.    
  92.    
  93. # 4
  94. def check_simulation_output_file_status(job_dir, config):
  95.     job_output_filename = config["job_output_filename"]
  96.     job_completion_keyword = config["job_completion_keyword"]
  97.  
  98.     output_filepath = os.path.join(job_dir, job_output_filename)
  99.  
  100.     if os.path.isfile(output_filepath):
  101.         with open(output_filepath, 'r') as file:
  102.             if any(job_completion_keyword in line for line in file):
  103.                 return "complete"
  104.             else:
  105.                 return "incomplete"
  106.     else:
  107.         return "missing"
  108.  
  109.  
  110. # 5
  111. def check_slurm_job_status(job_dir, config):
  112.     clusters = config["clusters"]
  113.    
  114.     job_id = read_job_id_from_text_file(job_dir)
  115.     slurm_status = None
  116.  
  117.     if job_id:
  118.         for cluster in clusters:
  119.             command = ["sacct", "-j", job_id, "-M", cluster, "--format=State", "--noheader", "--parsable2"]
  120.             result = run_slurm_command(command, config)
  121.             if result:
  122.                 output = result.stdout.strip()
  123.                 if output:
  124.                     first_line = output.splitlines()[0]
  125.                     state = first_line.split('|')[0].split()[0].replace('+', '')
  126.                     slurm_status = state
  127.                     break
  128.  
  129.     return slurm_status
  130.  
  131.  
  132. def check_simulation_status(job_dir, config):
  133.    
  134.     sim_output_file_status = check_simulation_output_file_status(job_dir, config)
  135.     slurm_status = check_slurm_job_status(job_dir, config)
  136.    
  137.     if slurm_status == "COMPLETED" and sim_output_file_status == "complete":
  138.         return "COMPLETED"
  139.     elif slurm_status == "COMPLETED" and sim_output_file_status == "incomplete":
  140.         return "SOFT_FAIL"
  141.     elif slurm_status == None and sim_output_file_status == "missing":
  142.         return "NOT_YET_SUBMITTED"
  143.     else:
  144.         return slurm_status
  145.  
  146. def update_job_dir_array(root_dir, config):
  147.     job_dir_identifying_filename = config["job_dir_identifying_filename"]
  148.     exclude_status = config["exclude_status"]
  149.  
  150.     # Collect candidate directories first
  151.     candidate_dirs = []
  152.     for dirpath, _, files in os.walk(root_dir):
  153.         if job_dir_identifying_filename in files:
  154.             candidate_dirs.append(dirpath)
  155.    
  156.     def check_and_filter(dirpath):
  157.         sim_status = check_simulation_status(dirpath, config)
  158.         if sim_status not in exclude_status:
  159.             return dirpath
  160.         return None
  161.  
  162.     job_dir_array = []
  163.     with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
  164.         for result in executor.map(check_and_filter, candidate_dirs):
  165.             if result:
  166.                 job_dir_array.append(result)
  167.  
  168.     return job_dir_array
  169.  
  170.  
  171. def get_jobs_ready_for_submission(job_dir_array, config):    
  172.     jobs_ready_for_submission_array = []
  173.     for job_dir in job_dir_array:
  174.         dependency_file_path = get_dependency_file_path(job_dir, config)
  175.         if dependency_file_path:
  176.             # Add only if the dependency file exists
  177.             if os.path.isfile(dependency_file_path):
  178.                 jobs_ready_for_submission_array.append(job_dir)
  179.         else:
  180.             # No dependency specified; consider ready for submission
  181.             jobs_ready_for_submission_array.append(job_dir)
  182.            
  183.     return jobs_ready_for_submission_array
  184.  
  185.    
  186. def submit_job(job_dir, config):  
  187.     '''
  188.        Max resources:
  189.            
  190.        "pinnacles|short            |12|6:00:00   |48",
  191.        "pinnacles|long             |3 |3-00:00:00|48",
  192.        "pinnacles|medium           |6 |1-00:00:00|48",
  193.        "pinnacles|bigmem           |2 |3-00:00:00|48",
  194.        "pinnacles|pi.amartini      |2 |3-00:00:00|48",
  195.        "merced   |compute          |3 |5-00:00:00|40",
  196.        "merced   |bigmem           |3 |5-00:00:00|24",
  197.        "pinnacles|cenvalarc.compute|3 |3-00:00:00|48",
  198.        "pinnacles|cenvalarc.bigmem |2 |3-00:00:00|48"
  199.    '''
  200.     username = config["username"]
  201.     job_submit_filename = config["job_submit_filename"]
  202.     cluster_partition_config = config["cluster_partition_config"]
  203.    
  204.     for cf in cluster_partition_config:
  205.         cluster, partition, max_jobs, time_limit, num_cores = [s.strip() for s in cf.split('|')]
  206.         max_jobs = int(max_jobs)
  207.         num_cores = int(num_cores)
  208.  
  209.         # Check current job count in queue
  210.         command = ["squeue", "-M", cluster, "--user", username, "--partition", partition, "--noheader"]
  211.         result = run_slurm_command(command, config)
  212.         njob_queue = len(result.stdout.strip().splitlines()) if result else max_jobs  # prevent submission to a failed partition
  213.  
  214.         if njob_queue < max_jobs:
  215.             # Prepare and submit job
  216.             submit_file = os.path.join(job_dir, job_submit_filename)
  217.  
  218.             current_dir = os.getcwd()
  219.             os.chdir(job_dir)
  220.  
  221.             try:
  222.                 command = ["sbatch", "-M", cluster, submit_file]
  223.                 result = run_slurm_command(command, config)
  224.                 if result and result.returncode == 0 and "Submitted batch job" in result.stdout:
  225.                     job_id = result.stdout.strip().split()[3]
  226.                     job_id_file = os.path.join(job_dir, "job_id.txt")
  227.  
  228.                     try:
  229.                         old = open(job_id_file).readlines()
  230.                     except:
  231.                         old = []
  232.  
  233.                     with open(job_id_file, 'w') as f:
  234.                         f.write(f"{job_id}\n")
  235.                         f.writelines(old)
  236.  
  237.                     return True
  238.             finally:
  239.                 os.chdir(current_dir)
  240.  
  241.     return False
  242.  
  243.  
  244. def setup_logger(log_file_base):
  245.     timestamp = time.strftime('%Y%m%d-%H%M%S')
  246.     base, ext = os.path.splitext(log_file_base)
  247.     log_file = f"{base}_{timestamp}{ext}"  # Makes: auto_submit_20250604_142100.log
  248.  
  249.     def log(message, level=None, timestamp=False, print_to_console=True):
  250.         parts = []
  251.        
  252.         if level:
  253.             parts.append(f"[{level}]")
  254.         parts.append(str(message))
  255.         if timestamp:
  256.             parts.append(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}]")
  257.  
  258.         log_line = " ".join(parts) + "\n"
  259.         with open(log_file, 'a') as f:  # Append mode
  260.             f.write(log_line)
  261.         if print_to_console:
  262.             print(log_line.strip())
  263.  
  264.     return log    
  265.  
  266. def initialize_jobid_files_for_legacy_runs(root_dir, config):
  267.     """
  268.    Create job_id.txt files for simulations from legacy runs (those without job_id.txt).
  269.    It looks for slurm-<jobid>.out files, checks submit times via sacct,
  270.    and writes the most recent job ID into job_id.txt.
  271.    """
  272.     log = config.get("logger", print)
  273.     clusters = config["clusters"]
  274.     job_id_pattern = re.compile(r"slurm-(\d+)\.out")
  275.     job_dir_identifying_filename = config["job_dir_identifying_filename"]
  276.    
  277.     start_time = time.time()
  278.    
  279.     log("initialization of job_id.txt files for legacy runs...", timestamp=True)
  280.  
  281.  
  282.     total_dirs = 0
  283.     created_count = 0
  284.     skipped_count = 0
  285.  
  286.     for dirpath, _, files in os.walk(root_dir):
  287.         if job_dir_identifying_filename not in files: continue
  288.    
  289.         total_dirs += 1
  290.         if "job_id.txt" in files:
  291.             skipped_count += 1
  292.             continue
  293.  
  294.         slurm_job_ids = [match.group(1) for f in files if (match := job_id_pattern.match(f))]
  295.  
  296.         job_submit_times = []
  297.         for job_id in slurm_job_ids:
  298.             for cluster in clusters:
  299.                 command = ["sacct", "-j", job_id, "-M", cluster, "--format=Submit", "--noheader", "--parsable2"]
  300.                 result = run_slurm_command(command, config)
  301.                 if result and result.stdout.strip():
  302.                     try:
  303.                         submit_time = datetime.strptime(result.stdout.strip().splitlines()[0], "%Y-%m-%dT%H:%M:%S")
  304.                         job_submit_times.append((job_id, submit_time))
  305.                     except ValueError:
  306.                         continue
  307.  
  308.         if job_submit_times:
  309.             latest_job_id = sorted(job_submit_times, key=lambda x: x[1], reverse=True)[0][0]
  310.             job_id_path = os.path.join(dirpath, "job_id.txt")
  311.             with open(job_id_path, 'w') as f:
  312.                 f.write(f"{latest_job_id}\n")
  313.                
  314.             relative_path = os.path.relpath(dirpath, root_dir)
  315.             log(f"\t\t- Created job_id.txt in '{relative_path}' with SLURM job ID: {latest_job_id}")
  316.             created_count += 1
  317.         else:
  318.             skipped_count += 1
  319.  
  320.     log(f"\t- Directories scanned: {total_dirs} | job_id.txt created: {created_count} | Skipped: {skipped_count}", timestamp=True)
  321.    
  322.     end_time = time.time()
  323.     extime = end_time - start_time
  324.     log(f"\t- Runtime: {extime}")
  325.  
  326.        
  327.  
  328. def main():
  329.     # inputs
  330.     root_dir = "/mnt/borgstore/amartini/sahmed73/data/REACTER/REACTER_DATA_ML/TEST_PAO-OO_TD=0.20_CVFF-class-1_Try-6"
  331.     log_filepath = os.path.join(root_dir, "auto_submit.log")
  332.     log = setup_logger(log_filepath)
  333.    
  334.    
  335.     config = {
  336.         "username": "sahmed73",
  337.         "job_dir_identifying_filename": "input.in",
  338.         "clusters": ["pinnacles", "merced"],
  339.         "exclude_status": ["COMPLETED", "RUNNING", "PENDING"],
  340.         "job_submit_filename": "submit.sh",
  341.         "job_output_filename": "output.out",
  342.         "job_completion_keyword": "Total wall time",
  343.         "logger": log,
  344.         "cluster_partition_config": [
  345.             "pinnacles|test              |1 |1:00:00   |56",
  346.             "pinnacles|short             |12|1:00:00   |56",
  347.             "pinnacles|long              |3 |1:00:00   |56",
  348.             "pinnacles|medium            |6 |1:00:00   |56",
  349.             "pinnacles|bigmem            |2 |1:00:00   |56",
  350.             "pinnacles|pi.amartini       |5 |1:00:00   |56",
  351.             "pinnacles|cenvalarc.compute |3 |1:00:00   |56",
  352.             "pinnacles|cenvalarc.bigmem  |2 |1:00:00   |56",
  353.             "merced   |compute           |6 |3:00:00   |40",
  354.             "merced   |bigmem            |6 |3:00:00   |24",
  355.         ]
  356.  
  357.     }
  358.  
  359.  
  360.     initialize_jobid_files_for_legacy_runs(root_dir, config)
  361.    
  362.     log("Auto job submission script started.", timestamp=True)
  363.     job_dir_array = update_job_dir_array(root_dir, config)
  364.     log(f"Detected {len(job_dir_array)} job directories in the root directory.")
  365.     log("="*70)
  366.     log("")
  367.  
  368.    
  369.     submission_loop_count = 0
  370.     while job_dir_array:
  371.         start_time = time.time()
  372.         submission_loop_count += 1
  373.         log(f"\nSubmission loop: {submission_loop_count}", timestamp=True)
  374.        
  375.         jobs_ready_for_submission_array = get_jobs_ready_for_submission(job_dir_array, config)
  376.         log(f"\t- {len(jobs_ready_for_submission_array)} jobs ready for submission.")
  377.        
  378.         job_submitted_count = 0
  379.    
  380.         for job_dir in jobs_ready_for_submission_array:
  381.             submitted = submit_job(job_dir, config)
  382.             if submitted:
  383.                 job_submitted_count += 1
  384.                 if job_dir in job_dir_array:
  385.                     job_dir_array.remove(job_dir)
  386.                 time.sleep(1)  # wait to avoid submission overlap
  387.    
  388.         runtime = time.time() - start_time
  389.         log(f"\t- {job_submitted_count}/{len(jobs_ready_for_submission_array)} jobs submitted successfully")
  390.         log(f"\t- Runtime: {runtime:.2f} seconds")
  391.         log("\t- Waiting 60 seconds before next loop...")
  392.         time.sleep(60)
  393.  
  394.  
  395.    
  396. if __name__ == "__main__":
  397.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement