import logging
import os
import shlex
import subprocess
import sys
from functools import lru_cache
from chemsmart.io.yaml import YAMLFile
from chemsmart.settings.submitters import Submitter
from chemsmart.settings.user import ChemsmartUserSettings
from chemsmart.utils.mixins import RegistryMixin, cached_property
user_settings = ChemsmartUserSettings()
logger = logging.getLogger(__name__)
[docs]
class Server(RegistryMixin):
"""
Base class for computational server configurations.
Represents a computational server or cluster environment with associated
settings for job submission, resource allocation, and queue management.
Provides methods for server comparison, serialization, and configuration
management.
Attributes:
name (str): Unique identifier for the server.
kwargs (dict): Additional server configuration parameters.
_num_hours (int): Default number of hours for job allocation.
_queue_name (str): Default queue name for job submission.
"""
def __init__(self, name, **kwargs):
"""
Initialize a server configuration.
Args:
name (str): Unique name identifier for the server.
**kwargs: Additional configuration parameters including:
NUM_HOURS (int): Default job time allocation in hours.
QUEUE_NAME (str): Default queue for job submission.
SCHEDULER (str): Job scheduler type (e.g., 'slurm', 'pbs').
"""
self.name = name
self.kwargs = kwargs
self._num_hours = self.kwargs.get("NUM_HOURS", None)
self._queue_name = self.kwargs.get("QUEUE_NAME", None)
def __str__(self):
"""
String representation of the server.
Returns:
str: Human-readable server description.
"""
return f"Server: {self.name}"
def __eq__(self, other):
"""
Check equality based on server name.
Args:
other (Server): Another server instance to compare.
Returns:
bool: True if server names are equal.
"""
return self.name == other.name
def __hash__(self):
"""
Generate hash based on server name.
Returns:
int: Hash value for server name.
"""
return hash(self.name)
def __repr__(self):
"""
Developer representation of the server.
Returns:
str: Detailed server representation for debugging.
"""
return f"Server(name={self.name})"
[docs]
@classmethod
def from_dict(cls, d):
"""
Create server instance from dictionary.
Args:
d (dict): Dictionary containing server configuration.
Returns:
Server: Configured server instance.
"""
return cls(**d)
[docs]
@classmethod
def from_yaml(cls, name):
"""
Create server instance from YAML configuration file.
Args:
name (str): Path to YAML configuration file.
Returns:
Server: Server instance loaded from YAML.
Raises:
ValueError: If no YAML file is provided.
"""
if not name:
raise ValueError("No yaml file provided.")
server_yaml = YAMLFile(filename=name)
return cls(name, **server_yaml.yaml_contents_dict["SERVER"])
@cached_property
def scheduler(self):
"""
Get the job scheduler for this server.
Returns:
str or None: Scheduler type (e.g.,
'slurm', 'pbs') or None if not set.
"""
return self.kwargs.get("SCHEDULER", None)
@property
def queue_name(self):
"""
Get or set the queue name for job submission.
Returns:
str or None: Queue name for job submission.
"""
return self._queue_name
@queue_name.setter
def queue_name(self, value):
"""
Set the queue name for job submission.
Args:
value (str): Queue name to set.
"""
self._queue_name = value
@property
def num_hours(self):
"""
Get or set the number of hours for job allocation.
Returns:
int or None: Number of hours for job time limit.
"""
return self._num_hours
@num_hours.setter
def num_hours(self, value):
"""
Set the number of hours for job allocation.
Args:
value (int): Number of hours for job time limit.
"""
self._num_hours = value
@cached_property
def mem_gb(self):
"""
Get memory allocation in gigabytes.
Returns:
int: Memory allocation in GB (default: 64).
"""
return self.kwargs.get("MEM_GB", 64)
@cached_property
def num_cores(self):
"""
Get number of CPU cores allocation.
Returns:
int: Number of CPU cores (default: 16).
"""
return self.kwargs.get("NUM_CORES", 16)
@cached_property
def num_gpus(self):
"""
Get number of GPU allocation.
Returns:
int: Number of GPUs (default: 0).
"""
return self.kwargs.get("NUM_GPUS", 0)
@cached_property
def num_threads(self):
"""
Get number of threads for parallel execution.
Returns:
int: Number of threads (default: 16).
"""
return self.kwargs.get("NUM_THREADS", 16)
@cached_property
def submit_command(self):
"""
Get the job submission command for this server.
Returns:
str: Command used to submit jobs to the scheduler.
"""
command = self.kwargs.get("SUBMIT_COMMAND")
if command is None:
command = self._get_submit_command()
logger.debug(f"Submit command to submit the job: {command}")
return command
@cached_property
def scratch_dir(self):
"""
Get the scratch directory path for temporary files.
Returns:
str or None: Path to scratch directory or None if not configured.
"""
return self.kwargs.get("SCRATCH_DIR", None)
@cached_property
def scratch(self):
"""
Check if scratch directory is available.
Returns:
bool: True if scratch directory is configured.
"""
return self.scratch_dir is not None
@cached_property
def use_hosts(self):
"""
Get host specification configuration.
Returns:
str or None: Host specification settings or None if not configured.
"""
return self.kwargs.get("USE_HOSTS", None)
@cached_property
def extra_commands(self):
"""
Get additional commands to execute during job setup.
Returns:
list or None: List of extra commands or None if not configured.
"""
return self.kwargs.get("EXTRA_COMMANDS", None)
def _get_submit_command(self):
"""
Obtain job submission command based on scheduler type.
Maps scheduler types to their corresponding submission commands
for various cluster management systems.
Returns:
str or None: Submission command for
the scheduler or None if unknown.
"""
scheduler_submit_commands = {
"SLURM": "sbatch",
"PBS": "qsub",
"LSF": "bsub < ",
"SGE": "qsub",
"HTCondor": "condor_q",
}
return scheduler_submit_commands.get(self.scheduler, None)
[docs]
def register(self):
"""
Register this server in the global server registry.
Adds the server to the registry if not already present,
enabling server lookup and management.
Returns:
Server: This server instance.
"""
# if server already in registry, pass
if self in Server._REGISTRY:
return self
Server._REGISTRY.append(self)
return self
[docs]
@classmethod
def current(cls):
"""
Get the current server based on detected scheduler type.
Returns:
Server: Server instance for the current environment.
"""
return cls.from_scheduler_type()
[docs]
@classmethod
@lru_cache(maxsize=12)
def from_scheduler_type(cls):
"""
Create a Server instance based on the detected scheduler type.
Automatically detects the scheduler type in the current environment
and creates an appropriate server instance. Falls back to local
server if no scheduler is detected.
Returns:
Server: Server instance for the detected scheduler
(or local fallback), typically a YamlServerSettings.
Raises:
ValueError: If no server class is
defined for the detected scheduler type.
"""
scheduler_type = cls.detect_server_scheduler()
if scheduler_type == "Unknown Scheduler":
logger.info("No scheduler detected. Using local server.")
return cls.from_servername(servername="local")
# Match scheduler type with available Server subclasses
for server_cls in cls.subclasses():
if getattr(server_cls, "SCHEDULER_TYPE", None) == scheduler_type:
return server_cls.from_servername(scheduler_type)
raise ValueError(
f"No server class defined for scheduler type: {scheduler_type}. "
f"Available servers: {cls.subclasses()}"
)
[docs]
@staticmethod
@lru_cache(maxsize=12)
def detect_server_scheduler():
"""
Detect the server's job scheduler system.
Checks for environment variables and available commands to identify
the type of job scheduler running on the current system.
Returns:
str: The detected scheduler type
(e.g., SLURM, PBS, LSF, SGE, HTCondor)
or "Unknown Scheduler" if none detected.
"""
schedulers = [
{
"name": "SLURM",
"env_vars": ["SLURM_JOB_ID", "SLURM_CLUSTER_NAME"],
"commands": [["squeue"]],
},
{
"name": "PBS",
"env_vars": ["PBS_JOBID", "PBS_QUEUE"],
"commands": [["qstat"]],
},
{
"name": "LSF",
"env_vars": ["LSB_JOBID", "LSB_MCPU_HOSTS"],
"commands": [["bjobs"]],
},
{
"name": "SGE",
"env_vars": [],
"commands": [["qstat"], ["qstat", "-help"]],
"check_output": lambda output: "Grid Engine" in output
or "Sun" in output,
},
{
"name": "HTCondor",
"env_vars": [],
"commands": [["condor_q"]],
},
]
for scheduler in schedulers:
# Check environment variables
if any(env in os.environ for env in scheduler.get("env_vars", [])):
logger.info(f"Detected scheduler: {scheduler['name']}")
return scheduler["name"]
# Check commands
for command in scheduler.get("commands", []):
try:
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if "check_output" in scheduler:
output = result.stdout.decode()
if scheduler["check_output"](output):
logger.info(
f"Detected scheduler: {scheduler['name']}"
)
return scheduler["name"]
else:
logger.info(f"Detected scheduler: {scheduler['name']}")
return scheduler["name"]
except FileNotFoundError:
pass # Command not found, move to the next scheduler
# Default case: unknown scheduler
logger.info("No scheduler detected.")
return "Unknown Scheduler"
[docs]
@classmethod
def from_servername(cls, servername):
"""
Obtain server instance from server name.
Loads server configuration from YAML files based on the server name.
Falls back to current server if no name is specified.
Args:
servername (str): Name of the server configuration to load.
Returns:
Server: Configured server instance.
"""
if servername is None:
# by default return current server
logger.warning("No server specified. Using current server.")
return cls.current()
return cls._from_server_name(servername)
@classmethod
def _from_server_name(cls, server_name):
"""
Get server settings from YAML file based on server name.
Searches for server configuration files in the user settings directory.
Provides detailed error messages if the server is not found.
Args:
server_name (str): Name of the server
(with or without .yaml extension).
Returns:
Server: Configured server instance.
Raises:
ValueError: If no server configuration is found for the given name.
"""
if server_name.endswith(".yaml"):
server_name = server_name
else:
server_name = f"{server_name}.yaml"
server_name_yaml_path = os.path.join(
user_settings.user_server_dir, server_name
)
user_settings_manager = ServerSettingsManager(
filename=server_name_yaml_path
)
server = cls._from_servers_manager(user_settings_manager)
if server is not None:
return server
# could not find server settings
templates_path = os.path.join(os.path.dirname(__file__), "templates")
raise ValueError(
f"No server implemented for {server_name}.\n\n"
f"Place new server .yaml file in {user_settings.user_server_dir}.\n\n"
f"Templates for server settings .yaml files are available at {templates_path}\n\n "
f"Currently available servers: {user_settings.all_available_servers}"
)
@classmethod
def _from_servers_manager(cls, manager):
"""
Load server configuration using a settings manager.
Internal method for loading server settings through a manager instance.
Args:
manager: Server settings manager instance.
Returns:
Server or None: Server if loaded;
None only when the file is missing.
Raises:
ValueError: if the YAML is malformed or invalid.
"""
try:
return manager.create()
except FileNotFoundError:
return None
[docs]
def get_submitter(self, job, **kwargs):
"""
Get a job submitter for this server.
Creates an appropriate submitter instance based on the server's
scheduler type and job requirements.
Args:
job: Job instance to be submitted.
**kwargs: Additional submitter configuration parameters.
Returns:
Submitter: Configured job submitter for this server.
"""
submitter = Submitter.from_scheduler_type(
scheduler_type=self.scheduler, job=job, server=self, **kwargs
)
logger.info(f"Obtained Submitter: {submitter}")
return submitter
[docs]
def submit(self, job, test=False, cli_args=None, **kwargs):
"""
Submit a computational job to the server.
Handles the complete job submission process including validation,
script writing, and actual submission to the scheduler.
Args:
job (Job): Job instance to be submitted.
test (bool): If True, only creates
scripts without actual submission.
Defaults to False.
cli_args: Command line arguments for the job.
**kwargs: Additional submission parameters.
"""
# First check that the job to be
# submitted is not already queued/running
self._check_running_jobs(job)
# Then write the submission script
self._write_submission_script(job=job, cli_args=cli_args, **kwargs)
# Submit the job
if not test:
self._submit_job(job)
@staticmethod
def _check_running_jobs(job):
"""
Check if the job is already running or queued.
Prevents duplicate job submissions by checking the scheduler queue
for jobs with the same label.
Args:
job: Job instance to check.
"""
from chemsmart.jobs.gaussian import GaussianJob
from chemsmart.utils.cluster import ClusterHelper
if not isinstance(job, GaussianJob) or job.label is None:
return
cluster_helper = ClusterHelper()
running_job_ids, running_job_names = (
cluster_helper.get_gaussian_running_jobs()
)
if job.label in running_job_names:
logger.info(
f"Warning: submitting job with duplicate name: {job.label}"
)
sys.exit(f"Duplicate job NOT submitted: {job.label}")
def _write_submission_script(self, job, cli_args, **kwargs):
"""
Write the submission script for the job.
Creates the necessary submission scripts using the appropriate
submitter for the server's scheduler type.
Args:
job: Job instance to create submission script for.
cli_args: Command line arguments for the job.
**kwargs: Additional script writing parameters.
"""
submitter = self.get_submitter(job, **kwargs)
submitter.write(cli_args)
def _submit_job(self, job):
"""
Submit the job to the scheduler.
Executes the submission command to queue the job in the scheduler.
Handles both simple commands and complex shell commands with operators.
Args:
job: Job instance to submit.
Returns:
int: Exit code from the submission command.
Raises:
ValueError: If no submission command is defined for this server.
"""
submitter = self.get_submitter(job)
command = self.submit_command
if command is None:
raise ValueError(
f"Cannot submit job on {self} "
f"since no submit command is defined."
)
command += f" {submitter.submit_script}"
logger.info(f"Submitting job with command: {command}")
if "<" in command or ">" in command or "|" in command:
# Use shell=True if the command has shell operators
p = subprocess.Popen(command, shell=True)
else:
p = subprocess.Popen(shlex.split(command), cwd=job.folder)
return p.wait()
[docs]
class YamlServerSettings(Server):
"""
YAML-based server settings configuration.
Extends the base Server class to provide YAML file-based configuration
loading for server settings. Allows server configurations to be defined
in YAML files and loaded dynamically.
Attributes:
NAME (str): Identifier for YAML-based server settings.
name (str): YAML file path used as this settings' identifier.
kwargs (dict): Parsed configuration
values from the YAML under "SERVER".
_num_hours (int or None): Default job time allocation in hours.
_queue_name (str or None): Default submission queue name.
"""
NAME = "yaml"
def __init__(self, name, **kwargs):
"""
Initialize YAML-based server settings.
Args:
name (str): Server identifier; typically the YAML filename when
loaded via `from_yaml`, or a logical server name when
instantiated directly.
**kwargs: Server configuration parameters (usually values parsed
from the YAML under the "SERVER" key).
"""
super().__init__(name, **kwargs)
[docs]
@classmethod
def from_yaml(cls, filename):
"""
Create server settings from YAML configuration file.
Args:
filename (str): Path to YAML configuration file.
Returns:
YamlServerSettings: Server settings loaded from YAML.
"""
yaml_file = YAMLFile(filename)
return cls(name=filename, **yaml_file.yaml_contents_dict["SERVER"])
def __repr__(self):
"""
Developer representation of YAML server settings.
Returns:
str: Detailed representation for debugging.
"""
return f"YamlServerSettings(name={self.name})"
def __str__(self):
"""
String representation of YAML server settings.
Returns:
str: Human-readable server description.
"""
return f"YamlServerSettings: {self.name}"
def __eq__(self, other):
"""
Check equality based on server name.
Args:
other: Another server instance to compare.
Returns:
bool: True if server names are equal.
"""
return self.name == other.name
def __hash__(self):
"""
Generate hash based on server name.
Returns:
int: Hash value for server name.
"""
return hash(self.name)
def __call__(self):
"""
Make the server settings callable.
Returns:
YamlServerSettings: This server settings instance.
"""
return self
[docs]
def register(self):
"""
Register this server in the global registry.
Returns:
YamlServerSettings: This server settings instance.
"""
return self
[docs]
class ServerSettingsManager:
"""
Manager for server settings from YAML configuration files.
Provides management interface for loading server configurations from YAML
files in a specified folder structure. Handles file validation and server
settings creation for computational cluster environments.
Attributes:
filename (str): Absolute path to the YAML configuration file.
"""
def __init__(self, filename):
"""
Initialize the server settings manager.
Args:
filename (str): Path to YAML configuration file containing
server settings.
Raises:
ValueError: If filename is None or not specified.
"""
if filename is None:
raise ValueError("filename is not specified")
self.filename = os.path.abspath(filename)
[docs]
def create(self):
"""
Create server settings from the specified YAML file.
Loads and parses the YAML configuration file to create a complete
server settings instance with all configurations.
Returns:
YamlServerSettings: Configured server settings loaded from YAML.
Raises:
FileNotFoundError: If the specified YAML file does not exist.
ValueError: If the YAML file is malformed or invalid.
"""
return YamlServerSettings.from_yaml(self.filename)
[docs]
class SLURMServer(YamlServerSettings):
"""
SLURM-specific server configuration.
Specialized server class for SLURM (Simple Linux Utility for Resource
Management) scheduler environments. Provides SLURM-specific defaults
and configurations for computational clusters.
Attributes:
NAME (str): Server type identifier ('SLURM').
SCHEDULER_TYPE (str): Scheduler system type ('SLURM').
name (str): Inherited; effective identifier (typically 'SLURM.yaml').
kwargs (dict): Inherited; configuration values parsed from YAML.
_num_hours (int or None): Inherited default job time allocation.
_queue_name (str or None): Inherited default submission queue.
"""
NAME = "SLURM"
SCHEDULER_TYPE = "SLURM"
def __init__(self, **kwargs):
"""
Initialize SLURM server configuration.
Args:
**kwargs: Additional SLURM-specific configuration parameters.
"""
super().__init__(filename=f"{self.NAME}.yaml", **kwargs)
[docs]
class PBSServer(YamlServerSettings):
"""
PBS-specific server configuration.
Specialized server class for PBS (Portable Batch System) scheduler
environments. Provides PBS-specific defaults and configurations.
Attributes:
NAME (str): Server type identifier ('PBS').
SCHEDULER_TYPE (str): Scheduler system type ('PBS').
name (str): Inherited; effective identifier (typically 'PBS.yaml').
kwargs (dict): Inherited; configuration values parsed from YAML.
_num_hours (int or None): Inherited default job time allocation.
_queue_name (str or None): Inherited default submission queue.
"""
NAME = "PBS"
SCHEDULER_TYPE = "PBS"
def __init__(self, **kwargs):
"""
Initialize PBS server configuration.
Args:
**kwargs: Additional PBS-specific configuration parameters.
"""
super().__init__(self.NAME, **kwargs)
[docs]
class LSFServer(YamlServerSettings):
"""
LSF-specific server configuration.
Specialized server class for LSF (Load Sharing Facility) scheduler
environments. Provides LSF-specific defaults and configurations.
Attributes:
NAME (str): Server type identifier ('LSF').
SCHEDULER_TYPE (str): Scheduler system type ('LSF').
name (str): Inherited; effective identifier (typically 'LSF.yaml').
kwargs (dict): Inherited; configuration values parsed from YAML.
_num_hours (int or None): Inherited default job time allocation.
_queue_name (str or None): Inherited default submission queue.
"""
NAME = "LSF"
SCHEDULER_TYPE = "LSF"
def __init__(self, **kwargs):
"""
Initialize LSF server configuration.
Args:
**kwargs: Additional LSF-specific configuration parameters.
"""
super().__init__(self.NAME, **kwargs)
[docs]
class SGE_Server(YamlServerSettings):
"""
SGE-specific server configuration.
Specialized server class for SGE (Sun Grid Engine) scheduler
environments. Provides SGE-specific defaults and configurations.
Attributes:
NAME (str): Server type identifier ('SGE').
SCHEDULER_TYPE (str): Scheduler system type ('SGE').
name (str): Inherited; effective identifier (typically 'SGE.yaml').
kwargs (dict): Inherited; configuration values parsed from YAML.
_num_hours (int or None): Inherited default job time allocation.
_queue_name (str or None): Inherited default submission queue.
"""
NAME = "SGE"
SCHEDULER_TYPE = "SGE"
def __init__(self, **kwargs):
"""
Initialize SGE server configuration.
Args:
**kwargs: Additional SGE-specific configuration parameters.
"""
super().__init__(self.NAME, **kwargs)