Coverage for /builds/alexhroom/ase/ase/calculators/genericfileio.py: 86.51%

126 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2024-08-05 14:37 +0000

1from abc import ABC, abstractmethod 

2from contextlib import ExitStack 

3from os import PathLike 

4from pathlib import Path 

5import shlex 

6from typing import Any, Iterable, List, Mapping, Optional, Set 

7 

8from ase.calculators.abc import GetOutputsMixin 

9from ase.calculators.calculator import ( 

10 BaseCalculator, _validate_command, BadConfiguration) 

11from ase.config import cfg as _cfg 

12 

13 

14class BaseProfile(ABC): 

15 configvars: Set[str] = set() 

16 

17 def __init__(self, command): 

18 self.command = _validate_command(command) 

19 

20 @property 

21 def _split_command(self): 

22 return shlex.split(self.command) 

23 

24 def get_command(self, inputfile, calc_command=None) -> List[str]: 

25 """ 

26 Get the command to run. This should be a list of strings. 

27 

28 Parameters 

29 ---------- 

30 inputfile : str 

31 calc_command: list[str]: calculator command (used for sockets) 

32 

33 Returns 

34 ------- 

35 list of str 

36 The command to run. 

37 """ 

38 if calc_command is None: 

39 calc_command = self.get_calculator_command(inputfile) 

40 return [*self._split_command, *calc_command] 

41 

42 @abstractmethod 

43 def get_calculator_command(self, inputfile): 

44 """ 

45 The calculator specific command as a list of strings. 

46 

47 Parameters 

48 ---------- 

49 inputfile : str 

50 

51 Returns 

52 ------- 

53 list of str 

54 The command to run. 

55 """ 

56 

57 def run( 

58 self, directory: Path, inputfile: Optional[str], 

59 outputfile: str, errorfile: Optional[str] = None, 

60 append: bool = False 

61 ) -> None: 

62 """ 

63 Run the command in the given directory. 

64 

65 Parameters 

66 ---------- 

67 directory : pathlib.Path 

68 The directory to run the command in. 

69 inputfile : Optional[str] 

70 The name of the input file. 

71 outputfile : str 

72 The name of the output file. 

73 errorfile: Optional[str] 

74 the stderror file 

75 append: bool 

76 if True then use append mode 

77 """ 

78 

79 import os 

80 from subprocess import check_call 

81 

82 argv_command = self.get_command(inputfile) 

83 mode = 'wb' if not append else 'ab' 

84 

85 with ExitStack() as stack: 

86 output_path = directory / outputfile 

87 fd_out = stack.enter_context(open(output_path, mode)) 

88 if errorfile is not None: 

89 error_path = directory / errorfile 

90 fd_err = stack.enter_context(open(error_path, mode)) 

91 else: 

92 fd_err = None 

93 check_call( 

94 argv_command, 

95 cwd=directory, 

96 stdout=fd_out, 

97 stderr=fd_err, 

98 env=os.environ, 

99 ) 

100 

101 @abstractmethod 

102 def version(self): 

103 """Get the version of the code. 

104 

105 Returns 

106 ------- 

107 str 

108 The version of the code. 

109 """ 

110 

111 @classmethod 

112 def from_config(cls, cfg, section_name): 

113 """Create a profile from a configuration file. 

114 

115 Parameters 

116 ---------- 

117 cfg : ase.config.Config 

118 The configuration object. 

119 section_name : str 

120 The name of the section in the configuration file. E.g. the name 

121 of the template that this profile is for. 

122 

123 Returns 

124 ------- 

125 BaseProfile 

126 The profile object. 

127 """ 

128 section = cfg.parser[section_name] 

129 command = section['command'] 

130 

131 kwargs = { 

132 varname: section[varname] 

133 for varname in cls.configvars if varname in section 

134 } 

135 

136 try: 

137 return cls(command=command, **kwargs) 

138 except TypeError as err: 

139 raise BadConfiguration(*err.args) 

140 

141 

142def read_stdout(args, createfile=None): 

143 """Run command in tempdir and return standard output. 

144 

145 Helper function for getting version numbers of DFT codes. 

146 Most DFT codes don't implement a --version flag, so in order to 

147 determine the code version, we just run the code until it prints 

148 a version number.""" 

149 import tempfile 

150 from subprocess import PIPE, Popen 

151 

152 with tempfile.TemporaryDirectory() as directory: 

153 if createfile is not None: 

154 path = Path(directory) / createfile 

155 path.touch() 

156 proc = Popen( 

157 args, 

158 stdout=PIPE, 

159 stderr=PIPE, 

160 stdin=PIPE, 

161 cwd=directory, 

162 encoding='utf-8', # Make this a parameter if any non-utf8/ascii 

163 ) 

164 stdout, _ = proc.communicate() 

165 # Exit code will be != 0 because there isn't an input file 

166 return stdout 

167 

168 

169class CalculatorTemplate(ABC): 

170 def __init__(self, name: str, implemented_properties: Iterable[str]): 

171 self.name = name 

172 self.implemented_properties = frozenset(implemented_properties) 

173 

174 @abstractmethod 

175 def write_input(self, profile, directory, atoms, parameters, properties): 

176 ... 

177 

178 @abstractmethod 

179 def execute(self, directory, profile): 

180 ... 

181 

182 @abstractmethod 

183 def read_results(self, directory: PathLike) -> Mapping[str, Any]: 

184 ... 

185 

186 @abstractmethod 

187 def load_profile(self, cfg): 

188 ... 

189 

190 def socketio_calculator( 

191 self, 

192 profile, 

193 parameters, 

194 directory, 

195 # We may need quite a few socket kwargs here 

196 # if we want to expose all the timeout etc. from 

197 # SocketIOCalculator. 

198 unixsocket=None, 

199 port=None, 

200 ): 

201 import os 

202 from subprocess import Popen 

203 

204 from ase.calculators.socketio import SocketIOCalculator 

205 

206 if port and unixsocket: 

207 raise TypeError( 

208 'For the socketio_calculator only a UNIX ' 

209 '(unixsocket) or INET (port) socket can be used' 

210 ' not both.' 

211 ) 

212 

213 if not port and not unixsocket: 

214 raise TypeError( 

215 'For the socketio_calculator either a ' 

216 'UNIX (unixsocket) or INET (port) socket ' 

217 'must be used' 

218 ) 

219 

220 if not ( 

221 hasattr(self, 'socketio_argv') 

222 and hasattr(self, 'socketio_parameters') 

223 ): 

224 raise TypeError( 

225 f'Template {self} does not implement mandatory ' 

226 'socketio_argv() and socketio_parameters()' 

227 ) 

228 

229 # XXX need socketio ABC or something 

230 argv = profile.get_command( 

231 inputfile=None, 

232 calc_command=self.socketio_argv(profile, unixsocket, port) 

233 ) 

234 parameters = { 

235 **self.socketio_parameters(unixsocket, port), 

236 **parameters, 

237 } 

238 

239 # Not so elegant that socket args are passed to this function 

240 # via socketiocalculator when we could make a closure right here. 

241 def launch(atoms, properties, port, unixsocket): 

242 directory.mkdir(exist_ok=True, parents=True) 

243 

244 self.write_input( 

245 atoms=atoms, 

246 profile=profile, 

247 parameters=parameters, 

248 properties=properties, 

249 directory=directory, 

250 ) 

251 

252 with open(directory / self.outputname, 'w') as out_fd: 

253 return Popen(argv, stdout=out_fd, cwd=directory, env=os.environ) 

254 

255 return SocketIOCalculator( 

256 launch_client=launch, unixsocket=unixsocket, port=port 

257 ) 

258 

259 

260class GenericFileIOCalculator(BaseCalculator, GetOutputsMixin): 

261 cfg = _cfg 

262 

263 def __init__( 

264 self, 

265 *, 

266 template, 

267 profile, 

268 directory, 

269 parameters=None, 

270 ): 

271 self.template = template 

272 if profile is None: 

273 if template.name not in self.cfg.parser: 

274 raise BadConfiguration(f'No configuration of {template.name}') 

275 try: 

276 profile = template.load_profile(self.cfg) 

277 except Exception as err: 

278 configvars = self.cfg.as_dict() 

279 raise BadConfiguration( 

280 f'Failed to load section [{template.name}] ' 

281 f'from configuration: {configvars}' 

282 ) from err 

283 

284 self.profile = profile 

285 

286 # Maybe we should allow directory to be a factory, so 

287 # calculators e.g. produce new directories on demand. 

288 self.directory = Path(directory) 

289 super().__init__(parameters) 

290 

291 def set(self, *args, **kwargs): 

292 raise RuntimeError( 

293 'No setting parameters for now, please. ' 

294 'Just create new calculators.' 

295 ) 

296 

297 def __repr__(self): 

298 return f'{type(self).__name__}({self.template.name})' 

299 

300 @property 

301 def implemented_properties(self): 

302 return self.template.implemented_properties 

303 

304 @property 

305 def name(self): 

306 return self.template.name 

307 

308 def write_inputfiles(self, atoms, properties): 

309 # SocketIOCalculators like to write inputfiles 

310 # without calculating. 

311 self.directory.mkdir(exist_ok=True, parents=True) 

312 self.template.write_input( 

313 profile=self.profile, 

314 atoms=atoms, 

315 parameters=self.parameters, 

316 properties=properties, 

317 directory=self.directory, 

318 ) 

319 

320 def calculate(self, atoms, properties, system_changes): 

321 self.write_inputfiles(atoms, properties) 

322 self.template.execute(self.directory, self.profile) 

323 self.results = self.template.read_results(self.directory) 

324 # XXX Return something useful? 

325 

326 def _outputmixin_get_results(self): 

327 return self.results 

328 

329 def socketio(self, **socketkwargs): 

330 return self.template.socketio_calculator( 

331 directory=self.directory, 

332 parameters=self.parameters, 

333 profile=self.profile, 

334 **socketkwargs, 

335 )