Skip to content

Commit

Permalink
Merge pull request #175 from dgasmith/mpi
Browse files Browse the repository at this point in the history
MPI/MultiNode support
  • Loading branch information
dgasmith committed Dec 3, 2019
2 parents 42adfa6 + 0cd5aa7 commit f9fea80
Show file tree
Hide file tree
Showing 22 changed files with 50 additions and 41 deletions.
11 changes: 7 additions & 4 deletions qcengine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def get_global(key: Optional[str] = None) -> Union[str, Dict[str, Any]]:
cpu_cnt = psutil.cpu_count(logical=True)

_global_values["ncores"] = cpu_cnt
_global_values["nnodes"] = 1

_global_values["cpuinfo"] = cpuinfo.get_cpu_info()
_global_values["cpu_brand"] = _global_values["cpuinfo"]["brand"]
Expand Down Expand Up @@ -80,10 +81,11 @@ class Config:
extra = "forbid"


class JobConfig(pydantic.BaseModel):
class TaskConfig(pydantic.BaseModel):

# Specifications
ncores: int # Number of ncores per job
ncores: int # Number of ncores per task
nnodes: int # Number of nodes per task
memory: float # Amount of memory in GiB per node
scratch_directory: Optional[str] # What location to use as scratch
retries: int # Number of retries on random failures
Expand Down Expand Up @@ -198,7 +200,7 @@ def parse_environment(data: Dict[str, Any]) -> Dict[str, Any]:
return ret


def get_config(*, hostname: Optional[str] = None, local_options: Dict[str, Any] = None) -> JobConfig:
def get_config(*, hostname: Optional[str] = None, local_options: Dict[str, Any] = None) -> TaskConfig:
"""
Returns the configuration key for qcengine.
"""
Expand Down Expand Up @@ -233,11 +235,12 @@ def get_config(*, hostname: Optional[str] = None, local_options: Dict[str, Any]
raise KeyError("Number of jobs per node exceeds the number of available cores.")

config["ncores"] = ncores
config["nnodes"] = local_options.pop("nnodes", 1)

if local_options is not None:
config.update(local_options)

return JobConfig(**config)
return TaskConfig(**config)


def get_provenance_augments() -> Dict[str, str]:
Expand Down
2 changes: 1 addition & 1 deletion qcengine/procedures/geometric.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def found(self, raise_error: bool = False) -> bool:
def build_input_model(self, data: Union[Dict[str, Any], "OptimizationInput"]) -> "OptimizationInput":
return self._build_model(data, OptimizationInput)

def compute(self, input_data: "OptimizationInput", config: "JobConfig") -> "OptimizationResult":
def compute(self, input_data: "OptimizationInput", config: "TaskConfig") -> "OptimizationResult":
try:
import geometric
except ModuleNotFoundError:
Expand Down
2 changes: 1 addition & 1 deletion qcengine/procedures/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def build_input_model(self, data: Union[Dict[str, Any], "BaseModel"], raise_erro
"""

@abc.abstractmethod
def compute(self, input_data: "BaseModel", config: "JobConfig") -> "BaseModel":
def compute(self, input_data: "BaseModel", config: "TaskConfig") -> "BaseModel":
pass

@abc.abstractmethod
Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/cfour/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
self.found(raise_error=True)

job_inputs = self.build_input(input_model, config)
Expand All @@ -76,7 +76,7 @@ def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicRes
return self.parse_output(dexe["outfiles"], input_model)

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
cfourrec = {"infiles": {}, "scratch_directory": config.scratch_directory}

Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/dftd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
self.found(raise_error=True)

job_inputs = self.build_input(input_model, config)
Expand Down Expand Up @@ -98,7 +98,7 @@ def execute(
return success, dexe

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:

# strip engine hint
Expand Down
6 changes: 3 additions & 3 deletions qcengine/programs/entos.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from ..config import JobConfig
from ..config import TaskConfig
from qcelemental.models import AtomicInput

from qcelemental.models import AtomicResult
Expand Down Expand Up @@ -109,7 +109,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Run entos
"""
Expand Down Expand Up @@ -180,7 +180,7 @@ def execute(
return exe_success, proc

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:

# Write the geom xyz file with unit au
Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/gamess/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
self.found(raise_error=True)

job_inputs = self.build_input(input_data, config)
Expand All @@ -83,7 +83,7 @@ def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResu
return self.parse_output(dexe["outfiles"], input_data)

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
gamessrec = {"infiles": {}, "scratch_directory": config.scratch_directory}

Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, **kwargs):
super().__init__(**{**self._defaults, **kwargs})

@abc.abstractmethod
def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
pass

@staticmethod
Expand Down Expand Up @@ -57,7 +57,7 @@ def get_version(self) -> str:
## Computers

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
raise ValueError("build_input is not implemented for {}.", self.__class__)

Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/molpro.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Run Molpro
"""
Expand Down Expand Up @@ -165,7 +165,7 @@ def execute(
return exe_success, proc

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
if template is None:
input_file = []
Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/mopac.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_version(self) -> str:
# Not really possible to pull at the moment, MolSSI will add a version ability
return "2016"

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Runs Psi4 in API mode
"""
Expand Down Expand Up @@ -109,7 +109,7 @@ def execute(
return exe_success, proc

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:

if template is not None:
Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/mp2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
from ..testing import is_program_new_enough

self.found(raise_error=True)
Expand Down Expand Up @@ -95,7 +95,7 @@ def execute(
return success, dexe

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:

# strip engine hint
Expand Down
6 changes: 3 additions & 3 deletions qcengine/programs/nwchem/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from qcelemental.models import AtomicResult, Provenance
from qcelemental.util import safe_version, which

from qcengine.config import JobConfig, get_config
from qcengine.config import TaskConfig, get_config
from qcengine.exceptions import UnknownError
from ...exceptions import InputError
from ...util import execute
Expand Down Expand Up @@ -82,7 +82,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Runs NWChem in executable mode
"""
Expand All @@ -105,7 +105,7 @@ def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicRes
raise UnknownError(dexe["stderr"])

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
nwchemrec = {"infiles": {}, "scratch_directory": config.scratch_directory}

Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/openmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def found(raise_error: bool = False) -> bool:

return (rdkit_found and openmm_found)

def compute(self, input_data: 'AtomicInput', config: 'JobConfig') -> 'AtomicResult':
def compute(self, input_data: 'AtomicInput', config: 'TaskConfig') -> 'AtomicResult':
"""
Runs OpenMM on given structure, inputs, in vacuum.
"""
Expand All @@ -131,7 +131,7 @@ def compute(self, input_data: 'AtomicInput', config: 'JobConfig') -> 'AtomicResu
basis = self._generate_basis(input_data)
ret_data['basis'] = basis

# get number of threads to use from `JobConfig.ncores`; otherwise, try environment variable
# get number of threads to use from `TaskConfig.ncores`; otherwise, try environment variable
nthreads = config.ncores
if nthreads is None:
nthreads = os.environ.get('OPENMM_CPU_THREADS')
Expand Down
2 changes: 1 addition & 1 deletion qcengine/programs/psi4.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_version(self) -> str:

return candidate_version

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Runs Psi4 in API mode
"""
Expand Down
6 changes: 3 additions & 3 deletions qcengine/programs/qchem.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def found(self, raise_error: bool = False) -> bool:
raise_msg="Please install by visiting the Q-Chem website.",
)

def _get_qc_path(self, config: Optional["JobConfig"] = None):
def _get_qc_path(self, config: Optional["TaskConfig"] = None):
paths = os.environ.copy()
paths["QCSCRATCH"] = tempfile.gettempdir()
if config and config.scratch_directory:
Expand Down Expand Up @@ -67,7 +67,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Run qchem
"""
Expand Down Expand Up @@ -159,7 +159,7 @@ def execute(
return exe_success, proc

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:

# Check some bounds on what cannot be parsed
Expand Down
2 changes: 1 addition & 1 deletion qcengine/programs/rdkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def found(raise_error: bool = False) -> bool:
raise_msg="Please install via `conda install rdkit -c conda-forge`.",
)

def compute(self, input_data: 'AtomicInput', config: 'JobConfig') -> 'AtomicResult':
def compute(self, input_data: 'AtomicInput', config: 'TaskConfig') -> 'AtomicResult':
"""
Runs RDKit in FF typing
"""
Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/terachem.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_version(self) -> str:

return self.version_cache[which_prog]

def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Run TeraChem
"""
Expand Down Expand Up @@ -81,7 +81,7 @@ def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResu
return result

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
# Write the geom xyz file with unit au
xyz_file = input_model.molecule.to_string(dtype="terachem", units="Bohr")
Expand Down
2 changes: 1 addition & 1 deletion qcengine/programs/torchani.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def ensemble_forward(self, species_input):

return self._CACHE[name]

def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
"""
Runs TorchANI in FF typing
"""
Expand Down
4 changes: 2 additions & 2 deletions qcengine/programs/turbomole/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_version(self) -> str:
self.version_cache[which_prog] = safe_version(version)
return self.version_cache[which_prog]

def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_model: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
self.found(raise_error=True)

job_inputs = self.build_input(input_model, config)
Expand All @@ -71,7 +71,7 @@ def compute(self, input_model: "AtomicInput", config: "JobConfig") -> "AtomicRes
return self.parse_output(dexe["outfiles"], input_model)

def build_input(
self, input_model: "AtomicInput", config: "JobConfig", template: Optional[str] = None
self, input_model: "AtomicInput", config: "TaskConfig", template: Optional[str] = None
) -> Dict[str, Any]:
turbomolrec = {"infiles": {}, "outfiles": {"control": "control"}, "scratch_directory": config.scratch_directory}

Expand Down
2 changes: 1 addition & 1 deletion qcengine/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Config(qcng.programs.ProgramHarness.Config):
def found(raise_error: bool = False) -> bool:
return True

def compute(self, input_data: "AtomicInput", config: "JobConfig") -> "AtomicResult":
def compute(self, input_data: "AtomicInput", config: "TaskConfig") -> "AtomicResult":
self.ncalls += 1
mode = self.iter_modes.pop(0)

Expand Down
6 changes: 6 additions & 0 deletions qcengine/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_node_auto():
node1 = qcng.config.NodeDescriptor(**desc)
job1 = qcng.get_config(hostname=node1)
assert job1.ncores == 4
assert job1.nnodes == 1
assert pytest.approx(job1.memory) == 10.0

desc["jobs_per_node"] = 2
Expand Down Expand Up @@ -153,6 +154,11 @@ def test_config_local_njob_ncore_plus_memory(opt_state_basic):
assert pytest.approx(config.memory, 0.1) == 6


def test_config_local_nnodes(opt_state_basic):
config = qcng.config.get_config(hostname="something", local_options={"nnodes": 4})
assert config.nnodes == 4


def test_config_validation(opt_state_basic):
with pytest.raises(pydantic.ValidationError):
config = qcng.config.get_config(hostname="something", local_options={"bad": 10})
Expand Down
4 changes: 2 additions & 2 deletions qcengine/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,13 @@ def popen(


@contextmanager
def environ_context(config: Optional["JobConfig"] = None, env: Optional[Dict[str, str]] = None) -> Dict[str, str]:
def environ_context(config: Optional["TaskConfig"] = None, env: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""Temporarily set environment variables inside the context manager and
fully restore previous environment afterwards.
Parameters
----------
config : Optional[JobConfig], optional
config : Optional[TaskConfig], optional
Automatically sets MKL/OMP num threads based off the input config.
env : Optional[Dict[str, str]], optional
A dictionary of environment variables to update.
Expand Down

0 comments on commit f9fea80

Please sign in to comment.