Coverage for sparc/io.py: 73%

490 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-18 16:19 +0000

1"""Providing a new bundled SPARC file format 

2""" 

3import os 

4import re 

5from pathlib import Path 

6from warnings import warn 

7 

8import numpy as np 

9from ase.atoms import Atoms 

10from ase.calculators.singlepoint import SinglePointDFTCalculator 

11from ase.config import cfg as _cfg 

12 

13# various io formatters 

14from .api import SparcAPI 

15from .common import psp_dir as default_psp_dir 

16from .download_data import is_psp_download_complete 

17from .sparc_parsers.aimd import _read_aimd 

18from .sparc_parsers.atoms import atoms_to_dict, dict_to_atoms 

19from .sparc_parsers.geopt import _read_geopt 

20from .sparc_parsers.inpt import _read_inpt, _write_inpt 

21from .sparc_parsers.ion import _read_ion, _write_ion 

22from .sparc_parsers.out import _read_out 

23from .sparc_parsers.pseudopotential import copy_psp_file, parse_psp8_header 

24from .sparc_parsers.static import _add_cell_info, _read_static 

25from .utils import deprecated, locate_api, sanitize_path, string2index 

26 

27# from .sparc_parsers.ion import read_ion, write_ion 

28defaultAPI = locate_api(cfg=_cfg) 

29 

30 

31class SparcBundle: 

32 """Provide access to a calculation folder of SPARC as a simple bundle 

33 

34 The bundle can be optionally named as .sparc following the ASE's 

35 .bundle format 

36 

37 Currently the write method only supports 1 image, while read method support reading 

38 atoms results in following conditions 

39 

40 1) No calculation (minimal): .ion + .inpt file --> 1 image 

41 2) Single point calculation: .ion + .inpt + .out + .static --> 1 

42 image with calc 

43 3) Multiple SP calculations: chain all 

44 .out{digits} and .static{digitis} outputs 4) Relaxation: read from 

45 .geopt and .out (supporting chaining) 5) AIMD: read from .aimd and 

46 .out (support chaining) 

47 

48 

49 Attributes: 

50 directory (Path): Path to the directory containing SPARC files. 

51 mode (str): File access mode ('r', 'w', or 'a'). 

52 label (str): Name of the main SPARC file. 

53 init_atoms (Atoms): Initial atomic configuration. 

54 init_inputs (dict): Initial input parameters. 

55 psp_data (dict): Pseudopotential data. 

56 raw_results (dict): Raw results from SPARC calculations. 

57 psp_dir (Path): Directory containing pseudopotentials. 

58 sorting (list): Sort order for atoms. 

59 last_image (int): Index of the last image in a series of calculations. 

60 validator (SparcAPI): API validator for SPARC calculations. 

61 

62 Methods: 

63 __find_psp_dir(psp_dir=None): Finds the directory for SPARC pseudopotentials. 

64 _find_files(): Finds all files matching the bundle label. 

65 _make_label(label=None): Infers or sets the label for the SPARC bundle. 

66 _indir(ext, label=None, occur=0, d_format="{:02d}"): Finds a file with a specific extension in the bundle. 

67 _read_ion_and_inpt(): Reads .ion and .inpt files together. 

68 _write_ion_and_inpt(): Writes .ion and .inpt files to the bundle. 

69 _read_results_from_index(index, d_format="{:02d}"): Reads results from a specific calculation index. 

70 _make_singlepoint(calc_results, images, raw_results): Converts results and images to SinglePointDFTCalculators. 

71 _extract_static_results(raw_results, index=":"): Extracts results from static calculations. 

72 _extract_geopt_results(raw_results, index=":"): Extracts results from geometric optimization calculations. 

73 _extract_aimd_results(raw_results, index=":"): Extracts results from AIMD calculations. 

74 convert_to_ase(index=-1, include_all_files=False, **kwargs): Converts raw results to ASE Atoms with calculators. 

75 read_raw_results(include_all_files=False): Parses all files in the bundle and merges results. 

76 read_psp_info(): Parses pseudopotential information from the inpt file. 

77 """ 

78 

79 psp_env = ["SPARC_PSP_PATH", "SPARC_PP_PATH"] 

80 

81 def __init__( 

82 self, 

83 directory, 

84 mode="r", 

85 atoms=None, 

86 label=None, 

87 psp_dir=None, 

88 validator=defaultAPI, 

89 cfg=_cfg, 

90 ): 

91 """ 

92 Initializes a SparcBundle for accessing SPARC calculation data. 

93 

94 Args: 

95 directory (str or Path): The path to the directory containing the SPARC files. 

96 mode (str, optional): The file access mode. Can be 'r' (read), 'w' (write), or 'a' (append). Defaults to 'r'. 

97 atoms (Atoms, optional): The initial atomic configuration. Only relevant in write mode. 

98 label (str, optional): A custom label for the bundle. If None, the label is inferred from the directory or files. 

99 psp_dir (str or Path, optional): Path to the directory containing pseudopotentials. If None, the path is inferred. 

100 validator (SparcAPI, optional): An instance of SparcAPI for validating and parsing SPARC parameters. Defaults to a default SparcAPI instance. 

101 

102 Raises: 

103 AssertionError: If an invalid mode is provided. 

104 ValueError: If multiple .ion files are found and no label is specified. 

105 Warning: If no .ion file is found in read-mode, or illegal characters are in the label. 

106 """ 

107 self.directory = Path(directory) 

108 self.mode = mode.lower() 

109 assert self.mode in ( 

110 "r", 

111 "w", 

112 "a", 

113 ), f"Invalid mode {self.mode}! Must one of 'r', 'w' or 'a'" 

114 self.label = self._make_label(label) 

115 self.init_atoms = atoms.copy() if atoms is not None else None 

116 self.init_inputs = {} 

117 self.psp_data = {} 

118 self.raw_results = {} 

119 self.cfg = cfg 

120 self.psp_dir = self.__find_psp_dir(psp_dir) 

121 # Sorting should be consistent across the whole bundle! 

122 self.sorting = None 

123 self.last_image = -1 

124 self.validator = validator 

125 

126 def _find_files(self): 

127 """Find all files matching '{label}.*'""" 

128 return list(self.directory.glob(f"{self.label}.*")) 

129 

130 def _make_label(self, label=None): 

131 """Infer the label from the bundle 

132 

133 Special cases if label is None: 

134 1. read mode --> get the ion file name 

135 2. write mode --> infer from the directory 

136 

137 Arguments: 

138 label (str or None): Label to be used to write the .ion, .inpt files 

139 """ 

140 prefix = self.directory.resolve().with_suffix("").name 

141 

142 illegal_chars = '\\/:*?"<>|' 

143 if label is not None: 

144 label_ = label 

145 elif self.mode == "w": 

146 label_ = prefix 

147 else: 

148 # read 

149 match_ion = list(self.directory.glob("*.ion")) 

150 if len(match_ion) > 1: 

151 raise ValueError( 

152 "Cannot read sparc bundle with multiple ion files without specifying the label!" 

153 ) 

154 elif len(match_ion) == 1: 

155 label_ = match_ion[0].name.split(".")[0] 

156 else: 

157 # No file found, possibly an empty bundle 

158 warn("No .ion file found in the read-mode bundle.") 

159 label_ = prefix 

160 

161 if any([c in label_ for c in illegal_chars]): 

162 warn( 

163 f"Label name {label_} contains illegal characters! I'll make it 'SPARC'" 

164 ) 

165 label_ = "SPARC" 

166 return label_ 

167 

168 def __find_psp_dir(self, psp_dir=None): 

169 """Use environmental variable to find the directory for SPARC 

170 pseudopotentials 

171 

172 Searching priority: 

173 1. User defined psp_dir 

174 2. $SPARC_PSP_PATH 

175 3. $SPARC_PP_PATH 

176 4. psp bundled with sparc-api 

177 

178 Arguments: 

179 psp_dir (str or PosixPath or None): the specific directory to search the psp files. 

180 Each element can only have 1 psp file under psp_dir 

181 Returns: 

182 PosixPath: Location of psp files 

183 """ 

184 if psp_dir is not None: 

185 return Path(psp_dir) 

186 else: 

187 for var in self.psp_env: 

188 env_psp_dir = self.cfg.get(var, None) 

189 if env_psp_dir: 

190 return Path(env_psp_dir) 

191 # Use pp_path field in cfg 

192 parser = self.cfg.parser["sparc"] if "sparc" in self.cfg.parser else {} 

193 psp_dir_ini = parser.get("psp_path", None) 

194 if psp_dir_ini: 

195 return sanitize_path(psp_dir_ini) 

196 # At this point, we try to use the psp files bundled with sparc 

197 if is_psp_download_complete(default_psp_dir): 

198 return default_psp_dir 

199 else: 

200 warn( 

201 ( 

202 "PSP directory bundled with SPARC-X-API is broken! " 

203 "Please use `sparc.download_data` to re-download them!" 

204 ) 

205 ) 

206 

207 # Not found 

208 if self.mode == "w": 

209 warn( 

210 ( 

211 "No pseudopotential searching path was set and " 

212 "neither of $SPARC_PSP_PATH nor $SPARC_PP_PATH is set.\n" 

213 "Please explicitly provide the pseudopotentials parameter when writing the sparc bundle." 

214 ) 

215 ) 

216 return None 

217 

218 def _indir(self, ext, label=None, occur=0, d_format="{:02d}"): 

219 """Find the file with {label}.{ext} under current dir, 

220 if label is None, use the default 

221 

222 Arguments: 

223 ext (str): Extension of file, e.g. '.ion' or 'ion' 

224 label (str or None): Label for the file. If None, use the parent directory name for searching 

225 occur (int): Occurance index of the file, if occur > 0, search for files with suffix like 'SPARC.out_01' 

226 d_format (str): Format for the index 

227 

228 Returns: 

229 PosixPath: Path to the target file under self.directory 

230 """ 

231 label = self.label if label is None else label 

232 if not ext.startswith("."): 

233 ext = "." + ext 

234 if occur == 0: 

235 target = self.directory / f"{label}{ext}" 

236 else: 

237 target = self.directory / f"{label}{ext}_{d_format.format(occur)}" 

238 return target 

239 

240 def _read_ion_and_inpt(self): 

241 """Read the ion and inpt files together to obtain basic atomstic data. 

242 

243 Returns: 

244 Atoms: atoms object from .ion and .inpt file 

245 """ 

246 f_ion, f_inpt = self._indir(".ion"), self._indir(".inpt") 

247 ion_data = _read_ion(f_ion, validator=self.validator) 

248 inpt_data = _read_inpt(f_inpt, validator=self.validator) 

249 merged_data = {**ion_data, **inpt_data} 

250 return dict_to_atoms(merged_data) 

251 

252 def _write_ion_and_inpt( 

253 self, 

254 atoms=None, 

255 label=None, 

256 direct=False, 

257 sort=True, 

258 ignore_constraints=False, 

259 wrap=False, 

260 # Below are the parameters from v1 

261 # scaled -> direct, ignore_constraints --> not add_constraints 

262 scaled=False, 

263 add_constraints=True, 

264 copy_psp=False, 

265 comment="", 

266 input_parameters={}, 

267 # Parameters that do not require type conversion 

268 **kwargs, 

269 ): 

270 """Write the ion and inpt files to a bundle. This method only 

271 supports writing 1 image. If input_parameters are empty, 

272 there will only be .ion writing the positions and .inpt 

273 writing a minimal cell information 

274 

275 Args: 

276 atoms (Atoms, optional): The Atoms object to write. If None, uses initialized atoms associated with SparcBundle. 

277 label (str, optional): Custom label for the written files. 

278 direct (bool, optional): If True, writes positions in direct coordinates. 

279 sort (bool, optional): If True, sorts atoms before writing. 

280 ignore_constraints (bool, optional): If True, ignores constraints on atoms. 

281 wrap (bool, optional): If True, wraps atoms into the unit cell. 

282 **kwargs: Additional keyword arguments for writing. 

283 

284 Raises: 

285 ValueError: If the bundle is not in write mode. 

286 """ 

287 if self.mode != "w": 

288 raise ValueError( 

289 "Cannot write input files while sparc bundle is opened in read or append mode!" 

290 ) 

291 os.makedirs(self.directory, exist_ok=True) 

292 atoms = self.atoms.copy() if atoms is None else atoms.copy() 

293 pseudopotentials = kwargs.pop("pseudopotentials", {}) 

294 

295 if sort: 

296 if self.sorting is not None: 

297 old_sort = self.sorting.get("sort", None) 

298 if old_sort: 

299 sort = old_sort 

300 

301 data_dict = atoms_to_dict( 

302 atoms, 

303 direct=direct, 

304 sort=sort, 

305 ignore_constraints=ignore_constraints, 

306 psp_dir=self.psp_dir, 

307 pseudopotentials=pseudopotentials, 

308 ) 

309 merged_inputs = input_parameters.copy() 

310 merged_inputs.update(kwargs) 

311 # @TT 2025.06.04 HUBBARD-U requires some special treatment 

312 # HUBBARD section should be poped and written under 

313 # .ion file (currently done via atoms conversion) 

314 hubbard_u_pairs = merged_inputs.pop("HUBBARD", []) 

315 # TODO: may need consistent naming for info 

316 if int(merged_inputs.get("HUBBARD_FLAG", 0)) > 0: 

317 atoms_info_hubbard_u_pairs = atoms.info.get("hubbard_u (hartree)", []) 

318 # We will overwrite all existing hubbard info 

319 if len(hubbard_u_pairs) > 0: 

320 data_dict["ion"]["extra"]["hubbard"] = hubbard_u_pairs 

321 elif len(atoms_info_hubbard_u_pairs) > 0: 

322 data_dict["ion"]["extra"]["hubbard"] = atoms_info_hubbard_u_pairs 

323 else: 

324 raise ValueError("HUBBARD_FLAG=1 but not U correction provided!") 

325 data_dict["inpt"]["params"].update(merged_inputs) 

326 

327 # If copy_psp, change the PSEUDO_POT field and copy the files 

328 if copy_psp: 

329 for block in data_dict["ion"]["atom_blocks"]: 

330 if "PSEUDO_POT" in block: 

331 origin_psp = block["PSEUDO_POT"] 

332 target_dir = self.directory 

333 target_fname = copy_psp_file(origin_psp, target_dir) 

334 block["PSEUDO_POT"] = target_fname 

335 

336 _write_ion( 

337 self._indir(".ion", label=label), data_dict, validator=self.validator 

338 ) 

339 _write_inpt( 

340 self._indir(".inpt", label=label), data_dict, validator=self.validator 

341 ) 

342 # Update the sorting information 

343 ion_dict = _read_ion(self._indir(".ion", label=label))["ion"] 

344 self.sorting = ion_dict.get("sorting", None) 

345 return 

346 

347 def read_raw_results(self, include_all_files=False): 

348 """Parse all files using the given self.label. 

349 The results are merged dict from all file formats 

350 

351 Arguments: 

352 include_all_files (bool): Whether to include output files with different suffices 

353 If true: include all files (e.g. SPARC.out, SPARC.out_01, 

354 SPARC.out_02, etc). 

355 Returns: 

356 dict or List: Dict containing all raw results. Only some of them will appear in the calculator's results 

357 

358 Sets: 

359 self.raw_results (dict or List): the same as the return value 

360 

361 #TODO: @TT 2024-11-01 allow accepting indices 

362 #TODO: @TT last_image is a bad name, it should refer to the occurance of images 

363 the same goes with num_calculations 

364 """ 

365 # Find the max output index 

366 out_files = self.directory.glob(f"{self.label}.out*") 

367 valid_out_files = [ 

368 f 

369 for f in out_files 

370 if (re.fullmatch(r"^\.out(?:_\d+)?$", f.suffix) is not None) 

371 ] 

372 # Combine and sort the file lists 

373 last_out = sorted(valid_out_files, reverse=True) 

374 # No output file, only ion / inpt 

375 if len(last_out) == 0: 

376 self.last_image = -1 

377 else: 

378 suffix = last_out[0].suffix 

379 if suffix == ".out": 

380 self.last_image = 0 

381 else: 

382 self.last_image = int(suffix.split("_")[1]) 

383 self.num_calculations = self.last_image + 1 

384 

385 # Always make sure ion / inpt results are parsed regardless of actual calculations 

386 if include_all_files: 

387 if self.num_calculations > 0: 

388 results = [ 

389 self._read_results_from_index(index) 

390 for index in range(self.num_calculations) 

391 ] 

392 else: 

393 results = [self._read_results_from_index(self.last_image)] 

394 else: 

395 results = self._read_results_from_index(self.last_image) 

396 

397 self.raw_results = results 

398 

399 if include_all_files: 

400 init_raw_results = self.raw_results[0] 

401 else: 

402 init_raw_results = self.raw_results.copy() 

403 

404 self.init_atoms = dict_to_atoms(init_raw_results) 

405 self.init_inputs = { 

406 "ion": init_raw_results["ion"], 

407 "inpt": init_raw_results["inpt"], 

408 } 

409 self.psp_data = self.read_psp_info() 

410 return self.raw_results 

411 

412 def _read_results_from_index(self, index, d_format="{:02d}"): 

413 """Read the results from one calculation index, and return a 

414 single raw result dict, e.g. for index=0 --> .static 

415 and index=1 --> .static_01. 

416 

417 Arguments: 

418 index (int): Index of image to return the results 

419 d_format (str): Format for the index suffix 

420 

421 Returns: 

422 dict: Results for single image 

423 

424 #TODO: @TT should we call index --> occurance? 

425 

426 """ 

427 results_dict = {} 

428 

429 for ext in ("ion", "inpt"): 

430 f = self._indir(ext, occur=0) 

431 if f.is_file(): 

432 data_dict = globals()[f"_read_{ext}"](f, validator=self.validator) 

433 results_dict.update(data_dict) 

434 for ext in ("geopt", "static", "aimd", "out"): 

435 f = self._indir(ext, occur=index, d_format=d_format) 

436 if f.is_file(): 

437 data_dict = globals()[f"_read_{ext}"](f) 

438 results_dict.update(data_dict) 

439 

440 # Must have files: ion, inpt 

441 if ("ion" not in results_dict) or ("inpt" not in results_dict): 

442 raise RuntimeError( 

443 "Either ion or inpt files are missing from the bundle! " 

444 "Your SPARC calculation may be corrupted." 

445 ) 

446 

447 # Copy the sorting information, if not existing 

448 sorting = results_dict["ion"].get("sorting", None) 

449 if sorting is not None: 

450 if self.sorting is None: 

451 self.sorting = sorting 

452 else: 

453 # Compare stored sorting 

454 assert (tuple(self.sorting["sort"]) == tuple(sorting["sort"])) and ( 

455 tuple(self.sorting["resort"]) == tuple(sorting["resort"]) 

456 ), "Sorting information changed!" 

457 return results_dict 

458 

459 def convert_to_ase(self, index=-1, include_all_files=False, **kwargs): 

460 """Read the raw results from the bundle and create atoms with 

461 single point calculators 

462 

463 Arguments: 

464 index (int or str): Index or slice of the image(s) to convert. Uses the same format as ase.io.read 

465 include_all_files (bool): If true, also read results with indexed suffices 

466 

467 Returns: 

468 Atoms or List[Atoms]: ASE-atoms or images with single point results 

469 

470 """ 

471 # Convert to images! 

472 # TODO: @TT 2024-11-01 read_raw_results should implement a more 

473 # robust behavior handling index, as it is the entry point for all 

474 rs = self.read_raw_results(include_all_files=include_all_files) 

475 if isinstance(rs, dict): 

476 raw_results = [rs] 

477 else: 

478 raw_results = list(rs) 

479 res_images = [] 

480 for entry in raw_results: 

481 if "static" in entry: 

482 calc_results, images = self._extract_static_results(entry, index=":") 

483 elif "geopt" in entry: 

484 calc_results, images = self._extract_geopt_results(entry, index=":") 

485 elif "aimd" in entry: 

486 calc_results, images = self._extract_aimd_results(entry, index=":") 

487 else: 

488 calc_results, images = None, [self.init_atoms.copy()] 

489 

490 if images is not None: 

491 if calc_results is not None: 

492 images = self._make_singlepoint(calc_results, images, entry) 

493 res_images.extend(images) 

494 

495 if isinstance(index, int): 

496 return res_images[index] 

497 else: 

498 return res_images[string2index(index)] 

499 

500 def _make_singlepoint(self, calc_results, images, raw_results): 

501 """Convert a calculator dict and images of Atoms to list of 

502 SinglePointDFTCalculators 

503 

504 The calculator also takes parameters from ion, inpt that exist 

505 in self.raw_results. 

506 

507 Arguments: 

508 calc_results (List): Calculation results for all images 

509 images (List): Corresponding Atoms images 

510 raw_results (List): Full raw results dict to obtain additional information 

511 

512 Returns: 

513 List(Atoms): ASE-atoms images with single point calculators attached 

514 

515 """ 

516 converted_images = [] 

517 for res, _atoms in zip(calc_results, images): 

518 atoms = _atoms.copy() 

519 sp = SinglePointDFTCalculator(atoms) 

520 # Res can be empty at this point, leading to incomplete calc 

521 sp.results.update(res) 

522 sp.name = "sparc" 

523 sp.kpts = raw_results["inpt"].get("params", {}).get("KPOINT_GRID", None) 

524 # There may be a better way handling the parameters... 

525 sp.parameters = raw_results["inpt"].get("params", {}) 

526 sp.raw_parameters = { 

527 "ion": raw_results["ion"], 

528 "inpt": raw_results["inpt"], 

529 } 

530 atoms.calc = sp 

531 converted_images.append(atoms) 

532 return converted_images 

533 

534 def _extract_static_results(self, raw_results, index=":"): 

535 """Extract the static calculation results and atomic 

536 structure(s) Returns: calc_results: dict with at least energy 

537 value atoms: ASE atoms object The priority is to parse 

538 position from static file first, then fallback from ion + inpt 

539 

540 Note: make all energy / forces resorted! 

541 

542 Arguments: 

543 raw_results (dict): Raw results parsed from self.read_raw_results 

544 index (str or int): Index or slice of images 

545 

546 Returns: 

547 List[results], List[Atoms] 

548 

549 """ 

550 static_results = raw_results.get("static", []) 

551 calc_results = [] 

552 # Use extra lattice information to construct the positions 

553 cell = self.init_atoms.cell 

554 # import pdb; pdb.set_trace() 

555 static_results = _add_cell_info(static_results, cell) 

556 

557 if isinstance(index, int): 

558 _images = [static_results[index]] 

559 elif isinstance(index, str): 

560 _images = static_results[string2index(index)] 

561 

562 ase_images = [] 

563 for static_results in _images: 

564 partial_results = {} 

565 if "free energy" in static_results: 

566 partial_results["energy"] = static_results["free energy"] 

567 partial_results["free energy"] = static_results["free energy"] 

568 

569 if "forces" in static_results: 

570 partial_results["forces"] = static_results["forces"][self.resort] 

571 

572 if "atomic_magnetization" in static_results: 

573 partial_results["magmoms"] = static_results["atomic_magnetization"][ 

574 self.resort 

575 ] 

576 

577 if "net_magnetization" in static_results: 

578 partial_results["magmom"] = static_results["net_magnetization"] 

579 

580 if "stress" in static_results: 

581 partial_results["stress"] = static_results["stress"] 

582 

583 if "stress_equiv" in static_results: 

584 partial_results["stress_equiv"] = static_results["stress_equiv"] 

585 

586 atoms = self.init_atoms.copy() 

587 # import pdb; pdb.set_trace() 

588 if "atoms" in static_results: 

589 atoms_dict = static_results["atoms"] 

590 

591 # The socket mode case. Reset all cell and positions 

592 # Be careful, 

593 if "lattice" in static_results: 

594 lat = static_results["lattice"] 

595 atoms.set_cell(lat, scale_atoms=False) 

596 if "coord" not in atoms_dict: 

597 raise KeyError( 

598 "Coordination conversion failed in socket static output!" 

599 ) 

600 atoms.set_positions( 

601 atoms_dict["coord"][self.resort], apply_constraint=False 

602 ) 

603 else: # Do not change cell information (normal static file) 

604 if "coord_frac" in atoms_dict: 

605 atoms.set_scaled_positions( 

606 atoms_dict["coord_frac"][self.resort] 

607 ) 

608 elif "coord" in atoms_dict: 

609 atoms.set_positions( 

610 atoms_dict["coord"][self.resort], apply_constraint=False 

611 ) 

612 ase_images.append(atoms) 

613 calc_results.append(partial_results) 

614 return calc_results, ase_images 

615 

616 def _extract_geopt_results(self, raw_results, index=":"): 

617 """Extract the static calculation results and atomic 

618 structure(s) Returns: calc_results: dict with at least energy 

619 value atoms: ASE atoms object The priority is to parse 

620 position from static file first, then fallback from ion + inpt 

621 

622 Arguments: 

623 raw_results (dict): Raw results parsed from self.read_raw_results 

624 index (str or int): Index or slice of images 

625 

626 Returns: 

627 List[results], List[Atoms] 

628 

629 """ 

630 # print("RAW_RES: ", raw_results) 

631 geopt_results = raw_results.get("geopt", []) 

632 calc_results = [] 

633 if len(geopt_results) == 0: 

634 warn( 

635 "Geopt file is empty! This is not an error if the calculation is continued from restart. " 

636 ) 

637 return None, None 

638 

639 if isinstance(index, int): 

640 _images = [geopt_results[index]] 

641 elif isinstance(index, str): 

642 _images = geopt_results[string2index(index)] 

643 

644 ase_images = [] 

645 for result in _images: 

646 atoms = self.init_atoms.copy() 

647 partial_result = {} 

648 if "energy" in result: 

649 partial_result["energy"] = result["energy"] 

650 partial_result["free energy"] = result["energy"] 

651 

652 if "forces" in result: 

653 partial_result["forces"] = result["forces"][self.resort] 

654 

655 if "stress" in result: 

656 partial_result["stress"] = result["stress"] 

657 

658 # Modify the atoms copy 

659 if "positions" in result: 

660 atoms.set_positions( 

661 result["positions"][self.resort], apply_constraint=False 

662 ) 

663 if "ase_cell" in result: 

664 atoms.set_cell(result["ase_cell"]) 

665 else: 

666 # For geopt and RELAX=2 (cell relaxation), 

667 # the positions may not be written in .geopt file 

668 relax_flag = raw_results["inpt"]["params"].get("RELAX_FLAG", 0) 

669 if relax_flag != 2: 

670 raise ValueError( 

671 ".geopt file missing positions while RELAX!=2. " 

672 "Please check your setup ad output files." 

673 ) 

674 if "ase_cell" not in result: 

675 raise ValueError( 

676 "Cannot recover positions from .geopt file due to missing cell information. " 

677 "Please check your setup ad output files." 

678 ) 

679 atoms.set_cell(result["ase_cell"], scale_atoms=True) 

680 

681 # Unlike low-dimensional stress in static calculations, we need to convert 

682 # stress_1d stress_2d to stress_equiv using the non-period cell dimension(s) 

683 # This has to be done when the actual cell information is loaded 

684 if "stress_1d" in result: 

685 stress_1d = result["stress_1d"] 

686 assert ( 

687 np.count_nonzero(atoms.pbc) == 1 

688 ), "Dimension of stress and PBC mismatch!" 

689 for i, bc in enumerate(atoms.pbc): 

690 if not bc: 

691 stress_1d /= atoms.cell.cellpar()[i] 

692 stress_equiv = stress_1d 

693 partial_result["stress_equiv"] = stress_equiv 

694 

695 if "stress_2d" in result: 

696 stress_2d = result["stress_2d"] 

697 assert ( 

698 np.count_nonzero(atoms.pbc) == 2 

699 ), "Dimension of stress and PBC mismatch!" 

700 for i, bc in enumerate(atoms.pbc): 

701 if not bc: 

702 stress_2d /= atoms.cell.cellpar()[i] 

703 stress_equiv = stress_2d 

704 partial_result["stress_equiv"] = stress_equiv 

705 

706 calc_results.append(partial_result) 

707 ase_images.append(atoms) 

708 

709 return calc_results, ase_images 

710 

711 def _extract_aimd_results(self, raw_results, index=":"): 

712 """Extract energy / forces from aimd results 

713 

714 For calculator, we only need the last image 

715 

716 We probably want more information for the AIMD calculations, 

717 but I'll keep them for now 

718 

719 Arguments: 

720 raw_results (dict): Raw results parsed from self.read_raw_results 

721 index (str or int): Index or slice of images 

722 

723 Returns: 

724 List[results], List[Atoms] 

725 

726 """ 

727 aimd_results = raw_results.get("aimd", []) 

728 calc_results = [] 

729 if len(aimd_results) == 0: 

730 warn( 

731 "Aimd file is empty! " 

732 "This is not an error if the calculation " 

733 "is continued from restart. " 

734 ) 

735 return None, None 

736 

737 if isinstance(index, int): 

738 _images = [aimd_results[index]] 

739 elif isinstance(index, str): 

740 _images = aimd_results[string2index(index)] 

741 

742 ase_images = [] 

743 for result in _images: 

744 partial_result = {} 

745 atoms = self.init_atoms.copy() 

746 if "total energy per atom" in result: 

747 partial_result["energy"] = result["total energy per atom"] * len(atoms) 

748 if "free energy per atom" in result: 

749 partial_result["free energy"] = result["free energy per atom"] * len( 

750 atoms 

751 ) 

752 

753 if "forces" in result: 

754 # The forces are already re-sorted! 

755 partial_result["forces"] = result["forces"][self.resort] 

756 

757 # Modify the atoms in-place 

758 if "positions" not in result: 

759 raise ValueError("Cannot have aimd without positions information!") 

760 

761 atoms.set_positions( 

762 result["positions"][self.resort], apply_constraint=False 

763 ) 

764 

765 if "velocities" in result: 

766 atoms.set_velocities(result["velocities"][self.resort]) 

767 

768 ase_images.append(atoms) 

769 calc_results.append(partial_result) 

770 return calc_results, ase_images 

771 

772 @property 

773 def sort(self): 

774 """Wrap the self.sorting dict. If sorting information does not exist, 

775 use the default slicing 

776 """ 

777 

778 if self.sorting is None: 

779 return slice(None, None, None) 

780 sort = self.sorting.get("sort", []) 

781 if len(sort) > 0: 

782 return sort 

783 else: 

784 return slice(None, None, None) 

785 

786 @property 

787 def resort(self): 

788 """Wrap the self.sorting dict. If sorting information does not exist, 

789 use the default slicing 

790 """ 

791 

792 if self.sorting is None: 

793 return slice(None, None, None) 

794 resort = self.sorting.get("resort", []) 

795 if len(resort) > 0: 

796 return resort 

797 else: 

798 return slice(None, None, None) 

799 

800 def read_psp_info(self): 

801 """Parse the psp information from inpt file options 

802 The psp file locations are relative to the bundle. 

803 

804 If the files cannot be found, the dict will only contain 

805 the path 

806 """ 

807 inpt = self.init_inputs.get("ion", {}) 

808 blocks = inpt.get("atom_blocks", []) 

809 psp_info = {} 

810 for block in blocks: 

811 element = block["ATOM_TYPE"] 

812 pseudo_path = block["PSEUDO_POT"] 

813 real_path = (self.directory / pseudo_path).resolve() 

814 psp_info[element] = {"rel_path": pseudo_path} 

815 if not real_path.is_file(): 

816 warn(f"Cannot locate pseudopotential {pseudo_path}. ") 

817 else: 

818 header = open(real_path, "r").read() 

819 psp_data = parse_psp8_header(header) 

820 psp_info[element].update(psp_data) 

821 return psp_info 

822 

823 

824def read_sparc(filename, index=-1, include_all_files=True, **kwargs): 

825 """Parse a SPARC bundle, return an Atoms object or list of Atoms (image) 

826 with embedded calculator result. 

827 

828 Arguments: 

829 filename (str or PosixPath): Filename to the sparc bundle 

830 index (int or str): Index or slice of the images, following the ase.io.read convention 

831 include_all_files (bool): If true, parse all output files with indexed suffices 

832 **kwargs: Additional parameters 

833 

834 Returns: 

835 Atoms or List[Atoms] 

836 

837 """ 

838 # We rely on minimal api version choose, i.e. default or set from env 

839 api = locate_api() 

840 sb = SparcBundle(directory=filename, validator=api) 

841 atoms_or_images = sb.convert_to_ase( 

842 index=index, include_all_files=include_all_files, **kwargs 

843 ) 

844 return atoms_or_images 

845 

846 

847def write_sparc(filename, images, **kwargs): 

848 """Write sparc file. Images can only be Atoms object 

849 or list of length 1 

850 

851 Arguments: 

852 filename (str or PosixPath): Filename to the output sparc directory 

853 images (Atoms or List(Atoms)): Atoms object to be written. Only supports writting 1 Atoms 

854 **kwargs: Additional parameters 

855 """ 

856 if isinstance(images, Atoms): 

857 atoms = images 

858 elif isinstance(images, list): 

859 if len(images) > 1: 

860 raise ValueError("SPARC format only supports writing one atoms object!") 

861 atoms = images[0] 

862 api = locate_api() 

863 sb = SparcBundle(directory=filename, mode="w", validator=api) 

864 sb._write_ion_and_inpt(atoms, **kwargs) 

865 return 

866 

867 

868@deprecated( 

869 "Reading individual .ion file is not recommended. Please use read_sparc instead." 

870) 

871def read_sparc_ion(filename, **kwargs): 

872 """Parse an .ion file inside the SPARC bundle using a wrapper around SparcBundle 

873 The reader works only when other files (.inpt) exist. 

874 

875 The returned Atoms object of read_ion method only contains the initial positions 

876 

877 Arguments: 

878 filename (str or PosixPath): Filename to the .ion file 

879 index (int or str): Index or slice of the images, following the ase.io.read convention 

880 **kwargs: Additional parameters 

881 

882 Returns: 

883 Atoms or List[Atoms] 

884 """ 

885 api = locate_api() 

886 parent_dir = Path(filename).parent 

887 sb = SparcBundle(directory=parent_dir, validator=api) 

888 atoms = sb._read_ion_and_inpt() 

889 return atoms 

890 

891 

892# Backward compatibity 

893read_ion = read_sparc_ion 

894 

895 

896@deprecated( 

897 "Writing individual .ion file is not recommended. Please use write_sparc instead." 

898) 

899def write_sparc_ion(filename, atoms, **kwargs): 

900 """Write .ion file using the SparcBundle wrapper. This method will also create the .inpt file 

901 

902 This is only for backward compatibility 

903 

904 Arguments: 

905 filename (str or PosixPath): Filename to the .ion file 

906 atoms (Atoms): atoms to be written 

907 **kwargs: Additional parameters 

908 """ 

909 label = Path(filename).with_suffix("").name 

910 parent_dir = Path(filename).parent 

911 api = locate_api() 

912 sb = SparcBundle(directory=parent_dir, label=label, mode="w", validator=api) 

913 sb._write_ion_and_inpt(atoms, **kwargs) 

914 return atoms 

915 

916 

917# Backward compatibility 

918write_ion = write_sparc_ion 

919 

920 

921@deprecated( 

922 "Reading individual .static file is not recommended. Please use read_sparc instead." 

923) 

924def read_sparc_static(filename, index=-1, **kwargs): 

925 """Parse a .static file bundle using a wrapper around SparcBundle 

926 The reader works only when other files (.ion, .inpt) exist. 

927 

928 Arguments: 

929 filename (str or PosixPath): Filename to the .static file 

930 index (int or str): Index or slice of the images, following the ase.io.read convention 

931 **kwargs: Additional parameters 

932 

933 Returns: 

934 Atoms or List[Atoms] 

935 """ 

936 parent_dir = Path(filename).parent 

937 api = locate_api() 

938 sb = SparcBundle(directory=parent_dir, validator=api) 

939 # In most of the cases the user wants to inspect all images 

940 kwargs = kwargs.copy() 

941 if "include_all_files" not in kwargs: 

942 kwargs.update(include_all_files=True) 

943 atoms_or_images = sb.convert_to_ase(index=index, **kwargs) 

944 return atoms_or_images 

945 

946 

947# Backward compatibility 

948read_static = read_sparc_static 

949 

950 

951@deprecated( 

952 "Reading individual .geopt file is not recommended. Please use read_sparc instead." 

953) 

954def read_sparc_geopt(filename, index=-1, **kwargs): 

955 """Parse a .geopt file bundle using a wrapper around SparcBundle 

956 The reader works only when other files (.ion, .inpt) exist. 

957 

958 Arguments: 

959 filename (str or PosixPath): Filename to the .geopt file 

960 index (int or str): Index or slice of the images, following the ase.io.read convention 

961 **kwargs: Additional parameters 

962 

963 Returns: 

964 Atoms or List[Atoms] 

965 """ 

966 parent_dir = Path(filename).parent 

967 api = locate_api() 

968 sb = SparcBundle(directory=parent_dir, validator=api) 

969 kwargs = kwargs.copy() 

970 if "include_all_files" not in kwargs: 

971 kwargs.update(include_all_files=True) 

972 atoms_or_images = sb.convert_to_ase(index=index, **kwargs) 

973 return atoms_or_images 

974 

975 

976# Backward compatibility 

977read_geopt = read_sparc_geopt 

978 

979 

980@deprecated( 

981 "Reading individual .aimd file is not recommended. Please use read_sparc instead." 

982) 

983def read_sparc_aimd(filename, index=-1, **kwargs): 

984 """Parse a .static file bundle using a wrapper around SparcBundle 

985 The reader works only when other files (.ion, .inpt) exist. 

986 

987 Arguments: 

988 filename (str or PosixPath): Filename to the .aimd file 

989 index (int or str): Index or slice of the images, following the ase.io.read convention 

990 **kwargs: Additional parameters 

991 

992 Returns: 

993 Atoms or List[Atoms] 

994 """ 

995 parent_dir = Path(filename).parent 

996 api = locate_api() 

997 sb = SparcBundle(directory=parent_dir, validator=api) 

998 kwargs = kwargs.copy() 

999 if "include_all_files" not in kwargs: 

1000 kwargs.update(include_all_files=True) 

1001 atoms_or_images = sb.convert_to_ase(index=index, **kwargs) 

1002 return atoms_or_images 

1003 

1004 

1005# Backward compatibility 

1006read_aimd = read_sparc_aimd 

1007 

1008 

1009@deprecated( 

1010 "__register_new_filetype will be deprecated for future releases. Please upgrade ase>=3.23." 

1011) 

1012def __register_new_filetype(): 

1013 """Register the filetype() function that allows recognizing .sparc as directory 

1014 This method should only be called for ase==3.22 compatibility and for ase-gui 

1015 In future versions of ase gui where format is supported, this method should be removed 

1016 """ 

1017 import sys 

1018 

1019 from ase.io import formats as hacked_formats 

1020 from ase.io.formats import filetype as _old_filetype 

1021 from ase.io.formats import ioformats 

1022 

1023 def _new_filetype(filename, read=True, guess=True): 

1024 """A hacked solution for the auto format recovery""" 

1025 path = Path(filename) 

1026 ext = path.name 

1027 if ".sparc" in ext: 

1028 return "sparc" 

1029 else: 

1030 if path.is_dir(): 

1031 if (len(list(path.glob("*.ion"))) > 0) and ( 

1032 len(list(path.glob("*.inpt"))) > 0 

1033 ): 

1034 return "sparc" 

1035 return _old_filetype(filename, read, guess) 

1036 

1037 hacked_formats.filetype = _new_filetype 

1038 sys.modules["ase.io.formats"] = hacked_formats 

1039 return 

1040 

1041 

1042@deprecated( 

1043 "register_ase_io_sparc will be deprecated for future releases. Please upgrade ase>=3.23." 

1044) 

1045def register_ase_io_sparc(name="sparc"): 

1046 """ 

1047 **Legacy register of io-formats for ase==3.22** 

1048 **For ase>=3.23, use the package entrypoint registration** 

1049 Monkey patching the ase.io and ase.io.formats 

1050 So that the following formats can be used 

1051 after `import sparc` 

1052 

1053 ``` 

1054 from ase.io import sparc 

1055 ase.io.read("test.sparc") 

1056 atoms.write("test.sparc") 

1057 ``` 

1058 

1059 The register method only aims to work for ase 3.22 

1060 the develope version of ase provides a much more powerful 

1061 register mechanism, we can wait. 

1062 """ 

1063 import sys 

1064 from warnings import warn 

1065 

1066 import pkg_resources 

1067 from ase.io.formats import define_io_format as F 

1068 from ase.io.formats import ioformats 

1069 

1070 name = name.lower() 

1071 if name in ioformats.keys(): 

1072 return 

1073 desc = "SPARC .sparc bundle" 

1074 

1075 # Step 1: patch the ase.io.sparc module 

1076 try: 

1077 entry_points = next( 

1078 ep for ep in pkg_resources.iter_entry_points("ase.io") if ep.name == "sparc" 

1079 ) 

1080 _monkey_mod = entry_points.load() 

1081 except Exception as e: 

1082 warn( 

1083 ( 

1084 "Failed to load entrypoint `ase.io.sparc`, " 

1085 "you may need to reinstall sparc python api.\n" 

1086 "You may still use `sparc.read_sparc` and " 

1087 "`sparc.write_sparc` methods, " 

1088 "but not `ase.io.read`\n", 

1089 f"The error is {e}", 

1090 ) 

1091 ) 

1092 return 

1093 

1094 sys.modules[f"ase.io.{name}"] = _monkey_mod 

1095 __register_new_filetype() 

1096 

1097 # Step 2: define a new format 

1098 F( 

1099 name, 

1100 desc=desc, 

1101 code="+S", # read_sparc has multi-image support 

1102 ext="sparc", 

1103 ) 

1104 

1105 if name not in ioformats.keys(): 

1106 warn( 

1107 ( 

1108 "Registering .sparc format with ase.io failed. " 

1109 "You may still use `sparc.read_sparc` and " 

1110 "`sparc.write_sparc` methods. \n" 

1111 "Please contact the developer to report this issue." 

1112 ) 

1113 ) 

1114 return 

1115 

1116 import tempfile 

1117 

1118 from ase.io import read 

1119 

1120 with tempfile.TemporaryDirectory(suffix=".sparc") as tmpdir: 

1121 try: 

1122 read(tmpdir.name) 

1123 except Exception as e: 

1124 emsg = str(e).lower() 

1125 if "bundletrajectory" in emsg: 

1126 warn( 

1127 "Atomatic format inference for sparc is not correctly registered. " 

1128 "You may need to use format=sparc in ase.io.read and ase.io.write. " 

1129 ) 

1130 # Add additional formats including .ion (r/w), .static, .geopt, .aimd 

1131 F( 

1132 "ion", 

1133 desc="SPARC .ion file", 

1134 module="sparc", 

1135 code="1S", 

1136 ext="ion", 

1137 ) 

1138 F( 

1139 "static", 

1140 desc="SPARC single point results", 

1141 module="sparc", 

1142 code="+S", 

1143 ext="static", 

1144 ) 

1145 F( 

1146 "geopt", 

1147 desc="SPARC geometric optimization results", 

1148 module="sparc", 

1149 code="+S", 

1150 ext="geopt", 

1151 ) 

1152 F("aimd", desc="SPARC AIMD results", module="sparc", code="+S", ext="aimd") 

1153 

1154 # TODO: remove print options as it may be redundant 

1155 print("Successfully registered sparc formats with ase.io!") 

1156 

1157 

1158# ase>=3.23 uses new ExternalIOFormat as registered entrypoints 

1159# Please do not use from ase.io.formats import ExternalIOFormat! 

1160# This causes circular import 

1161try: 

1162 from ase.utils.plugins import ExternalIOFormat as EIF 

1163except ImportError: 

1164 # Backward Compatibility 

1165 from typing import List, NamedTuple, Optional, Union 

1166 

1167 # Copy definition from 3.23 

1168 # Name is defined in the entry point 

1169 class ExternalIOFormat(NamedTuple): 

1170 desc: str 

1171 code: str 

1172 module: Optional[str] = None 

1173 glob: Optional[Union[str, List[str]]] = None 

1174 ext: Optional[Union[str, List[str]]] = None 

1175 magic: Optional[Union[bytes, List[bytes]]] = None 

1176 magic_regex: Optional[bytes] = None 

1177 

1178 EIF = ExternalIOFormat 

1179 

1180format_sparc = EIF( 

1181 desc="SPARC .sparc bundle", 

1182 module="sparc.io", 

1183 code="+S", # read_sparc has multi-image support 

1184 ext="sparc", 

1185) 

1186format_ion = EIF( 

1187 desc="SPARC .ion file", 

1188 module="sparc.io", 

1189 code="1S", 

1190 ext="ion", 

1191) 

1192format_static = EIF( 

1193 desc="SPARC single point results", 

1194 module="sparc.io", 

1195 code="+S", 

1196 glob=["*.static", "*.static_*"], 

1197) 

1198format_geopt = EIF( 

1199 desc="SPARC geometric optimization results", 

1200 module="sparc.io", 

1201 code="+S", 

1202 glob=["*.geopt", "*.geopt_*"], 

1203) 

1204format_aimd = EIF( 

1205 desc="SPARC AIMD results", 

1206 module="sparc", 

1207 code="+S", 

1208 glob=["*.aimd*", "*.geopt_*"], 

1209)