#######################################################################
# EMLE-Engine: https://github.com/chemle/emle-engine
#
# Copyright: 2023-2025
#
# Authors: Lester Hedges <lester.hedges@gmail.com>
# Kirill Zinovjev <kzinovjev@gmail.com>
#
# EMLE-Engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# EMLE-Engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with EMLE-Engine. If not, see <http://www.gnu.org/licenses/>.
#####################################################################
"""
EMLE calculator implementation.
"""
__author__ = "Lester Hedges"
__email__ = "lester.hedges@gmail.com"
__all__ = ["EMLECalculator"]
from loguru import logger as _logger
import os as _os
import numpy as _np
import sys as _sys
import yaml as _yaml
import ase as _ase
import ase.io as _ase_io
import torch as _torch
from .models import EMLE as _EMLE
from emle._units import (
_NANOMETER_TO_BOHR,
_BOHR_TO_ANGSTROM,
_HARTREE_TO_KJ_MOL,
_NANOMETER_TO_ANGSTROM,
)
[docs]
class EMLECalculator:
"""
Predicts EMLE energies and gradients allowing QM/MM with ML electrostatic
embedding. Requires the use of a QM (or ML) engine to compute in vacuo
energies and gradients, to which those from the EMLE model are added.
Supported backends are listed in the _supported_backends attribute below.
"""
from . import _supported_backends
# Class attributes.
# List of supported backends.
_supported_backends = _supported_backends.copy() + ["external"]
# List of supported devices.
_supported_devices = ["cpu", "cuda"]
# Default to no interpolation.
_lambda_interpolate = None
# Default to no external callback.
_is_external_backend = False
def __init__(
self,
model=None,
method="electrostatic",
alpha_mode="species",
atomic_numbers=None,
qm_charge=0,
backend="torchani",
external_backend=None,
plugin_path=".",
mm_charges=None,
deepmd_model=None,
deepmd_deviation=None,
deepmd_deviation_threshold=None,
qm_xyz_file="qm.xyz",
pc_xyz_file="pc.xyz",
qm_xyz_frequency=0,
ani2x_model_index=None,
mace_model=None,
ace_model=None,
rascal_model=None,
parm7=None,
qm_indices=None,
orca_path=None,
sqm_theory="DFTB3",
lambda_interpolate=None,
interpolate_steps=None,
restart=False,
device=None,
orca_template=None,
energy_frequency=0,
energy_file="emle_energy.txt",
log_level="ERROR",
log_file=None,
save_settings=False,
):
"""
Constructor
Parameters
----------
model: str
Path to the EMLE embedding model parameter file. If None, then a
default model will be used.
method: str
The desired embedding method. Options are:
"electrostatic":
Full ML electrostatic embedding.
"mechanical":
ML predicted charges for the core, but zero valence charge.
"nonpol":
Non-polarisable ML embedding. Here the induced component of
the potential is zeroed.
"mm":
MM charges are used for the core charge and valence charges
are set to zero. If this option is specified then the user
should also specify the MM charges for atoms in the QM
region.
backend: str, Tuple[str]
The backend to use to compute in vacuo energies and gradients. If None,
then no backend will be used, allowing you to obtain the electrostatic
embedding energy and gradients only. If two backends are specified, then
the second can be used to apply delta-learning corrections to the
energies and gradients.
alpha_mode: str
How atomic polarizabilities are calculated.
"species":
one volume scaling factor is used for each species
"reference":
scaling factors are obtained with GPR using the values learned
for each reference environment
atomic_numbers: List[int], Tuple[int], numpy.ndarray
Atomic numbers for the QM region. This allows use of optimised AEV
symmetry functions from the NNPOps package. Only use this option if
you are using a fixed QM region, i.e. the same QM region for each
call to the calculator.
qm_charge: int
The charge on the QM region. This is required when using an
EMLECalculator instance with the OpenMM interface. When using
the sander interface, the QM charge will be taken from the ORCA
input file.
external_backend: str
The name of an external backend to use to compute in vacuo energies.
This should be a callback function formatted as 'module.function'.
The function should take a single argument, which is an ASE Atoms
object for the QM region, and return the energy in Hartree along with
the gradients in Hartree/Bohr as a numpy.ndarray.
plugin_path: str
The direcory containing any scripts used for external backends.
mm_charges: List, Tuple, numpy.ndarray, str
An array of MM charges for atoms in the QM region. This is required
when the embedding method is "mm". Alternatively, pass the path to
a file containing the charges. The file should contain a single
column. Units are electron charge.
deepmd_model: str
Path to the DeePMD model file to use for in vacuo calculations. This
must be specified if "deepmd" is the selected backend.
deepmd_deviation: str
Path to a file to write the max deviation between forces predicted
with the DeePMD models.
deepmd_deviation_threshold: float
The threshold for the maximum deviation between forces predicted with
the DeePMD models. If the deviation exceeds this value, a ValueError
will be raised and the calculation will be terminated.
qm_xyz_file: str
Path to an output file for writing the xyz trajectory of the QM
region.
pc_xyz_file: str
Path to an output file for writing the charges and positions of the
MM point charges.
qm_xyz_frequency: int
How often to write the xyz trajectory of the QM and MM regions. Zero
turns off writing.
ani2x_model_index: int
The index of the ANI model to use when using the TorchANI backend.
If None, then the full 8 model ensemble is used.
mmace_model: str
Name of the MACE-OFF23 models to use.
Available models are 'mace-off23-small', 'mace-off23-medium', 'mace-off23-large'.
To use a locally trained MACE model, provide the path to the model file.
If None, the MACE-OFF23(S) model will be used by default.
ace_model: str
Path to the ACE model file to use for in vacuo calculations. This
must be specified if 'ace' is the selected backend.
rascal_model: str
Path to the Rascal model file to use for in vacuo calculations. This
must be specified if "rascal" is the selected backend.
lambda_interpolate: float, [float, float]
The value of lambda to use for end-state correction calculations. This
must be between 0 and 1, which is used to interpolate between a full MM
and EMLE potential. If two lambda values are specified, the calculator
will gradually interpolate between them when called multiple times. This
must be used in conjunction with the 'interpolate_steps' argument.
interpolate_steps: int
The number of steps over which lambda is linearly interpolated.
parm7: str
The path to an AMBER parm7 file for the QM region. This is needed to
compute in vacuo MM energies for the QM region when using the Rascal
backend, or when interpolating.
qm_indices: list, str
A list of atom indices for the QM region. This must be specified when
interpolating. Alternatively, a path to a file containing the indices
can be specified. The file should contain a single column with the
indices being zero-based.
orca_path: str
The path to the ORCA executable. This is required when using the ORCA
backend.
sqm_theory: str
The QM theory to use when using the SQM backend. See the AmberTools
manual for the supported theory levels for your version of AmberTools.
restart: bool
Whether this is a restart simulation with sander. If True, then energies
are logged immediately.
device: str
The name of the device to be used by PyTorch. Options are "cpu"
or "cuda".
orca_template: str
The path to a template ORCA input file. This is required when using
the ORCA backend when using emle-engine with Sire. This should be a
full template, including the charge and multiplicity of the QM region,
along with a placeholder name for the xyz file that will be replaced
with the file generated at runtime, e.g.:
%pal nprocs 4 end
!BLYP 6-31G* verytightscf
%method
grid 4
finalgrid 6
end
%scf
maxiter 100
end
%MaxCore 1024
! ENGRAD
! Angs NoUseSym
*xyzfile 0 1 inpfile.xyz
energy_frequency: int
The frequency of logging energies to file. If 0, then no energies are
logged.
energy_file: str
The name of the file to which energies are logged.
log_level: str
The logging level to use. Options are "TRACE", "DEBUG", "INFO", "WARNING",
"ERROR", and "CRITICAL".
log_file: str
The name of the file to which log messages are written.
save_settings: bool
Whether to write a YAML file containing the settings used to initialise
the calculator.
"""
from ._resources import _fetch_resources
# Fetch or update the resources.
if model is None:
_fetch_resources()
# Validate input.
# First handle the logger.
if log_level is None:
log_level = "ERROR"
else:
if not isinstance(log_level, str):
raise TypeError("'log_level' must be of type 'str'")
# Delete whitespace and convert to upper case.
log_level = log_level.upper().replace(" ", "")
# Validate the log level.
if not log_level in _logger._core.levels.keys():
raise ValueError(
f"Unsupported logging level '{log_level}'. Options are: {', '.join(_logger._core.levels.keys())}"
)
self._log_level = log_level
# Validate the log file.
if log_file is not None:
if not isinstance(log_file, str):
raise TypeError("'log_file' must be of type 'str'")
# Try to create the directory.
dirname = _os.path.dirname(log_file)
if dirname != "":
try:
_os.makedirs(dirname, exist_ok=True)
except:
raise IOError(
f"Unable to create directory for log file: {log_file}"
)
self._log_file = _os.path.abspath(log_file)
else:
self._log_file = _sys.stderr
# Update the logger.
_logger.remove()
_logger.add(self._log_file, level=self._log_level)
# Validate the PyTorch device.
if device is not None:
if not isinstance(device, str):
msg = "'device' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
# Strip whitespace and convert to lower case.
device = device.lower().replace(" ", "")
# See if the user has specified a GPU index.
if device.startswith("cuda"):
try:
device, index = device.split(":")
except:
index = 0
# Make sure the GPU device index is valid.
try:
index = int(index)
except:
msg = f"Invalid GPU index: {index}"
_logger.error(msg)
raise ValueError(msg)
if not device in self._supported_devices:
msg = f"Unsupported device '{device}'. Options are: {', '.join(self._supported_devices)}"
_logger.error(msg)
raise ValueError(msg)
# Create the full CUDA device string.
if device == "cuda":
device = f"cuda:{index}"
# Set the device.
self._device = _torch.device(device)
else:
# Default to CUDA, if available.
self._device = _torch.device(
"cuda" if _torch.cuda.is_available() else "cpu"
)
# Validate the MM charges.
if mm_charges is not None:
# Convert lists/tuples to NumPy arrays.
if isinstance(mm_charges, (list, tuple)):
mm_charges = _np.array(mm_charges)
if isinstance(mm_charges, _np.ndarray):
if mm_charges.dtype != _np.float64:
msg = "'mm_charges' must have dtype 'float64'"
_logger.error(msg)
raise TypeError(msg)
else:
self._mm_charges = mm_charges
elif isinstance(mm_charges, str):
# Convert to an absolute path.
mm_charges = _os.path.abspath(mm_charges)
if not _os.path.isfile(mm_charges):
msg = f"Unable to locate 'mm_charges' file: {mm_charges}"
_logger.error(msg)
raise IOError(msg)
# Read the charges into a list.
charges = []
with open(mm_charges, "r") as f:
for line in f:
try:
charges.append(float(line.strip()))
except:
msg = f"Unable to read 'mm_charges' from file: {mm_charges}"
_logger.error(msg)
raise ValueError(msg)
self._mm_charges = _np.array(charges)
else:
msg = "'mm_charges' must be of type 'numpy.ndarray' or 'str'"
_logger.error(msg)
raise TypeError(msg)
else:
self._mm_charges = None
if qm_charge is not None:
try:
qm_charge = int(qm_charge)
except:
msg = "'qm_charge' must be of type 'int'"
_logger.error(msg)
raise TypeError(msg)
self._qm_charge = qm_charge
# Create the EMLE model instance.
self._emle = _EMLE(
model=model,
method=method,
alpha_mode=alpha_mode,
atomic_numbers=atomic_numbers,
mm_charges=self._mm_charges,
qm_charge=self._qm_charge,
device=self._device,
)
# Validate the backend(s).
if backend is not None:
if isinstance(backend, (tuple, list)):
if not len(backend) == 2:
msg = "If 'backend' is a list or tuple, it must have length 2"
_logger.error(msg)
raise ValueError(msg)
if not all(isinstance(x, str) for x in backend):
msg = "If 'backend' is a list or tuple, all elements must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
elif isinstance(backend, str):
# Strip whitespace and convert to lower case.
backend = backend.lower().replace(" ", "")
backend = tuple(backend.split(","))
if len(backend) > 2:
msg = "If 'backend' is a string, it must contain at most two comma-separated values"
_logger.error(msg)
raise ValueError(msg)
else:
msg = "'backend' must be of type 'str', or a tuple of 'str' types"
_logger.error(msg)
raise TypeError(msg)
formatted_backends = []
for b in backend:
# Strip whitespace and convert to lower case.
b = b.lower().replace(" ", "")
if not b in self._supported_backends:
msg = f"Unsupported backend '{b}'. Options are: {', '.join(self._supported_backends)}"
_logger.error(msg)
raise ValueError(msg)
formatted_backends.append(b)
self._backend = formatted_backends
# Validate the external backend.
if external_backend is not None:
if not isinstance(external_backend, str):
msg = "'external_backend' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
if plugin_path is None:
plugin_path = "."
if not isinstance(plugin_path, str):
msg = "'plugin_path' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
# Convert to an absolute path.
abs_plugin_path = _os.path.abspath(plugin_path)
if not _os.path.isdir(abs_plugin_path):
msg = f"Unable to locate plugin directory: {plugin_path}"
_logger.error(msg)
raise IOError(msg)
self._plugin_path = abs_plugin_path
# Strip whitespace.
external_backend = external_backend.replace(" ", "")
# Split the module and function names.
try:
function = external_backend.split(".")[-1]
module = external_backend.replace("." + function, "")
except:
msg = f"Unable to parse 'external_backend' callback string: {external_backend}"
_logger.error(msg)
raise ValueError(msg)
# Try to import the module.
try:
from importlib import import_module
module = import_module(module)
except:
try:
import sys
# Try adding the plugin directory to the path.
sys.path.append(plugin_path)
module = import_module(module)
sys.path.pop()
except:
msg = f"Unable to import module '{module}'"
_logger.error(msg)
raise ImportError(msg)
# Bind the function to the class.
self._external_backend = getattr(module, function)
# Flag that an external backend is being used.
self._is_external_backend = True
# Set the backed to "external".
self._backend = "external"
if parm7 is not None:
if not isinstance(parm7, str):
msg = "'parm7' must be of type 'str'"
_logger.error(msg)
raise ValueError(msg)
# Convert to an absolute path.
abs_parm7 = _os.path.abspath(parm7)
# Make sure the file exists.
if not _os.path.isfile(abs_parm7):
msg = f"Unable to locate the 'parm7' file: '{parm7}'"
raise IOError(msg)
self._parm7 = abs_parm7
# Validate the QM XYZ file options.
if qm_xyz_file is None:
qm_xyz_file = "qm.xyz"
else:
if not isinstance(qm_xyz_file, str):
msg = "'qm_xyz_file' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
self._qm_xyz_file = qm_xyz_file
if pc_xyz_file is None:
pc_xyz_file = "pc.xyz"
else:
if not isinstance(pc_xyz_file, str):
msg = "'pc_xyz_file' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
self._pc_xyz_file = pc_xyz_file
if qm_xyz_frequency is None:
qm_xyz_frequency = 0
else:
try:
qm_xyz_frequency = int(qm_xyz_frequency)
except:
msg = "'qm_xyz_frequency' must be of type 'int'"
_logger.error(msg)
raise TypeError(msg)
if qm_xyz_frequency < 0:
msg = "'qm_xyz_frequency' must be greater than or equal to 0"
_logger.error(msg)
raise ValueError(msg)
self._qm_xyz_frequency = qm_xyz_frequency
# Validate and create the backend(s).
self._backends = []
for backend in self._backend:
if backend == "torchani":
from .models import ANI2xEMLE as _ANI2xEMLE
ani2x_emle = _ANI2xEMLE(
emle_model=model,
emle_method=method,
alpha_mode=alpha_mode,
mm_charges=self._mm_charges,
qm_charge=self._qm_charge,
model_index=ani2x_model_index,
atomic_numbers=atomic_numbers,
device=self._device,
)
# Convert to TorchScript.
b = _torch.jit.script(ani2x_emle).eval()
# Store the model index.
self._ani2x_model_index = ani2x_model_index
# Initialise the MACEMLE model.
elif backend == "mace":
from .models import MACEEMLE as _MACEEMLE
mace_emle = _MACEEMLE(
emle_model=model,
emle_method=method,
alpha_mode=alpha_mode,
mm_charges=self._mm_charges,
qm_charge=self._qm_charge,
mace_model=mace_model,
atomic_numbers=atomic_numbers,
device=self._device,
)
# Convert to TorchScript.
b = _torch.jit.script(mace_emle).eval()
# Store the MACE model.
self._mace_model = mace_model
elif backend == "ace":
try:
# Import directly from module since the import is so slow.
from ._backends._ace import ACE
b = ACE(ace_model)
except:
msg = "Unable to create ACE backend. Please ensure PyJulip is installed."
_logger.error(msg)
raise RuntimeError(msg)
elif backend == "orca":
try:
from ._backends import ORCA
b = ORCA(orca_path, template=orca_template)
self._orca_path = b._exe
self._orca_template = b._template
except Exception as e:
msg = "Unable to create ORCA backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
elif backend == "deepmd":
try:
from ._backends import DeepMD
b = DeepMD(
model=deepmd_model,
deviation=deepmd_deviation,
deviation_threshold=deepmd_deviation_threshold,
)
self._deepmd_model = b._model
self._deepmd_deviation = b._deviation
self._deepmd_deviation_threshold = b._deviation_threshold
except Exception as e:
msg = "Unable to create DeePMD backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
elif backend == "sqm":
try:
from ._backends import SQM
b = SQM(parm7, theory=sqm_theory)
self._sqm_theory = b._theory
except Exception as e:
msg = "Unable to create SQM backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
elif backend == "xtb":
try:
from ._backends import XTB
b = XTB()
except Exception as e:
msg = "Unable to create XTB backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
elif backend == "sander":
try:
from ._backends import Sander
b = Sander(parm7)
except Exception as e:
msg = "Unable to create Sander backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
elif backend == "rascal":
try:
from ._backends import Rascal
b = Rascal(rascal_model)
except Exception as e:
msg = "Unable to create Rascal backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
# Append the backend to the list.
self._backends.append(b)
if restart is not None:
if not isinstance(restart, bool):
msg = "'restart' must be of type 'bool'"
_logger.error(msg)
raise TypeError(msg)
else:
restart = False
self._restart = restart
# Validate the interpolation lambda parameter.
if lambda_interpolate is not None:
if self._backend == "rascal":
msg = "'lambda_interpolate' is currently unsupported when using the the Rascal backend!"
_logger.error(msg)
raise ValueError(msg)
self._is_interpolate = True
self.set_lambda_interpolate(lambda_interpolate)
# Make sure a topology file has been set.
if parm7 is None:
msg = "'parm7' must be specified when interpolating"
_logger.error(msg)
raise ValueError(msg)
# Make sure MM charges for the QM region have been set.
if mm_charges is None:
msg = "'mm_charges' are required when interpolating"
_logger.error(msg)
raise ValueError(msg)
# Make sure indices for the QM region have been passed.
if qm_indices is None:
msg = "'qm_indices' must be specified when interpolating"
_logger.error(msg)
raise ValueError(msg)
# Validate the indices. Note that we don't check that the are valid, only
# that they are the correct type.
if isinstance(qm_indices, list):
if not all(isinstance(x, int) for x in qm_indices):
msg = "'qm_indices' must be a list of 'int' types"
_logger.error(msg)
raise TypeError(msg)
self._qm_indices = qm_indices
elif isinstance(qm_indices, str):
# Convert to an absolute path.
qm_indices = _os.path.abspath(qm_indices)
if not _os.path.isfile(qm_indices):
msg = f"Unable to locate 'qm_indices' file: {qm_indices}"
_logger.error(msg)
raise IOError(msg)
# Read the indices into a list.
indices = []
with open(qm_indices, "r") as f:
for line in f:
try:
indices.append(int(line.strip()))
except:
msg = f"Unable to read 'qm_indices' from file: {qm_indices}"
_logger.error(msg)
raise ValueError(msg)
self._qm_indices = indices
else:
msg = "'qm_indices' must be of type 'list' or 'str'"
_logger.error(msg)
raise TypeError(msg)
# Make sure the number of interpolation steps has been set if more
# than one lambda value has been specified.
if len(self._lambda_interpolate) == 2:
if interpolate_steps is None:
msg = "'interpolate_steps' must be specified when interpolating between two lambda values"
_logger.error(msg)
raise ValueError(msg)
else:
try:
interpolate_steps = int(interpolate_steps)
except:
msg = "'interpolate_steps' must be of type 'int'"
_logger.error(msg)
raise TypeError(msg)
if interpolate_steps < 0:
msg = "'interpolate_steps' must be greater than or equal to 0"
_logger.error(msg)
raise ValueError(msg)
self._interpolate_steps = interpolate_steps
# Create an MM EMLE model for interpolation.
self._emle_mm = _EMLE(
model=model,
alpha_mode=alpha_mode,
atomic_numbers=atomic_numbers,
method="mm",
mm_charges=self._mm_charges,
device=self._device,
)
else:
self._is_interpolate = False
if energy_frequency is None:
energy_frequency = 0
if not isinstance(energy_frequency, int):
msg = "'energy_frequency' must be of type 'int'"
_logger.error(msg)
raise TypeError(msg)
else:
self._energy_frequency = energy_frequency
if energy_file is None:
energy_file = "emle_energy.txt"
else:
if not isinstance(energy_file, str):
msg = "'energy_file' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
# Try to create the directory.
dirname = _os.path.dirname(energy_file)
if dirname != "":
try:
_os.makedirs(dirname, exist_ok=True)
except:
msg = f"Unable to create directory for energy file: {energy_file}"
_logger.error(msg)
raise IOError(msg)
self._energy_file = _os.path.abspath(energy_file)
if save_settings is None:
save_settings = True
if not isinstance(save_settings, bool):
msg = "'save_settings' must be of type 'bool'"
_logger.error(msg)
raise TypeError(msg)
else:
self._save_settings = save_settings
if orca_template is not None:
if not isinstance(orca_template, str):
msg = "'orca_template' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
# Convert to an absolute path.
abs_orca_template = _os.path.abspath(orca_template)
if not _os.path.isfile(abs_orca_template):
msg = f"Unable to locate ORCA template file: '{orca_template}'"
_logger.error(msg)
raise IOError(msg)
self._orca_template = abs_orca_template
else:
self._orca_template = None
# Intialise the maximum number of MM atoms that have been seen.
self._max_mm_atoms = 0
# Initialise the number of steps. (Calls to the calculator.)
self._step = 0
# Flag whether to skip logging the first call to the server. This is
# used to avoid writing duplicate energy records since sander will call
# orca on startup when not performing a restart simulation, i.e. not
# just after each integration step.
self._is_first_step = not self._restart
# Get the settings from the internal EMLE model.
self._model = self._emle._model
self._method = self._emle._method
self._alpha_mode = self._emle._alpha_mode
self._atomic_numbers = self._emle._atomic_numbers
if isinstance(atomic_numbers, _np.ndarray):
atomic_numbers = atomic_numbers.tolist()
# Store the settings as a dictionary.
self._settings = {
"model": None if model is None else self._model,
"method": self._method,
"alpha_mode": self._alpha_mode,
"atomic_numbers": None if atomic_numbers is None else atomic_numbers,
"qm_charge": self._qm_charge,
"backend": self._backend,
"external_backend": None if external_backend is None else external_backend,
"mm_charges": None if mm_charges is None else self._mm_charges.tolist(),
"deepmd_model": deepmd_model,
"deepmd_deviation": deepmd_deviation,
"deepmd_deviation_threshold": deepmd_deviation_threshold,
"qm_xyz_file": qm_xyz_file,
"pc_xyz_file": pc_xyz_file,
"qm_xyz_frequency": qm_xyz_frequency,
"ani2x_model_index": ani2x_model_index,
"mace_model": None if mace_model is None else self._mace_model,
"rascal_model": rascal_model,
"parm7": parm7,
"qm_indices": None if qm_indices is None else self._qm_indices,
"orca_path": orca_path,
"sqm_theory": sqm_theory,
"lambda_interpolate": lambda_interpolate,
"interpolate_steps": interpolate_steps,
"restart": restart,
"device": device,
"orca_template": None if orca_template is None else self._orca_template,
"plugin_path": plugin_path,
"energy_frequency": energy_frequency,
"energy_file": energy_file,
"log_level": self._log_level,
"log_file": log_file,
}
# Write to a YAML file.
if save_settings:
with open("emle_settings.yaml", "w") as f:
_yaml.dump(self._settings, f)
[docs]
def run(self, path=None):
"""
Calculate the energy and gradients.
Parameters
----------
path: str
Path to the working directory of the sander process.
"""
if path is not None:
if not isinstance(path, str):
msg = "'path' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
if not _os.path.isdir(path):
msg = f"sander process path does not exist: {path}"
_logger.error(msg)
raise ValueError(msg)
orca_input = f"{path}/orc_job.inp"
else:
orca_input = "orc_job.inp"
# Parse the ORCA input file.
(
dirname,
charge,
multi,
atoms,
atomic_numbers,
xyz_qm,
xyz_mm,
charges_mm,
xyz_file_qm,
) = self._parse_orca_input(orca_input)
# Make sure that the number of QM atoms matches the number of MM charges
# when using mm embedding.
if self._method == "mm":
if len(xyz_qm) != len(self._mm_charges):
msg = (
"MM embedding is specified but the number of atoms in the QM "
f"region ({len(xyz_qm)}) doesn't match the number of MM charges "
f"({len(self._mm_charges)})"
)
_logger.error(msg)
raise ValueError(msg)
# Compute the energy and gradients.
E_vac, grad_vac, E_tot, grad_qm, grad_mm = self._calculate_energy_and_gradients(
atomic_numbers,
charges_mm,
xyz_qm,
xyz_mm,
atoms=atoms,
charge=charge,
)
# Create the file names for the ORCA format output.
filename = _os.path.splitext(orca_input)[0]
engrad = filename + ".engrad"
pcgrad = filename + ".pcgrad"
# Store the number of MM atoms.
num_mm_atoms = len(charges_mm)
with open(engrad, "w") as f:
# Write the energy.
f.write("# The current total energy in Eh\n")
f.write("#\n")
f.write(f"{E_tot:22.12f}\n")
# Write the QM gradients.
f.write("# The current gradient in Eh/bohr\n")
f.write("#\n")
for x, y, z in grad_qm:
f.write(f"{x:16.10f}\n{y:16.10f}\n{z:16.10f}\n")
with open(pcgrad, "w") as f:
# Write the number of MM atoms.
f.write(f"{num_mm_atoms}\n")
# Write the MM gradients.
for x, y, z in grad_mm[:num_mm_atoms]:
f.write(f"{x:17.12f}{y:17.12f}{z:17.12f}\n")
# Log energies to file.
if (
self._energy_frequency > 0
and not self._is_first_step
and self._step % self._energy_frequency == 0
):
with open(self._energy_file, "a+") as f:
# Write the header.
if self._step == 0:
if self._is_interpolate:
f.write(
f"#{'Step':>9}{'λ':>22}{'E(λ) (Eh)':>22}{'E(λ=0) (Eh)':>22}{'E(λ=1) (Eh)':>22}\n"
)
else:
f.write(f"#{'Step':>9}{'E_vac (Eh)':>22}{'E_tot (Eh)':>22}\n")
# Write the record.
if self._is_interpolate:
f.write(
f"{self._step:>10}{lam:22.12f}{E_tot:22.12f}{E_mm:22.12f}{E_emle:22.12f}\n"
)
else:
f.write(f"{self._step:>10}{E_vac:22.12f}{E_tot:22.12f}\n")
# Write out the QM region to the xyz trajectory file.
if self._qm_xyz_frequency > 0 and self._step % self._qm_xyz_frequency == 0:
atoms = _ase.Atoms(positions=xyz_qm, numbers=atomic_numbers)
if hasattr(self._backend, "_max_f_std"):
atoms.info = {"max_f_std": self._max_f_std}
_ase_io.write(self._qm_xyz_file, atoms, append=True)
pc_data = _np.hstack((charges_mm[:, None], xyz_mm))
pc_data = pc_data[pc_data[:, 0] != 0]
with open(self._pc_xyz_file, "a") as f:
f.write(f"{len(pc_data)}\n")
_np.savetxt(f, pc_data, fmt="%14.6f")
f.write("\n")
# Increment the step counter.
if self._is_first_step:
self._is_first_step = False
else:
self._step += 1
_logger.info(f"Step: {self._step}")
def _calculate_energy_and_gradients(
self,
atomic_numbers,
charges_mm,
xyz_qm,
xyz_mm,
atoms=None,
charge=0,
):
"""
Calculate the energy and gradients.
Parameters
----------
atomic_numbers: numpy.ndarray, (N_QM_ATOMS)
Atomic numbers for the QM region.
charges_mm: numpy.ndarray, (N_MM_ATOMS)
The charges on the MM atoms.
xyz_qm: numpy.ndarray, (N_QM_ATOMS, 3)
The QM coordinates.
xyz_mm: numpy.ndarray, (N_MM_ATOMS, 3)
The MM coordinates.
atoms: ase.Atoms
The atoms object for the QM region.
charge: int
The total charge of the QM region.
Returns
-------
E_tot: float
The total energy.
grad_qm: numpy.ndarray, (N_QM_ATOMS, 3)
The QM gradients.
grad_mm: numpy.ndarray, (N_MM_ATOMS, 3)
The MM gradients.
"""
# Update the maximum number of MM atoms if this is the largest seen.
num_mm_atoms = len(charges_mm)
if num_mm_atoms > self._max_mm_atoms:
self._max_mm_atoms = num_mm_atoms
# Pad the MM coordinates and charges arrays to avoid re-jitting.
if self._max_mm_atoms > num_mm_atoms:
num_pad = self._max_mm_atoms - num_mm_atoms
xyz_mm_pad = num_pad * [[0.0, 0.0, 0.0]]
charges_mm_pad = num_pad * [0.0]
xyz_mm = _np.append(xyz_mm, xyz_mm_pad, axis=0)
charges_mm = _np.append(charges_mm, charges_mm_pad)
# First try to use the specified backend to compute in vacuo
# energies and (optionally) gradients.
# Base and delta-learning correction models.
base_model = None
delta_model = None
# Zero the in vacuo energy and gradients.
E_vac = 0.0
grad_vac = _np.zeros_like(xyz_qm)
# Internal backends.
if not self._is_external_backend:
if self._backend is not None:
# Enumerate the backends.
for i, backend in enumerate(self._backends):
# This is an EMLE Torch model.
if isinstance(backend, _torch.nn.Module):
# Base backend is an EMLE Torch model.
if i == 0:
base_model = backend
# Delta-learning correction is applied using an EMLE Torch model.
else:
delta_model = backend
# This is a non-Torch backend.
else:
try:
# Add the in vacuo energy and gradients to the total.
energy, forces = backend(atomic_numbers, xyz_qm)
E_vac += energy[0]
grad_vac += -forces[0]
except Exception as e:
msg = f"Failed to calculate in vacuo energies using {self._backend[i]} backend: {e}"
_logger.error(msg)
raise RuntimeError(msg)
# No backend.
else:
E_vac, grad_vac = 0.0, _np.zeros_like(xyz_qm.detatch().cpu())
# External backend.
else:
try:
if atoms is None:
atoms = _ase.Atoms(positions=xyz_qm, numbers=atomic_numbers)
E_vac, grad_vac = self._external_backend(atoms)
except Exception as e:
msg = (
f"Failed to calculate in vacuo energies using external backend: {e}"
)
_logger.error(msg)
raise RuntimeError(msg)
# Store a copy of the atomic numbers and QM coordinates as NumPy arrays.
atomic_numbers_np = atomic_numbers
xyz_qm_np = xyz_qm
# Convert inputs to Torch tensors.
atomic_numbers = _torch.tensor(
atomic_numbers, dtype=_torch.int64, device=self._device
)
charges_mm = _torch.tensor(
charges_mm, dtype=_torch.float32, device=self._device
)
xyz_qm = _torch.tensor(
xyz_qm, dtype=_torch.float32, device=self._device, requires_grad=True
)
xyz_mm = _torch.tensor(
xyz_mm, dtype=_torch.float32, device=self._device, requires_grad=True
)
# Are there any MM atoms?
allow_unused = len(charges_mm) == 0
# Apply delta-learning corrections using an EMLE model.
if delta_model is not None:
model = delta_model.original_name
try:
# Create null MM inputs.
null_charges_mm = _torch.zeros_like(charges_mm)
null_xyz_mm = _torch.zeros_like(xyz_mm)
# Compute the energy.
E = delta_model(
atomic_numbers, null_charges_mm, xyz_qm, null_xyz_mm, charge
)
# Compute the gradients.
dE_dxyz_qm = _torch.autograd.grad(E.sum(), xyz_qm)
# Compute the delta correction.
delta_E = E.sum().detach().cpu().numpy()
delta_grad = dE_dxyz_qm[0].cpu().numpy() * _BOHR_TO_ANGSTROM
# Apply the correction.
E_vac += delta_E
grad_vac += delta_grad
except Exception as e:
msg = f"Failed to apply delta-learning correction using {model} model: {e}"
_logger.error(msg)
raise RuntimeError(msg)
# Compute embedding energy and gradients.
if base_model is None:
try:
E = self._emle(atomic_numbers, charges_mm, xyz_qm, xyz_mm, charge)
dE_dxyz_qm, dE_dxyz_mm = _torch.autograd.grad(
E.sum(), (xyz_qm, xyz_mm), allow_unused=allow_unused
)
dE_dxyz_qm_bohr = dE_dxyz_qm.cpu().numpy() * _BOHR_TO_ANGSTROM
dE_dxyz_mm_bohr = dE_dxyz_mm.cpu().numpy() * _BOHR_TO_ANGSTROM
# Compute the total energy and gradients.
E_tot = E_vac + E.sum().detach().cpu().numpy()
grad_qm = dE_dxyz_qm_bohr + grad_vac
grad_mm = dE_dxyz_mm_bohr
except Exception as e:
msg = f"Failed to compute EMLE energies and gradients: {e}"
_logger.error(msg)
raise RuntimeError(msg)
# Compute in vacuo and embedding energies and gradients in one go using
# the EMLE Torch models
else:
model = base_model.original_name
try:
with _torch.jit.optimized_execution(False):
E = base_model(atomic_numbers, charges_mm, xyz_qm, xyz_mm, charge)
dE_dxyz_qm, dE_dxyz_mm = _torch.autograd.grad(
E.sum(), (xyz_qm, xyz_mm), allow_unused=allow_unused
)
grad_qm = grad_vac + dE_dxyz_qm.cpu().numpy() * _BOHR_TO_ANGSTROM
grad_mm = dE_dxyz_mm.cpu().numpy() * _BOHR_TO_ANGSTROM
E_tot = E_vac + E.sum().detach().cpu().numpy()
except Exception as e:
msg = f"Failed to compute {model} energies and gradients: {e}"
_logger.error(msg)
raise RuntimeError(msg)
if self._is_interpolate:
# Compute the in vacuo MM energy and gradients for the QM region.
if self._backend != None:
from ._backends import Sander
# Create a Sander backend instance.
backend = Sander(self._parm7)
# Compute the in vacuo MM energy and forces for the QM region.
energy, forces = backend(atomic_numbers_np, xyz_qm_np)
E_mm_qm_vac = energy[0]
grad_mm_qm_vac = -forces[0]
# If no backend is specified, then the MM energy and gradients are zero.
else:
E_mm_qm_vac, grad_mm_qm_vac = 0.0, _np.zeros_like(xyz_qm)
# Compute the embedding contributions.
E = self._emle_mm(atomic_numbers, charges_mm, xyz_qm, xyz_mm, charge)
dE_dxyz_qm, dE_dxyz_mm = _torch.autograd.grad(
E.sum(), (xyz_qm, xyz_mm), allow_unused=allow_unused
)
dE_dxyz_qm_bohr = dE_dxyz_qm.cpu().numpy() * _BOHR_TO_ANGSTROM
dE_dxyz_mm_bohr = dE_dxyz_mm.cpu().numpy() * _BOHR_TO_ANGSTROM
# Store the the MM and EMLE energies. The MM energy is an approximation.
E_mm = E_mm_qm_vac + E.sum().detach().cpu().numpy()
E_emle = E_tot
# Work out the current value of lambda.
if len(self._lambda_interpolate) == 1:
lam = self._lambda_interpolate[0]
else:
offset = int(not self._restart)
lam = self._lambda_interpolate[0] + (
(self._step / (self._interpolate_steps - offset))
) * (self._lambda_interpolate[1] - self._lambda_interpolate[0])
if lam < 0.0:
lam = 0.0
elif lam > 1.0:
lam = 1.0
# Calculate the lambda weighted energy and gradients.
E_tot = lam * E_tot + (1 - lam) * E_mm
grad_qm = lam * grad_qm + (1 - lam) * (grad_mm_qm_vac + dE_dxyz_qm_bohr)
grad_mm = lam * grad_mm + (1 - lam) * dE_dxyz_mm_bohr
return E_vac, grad_vac, E_tot, grad_qm, grad_mm
[docs]
def set_lambda_interpolate(self, lambda_interpolate):
"""
Set the value of the lambda interpolation parameter. Note the server must
already be in 'interpolation' mode, i.e. the user must have specified an
initial value for 'lambda_interpolate' in the constructor.
Parameters
----------
lambda_interpolate: float, [float, float]
The value of lambda to use for interpolating between pure MM
(lambda=0) and ML/MM (lambda=1) potentials.and. If two lambda
values are specified, the calculator will gradually interpolate
between them when called multiple times.
"""
if not self._is_interpolate:
msg = "Server is not in interpolation mode!"
_logger.error(msg)
raise Exception(msg)
elif (
self._lambda_interpolate is not None and len(self._lambda_interpolate) == 2
):
msg = "Cannot set lambda when interpolating between two lambda values!"
_logger.error(msg)
raise Exception(msg)
if isinstance(lambda_interpolate, (list, tuple)):
if len(lambda_interpolate) not in [1, 2]:
msg = "'lambda_interpolate' must be a single value or a list/tuple of two values"
_logger.error(msg)
raise ValueError(msg)
try:
lambda_interpolate = [float(x) for x in lambda_interpolate]
except:
msg = "'lambda_interpolate' must be a single value or a list/tuple of two values"
_logger.error(msg)
raise TypeError(msg)
if not all(0.0 <= x <= 1.0 for x in lambda_interpolate):
msg = "'lambda_interpolate' must be between 0 and 1 for both values"
_logger.error(msg)
raise ValueError(msg)
if len(lambda_interpolate) == 2:
if _np.isclose(lambda_interpolate[0], lambda_interpolate[1], atol=1e-6):
msg = "The two values of 'lambda_interpolate' must be different"
_logger.error(msg)
raise ValueError(msg)
self._lambda_interpolate = lambda_interpolate
elif isinstance(lambda_interpolate, (int, float)):
lambda_interpolate = float(lambda_interpolate)
if not 0.0 <= lambda_interpolate <= 1.0:
msg = "'lambda_interpolate' must be between 0 and 1"
_logger.error(msg)
raise ValueError(msg)
self._lambda_interpolate = [lambda_interpolate]
# Reset the first step flag.
self._is_first_step = not self._restart
def _sire_callback(self, atomic_numbers, charges_mm, xyz_qm, xyz_mm, idx_mm=None):
"""
A callback function to be used with Sire.
Parameters
----------
atomic_numbers: [float]
A list of atomic numbers for the QM region.
charges_mm: [float]
The charges on the MM atoms.
xyz_qm: [[float, float, float]]
The coordinates of the QM atoms in Angstrom.
xyz_mm: [[float, float, float]]
The coordinates of the MM atoms in Angstrom.
idx_mm: [int]
A list of indices of the MM atoms in the QM/MM region.
Note that len(idx_mm) <= len(charges_mm) since it only
contains the indices of true MM atoms, not link atoms
or virtual charges.
Returns
-------
energy: float
The energy in kJ/mol.
force_qm: [[float, float, float]]
The forces on the QM atoms in kJ/mol/nanometer.
force_mm: [[float, float, float]]
The forces on the MM atoms in kJ/mol/nanometer.
"""
# For performance, we assume that the input is already validated.
# Convert to NumPy arrays.
atomic_numbers = _np.array(atomic_numbers)
charges_mm = _np.array(charges_mm)
xyz_qm = _np.array(xyz_qm)
xyz_mm = _np.array(xyz_mm)
# Make sure that the number of QM atoms matches the number of MM charges
# when using mm embedding.
if self._method == "mm":
if len(xyz_qm) != len(self._mm_charges):
msg = (
"MM embedding is specified but the number of atoms in the "
f"QM region ({len(xyz_qm)}) doesn't match the number of MM "
f"charges ({len(self._mm_charges)})"
)
_logger.error(msg)
raise ValueError(msg)
# Compute the energy and gradients.
E_vac, grad_vac, E_tot, grad_qm, grad_mm = self._calculate_energy_and_gradients(
atomic_numbers,
charges_mm,
xyz_qm,
xyz_mm,
)
# Store the number of MM atoms.
num_mm_atoms = len(charges_mm)
# Log energies to file.
if (
self._energy_frequency > 0
and not self._is_first_step
and self._step % self._energy_frequency == 0
):
with open(self._energy_file, "a+") as f:
# Write the header.
if self._step == 0:
if self._is_interpolate:
f.write(
f"#{'Step':>9}{'λ':>22}{'E(λ) (Eh)':>22}{'E(λ=0) (Eh)':>22}{'E(λ=1) (Eh)':>22}\n"
)
else:
f.write(f"#{'Step':>9}{'E_vac (Eh)':>22}{'E_tot (Eh)':>22}\n")
# Write the record.
if self._is_interpolate:
f.write(
f"{self._step:>10}{lam:22.12f}{E_tot:22.12f}{E_mm:22.12f}{E_emle:22.12f}\n"
)
else:
f.write(f"{self._step:>10}{E_vac:22.12f}{E_tot:22.12f}\n")
# Increment the step counter.
if self._is_first_step:
self._is_first_step = False
else:
self._step += 1
# Return the energy and forces in OpenMM units.
return (
E_tot.item() * _HARTREE_TO_KJ_MOL,
(-grad_qm * _HARTREE_TO_KJ_MOL * _NANOMETER_TO_BOHR).tolist(),
(
-grad_mm[:num_mm_atoms] * _HARTREE_TO_KJ_MOL * _NANOMETER_TO_BOHR
).tolist(),
)
@staticmethod
def _parse_orca_input(orca_input):
"""
Internal method to parse an ORCA input file.
Parameters
----------
orca_input: str
The path to the ORCA input file.
Returns
-------
dirname: str
The path to the directory containing the ORCA file.
charge: int
The charge on the QM region.
mult: int
The spin multiplicity of the QM region.
atoms: ase.Atoms
The atoms in the QM region.
atomic_numbers: numpy.array
The atomic numbers of the atoms in the QM region.
xyz_qm: numpy.array
The positions of the atoms in the QM region.
xyz_mm: numpy.array
The positions of the atoms in the MM region.
charges_mm: numpy.array
The charges of the atoms in the MM region.
xyz_file_qm: str
The path to the QM xyz file.
"""
if not isinstance(orca_input, str):
msg = "'orca_input' must be of type 'str'"
_logger.error(msg)
raise TypeError(msg)
if not _os.path.isfile(orca_input):
msg = f"Unable to locate the ORCA input file: {orca_input}"
_logger.error(msg)
raise IOError(msg)
# Store the directory name for the file. Files within the input file
# should be relative to this.
dirname = _os.path.dirname(orca_input)
if dirname:
dirname += "/"
else:
dirname = "./"
# Null the required information from the input file.
charge = None
mult = None
xyz_file_qm = None
xyz_file_mm = None
# Parse the file for the required information.
with open(orca_input, "r") as f:
for line in f:
if line.startswith("%pointcharges"):
xyz_file_mm = str(line.split()[1]).replace('"', "")
elif line.startswith("*xyzfile"):
data = line.split()
charge = int(data[1])
mult = int(data[2])
xyz_file_qm = str(data[3]).replace('"', "")
# Validate that the information was found.
if charge is None:
msg = "Unable to determine QM charge from ORCA input."
_logger.error(msg)
raise ValueError(msg)
if mult is None:
msg = "Unable to determine QM spin multiplicity from ORCA input."
_logger.error(msg)
raise ValueError(msg)
if xyz_file_qm is None:
msg = "Unable to determine QM xyz file from ORCA input."
_logger.error(msg)
raise ValueError(msg)
else:
if not _os.path.isfile(xyz_file_qm):
xyz_file_qm = dirname + xyz_file_qm
if not _os.path.isfile(xyz_file_qm):
msg = f"Unable to locate QM xyz file: {xyz_file_qm}"
_logger.error(msg)
raise ValueError(msg)
if xyz_file_mm is None:
msg = "Unable to determine MM xyz file from ORCA input."
_logger.error(msg)
raise ValueError(msg)
else:
if not _os.path.isfile(xyz_file_mm):
xyz_file_mm = dirname + xyz_file_mm
if not _os.path.isfile(xyz_file_mm):
msg = f"Unable to locate MM xyz file: {xyz_file_mm}"
_logger.error(msg)
raise ValueError(msg)
# Process the QM xyz file.
try:
atoms = _ase_io.read(xyz_file_qm)
except:
msg = f"Unable to read QM xyz file: {xyz_file_qm}"
_logger.error(msg)
raise IOError(msg)
charges_mm = []
xyz_mm = []
# Process the MM xyz file. (Charges plus coordinates.)
with open(xyz_file_mm, "r") as f:
for line in f:
data = line.split()
# MM records have four entries per line.
if len(data) == 4:
try:
charges_mm.append(float(data[0]))
except:
msg = "Unable to parse MM charge."
_logger.error(msg)
raise ValueError(msg)
try:
xyz_mm.append([float(x) for x in data[1:]])
except:
msg = "Unable to parse MM coordinates."
_logger.error(msg)
raise ValueError(msg)
# Convert to NumPy arrays.
charges_mm = _np.array(charges_mm)
xyz_mm = _np.array(xyz_mm)
return (
dirname,
charge,
mult,
atoms,
atoms.get_atomic_numbers(),
atoms.get_positions(),
xyz_mm,
charges_mm,
xyz_file_qm,
)