import inspect
import logging
from abc import abstractmethod
from typing import Optional
from chemsmart.settings.executable import (
GaussianExecutable,
NCIPLOTExecutable,
ORCAExecutable,
)
from chemsmart.settings.user import ChemsmartUserSettings
from chemsmart.utils.mixins import RegistryMixin
user_settings = ChemsmartUserSettings()
logger = logging.getLogger(__name__)
[docs]
class RunScript:
"""
Script generator for computational job execution.
Creates Python scripts that handle job execution with proper environment
setup and command line argument passing. Manages the execution context
for computational chemistry jobs.
Attributes:
filename (str): Path to the output script file.
batch (bool): Whether this is a batch job execution.
cli_args: Command line arguments to pass to the job.
"""
def __init__(self, filename, cli_args, batch=False):
"""
Initialize the run script generator.
Args:
filename (str): Path where the script will be written.
cli_args: Command line arguments for job execution.
batch (bool): Whether this is a batch job. Defaults to False.
"""
self.filename = filename
self.batch = batch
self.cli_args = cli_args
[docs]
def write(self):
"""
Write the run script to the specified file.
Creates a Python script file that can be executed to run the
computational job with the specified arguments.
"""
with open(self.filename, "w") as f:
self._write(f)
def _write(self, f):
"""
Write the script contents to the file handle.
Generates a Python script with proper environment setup and
job execution commands.
Args:
f: File handle to write the script contents to.
"""
contents = f"""\
#!/usr/bin/env python
import os
os.environ['OMP_NUM_THREADS'] = '1'
from chemsmart.cli.run import run
def run_job():
run({self.cli_args!r})
if __name__ == '__main__':
run_job()
"""
# Needed to remove leading whitespace in the docstring
contents = inspect.cleandoc(contents)
logger.debug(f"{self.cli_args!r}")
f.write(contents)
[docs]
class Submitter(RegistryMixin):
"""
Abstract base class for job submission systems.
Provides the foundation for scheduler-specific job submitters that handle
the creation and submission of computational chemistry jobs to various
cluster management systems.
Attributes:
NAME (str): Class-level identifier for the submitter type.
name (str): Instance identifier for
this submitter (often same as NAME).
job (Job): Job instance to be submitted.
server (Server): Server configuration used for submission.
kwargs (dict): Additional submission
parameters passed through to subclasses.
"""
NAME: Optional[str] = None
def __init__(self, name, job, server, **kwargs):
"""
Initialize the job submitter.
Args:
name (str): Name identifier for this submitter instance.
job: Job instance to be submitted.
server: Server configuration for submission.
**kwargs: Additional submission parameters.
"""
self.name = name
self.job = job
self.server = server
self.kwargs = kwargs
def __str__(self):
"""
String representation of the submitter.
Returns:
str: Human-readable submitter description.
"""
return f"Submitter: {self.name}"
def __eq__(self, other):
"""
Check equality based on submitter name.
Args:
other (Submitter): Another submitter instance to compare.
Returns:
bool: True if submitter names are equal.
"""
return self.name == other.name
def __hash__(self):
"""
Generate hash based on submitter name.
Returns:
int: Hash value for submitter name.
"""
return hash(self.name)
def __repr__(self):
"""
Developer representation of the submitter.
Returns:
str: Detailed submitter representation for debugging.
"""
return f"Submitter(name={self.name})"
def __call__(self):
"""
Create a submitter instance based on the name.
Returns:
Submitter: Configured submitter instance.
Raises:
ValueError: If no submitter is defined for the specified name.
"""
submitter_cls = [
s for s in Submitter.subclasses() if self.name == s.NAME
]
if len(submitter_cls) == 0:
raise ValueError(
f"No submitter of defined name: {self.name}.\n"
f"Available submitters: {Submitter.subclasses()}"
)
assert len(submitter_cls) == 1
submitter_cls = submitter_cls[0]
return submitter_cls(**self.kwargs)
[docs]
@classmethod
def from_dict(cls, d):
"""
Create a submitter instance from a dictionary.
Args:
d (dict): Dictionary containing submitter configuration.
Returns:
Submitter: Configured submitter instance.
"""
return cls(**d)
@property
def submit_folder(self):
"""
Get the submission folder for the job.
Returns:
str: Path to the job submission folder.
"""
return self.job.folder
@property
def submit_script(self):
"""
Get the submission script filename.
Returns:
str: Filename for the job submission script.
"""
if self.job.label is not None:
return f"chemsmart_sub_{self.job.label}.sh"
return "chemsmart_sub.sh"
@property
def run_script(self):
"""
Get the run script filename.
Returns:
str: Filename for the job execution script.
"""
if self.job.label is not None:
return f"chemsmart_run_{self.job.label}.py"
return "chemsmart_run.py"
@property
def executable(self):
"""
Get the executable configuration for the job's program.
Returns:
Executable: Instance of the appropriate executable handler
(GaussianExecutable, ORCAExecutable, or NCIPLOTExecutable)
based on `job.PROGRAM`.
Raises:
ValueError: If the job's program is not supported.
"""
if self.job.PROGRAM.lower() == "gaussian":
executable = GaussianExecutable.from_servername(self.server.name)
elif self.job.PROGRAM.lower() == "orca":
executable = ORCAExecutable.from_servername(self.server.name)
elif self.job.PROGRAM.lower() == "nciplot":
executable = NCIPLOTExecutable.from_servername(self.server.name)
else:
# Need to add programs here to be
# supported for other types of programs
raise ValueError(f"Program {self.job.PROGRAM} not supported.")
return executable
[docs]
def write(self, cli_args):
"""
Write the submission and run scripts for the job.
Creates both the scheduler-specific submission script and the Python
run script that will execute the computational job.
Args:
cli_args: Command line arguments for job execution.
"""
if self.job.is_complete():
logger.warning("Submitting an already complete job.")
self._write_runscript(cli_args)
self._write_submitscript()
def _write_runscript(self, cli_args):
"""
Write the Python run script for job execution.
Creates a Python script that handles the actual job execution
with proper environment setup and argument passing.
Args:
cli_args: Command line arguments for the job.
"""
runscript = RunScript(self.run_script, cli_args)
logger.debug(f"Writing run script to: {runscript.filename}")
runscript.write()
def _write_submitscript(self):
"""
Write the scheduler submission script.
Creates a shell script with scheduler directives and job execution
commands appropriate for the target cluster management system.
"""
with open(self.submit_script, "w") as f:
logger.debug(f"Writing submission script to: {self.submit_script}")
self._write_bash_header(f)
self._write_scheduler_options(f)
self._write_program_specifics(f)
self._write_extra_commands(f)
self._write_change_to_job_directory(f)
self._write_job_command(f)
@staticmethod
def _write_bash_header(f):
"""
Write the bash shebang header to the script.
Args:
f: File handle for writing the script.
"""
f.write("#!/bin/bash\n\n")
@abstractmethod
def _write_scheduler_options(self, f):
"""
Write scheduler-specific options to the submission script.
This method must be implemented by subclasses to provide
scheduler-specific directives and resource requests.
Args:
f: File handle for writing scheduler options.
Raises:
NotImplementedError: If not implemented by subclass.
"""
raise NotImplementedError
def _write_program_specifics(self, f):
"""
Write program-specific environment setup to the script.
Includes conda environment activation, module loading, script
sourcing, and environment variable configuration specific to
the computational program being used.
Args:
f: File handle for writing program-specific setup.
"""
self._write_program_specific_conda_env(f)
self._write_load_program_specific_modules(f)
self._write_source_program_specific_script(f)
self._write_program_specific_environment_variables(f)
def _write_program_specific_conda_env(self, f):
"""
Write conda environment activation commands.
Different computational programs may require different conda
environments for proper execution. This method writes the
necessary activation commands.
Args:
f: File handle for writing conda environment setup.
"""
if self.executable.conda_env is not None:
logger.debug(
f"Writing conda environment: {self.executable.conda_env}"
)
f.write("# conda environment\n")
for line in self.executable.conda_env:
f.write(line)
f.write("\n")
def _write_load_program_specific_modules(self, f):
"""
Write module loading commands for program dependencies.
Different computational programs may require loading different
environment modules for proper execution.
Args:
f: File handle for writing module loading commands.
"""
if self.executable.modules is not None:
logger.debug(f"Writing modules: {self.executable.modules}")
f.write("# modules\n")
for line in self.executable.modules:
f.write(line)
f.write("\n")
def _write_source_program_specific_script(self, f):
"""
Write script sourcing commands for program setup.
Different computational programs may require sourcing specific
setup scripts for proper environment configuration.
Args:
f: File handle for writing script sourcing commands.
"""
if self.executable.scripts is not None:
logger.debug(f"Writing scripts: {self.executable.scripts}")
f.write("# program specific scripts\n")
for line in self.executable.scripts:
f.write(line)
f.write("\n")
def _write_extra_commands(self, f):
"""
Write additional server-specific commands.
Extra commands that may be required for the job execution.
These commands are needed for all jobs across all programs
and are specific to the server configuration.
Args:
f: File handle for writing extra commands.
"""
if self.server.extra_commands is not None:
for line in self.server.extra_commands:
f.write(line)
f.write("\n")
def _write_program_specific_environment_variables(self, f):
"""
Write program-specific environment variables.
Different computational programs may require different environment
variables for proper execution. May need to configure different
scratch folders for different programs (e.g., Gaussian vs ORCA).
Args:
f: File handle for writing environment variable exports.
"""
if self.executable.envars is not None:
f.write("# Writing program specific environment variables\n")
for key, value in self.executable.env.items():
f.write(f"export {key}={value}\n")
f.write("\n")
@abstractmethod
def _write_change_to_job_directory(self, f):
"""
Write scheduler-specific directory change command.
Each scheduler system has different environment variables
for the job submission directory. This method must be
implemented by subclasses.
Args:
f: File handle for writing directory change command.
Raises:
NotImplementedError: If not implemented by subclass.
"""
raise NotImplementedError
def _write_job_command(self, f):
"""
Write the final job execution commands.
Makes the run script executable and executes it in the background,
then waits for completion.
Args:
f: File handle for writing job execution commands.
"""
f.write(f"chmod +x ./{self.run_script}\n")
f.write(f"./{self.run_script} &\n")
f.write("wait\n")
[docs]
@classmethod
def from_scheduler_type(cls, scheduler_type, **kwargs):
"""
Create a submitter instance for the specified scheduler type.
Factory method that finds and instantiates the appropriate
submitter subclass based on the scheduler type name.
Args:
scheduler_type (str): Name of the scheduler system
(e.g., "PBS", "SLURM", "SLF", "FUGAKU").
**kwargs: Additional arguments passed to the submitter constructor.
Returns:
Submitter: Instance of the appropriate submitter subclass
(one of PBSSubmitter, SLURMSubmitter, SLFSubmitter, or
FUGAKUSubmitter) configured with the provided kwargs
(e.g., job and server).
Raises:
ValueError: If no submitter is found
for the specified scheduler type.
"""
submitters = cls.subclasses()
for submitter in submitters:
if submitter.NAME == scheduler_type:
return submitter(**kwargs)
raise ValueError(
f"Could not find any submitters for scheduler type: {scheduler_type}."
)
[docs]
class PBSSubmitter(Submitter):
"""
PBS (Portable Batch System) job submitter.
Handles job submission to PBS/Torque cluster management systems.
Creates PBS-specific submission scripts with appropriate resource
requests and scheduler directives.
Attributes:
NAME (str): Identifier for PBS scheduler type ('PBS').
name (str): Inherited; instance identifier (often 'PBS').
job (Job): Job instance to be submitted.
server (Server): Server configuration used for submission.
kwargs (dict): Additional submission
parameters passed to the base class.
"""
NAME = "PBS"
def __init__(self, name="PBS", job=None, server=None, **kwargs):
"""
Initialize PBS submitter.
Args:
name (str): Name identifier for this submitter. Defaults to "PBS".
job: Job instance to be submitted.
server: Server configuration for PBS submission.
**kwargs: Additional submission parameters.
"""
super().__init__(name=name, job=job, server=server, **kwargs)
def _write_scheduler_options(self, f):
"""
Write PBS-specific scheduler directives.
Writes PBS directives for output files, resource requests,
queue selection, walltime, and user notification settings.
Args:
f: File handle for writing PBS directives.
"""
f.write(f"#PBS -o {self.job.label}.pbsout\n")
f.write(f"#PBS -e {self.job.label}.pbserr\n")
if self.server.num_gpus > 0:
f.write(f"#PBS -l gpus={self.server.num_gpus}\n")
f.write(
f"#PBS -l select=1:ncpus={self.server.num_cores}:"
f"mpiprocs={self.server.num_cores}:mem={self.server.mem_gb}G\n"
)
# using only one node here
if self.server.queue_name:
f.write(f"#PBS -q {self.server.queue_name}\n")
if self.server.num_hours:
f.write(f"#PBS -l walltime={self.server.num_hours}:00:00\n")
if user_settings is not None:
if user_settings.data.get("PROJECT"):
f.write(f"#PBS -P {user_settings.data['PROJECT']}\n")
if user_settings.data.get("EMAIL"):
f.write(f"#PBS -M {user_settings.data['EMAIL']}\n")
f.write("#PBS -m abe\n")
f.write("\n")
f.write("\n")
def _write_change_to_job_directory(self, f):
"""
Write PBS-specific directory change command.
Uses PBS_O_WORKDIR environment variable to change to the
job submission directory.
Args:
f: File handle for writing directory change command.
"""
f.write("cd $PBS_O_WORKDIR\n\n")
[docs]
class SLURMSubmitter(Submitter):
"""
SLURM (Simple Linux Utility for Resource Management) job submitter.
Handles job submission to SLURM cluster management systems.
Creates SLURM-specific submission scripts with appropriate resource
requests and scheduler directives.
Attributes:
NAME (str): Identifier for SLURM scheduler type ('SLURM').
name (str): Inherited; instance identifier (often 'SLURM').
job (Job): Job instance to be submitted.
server (Server): Server configuration used for submission.
kwargs (dict): Additional submission
parameters passed to the base class.
"""
NAME = "SLURM"
def __init__(self, name="SLURM", job=None, server=None, **kwargs):
"""
Initialize SLURM submitter.
Args:
name (str): Name identifier for
this submitter. Defaults to "SLURM".
job: Job instance to be submitted.
server: Server configuration for SLURM submission.
**kwargs: Additional submission parameters.
"""
super().__init__(name=name, job=job, server=server, **kwargs)
def _write_scheduler_options(self, f):
"""
Write SLURM-specific scheduler directives.
Writes SLURM directives for job name, output files, resource
requests, partition selection, time limits, and user notifications.
Args:
f: File handle for writing SLURM directives.
"""
f.write(f"#SBATCH --job-name={self.job.label}\n")
f.write(f"#SBATCH --output={self.job.label}.slurmout\n")
f.write(f"#SBATCH --error={self.job.label}.slurmerr\n")
if self.server.num_gpus:
f.write(f"#SBATCH --gres=gpu:{self.server.num_gpus}\n")
f.write(
f"#SBATCH --nodes=1 --ntasks-per-node={self.server.num_cores} --mem={self.server.mem_gb}G\n"
)
if self.server.queue_name:
f.write(f"#SBATCH --partition={self.server.queue_name}\n")
if self.server.num_hours:
f.write(f"#SBATCH --time={self.server.num_hours}:00:00\n")
if user_settings is not None:
if user_settings.data.get("PROJECT"):
f.write(f"#SBATCH --account={user_settings.data['PROJECT']}\n")
if user_settings.data.get("EMAIL"):
f.write(f"#SBATCH --mail-user={user_settings.data['EMAIL']}\n")
f.write("#SBATCH --mail-type=END,FAIL\n")
f.write("\n")
f.write("\n")
def _write_change_to_job_directory(self, f):
"""
Write SLURM-specific directory change command.
Uses SLURM_SUBMIT_DIR environment variable to change to the
job submission directory.
Args:
f: File handle for writing directory change command.
"""
f.write("cd $SLURM_SUBMIT_DIR\n\n")
[docs]
class SLFSubmitter(Submitter):
"""
LSF (Load Sharing Facility) job submitter.
Handles job submission to IBM LSF cluster management systems.
Creates LSF-specific submission scripts with appropriate resource
requests and scheduler directives.
Note: The class name 'SLFSubmitter' appears to be a typo for 'LSFSubmitter'
but is maintained for compatibility.
Attributes:
NAME (str): Identifier for LSF scheduler type ('SLF').
name (str): Inherited; instance identifier (often 'SLF').
job (Job): Job instance to be submitted.
server (Server): Server configuration used for submission.
kwargs (dict): Additional submission
parameters passed to the base class.
"""
NAME = "SLF"
def __init__(self, name="SLF", job=None, server=None, **kwargs):
"""
Initialize LSF submitter.
Args:
name (str): Name identifier for this submitter. Defaults to "SLF".
job: Job instance to be submitted.
server: Server configuration for LSF submission.
**kwargs: Additional submission parameters.
"""
super().__init__(name=name, job=job, server=server, **kwargs)
def _write_scheduler_options(self, f):
"""
Write LSF-specific scheduler directives.
Writes LSF directives for job name, output files, project assignment,
node requests, GPU allocation, and walltime limits.
Args:
f: File handle for writing LSF directives.
"""
f.write(f"#BSUB -J {self.job.label}\n")
f.write(f"#BSUB -o {self.job.label}.bsubout\n")
f.write(f"#BSUB -e {self.job.label}.bsuberr\n")
if user_settings is not None:
project_number = user_settings.data.get("PROJECT")
if project_number is not None:
f.write(f"#BSUB -P {project_number}\n")
f.write(f"#BSUB -nnodes {self.server.num_nodes}\n")
if self.server.num_gpus:
f.write(f"#BSUB -gpu num={self.server.num_gpus}\n")
f.write(f"#BSUB -W {self.server.num_hours}\n")
f.write("#BSUB -alloc_flags gpumps\n")
f.write("\n")
f.write("\n")
def _write_change_to_job_directory(self, f):
"""
Write LSF-specific directory change command.
Uses LS_SUBCWD environment variable to change to the
job submission directory.
Args:
f: File handle for writing directory change command.
"""
f.write("cd $LS_SUBCWD\n\n")
[docs]
class FUGAKUSubmitter(Submitter):
"""
FUGAKU supercomputer job submitter.
Handles job submission to the FUGAKU supercomputer system using
the Fujitsu Job Operation and Management (PJM) scheduler.
Creates PJM-specific submission scripts with appropriate resource
requests and scheduler directives.
Attributes:
NAME (str): Identifier for FUGAKU scheduler type ('FUGAKU').
name (str): Inherited; instance identifier (often 'FUGAKU').
job (Job): Job instance to be submitted.
server (Server): Server configuration used for submission.
kwargs (dict): Additional submission
parameters passed to the base class.
"""
NAME = "FUGAKU"
def __init__(self, name="FUGAKU", job=None, server=None, **kwargs):
"""
Initialize FUGAKU submitter.
Args:
name (str): Name identifier for this
submitter. Defaults to "FUGAKU".
job: Job instance to be submitted.
server: Server configuration for FUGAKU submission.
**kwargs: Additional submission parameters.
"""
super().__init__(name=name, job=job, server=server, **kwargs)
def _write_scheduler_options(self, f):
"""
Write FUGAKU PJM-specific scheduler directives.
Writes PJM directives for resource group, node allocation,
elapsed time, MPI processes, project assignment, and output files.
Includes FUGAKU-specific optimizations like LLIO cache settings.
Args:
f: File handle for writing PJM directives.
"""
if user_settings is not None:
f.write(f'#PJM -L rscgrp={user_settings.data["RSCGRP"]}\n')
f.write("#PJM -L node=1\n") # using one node here
f.write(f"#PJM -L elapse={self.server.num_hours}\n")
f.write(f"#PJM --mpi proc={self.server.num_cores}\n")
f.write(f"#PJM -g {self.project}\n")
f.write("#PJM -o pjm.%j.out\n")
f.write("#PJM -e pjm.%j.err\n")
f.write("#PJM -x PJM_LLIO_GFSCACHE=/vol0005:/vol0004\n")
f.write("#PJM -S\n")
f.write("\n")
f.write("\n")
def _write_change_to_job_directory(self, f):
"""
Write FUGAKU PJM-specific directory change command.
Uses PJM_O_WORKDIR environment variable to change to the
job submission directory.
Args:
f: File handle for writing directory change command.
"""
f.write("cd $PJM_O_WORKDIR\n\n")