Coverage for /builds/alexhroom/ase/ase/calculators/genericfileio.py: 86.51%
126 statements
« prev ^ index » next coverage.py v7.5.3, created at 2024-08-05 14:37 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2024-08-05 14:37 +0000
1from abc import ABC, abstractmethod
2from contextlib import ExitStack
3from os import PathLike
4from pathlib import Path
5import shlex
6from typing import Any, Iterable, List, Mapping, Optional, Set
8from ase.calculators.abc import GetOutputsMixin
9from ase.calculators.calculator import (
10 BaseCalculator, _validate_command, BadConfiguration)
11from ase.config import cfg as _cfg
14class BaseProfile(ABC):
15 configvars: Set[str] = set()
17 def __init__(self, command):
18 self.command = _validate_command(command)
20 @property
21 def _split_command(self):
22 return shlex.split(self.command)
24 def get_command(self, inputfile, calc_command=None) -> List[str]:
25 """
26 Get the command to run. This should be a list of strings.
28 Parameters
29 ----------
30 inputfile : str
31 calc_command: list[str]: calculator command (used for sockets)
33 Returns
34 -------
35 list of str
36 The command to run.
37 """
38 if calc_command is None:
39 calc_command = self.get_calculator_command(inputfile)
40 return [*self._split_command, *calc_command]
42 @abstractmethod
43 def get_calculator_command(self, inputfile):
44 """
45 The calculator specific command as a list of strings.
47 Parameters
48 ----------
49 inputfile : str
51 Returns
52 -------
53 list of str
54 The command to run.
55 """
57 def run(
58 self, directory: Path, inputfile: Optional[str],
59 outputfile: str, errorfile: Optional[str] = None,
60 append: bool = False
61 ) -> None:
62 """
63 Run the command in the given directory.
65 Parameters
66 ----------
67 directory : pathlib.Path
68 The directory to run the command in.
69 inputfile : Optional[str]
70 The name of the input file.
71 outputfile : str
72 The name of the output file.
73 errorfile: Optional[str]
74 the stderror file
75 append: bool
76 if True then use append mode
77 """
79 import os
80 from subprocess import check_call
82 argv_command = self.get_command(inputfile)
83 mode = 'wb' if not append else 'ab'
85 with ExitStack() as stack:
86 output_path = directory / outputfile
87 fd_out = stack.enter_context(open(output_path, mode))
88 if errorfile is not None:
89 error_path = directory / errorfile
90 fd_err = stack.enter_context(open(error_path, mode))
91 else:
92 fd_err = None
93 check_call(
94 argv_command,
95 cwd=directory,
96 stdout=fd_out,
97 stderr=fd_err,
98 env=os.environ,
99 )
101 @abstractmethod
102 def version(self):
103 """Get the version of the code.
105 Returns
106 -------
107 str
108 The version of the code.
109 """
111 @classmethod
112 def from_config(cls, cfg, section_name):
113 """Create a profile from a configuration file.
115 Parameters
116 ----------
117 cfg : ase.config.Config
118 The configuration object.
119 section_name : str
120 The name of the section in the configuration file. E.g. the name
121 of the template that this profile is for.
123 Returns
124 -------
125 BaseProfile
126 The profile object.
127 """
128 section = cfg.parser[section_name]
129 command = section['command']
131 kwargs = {
132 varname: section[varname]
133 for varname in cls.configvars if varname in section
134 }
136 try:
137 return cls(command=command, **kwargs)
138 except TypeError as err:
139 raise BadConfiguration(*err.args)
142def read_stdout(args, createfile=None):
143 """Run command in tempdir and return standard output.
145 Helper function for getting version numbers of DFT codes.
146 Most DFT codes don't implement a --version flag, so in order to
147 determine the code version, we just run the code until it prints
148 a version number."""
149 import tempfile
150 from subprocess import PIPE, Popen
152 with tempfile.TemporaryDirectory() as directory:
153 if createfile is not None:
154 path = Path(directory) / createfile
155 path.touch()
156 proc = Popen(
157 args,
158 stdout=PIPE,
159 stderr=PIPE,
160 stdin=PIPE,
161 cwd=directory,
162 encoding='utf-8', # Make this a parameter if any non-utf8/ascii
163 )
164 stdout, _ = proc.communicate()
165 # Exit code will be != 0 because there isn't an input file
166 return stdout
169class CalculatorTemplate(ABC):
170 def __init__(self, name: str, implemented_properties: Iterable[str]):
171 self.name = name
172 self.implemented_properties = frozenset(implemented_properties)
174 @abstractmethod
175 def write_input(self, profile, directory, atoms, parameters, properties):
176 ...
178 @abstractmethod
179 def execute(self, directory, profile):
180 ...
182 @abstractmethod
183 def read_results(self, directory: PathLike) -> Mapping[str, Any]:
184 ...
186 @abstractmethod
187 def load_profile(self, cfg):
188 ...
190 def socketio_calculator(
191 self,
192 profile,
193 parameters,
194 directory,
195 # We may need quite a few socket kwargs here
196 # if we want to expose all the timeout etc. from
197 # SocketIOCalculator.
198 unixsocket=None,
199 port=None,
200 ):
201 import os
202 from subprocess import Popen
204 from ase.calculators.socketio import SocketIOCalculator
206 if port and unixsocket:
207 raise TypeError(
208 'For the socketio_calculator only a UNIX '
209 '(unixsocket) or INET (port) socket can be used'
210 ' not both.'
211 )
213 if not port and not unixsocket:
214 raise TypeError(
215 'For the socketio_calculator either a '
216 'UNIX (unixsocket) or INET (port) socket '
217 'must be used'
218 )
220 if not (
221 hasattr(self, 'socketio_argv')
222 and hasattr(self, 'socketio_parameters')
223 ):
224 raise TypeError(
225 f'Template {self} does not implement mandatory '
226 'socketio_argv() and socketio_parameters()'
227 )
229 # XXX need socketio ABC or something
230 argv = profile.get_command(
231 inputfile=None,
232 calc_command=self.socketio_argv(profile, unixsocket, port)
233 )
234 parameters = {
235 **self.socketio_parameters(unixsocket, port),
236 **parameters,
237 }
239 # Not so elegant that socket args are passed to this function
240 # via socketiocalculator when we could make a closure right here.
241 def launch(atoms, properties, port, unixsocket):
242 directory.mkdir(exist_ok=True, parents=True)
244 self.write_input(
245 atoms=atoms,
246 profile=profile,
247 parameters=parameters,
248 properties=properties,
249 directory=directory,
250 )
252 with open(directory / self.outputname, 'w') as out_fd:
253 return Popen(argv, stdout=out_fd, cwd=directory, env=os.environ)
255 return SocketIOCalculator(
256 launch_client=launch, unixsocket=unixsocket, port=port
257 )
260class GenericFileIOCalculator(BaseCalculator, GetOutputsMixin):
261 cfg = _cfg
263 def __init__(
264 self,
265 *,
266 template,
267 profile,
268 directory,
269 parameters=None,
270 ):
271 self.template = template
272 if profile is None:
273 if template.name not in self.cfg.parser:
274 raise BadConfiguration(f'No configuration of {template.name}')
275 try:
276 profile = template.load_profile(self.cfg)
277 except Exception as err:
278 configvars = self.cfg.as_dict()
279 raise BadConfiguration(
280 f'Failed to load section [{template.name}] '
281 f'from configuration: {configvars}'
282 ) from err
284 self.profile = profile
286 # Maybe we should allow directory to be a factory, so
287 # calculators e.g. produce new directories on demand.
288 self.directory = Path(directory)
289 super().__init__(parameters)
291 def set(self, *args, **kwargs):
292 raise RuntimeError(
293 'No setting parameters for now, please. '
294 'Just create new calculators.'
295 )
297 def __repr__(self):
298 return f'{type(self).__name__}({self.template.name})'
300 @property
301 def implemented_properties(self):
302 return self.template.implemented_properties
304 @property
305 def name(self):
306 return self.template.name
308 def write_inputfiles(self, atoms, properties):
309 # SocketIOCalculators like to write inputfiles
310 # without calculating.
311 self.directory.mkdir(exist_ok=True, parents=True)
312 self.template.write_input(
313 profile=self.profile,
314 atoms=atoms,
315 parameters=self.parameters,
316 properties=properties,
317 directory=self.directory,
318 )
320 def calculate(self, atoms, properties, system_changes):
321 self.write_inputfiles(atoms, properties)
322 self.template.execute(self.directory, self.profile)
323 self.results = self.template.read_results(self.directory)
324 # XXX Return something useful?
326 def _outputmixin_get_results(self):
327 return self.results
329 def socketio(self, **socketkwargs):
330 return self.template.socketio_calculator(
331 directory=self.directory,
332 parameters=self.parameters,
333 profile=self.profile,
334 **socketkwargs,
335 )