Coverage for sparc/io.py: 73%

481 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 01:13 +0000

1"""Providing a new bundled SPARC file format 

2""" 

3import os 

4import re 

5from pathlib import Path 

6from warnings import warn 

7 

8import numpy as np 

9from ase.atoms import Atoms 

10from ase.calculators.singlepoint import SinglePointDFTCalculator 

11from ase.config import cfg as _cfg 

12 

13# various io formatters 

14from .api import SparcAPI 

15from .common import psp_dir as default_psp_dir 

16from .download_data import is_psp_download_complete 

17from .sparc_parsers.aimd import _read_aimd 

18from .sparc_parsers.atoms import atoms_to_dict, dict_to_atoms 

19from .sparc_parsers.geopt import _read_geopt 

20from .sparc_parsers.inpt import _read_inpt, _write_inpt 

21from .sparc_parsers.ion import _read_ion, _write_ion 

22from .sparc_parsers.out import _read_out 

23from .sparc_parsers.pseudopotential import copy_psp_file, parse_psp8_header 

24from .sparc_parsers.static import _add_cell_info, _read_static 

25from .utils import deprecated, locate_api, sanitize_path, string2index 

26 

27# from .sparc_parsers.ion import read_ion, write_ion 

28defaultAPI = locate_api(cfg=_cfg) 

29 

30 

31class SparcBundle: 

32 """Provide access to a calculation folder of SPARC as a simple bundle 

33 

34 The bundle can be optionally named as .sparc following the ASE's 

35 .bundle format 

36 

37 Currently the write method only supports 1 image, while read method support reading 

38 atoms results in following conditions 

39 

40 1) No calculation (minimal): .ion + .inpt file --> 1 image 

41 2) Single point calculation: .ion + .inpt + .out + .static --> 1 

42 image with calc 

43 3) Multiple SP calculations: chain all 

44 .out{digits} and .static{digitis} outputs 4) Relaxation: read from 

45 .geopt and .out (supporting chaining) 5) AIMD: read from .aimd and 

46 .out (support chaining) 

47 

48 

49 Attributes: 

50 directory (Path): Path to the directory containing SPARC files. 

51 mode (str): File access mode ('r', 'w', or 'a'). 

52 label (str): Name of the main SPARC file. 

53 init_atoms (Atoms): Initial atomic configuration. 

54 init_inputs (dict): Initial input parameters. 

55 psp_data (dict): Pseudopotential data. 

56 raw_results (dict): Raw results from SPARC calculations. 

57 psp_dir (Path): Directory containing pseudopotentials. 

58 sorting (list): Sort order for atoms. 

59 last_image (int): Index of the last image in a series of calculations. 

60 validator (SparcAPI): API validator for SPARC calculations. 

61 

62 Methods: 

63 __find_psp_dir(psp_dir=None): Finds the directory for SPARC pseudopotentials. 

64 _find_files(): Finds all files matching the bundle label. 

65 _make_label(label=None): Infers or sets the label for the SPARC bundle. 

66 _indir(ext, label=None, occur=0, d_format="{:02d}"): Finds a file with a specific extension in the bundle. 

67 _read_ion_and_inpt(): Reads .ion and .inpt files together. 

68 _write_ion_and_inpt(): Writes .ion and .inpt files to the bundle. 

69 _read_results_from_index(index, d_format="{:02d}"): Reads results from a specific calculation index. 

70 _make_singlepoint(calc_results, images, raw_results): Converts results and images to SinglePointDFTCalculators. 

71 _extract_static_results(raw_results, index=":"): Extracts results from static calculations. 

72 _extract_geopt_results(raw_results, index=":"): Extracts results from geometric optimization calculations. 

73 _extract_aimd_results(raw_results, index=":"): Extracts results from AIMD calculations. 

74 convert_to_ase(index=-1, include_all_files=False, **kwargs): Converts raw results to ASE Atoms with calculators. 

75 read_raw_results(include_all_files=False): Parses all files in the bundle and merges results. 

76 read_psp_info(): Parses pseudopotential information from the inpt file. 

77 """ 

78 

79 psp_env = ["SPARC_PSP_PATH", "SPARC_PP_PATH"] 

80 

81 def __init__( 

82 self, 

83 directory, 

84 mode="r", 

85 atoms=None, 

86 label=None, 

87 psp_dir=None, 

88 validator=defaultAPI, 

89 cfg=_cfg, 

90 ): 

91 """ 

92 Initializes a SparcBundle for accessing SPARC calculation data. 

93 

94 Args: 

95 directory (str or Path): The path to the directory containing the SPARC files. 

96 mode (str, optional): The file access mode. Can be 'r' (read), 'w' (write), or 'a' (append). Defaults to 'r'. 

97 atoms (Atoms, optional): The initial atomic configuration. Only relevant in write mode. 

98 label (str, optional): A custom label for the bundle. If None, the label is inferred from the directory or files. 

99 psp_dir (str or Path, optional): Path to the directory containing pseudopotentials. If None, the path is inferred. 

100 validator (SparcAPI, optional): An instance of SparcAPI for validating and parsing SPARC parameters. Defaults to a default SparcAPI instance. 

101 

102 Raises: 

103 AssertionError: If an invalid mode is provided. 

104 ValueError: If multiple .ion files are found and no label is specified. 

105 Warning: If no .ion file is found in read-mode, or illegal characters are in the label. 

106 """ 

107 self.directory = Path(directory) 

108 self.mode = mode.lower() 

109 assert self.mode in ( 

110 "r", 

111 "w", 

112 "a", 

113 ), f"Invalid mode {self.mode}! Must one of 'r', 'w' or 'a'" 

114 self.label = self._make_label(label) 

115 self.init_atoms = atoms.copy() if atoms is not None else None 

116 self.init_inputs = {} 

117 self.psp_data = {} 

118 self.raw_results = {} 

119 self.cfg = cfg 

120 self.psp_dir = self.__find_psp_dir(psp_dir) 

121 # Sorting should be consistent across the whole bundle! 

122 self.sorting = None 

123 self.last_image = -1 

124 self.validator = validator 

125 

126 def _find_files(self): 

127 """Find all files matching '{label}.*'""" 

128 return list(self.directory.glob(f"{self.label}.*")) 

129 

130 def _make_label(self, label=None): 

131 """Infer the label from the bundle 

132 

133 Special cases if label is None: 

134 1. read mode --> get the ion file name 

135 2. write mode --> infer from the directory 

136 

137 Arguments: 

138 label (str or None): Label to be used to write the .ion, .inpt files 

139 """ 

140 prefix = self.directory.resolve().with_suffix("").name 

141 

142 illegal_chars = '\\/:*?"<>|' 

143 if label is not None: 

144 label_ = label 

145 elif self.mode == "w": 

146 label_ = prefix 

147 else: 

148 # read 

149 match_ion = list(self.directory.glob("*.ion")) 

150 if len(match_ion) > 1: 

151 raise ValueError( 

152 "Cannot read sparc bundle with multiple ion files without specifying the label!" 

153 ) 

154 elif len(match_ion) == 1: 

155 label_ = match_ion[0].name.split(".")[0] 

156 else: 

157 # No file found, possibly an empty bundle 

158 warn("No .ion file found in the read-mode bundle.") 

159 label_ = prefix 

160 

161 if any([c in label_ for c in illegal_chars]): 

162 warn( 

163 f"Label name {label_} contains illegal characters! I'll make it 'SPARC'" 

164 ) 

165 label_ = "SPARC" 

166 return label_ 

167 

168 def __find_psp_dir(self, psp_dir=None): 

169 """Use environmental variable to find the directory for SPARC 

170 pseudopotentials 

171 

172 Searching priority: 

173 1. User defined psp_dir 

174 2. $SPARC_PSP_PATH 

175 3. $SPARC_PP_PATH 

176 4. psp bundled with sparc-api 

177 

178 Arguments: 

179 psp_dir (str or PosixPath or None): the specific directory to search the psp files. 

180 Each element can only have 1 psp file under psp_dir 

181 Returns: 

182 PosixPath: Location of psp files 

183 """ 

184 if psp_dir is not None: 

185 return Path(psp_dir) 

186 else: 

187 for var in self.psp_env: 

188 env_psp_dir = self.cfg.get(var, None) 

189 if env_psp_dir: 

190 return Path(env_psp_dir) 

191 # Use pp_path field in cfg 

192 parser = self.cfg.parser["sparc"] if "sparc" in self.cfg.parser else {} 

193 psp_dir_ini = parser.get("psp_path", None) 

194 if psp_dir_ini: 

195 return sanitize_path(psp_dir_ini) 

196 # At this point, we try to use the psp files bundled with sparc 

197 if is_psp_download_complete(default_psp_dir): 

198 return default_psp_dir 

199 else: 

200 warn( 

201 ( 

202 "PSP directory bundled with SPARC-X-API is broken! " 

203 "Please use `sparc.download_data` to re-download them!" 

204 ) 

205 ) 

206 

207 # Not found 

208 if self.mode == "w": 

209 warn( 

210 ( 

211 "No pseudopotential searching path was set and " 

212 "neither of $SPARC_PSP_PATH nor $SPARC_PP_PATH is set.\n" 

213 "Please explicitly provide the pseudopotentials parameter when writing the sparc bundle." 

214 ) 

215 ) 

216 return None 

217 

218 def _indir(self, ext, label=None, occur=0, d_format="{:02d}"): 

219 """Find the file with {label}.{ext} under current dir, 

220 if label is None, use the default 

221 

222 Arguments: 

223 ext (str): Extension of file, e.g. '.ion' or 'ion' 

224 label (str or None): Label for the file. If None, use the parent directory name for searching 

225 occur (int): Occurance index of the file, if occur > 0, search for files with suffix like 'SPARC.out_01' 

226 d_format (str): Format for the index 

227 

228 Returns: 

229 PosixPath: Path to the target file under self.directory 

230 """ 

231 label = self.label if label is None else label 

232 if not ext.startswith("."): 

233 ext = "." + ext 

234 if occur == 0: 

235 target = self.directory / f"{label}{ext}" 

236 else: 

237 target = self.directory / f"{label}{ext}_{d_format.format(occur)}" 

238 return target 

239 

240 def _read_ion_and_inpt(self): 

241 """Read the ion and inpt files together to obtain basic atomstic data. 

242 

243 Returns: 

244 Atoms: atoms object from .ion and .inpt file 

245 """ 

246 f_ion, f_inpt = self._indir(".ion"), self._indir(".inpt") 

247 ion_data = _read_ion(f_ion, validator=self.validator) 

248 inpt_data = _read_inpt(f_inpt, validator=self.validator) 

249 merged_data = {**ion_data, **inpt_data} 

250 return dict_to_atoms(merged_data) 

251 

252 def _write_ion_and_inpt( 

253 self, 

254 atoms=None, 

255 label=None, 

256 direct=False, 

257 sort=True, 

258 ignore_constraints=False, 

259 wrap=False, 

260 # Below are the parameters from v1 

261 # scaled -> direct, ignore_constraints --> not add_constraints 

262 scaled=False, 

263 add_constraints=True, 

264 copy_psp=False, 

265 comment="", 

266 input_parameters={}, 

267 # Parameters that do not require type conversion 

268 **kwargs, 

269 ): 

270 """Write the ion and inpt files to a bundle. This method only 

271 supports writing 1 image. If input_parameters are empty, 

272 there will only be .ion writing the positions and .inpt 

273 writing a minimal cell information 

274 

275 Args: 

276 atoms (Atoms, optional): The Atoms object to write. If None, uses initialized atoms associated with SparcBundle. 

277 label (str, optional): Custom label for the written files. 

278 direct (bool, optional): If True, writes positions in direct coordinates. 

279 sort (bool, optional): If True, sorts atoms before writing. 

280 ignore_constraints (bool, optional): If True, ignores constraints on atoms. 

281 wrap (bool, optional): If True, wraps atoms into the unit cell. 

282 **kwargs: Additional keyword arguments for writing. 

283 

284 Raises: 

285 ValueError: If the bundle is not in write mode. 

286 """ 

287 if self.mode != "w": 

288 raise ValueError( 

289 "Cannot write input files while sparc bundle is opened in read or append mode!" 

290 ) 

291 os.makedirs(self.directory, exist_ok=True) 

292 atoms = self.atoms.copy() if atoms is None else atoms.copy() 

293 pseudopotentials = kwargs.pop("pseudopotentials", {}) 

294 

295 if sort: 

296 if self.sorting is not None: 

297 old_sort = self.sorting.get("sort", None) 

298 if old_sort: 

299 sort = old_sort 

300 

301 data_dict = atoms_to_dict( 

302 atoms, 

303 direct=direct, 

304 sort=sort, 

305 ignore_constraints=ignore_constraints, 

306 psp_dir=self.psp_dir, 

307 pseudopotentials=pseudopotentials, 

308 ) 

309 merged_inputs = input_parameters.copy() 

310 merged_inputs.update(kwargs) 

311 data_dict["inpt"]["params"].update(merged_inputs) 

312 

313 # If copy_psp, change the PSEUDO_POT field and copy the files 

314 if copy_psp: 

315 for block in data_dict["ion"]["atom_blocks"]: 

316 if "PSEUDO_POT" in block: 

317 origin_psp = block["PSEUDO_POT"] 

318 target_dir = self.directory 

319 target_fname = copy_psp_file(origin_psp, target_dir) 

320 block["PSEUDO_POT"] = target_fname 

321 

322 _write_ion(self._indir(".ion"), data_dict, validator=self.validator) 

323 _write_inpt(self._indir(".inpt"), data_dict, validator=self.validator) 

324 # Update the sorting information 

325 ion_dict = _read_ion(self._indir(".ion"))["ion"] 

326 self.sorting = ion_dict.get("sorting", None) 

327 return 

328 

329 def read_raw_results(self, include_all_files=False): 

330 """Parse all files using the given self.label. 

331 The results are merged dict from all file formats 

332 

333 Arguments: 

334 include_all_files (bool): Whether to include output files with different suffices 

335 If true: include all files (e.g. SPARC.out, SPARC.out_01, 

336 SPARC.out_02, etc). 

337 Returns: 

338 dict or List: Dict containing all raw results. Only some of them will appear in the calculator's results 

339 

340 Sets: 

341 self.raw_results (dict or List): the same as the return value 

342 

343 #TODO: @TT 2024-11-01 allow accepting indices 

344 #TODO: @TT last_image is a bad name, it should refer to the occurance of images 

345 the same goes with num_calculations 

346 """ 

347 # Find the max output index 

348 out_files = self.directory.glob(f"{self.label}.out*") 

349 valid_out_files = [ 

350 f 

351 for f in out_files 

352 if (re.fullmatch(r"^\.out(?:_\d+)?$", f.suffix) is not None) 

353 ] 

354 # Combine and sort the file lists 

355 last_out = sorted(valid_out_files, reverse=True) 

356 # No output file, only ion / inpt 

357 if len(last_out) == 0: 

358 self.last_image = -1 

359 else: 

360 suffix = last_out[0].suffix 

361 if suffix == ".out": 

362 self.last_image = 0 

363 else: 

364 self.last_image = int(suffix.split("_")[1]) 

365 self.num_calculations = self.last_image + 1 

366 

367 # Always make sure ion / inpt results are parsed regardless of actual calculations 

368 if include_all_files: 

369 if self.num_calculations > 0: 

370 results = [ 

371 self._read_results_from_index(index) 

372 for index in range(self.num_calculations) 

373 ] 

374 else: 

375 results = [self._read_results_from_index(self.last_image)] 

376 else: 

377 results = self._read_results_from_index(self.last_image) 

378 

379 self.raw_results = results 

380 

381 if include_all_files: 

382 init_raw_results = self.raw_results[0] 

383 else: 

384 init_raw_results = self.raw_results.copy() 

385 

386 self.init_atoms = dict_to_atoms(init_raw_results) 

387 self.init_inputs = { 

388 "ion": init_raw_results["ion"], 

389 "inpt": init_raw_results["inpt"], 

390 } 

391 self.psp_data = self.read_psp_info() 

392 return self.raw_results 

393 

394 def _read_results_from_index(self, index, d_format="{:02d}"): 

395 """Read the results from one calculation index, and return a 

396 single raw result dict, e.g. for index=0 --> .static 

397 and index=1 --> .static_01. 

398 

399 Arguments: 

400 index (int): Index of image to return the results 

401 d_format (str): Format for the index suffix 

402 

403 Returns: 

404 dict: Results for single image 

405 

406 #TODO: @TT should we call index --> occurance? 

407 

408 """ 

409 results_dict = {} 

410 

411 for ext in ("ion", "inpt"): 

412 f = self._indir(ext, occur=0) 

413 if f.is_file(): 

414 data_dict = globals()[f"_read_{ext}"](f) 

415 results_dict.update(data_dict) 

416 for ext in ("geopt", "static", "aimd", "out"): 

417 f = self._indir(ext, occur=index, d_format=d_format) 

418 if f.is_file(): 

419 data_dict = globals()[f"_read_{ext}"](f) 

420 results_dict.update(data_dict) 

421 

422 # Must have files: ion, inpt 

423 if ("ion" not in results_dict) or ("inpt" not in results_dict): 

424 raise RuntimeError( 

425 "Either ion or inpt files are missing from the bundle! " 

426 "Your SPARC calculation may be corrupted." 

427 ) 

428 

429 # Copy the sorting information, if not existing 

430 sorting = results_dict["ion"].get("sorting", None) 

431 if sorting is not None: 

432 if self.sorting is None: 

433 self.sorting = sorting 

434 else: 

435 # Compare stored sorting 

436 assert (tuple(self.sorting["sort"]) == tuple(sorting["sort"])) and ( 

437 tuple(self.sorting["resort"]) == tuple(sorting["resort"]) 

438 ), "Sorting information changed!" 

439 return results_dict 

440 

441 def convert_to_ase(self, index=-1, include_all_files=False, **kwargs): 

442 """Read the raw results from the bundle and create atoms with 

443 single point calculators 

444 

445 Arguments: 

446 index (int or str): Index or slice of the image(s) to convert. Uses the same format as ase.io.read 

447 include_all_files (bool): If true, also read results with indexed suffices 

448 

449 Returns: 

450 Atoms or List[Atoms]: ASE-atoms or images with single point results 

451 

452 """ 

453 # Convert to images! 

454 # TODO: @TT 2024-11-01 read_raw_results should implement a more 

455 # robust behavior handling index, as it is the entry point for all 

456 rs = self.read_raw_results(include_all_files=include_all_files) 

457 if isinstance(rs, dict): 

458 raw_results = [rs] 

459 else: 

460 raw_results = list(rs) 

461 res_images = [] 

462 for entry in raw_results: 

463 if "static" in entry: 

464 calc_results, images = self._extract_static_results(entry, index=":") 

465 elif "geopt" in entry: 

466 calc_results, images = self._extract_geopt_results(entry, index=":") 

467 elif "aimd" in entry: 

468 calc_results, images = self._extract_aimd_results(entry, index=":") 

469 else: 

470 calc_results, images = None, [self.init_atoms.copy()] 

471 

472 if images is not None: 

473 if calc_results is not None: 

474 images = self._make_singlepoint(calc_results, images, entry) 

475 res_images.extend(images) 

476 

477 if isinstance(index, int): 

478 return res_images[index] 

479 else: 

480 return res_images[string2index(index)] 

481 

482 def _make_singlepoint(self, calc_results, images, raw_results): 

483 """Convert a calculator dict and images of Atoms to list of 

484 SinglePointDFTCalculators 

485 

486 The calculator also takes parameters from ion, inpt that exist 

487 in self.raw_results. 

488 

489 Arguments: 

490 calc_results (List): Calculation results for all images 

491 images (List): Corresponding Atoms images 

492 raw_results (List): Full raw results dict to obtain additional information 

493 

494 Returns: 

495 List(Atoms): ASE-atoms images with single point calculators attached 

496 

497 """ 

498 converted_images = [] 

499 for res, _atoms in zip(calc_results, images): 

500 atoms = _atoms.copy() 

501 sp = SinglePointDFTCalculator(atoms) 

502 # Res can be empty at this point, leading to incomplete calc 

503 sp.results.update(res) 

504 sp.name = "sparc" 

505 sp.kpts = raw_results["inpt"].get("params", {}).get("KPOINT_GRID", None) 

506 # There may be a better way handling the parameters... 

507 sp.parameters = raw_results["inpt"].get("params", {}) 

508 sp.raw_parameters = { 

509 "ion": raw_results["ion"], 

510 "inpt": raw_results["inpt"], 

511 } 

512 atoms.calc = sp 

513 converted_images.append(atoms) 

514 return converted_images 

515 

516 def _extract_static_results(self, raw_results, index=":"): 

517 """Extract the static calculation results and atomic 

518 structure(s) Returns: calc_results: dict with at least energy 

519 value atoms: ASE atoms object The priority is to parse 

520 position from static file first, then fallback from ion + inpt 

521 

522 Note: make all energy / forces resorted! 

523 

524 Arguments: 

525 raw_results (dict): Raw results parsed from self.read_raw_results 

526 index (str or int): Index or slice of images 

527 

528 Returns: 

529 List[results], List[Atoms] 

530 

531 """ 

532 static_results = raw_results.get("static", []) 

533 calc_results = [] 

534 # Use extra lattice information to construct the positions 

535 cell = self.init_atoms.cell 

536 # import pdb; pdb.set_trace() 

537 static_results = _add_cell_info(static_results, cell) 

538 

539 if isinstance(index, int): 

540 _images = [static_results[index]] 

541 elif isinstance(index, str): 

542 _images = static_results[string2index(index)] 

543 

544 ase_images = [] 

545 for static_results in _images: 

546 partial_results = {} 

547 if "free energy" in static_results: 

548 partial_results["energy"] = static_results["free energy"] 

549 partial_results["free energy"] = static_results["free energy"] 

550 

551 if "forces" in static_results: 

552 partial_results["forces"] = static_results["forces"][self.resort] 

553 

554 if "atomic_magnetization" in static_results: 

555 partial_results["magmoms"] = static_results["atomic_magnetization"][ 

556 self.resort 

557 ] 

558 

559 if "net_magnetization" in static_results: 

560 partial_results["magmom"] = static_results["net_magnetization"] 

561 

562 if "stress" in static_results: 

563 partial_results["stress"] = static_results["stress"] 

564 

565 if "stress_equiv" in static_results: 

566 partial_results["stress_equiv"] = static_results["stress_equiv"] 

567 

568 atoms = self.init_atoms.copy() 

569 # import pdb; pdb.set_trace() 

570 if "atoms" in static_results: 

571 atoms_dict = static_results["atoms"] 

572 

573 # The socket mode case. Reset all cell and positions 

574 # Be careful, 

575 if "lattice" in static_results: 

576 lat = static_results["lattice"] 

577 atoms.set_cell(lat, scale_atoms=False) 

578 if "coord" not in atoms_dict: 

579 raise KeyError( 

580 "Coordination conversion failed in socket static output!" 

581 ) 

582 atoms.set_positions( 

583 atoms_dict["coord"][self.resort], apply_constraint=False 

584 ) 

585 else: # Do not change cell information (normal static file) 

586 if "coord_frac" in atoms_dict: 

587 atoms.set_scaled_positions( 

588 atoms_dict["coord_frac"][self.resort] 

589 ) 

590 elif "coord" in atoms_dict: 

591 atoms.set_positions( 

592 atoms_dict["coord"][self.resort], apply_constraint=False 

593 ) 

594 ase_images.append(atoms) 

595 calc_results.append(partial_results) 

596 return calc_results, ase_images 

597 

598 def _extract_geopt_results(self, raw_results, index=":"): 

599 """Extract the static calculation results and atomic 

600 structure(s) Returns: calc_results: dict with at least energy 

601 value atoms: ASE atoms object The priority is to parse 

602 position from static file first, then fallback from ion + inpt 

603 

604 Arguments: 

605 raw_results (dict): Raw results parsed from self.read_raw_results 

606 index (str or int): Index or slice of images 

607 

608 Returns: 

609 List[results], List[Atoms] 

610 

611 """ 

612 # print("RAW_RES: ", raw_results) 

613 geopt_results = raw_results.get("geopt", []) 

614 calc_results = [] 

615 if len(geopt_results) == 0: 

616 warn( 

617 "Geopt file is empty! This is not an error if the calculation is continued from restart. " 

618 ) 

619 return None, None 

620 

621 if isinstance(index, int): 

622 _images = [geopt_results[index]] 

623 elif isinstance(index, str): 

624 _images = geopt_results[string2index(index)] 

625 

626 ase_images = [] 

627 for result in _images: 

628 atoms = self.init_atoms.copy() 

629 partial_result = {} 

630 if "energy" in result: 

631 partial_result["energy"] = result["energy"] 

632 partial_result["free energy"] = result["energy"] 

633 

634 if "forces" in result: 

635 partial_result["forces"] = result["forces"][self.resort] 

636 

637 if "stress" in result: 

638 partial_result["stress"] = result["stress"] 

639 

640 # Modify the atoms copy 

641 if "positions" in result: 

642 atoms.set_positions( 

643 result["positions"][self.resort], apply_constraint=False 

644 ) 

645 if "ase_cell" in result: 

646 atoms.set_cell(result["ase_cell"]) 

647 else: 

648 # For geopt and RELAX=2 (cell relaxation), 

649 # the positions may not be written in .geopt file 

650 relax_flag = raw_results["inpt"]["params"].get("RELAX_FLAG", 0) 

651 if relax_flag != 2: 

652 raise ValueError( 

653 ".geopt file missing positions while RELAX!=2. " 

654 "Please check your setup ad output files." 

655 ) 

656 if "ase_cell" not in result: 

657 raise ValueError( 

658 "Cannot recover positions from .geopt file due to missing cell information. " 

659 "Please check your setup ad output files." 

660 ) 

661 atoms.set_cell(result["ase_cell"], scale_atoms=True) 

662 

663 # Unlike low-dimensional stress in static calculations, we need to convert 

664 # stress_1d stress_2d to stress_equiv using the non-period cell dimension(s) 

665 # This has to be done when the actual cell information is loaded 

666 if "stress_1d" in result: 

667 stress_1d = result["stress_1d"] 

668 assert ( 

669 np.count_nonzero(atoms.pbc) == 1 

670 ), "Dimension of stress and PBC mismatch!" 

671 for i, bc in enumerate(atoms.pbc): 

672 if not bc: 

673 stress_1d /= atoms.cell.cellpar()[i] 

674 stress_equiv = stress_1d 

675 partial_result["stress_equiv"] = stress_equiv 

676 

677 if "stress_2d" in result: 

678 stress_2d = result["stress_2d"] 

679 assert ( 

680 np.count_nonzero(atoms.pbc) == 2 

681 ), "Dimension of stress and PBC mismatch!" 

682 for i, bc in enumerate(atoms.pbc): 

683 if not bc: 

684 stress_2d /= atoms.cell.cellpar()[i] 

685 stress_equiv = stress_2d 

686 partial_result["stress_equiv"] = stress_equiv 

687 

688 calc_results.append(partial_result) 

689 ase_images.append(atoms) 

690 

691 return calc_results, ase_images 

692 

693 def _extract_aimd_results(self, raw_results, index=":"): 

694 """Extract energy / forces from aimd results 

695 

696 For calculator, we only need the last image 

697 

698 We probably want more information for the AIMD calculations, 

699 but I'll keep them for now 

700 

701 Arguments: 

702 raw_results (dict): Raw results parsed from self.read_raw_results 

703 index (str or int): Index or slice of images 

704 

705 Returns: 

706 List[results], List[Atoms] 

707 

708 """ 

709 aimd_results = raw_results.get("aimd", []) 

710 calc_results = [] 

711 if len(aimd_results) == 0: 

712 warn( 

713 "Aimd file is empty! " 

714 "This is not an error if the calculation " 

715 "is continued from restart. " 

716 ) 

717 return None, None 

718 

719 if isinstance(index, int): 

720 _images = [aimd_results[index]] 

721 elif isinstance(index, str): 

722 _images = aimd_results[string2index(index)] 

723 

724 ase_images = [] 

725 for result in _images: 

726 partial_result = {} 

727 atoms = self.init_atoms.copy() 

728 if "total energy per atom" in result: 

729 partial_result["energy"] = result["total energy per atom"] * len(atoms) 

730 if "free energy per atom" in result: 

731 partial_result["free energy"] = result["free energy per atom"] * len( 

732 atoms 

733 ) 

734 

735 if "forces" in result: 

736 # The forces are already re-sorted! 

737 partial_result["forces"] = result["forces"][self.resort] 

738 

739 # Modify the atoms in-place 

740 if "positions" not in result: 

741 raise ValueError("Cannot have aimd without positions information!") 

742 

743 atoms.set_positions( 

744 result["positions"][self.resort], apply_constraint=False 

745 ) 

746 

747 if "velocities" in result: 

748 atoms.set_velocities(result["velocities"][self.resort]) 

749 

750 ase_images.append(atoms) 

751 calc_results.append(partial_result) 

752 return calc_results, ase_images 

753 

754 @property 

755 def sort(self): 

756 """Wrap the self.sorting dict. If sorting information does not exist, 

757 use the default slicing 

758 """ 

759 

760 if self.sorting is None: 

761 return slice(None, None, None) 

762 sort = self.sorting.get("sort", []) 

763 if len(sort) > 0: 

764 return sort 

765 else: 

766 return slice(None, None, None) 

767 

768 @property 

769 def resort(self): 

770 """Wrap the self.sorting dict. If sorting information does not exist, 

771 use the default slicing 

772 """ 

773 

774 if self.sorting is None: 

775 return slice(None, None, None) 

776 resort = self.sorting.get("resort", []) 

777 if len(resort) > 0: 

778 return resort 

779 else: 

780 return slice(None, None, None) 

781 

782 def read_psp_info(self): 

783 """Parse the psp information from inpt file options 

784 The psp file locations are relative to the bundle. 

785 

786 If the files cannot be found, the dict will only contain 

787 the path 

788 """ 

789 inpt = self.init_inputs.get("ion", {}) 

790 blocks = inpt.get("atom_blocks", []) 

791 psp_info = {} 

792 for block in blocks: 

793 element = block["ATOM_TYPE"] 

794 pseudo_path = block["PSEUDO_POT"] 

795 real_path = (self.directory / pseudo_path).resolve() 

796 psp_info[element] = {"rel_path": pseudo_path} 

797 if not real_path.is_file(): 

798 warn(f"Cannot locate pseudopotential {pseudo_path}. ") 

799 else: 

800 header = open(real_path, "r").read() 

801 psp_data = parse_psp8_header(header) 

802 psp_info[element].update(psp_data) 

803 return psp_info 

804 

805 

806def read_sparc(filename, index=-1, include_all_files=True, **kwargs): 

807 """Parse a SPARC bundle, return an Atoms object or list of Atoms (image) 

808 with embedded calculator result. 

809 

810 Arguments: 

811 filename (str or PosixPath): Filename to the sparc bundle 

812 index (int or str): Index or slice of the images, following the ase.io.read convention 

813 include_all_files (bool): If true, parse all output files with indexed suffices 

814 **kwargs: Additional parameters 

815 

816 Returns: 

817 Atoms or List[Atoms] 

818 

819 """ 

820 # We rely on minimal api version choose, i.e. default or set from env 

821 api = locate_api() 

822 sb = SparcBundle(directory=filename, validator=api) 

823 atoms_or_images = sb.convert_to_ase( 

824 index=index, include_all_files=include_all_files, **kwargs 

825 ) 

826 return atoms_or_images 

827 

828 

829def write_sparc(filename, images, **kwargs): 

830 """Write sparc file. Images can only be Atoms object 

831 or list of length 1 

832 

833 Arguments: 

834 filename (str or PosixPath): Filename to the output sparc directory 

835 images (Atoms or List(Atoms)): Atoms object to be written. Only supports writting 1 Atoms 

836 **kwargs: Additional parameters 

837 """ 

838 if isinstance(images, Atoms): 

839 atoms = images 

840 elif isinstance(images, list): 

841 if len(images) > 1: 

842 raise ValueError("SPARC format only supports writing one atoms object!") 

843 atoms = images[0] 

844 api = locate_api() 

845 sb = SparcBundle(directory=filename, mode="w", validator=api) 

846 sb._write_ion_and_inpt(atoms, **kwargs) 

847 return 

848 

849 

850@deprecated( 

851 "Reading individual .ion file is not recommended. Please use read_sparc instead." 

852) 

853def read_sparc_ion(filename, **kwargs): 

854 """Parse an .ion file inside the SPARC bundle using a wrapper around SparcBundle 

855 The reader works only when other files (.inpt) exist. 

856 

857 The returned Atoms object of read_ion method only contains the initial positions 

858 

859 Arguments: 

860 filename (str or PosixPath): Filename to the .ion file 

861 index (int or str): Index or slice of the images, following the ase.io.read convention 

862 **kwargs: Additional parameters 

863 

864 Returns: 

865 Atoms or List[Atoms] 

866 """ 

867 api = locate_api() 

868 parent_dir = Path(filename).parent 

869 sb = SparcBundle(directory=parent_dir, validator=api) 

870 atoms = sb._read_ion_and_inpt() 

871 return atoms 

872 

873 

874# Backward compatibity 

875read_ion = read_sparc_ion 

876 

877 

878@deprecated( 

879 "Writing individual .ion file is not recommended. Please use write_sparc instead." 

880) 

881def write_sparc_ion(filename, atoms, **kwargs): 

882 """Write .ion file using the SparcBundle wrapper. This method will also create the .inpt file 

883 

884 This is only for backward compatibility 

885 

886 Arguments: 

887 filename (str or PosixPath): Filename to the .ion file 

888 atoms (Atoms): atoms to be written 

889 **kwargs: Additional parameters 

890 """ 

891 label = Path(filename).with_suffix("").name 

892 parent_dir = Path(filename).parent 

893 api = locate_api() 

894 sb = SparcBundle(directory=parent_dir, label=label, mode="w", validator=api) 

895 sb._write_ion_and_inpt(atoms, **kwargs) 

896 return atoms 

897 

898 

899# Backward compatibility 

900write_ion = write_sparc_ion 

901 

902 

903@deprecated( 

904 "Reading individual .static file is not recommended. Please use read_sparc instead." 

905) 

906def read_sparc_static(filename, index=-1, **kwargs): 

907 """Parse a .static file bundle using a wrapper around SparcBundle 

908 The reader works only when other files (.ion, .inpt) exist. 

909 

910 Arguments: 

911 filename (str or PosixPath): Filename to the .static file 

912 index (int or str): Index or slice of the images, following the ase.io.read convention 

913 **kwargs: Additional parameters 

914 

915 Returns: 

916 Atoms or List[Atoms] 

917 """ 

918 parent_dir = Path(filename).parent 

919 api = locate_api() 

920 sb = SparcBundle(directory=parent_dir, validator=api) 

921 # In most of the cases the user wants to inspect all images 

922 kwargs = kwargs.copy() 

923 if "include_all_files" not in kwargs: 

924 kwargs.update(include_all_files=True) 

925 atoms_or_images = sb.convert_to_ase(index=index, **kwargs) 

926 return atoms_or_images 

927 

928 

929# Backward compatibility 

930read_static = read_sparc_static 

931 

932 

933@deprecated( 

934 "Reading individual .geopt file is not recommended. Please use read_sparc instead." 

935) 

936def read_sparc_geopt(filename, index=-1, **kwargs): 

937 """Parse a .geopt file bundle using a wrapper around SparcBundle 

938 The reader works only when other files (.ion, .inpt) exist. 

939 

940 Arguments: 

941 filename (str or PosixPath): Filename to the .geopt file 

942 index (int or str): Index or slice of the images, following the ase.io.read convention 

943 **kwargs: Additional parameters 

944 

945 Returns: 

946 Atoms or List[Atoms] 

947 """ 

948 parent_dir = Path(filename).parent 

949 api = locate_api() 

950 sb = SparcBundle(directory=parent_dir, validator=api) 

951 kwargs = kwargs.copy() 

952 if "include_all_files" not in kwargs: 

953 kwargs.update(include_all_files=True) 

954 atoms_or_images = sb.convert_to_ase(index=index, **kwargs) 

955 return atoms_or_images 

956 

957 

958# Backward compatibility 

959read_geopt = read_sparc_geopt 

960 

961 

962@deprecated( 

963 "Reading individual .aimd file is not recommended. Please use read_sparc instead." 

964) 

965def read_sparc_aimd(filename, index=-1, **kwargs): 

966 """Parse a .static file bundle using a wrapper around SparcBundle 

967 The reader works only when other files (.ion, .inpt) exist. 

968 

969 Arguments: 

970 filename (str or PosixPath): Filename to the .aimd file 

971 index (int or str): Index or slice of the images, following the ase.io.read convention 

972 **kwargs: Additional parameters 

973 

974 Returns: 

975 Atoms or List[Atoms] 

976 """ 

977 parent_dir = Path(filename).parent 

978 api = locate_api() 

979 sb = SparcBundle(directory=parent_dir, validator=api) 

980 kwargs = kwargs.copy() 

981 if "include_all_files" not in kwargs: 

982 kwargs.update(include_all_files=True) 

983 atoms_or_images = sb.convert_to_ase(index=index, **kwargs) 

984 return atoms_or_images 

985 

986 

987# Backward compatibility 

988read_aimd = read_sparc_aimd 

989 

990 

991def __register_new_filetype(): 

992 """Register the filetype() function that allows recognizing .sparc as directory 

993 This method should only be called for ase==3.22 compatibility and for ase-gui 

994 In future versions of ase gui where format is supported, this method should be removed 

995 """ 

996 import sys 

997 

998 from ase.io import formats as hacked_formats 

999 from ase.io.formats import filetype as _old_filetype 

1000 from ase.io.formats import ioformats 

1001 

1002 def _new_filetype(filename, read=True, guess=True): 

1003 """A hacked solution for the auto format recovery""" 

1004 path = Path(filename) 

1005 ext = path.name 

1006 if ".sparc" in ext: 

1007 return "sparc" 

1008 else: 

1009 if path.is_dir(): 

1010 if (len(list(path.glob("*.ion"))) > 0) and ( 

1011 len(list(path.glob("*.inpt"))) > 0 

1012 ): 

1013 return "sparc" 

1014 return _old_filetype(filename, read, guess) 

1015 

1016 hacked_formats.filetype = _new_filetype 

1017 sys.modules["ase.io.formats"] = hacked_formats 

1018 return 

1019 

1020 

1021@deprecated( 

1022 "register_ase_io_sparc will be deprecated for future releases. Please upgrade ase>=3.23." 

1023) 

1024def register_ase_io_sparc(name="sparc"): 

1025 """ 

1026 **Legacy register of io-formats for ase==3.22** 

1027 **For ase>=3.23, use the package entrypoint registration** 

1028 Monkey patching the ase.io and ase.io.formats 

1029 So that the following formats can be used 

1030 after `import sparc` 

1031 

1032 ``` 

1033 from ase.io import sparc 

1034 ase.io.read("test.sparc") 

1035 atoms.write("test.sparc") 

1036 ``` 

1037 

1038 The register method only aims to work for ase 3.22 

1039 the develope version of ase provides a much more powerful 

1040 register mechanism, we can wait. 

1041 """ 

1042 import sys 

1043 from warnings import warn 

1044 

1045 import pkg_resources 

1046 from ase.io.formats import define_io_format as F 

1047 from ase.io.formats import ioformats 

1048 

1049 name = name.lower() 

1050 if name in ioformats.keys(): 

1051 return 

1052 desc = "SPARC .sparc bundle" 

1053 

1054 # Step 1: patch the ase.io.sparc module 

1055 try: 

1056 entry_points = next( 

1057 ep for ep in pkg_resources.iter_entry_points("ase.io") if ep.name == "sparc" 

1058 ) 

1059 _monkey_mod = entry_points.load() 

1060 except Exception as e: 

1061 warn( 

1062 ( 

1063 "Failed to load entrypoint `ase.io.sparc`, " 

1064 "you may need to reinstall sparc python api.\n" 

1065 "You may still use `sparc.read_sparc` and " 

1066 "`sparc.write_sparc` methods, " 

1067 "but not `ase.io.read`\n", 

1068 f"The error is {e}", 

1069 ) 

1070 ) 

1071 return 

1072 

1073 sys.modules[f"ase.io.{name}"] = _monkey_mod 

1074 __register_new_filetype() 

1075 

1076 # Step 2: define a new format 

1077 F( 

1078 name, 

1079 desc=desc, 

1080 code="+S", # read_sparc has multi-image support 

1081 ext="sparc", 

1082 ) 

1083 

1084 if name not in ioformats.keys(): 

1085 warn( 

1086 ( 

1087 "Registering .sparc format with ase.io failed. " 

1088 "You may still use `sparc.read_sparc` and " 

1089 "`sparc.write_sparc` methods. \n" 

1090 "Please contact the developer to report this issue." 

1091 ) 

1092 ) 

1093 return 

1094 

1095 import tempfile 

1096 

1097 from ase.io import read 

1098 

1099 with tempfile.TemporaryDirectory(suffix=".sparc") as tmpdir: 

1100 try: 

1101 read(tmpdir.name) 

1102 except Exception as e: 

1103 emsg = str(e).lower() 

1104 if "bundletrajectory" in emsg: 

1105 warn( 

1106 "Atomatic format inference for sparc is not correctly registered. " 

1107 "You may need to use format=sparc in ase.io.read and ase.io.write. " 

1108 ) 

1109 # Add additional formats including .ion (r/w), .static, .geopt, .aimd 

1110 F( 

1111 "ion", 

1112 desc="SPARC .ion file", 

1113 module="sparc", 

1114 code="1S", 

1115 ext="ion", 

1116 ) 

1117 F( 

1118 "static", 

1119 desc="SPARC single point results", 

1120 module="sparc", 

1121 code="+S", 

1122 ext="static", 

1123 ) 

1124 F( 

1125 "geopt", 

1126 desc="SPARC geometric optimization results", 

1127 module="sparc", 

1128 code="+S", 

1129 ext="geopt", 

1130 ) 

1131 F("aimd", desc="SPARC AIMD results", module="sparc", code="+S", ext="aimd") 

1132 

1133 # TODO: remove print options as it may be redundant 

1134 print("Successfully registered sparc formats with ase.io!") 

1135 

1136 

1137# ase>=3.23 uses new ExternalIOFormat as registered entrypoints 

1138# Please do not use from ase.io.formats import ExternalIOFormat! 

1139# This causes circular import 

1140try: 

1141 from ase.utils.plugins import ExternalIOFormat as EIF 

1142except ImportError: 

1143 # Backward Compatibility 

1144 from typing import List, NamedTuple, Optional, Union 

1145 

1146 # Copy definition from 3.23 

1147 # Name is defined in the entry point 

1148 class ExternalIOFormat(NamedTuple): 

1149 desc: str 

1150 code: str 

1151 module: Optional[str] = None 

1152 glob: Optional[Union[str, List[str]]] = None 

1153 ext: Optional[Union[str, List[str]]] = None 

1154 magic: Optional[Union[bytes, List[bytes]]] = None 

1155 magic_regex: Optional[bytes] = None 

1156 

1157 EIF = ExternalIOFormat 

1158 

1159format_sparc = EIF( 

1160 desc="SPARC .sparc bundle", 

1161 module="sparc.io", 

1162 code="+S", # read_sparc has multi-image support 

1163 ext="sparc", 

1164) 

1165format_ion = EIF( 

1166 desc="SPARC .ion file", 

1167 module="sparc.io", 

1168 code="1S", 

1169 ext="ion", 

1170) 

1171format_static = EIF( 

1172 desc="SPARC single point results", 

1173 module="sparc.io", 

1174 code="+S", 

1175 glob=["*.static", "*.static_*"], 

1176) 

1177format_geopt = EIF( 

1178 desc="SPARC geometric optimization results", 

1179 module="sparc.io", 

1180 code="+S", 

1181 glob=["*.geopt", "*.geopt_*"], 

1182) 

1183format_aimd = EIF( 

1184 desc="SPARC AIMD results", 

1185 module="sparc", 

1186 code="+S", 

1187 glob=["*.aimd*", "*.geopt_*"], 

1188)