Coverage for sparc/io.py: 73%
490 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 16:19 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 16:19 +0000
1"""Providing a new bundled SPARC file format
2"""
3import os
4import re
5from pathlib import Path
6from warnings import warn
8import numpy as np
9from ase.atoms import Atoms
10from ase.calculators.singlepoint import SinglePointDFTCalculator
11from ase.config import cfg as _cfg
13# various io formatters
14from .api import SparcAPI
15from .common import psp_dir as default_psp_dir
16from .download_data import is_psp_download_complete
17from .sparc_parsers.aimd import _read_aimd
18from .sparc_parsers.atoms import atoms_to_dict, dict_to_atoms
19from .sparc_parsers.geopt import _read_geopt
20from .sparc_parsers.inpt import _read_inpt, _write_inpt
21from .sparc_parsers.ion import _read_ion, _write_ion
22from .sparc_parsers.out import _read_out
23from .sparc_parsers.pseudopotential import copy_psp_file, parse_psp8_header
24from .sparc_parsers.static import _add_cell_info, _read_static
25from .utils import deprecated, locate_api, sanitize_path, string2index
27# from .sparc_parsers.ion import read_ion, write_ion
28defaultAPI = locate_api(cfg=_cfg)
31class SparcBundle:
32 """Provide access to a calculation folder of SPARC as a simple bundle
34 The bundle can be optionally named as .sparc following the ASE's
35 .bundle format
37 Currently the write method only supports 1 image, while read method support reading
38 atoms results in following conditions
40 1) No calculation (minimal): .ion + .inpt file --> 1 image
41 2) Single point calculation: .ion + .inpt + .out + .static --> 1
42 image with calc
43 3) Multiple SP calculations: chain all
44 .out{digits} and .static{digitis} outputs 4) Relaxation: read from
45 .geopt and .out (supporting chaining) 5) AIMD: read from .aimd and
46 .out (support chaining)
49 Attributes:
50 directory (Path): Path to the directory containing SPARC files.
51 mode (str): File access mode ('r', 'w', or 'a').
52 label (str): Name of the main SPARC file.
53 init_atoms (Atoms): Initial atomic configuration.
54 init_inputs (dict): Initial input parameters.
55 psp_data (dict): Pseudopotential data.
56 raw_results (dict): Raw results from SPARC calculations.
57 psp_dir (Path): Directory containing pseudopotentials.
58 sorting (list): Sort order for atoms.
59 last_image (int): Index of the last image in a series of calculations.
60 validator (SparcAPI): API validator for SPARC calculations.
62 Methods:
63 __find_psp_dir(psp_dir=None): Finds the directory for SPARC pseudopotentials.
64 _find_files(): Finds all files matching the bundle label.
65 _make_label(label=None): Infers or sets the label for the SPARC bundle.
66 _indir(ext, label=None, occur=0, d_format="{:02d}"): Finds a file with a specific extension in the bundle.
67 _read_ion_and_inpt(): Reads .ion and .inpt files together.
68 _write_ion_and_inpt(): Writes .ion and .inpt files to the bundle.
69 _read_results_from_index(index, d_format="{:02d}"): Reads results from a specific calculation index.
70 _make_singlepoint(calc_results, images, raw_results): Converts results and images to SinglePointDFTCalculators.
71 _extract_static_results(raw_results, index=":"): Extracts results from static calculations.
72 _extract_geopt_results(raw_results, index=":"): Extracts results from geometric optimization calculations.
73 _extract_aimd_results(raw_results, index=":"): Extracts results from AIMD calculations.
74 convert_to_ase(index=-1, include_all_files=False, **kwargs): Converts raw results to ASE Atoms with calculators.
75 read_raw_results(include_all_files=False): Parses all files in the bundle and merges results.
76 read_psp_info(): Parses pseudopotential information from the inpt file.
77 """
79 psp_env = ["SPARC_PSP_PATH", "SPARC_PP_PATH"]
81 def __init__(
82 self,
83 directory,
84 mode="r",
85 atoms=None,
86 label=None,
87 psp_dir=None,
88 validator=defaultAPI,
89 cfg=_cfg,
90 ):
91 """
92 Initializes a SparcBundle for accessing SPARC calculation data.
94 Args:
95 directory (str or Path): The path to the directory containing the SPARC files.
96 mode (str, optional): The file access mode. Can be 'r' (read), 'w' (write), or 'a' (append). Defaults to 'r'.
97 atoms (Atoms, optional): The initial atomic configuration. Only relevant in write mode.
98 label (str, optional): A custom label for the bundle. If None, the label is inferred from the directory or files.
99 psp_dir (str or Path, optional): Path to the directory containing pseudopotentials. If None, the path is inferred.
100 validator (SparcAPI, optional): An instance of SparcAPI for validating and parsing SPARC parameters. Defaults to a default SparcAPI instance.
102 Raises:
103 AssertionError: If an invalid mode is provided.
104 ValueError: If multiple .ion files are found and no label is specified.
105 Warning: If no .ion file is found in read-mode, or illegal characters are in the label.
106 """
107 self.directory = Path(directory)
108 self.mode = mode.lower()
109 assert self.mode in (
110 "r",
111 "w",
112 "a",
113 ), f"Invalid mode {self.mode}! Must one of 'r', 'w' or 'a'"
114 self.label = self._make_label(label)
115 self.init_atoms = atoms.copy() if atoms is not None else None
116 self.init_inputs = {}
117 self.psp_data = {}
118 self.raw_results = {}
119 self.cfg = cfg
120 self.psp_dir = self.__find_psp_dir(psp_dir)
121 # Sorting should be consistent across the whole bundle!
122 self.sorting = None
123 self.last_image = -1
124 self.validator = validator
126 def _find_files(self):
127 """Find all files matching '{label}.*'"""
128 return list(self.directory.glob(f"{self.label}.*"))
130 def _make_label(self, label=None):
131 """Infer the label from the bundle
133 Special cases if label is None:
134 1. read mode --> get the ion file name
135 2. write mode --> infer from the directory
137 Arguments:
138 label (str or None): Label to be used to write the .ion, .inpt files
139 """
140 prefix = self.directory.resolve().with_suffix("").name
142 illegal_chars = '\\/:*?"<>|'
143 if label is not None:
144 label_ = label
145 elif self.mode == "w":
146 label_ = prefix
147 else:
148 # read
149 match_ion = list(self.directory.glob("*.ion"))
150 if len(match_ion) > 1:
151 raise ValueError(
152 "Cannot read sparc bundle with multiple ion files without specifying the label!"
153 )
154 elif len(match_ion) == 1:
155 label_ = match_ion[0].name.split(".")[0]
156 else:
157 # No file found, possibly an empty bundle
158 warn("No .ion file found in the read-mode bundle.")
159 label_ = prefix
161 if any([c in label_ for c in illegal_chars]):
162 warn(
163 f"Label name {label_} contains illegal characters! I'll make it 'SPARC'"
164 )
165 label_ = "SPARC"
166 return label_
168 def __find_psp_dir(self, psp_dir=None):
169 """Use environmental variable to find the directory for SPARC
170 pseudopotentials
172 Searching priority:
173 1. User defined psp_dir
174 2. $SPARC_PSP_PATH
175 3. $SPARC_PP_PATH
176 4. psp bundled with sparc-api
178 Arguments:
179 psp_dir (str or PosixPath or None): the specific directory to search the psp files.
180 Each element can only have 1 psp file under psp_dir
181 Returns:
182 PosixPath: Location of psp files
183 """
184 if psp_dir is not None:
185 return Path(psp_dir)
186 else:
187 for var in self.psp_env:
188 env_psp_dir = self.cfg.get(var, None)
189 if env_psp_dir:
190 return Path(env_psp_dir)
191 # Use pp_path field in cfg
192 parser = self.cfg.parser["sparc"] if "sparc" in self.cfg.parser else {}
193 psp_dir_ini = parser.get("psp_path", None)
194 if psp_dir_ini:
195 return sanitize_path(psp_dir_ini)
196 # At this point, we try to use the psp files bundled with sparc
197 if is_psp_download_complete(default_psp_dir):
198 return default_psp_dir
199 else:
200 warn(
201 (
202 "PSP directory bundled with SPARC-X-API is broken! "
203 "Please use `sparc.download_data` to re-download them!"
204 )
205 )
207 # Not found
208 if self.mode == "w":
209 warn(
210 (
211 "No pseudopotential searching path was set and "
212 "neither of $SPARC_PSP_PATH nor $SPARC_PP_PATH is set.\n"
213 "Please explicitly provide the pseudopotentials parameter when writing the sparc bundle."
214 )
215 )
216 return None
218 def _indir(self, ext, label=None, occur=0, d_format="{:02d}"):
219 """Find the file with {label}.{ext} under current dir,
220 if label is None, use the default
222 Arguments:
223 ext (str): Extension of file, e.g. '.ion' or 'ion'
224 label (str or None): Label for the file. If None, use the parent directory name for searching
225 occur (int): Occurance index of the file, if occur > 0, search for files with suffix like 'SPARC.out_01'
226 d_format (str): Format for the index
228 Returns:
229 PosixPath: Path to the target file under self.directory
230 """
231 label = self.label if label is None else label
232 if not ext.startswith("."):
233 ext = "." + ext
234 if occur == 0:
235 target = self.directory / f"{label}{ext}"
236 else:
237 target = self.directory / f"{label}{ext}_{d_format.format(occur)}"
238 return target
240 def _read_ion_and_inpt(self):
241 """Read the ion and inpt files together to obtain basic atomstic data.
243 Returns:
244 Atoms: atoms object from .ion and .inpt file
245 """
246 f_ion, f_inpt = self._indir(".ion"), self._indir(".inpt")
247 ion_data = _read_ion(f_ion, validator=self.validator)
248 inpt_data = _read_inpt(f_inpt, validator=self.validator)
249 merged_data = {**ion_data, **inpt_data}
250 return dict_to_atoms(merged_data)
252 def _write_ion_and_inpt(
253 self,
254 atoms=None,
255 label=None,
256 direct=False,
257 sort=True,
258 ignore_constraints=False,
259 wrap=False,
260 # Below are the parameters from v1
261 # scaled -> direct, ignore_constraints --> not add_constraints
262 scaled=False,
263 add_constraints=True,
264 copy_psp=False,
265 comment="",
266 input_parameters={},
267 # Parameters that do not require type conversion
268 **kwargs,
269 ):
270 """Write the ion and inpt files to a bundle. This method only
271 supports writing 1 image. If input_parameters are empty,
272 there will only be .ion writing the positions and .inpt
273 writing a minimal cell information
275 Args:
276 atoms (Atoms, optional): The Atoms object to write. If None, uses initialized atoms associated with SparcBundle.
277 label (str, optional): Custom label for the written files.
278 direct (bool, optional): If True, writes positions in direct coordinates.
279 sort (bool, optional): If True, sorts atoms before writing.
280 ignore_constraints (bool, optional): If True, ignores constraints on atoms.
281 wrap (bool, optional): If True, wraps atoms into the unit cell.
282 **kwargs: Additional keyword arguments for writing.
284 Raises:
285 ValueError: If the bundle is not in write mode.
286 """
287 if self.mode != "w":
288 raise ValueError(
289 "Cannot write input files while sparc bundle is opened in read or append mode!"
290 )
291 os.makedirs(self.directory, exist_ok=True)
292 atoms = self.atoms.copy() if atoms is None else atoms.copy()
293 pseudopotentials = kwargs.pop("pseudopotentials", {})
295 if sort:
296 if self.sorting is not None:
297 old_sort = self.sorting.get("sort", None)
298 if old_sort:
299 sort = old_sort
301 data_dict = atoms_to_dict(
302 atoms,
303 direct=direct,
304 sort=sort,
305 ignore_constraints=ignore_constraints,
306 psp_dir=self.psp_dir,
307 pseudopotentials=pseudopotentials,
308 )
309 merged_inputs = input_parameters.copy()
310 merged_inputs.update(kwargs)
311 # @TT 2025.06.04 HUBBARD-U requires some special treatment
312 # HUBBARD section should be poped and written under
313 # .ion file (currently done via atoms conversion)
314 hubbard_u_pairs = merged_inputs.pop("HUBBARD", [])
315 # TODO: may need consistent naming for info
316 if int(merged_inputs.get("HUBBARD_FLAG", 0)) > 0:
317 atoms_info_hubbard_u_pairs = atoms.info.get("hubbard_u (hartree)", [])
318 # We will overwrite all existing hubbard info
319 if len(hubbard_u_pairs) > 0:
320 data_dict["ion"]["extra"]["hubbard"] = hubbard_u_pairs
321 elif len(atoms_info_hubbard_u_pairs) > 0:
322 data_dict["ion"]["extra"]["hubbard"] = atoms_info_hubbard_u_pairs
323 else:
324 raise ValueError("HUBBARD_FLAG=1 but not U correction provided!")
325 data_dict["inpt"]["params"].update(merged_inputs)
327 # If copy_psp, change the PSEUDO_POT field and copy the files
328 if copy_psp:
329 for block in data_dict["ion"]["atom_blocks"]:
330 if "PSEUDO_POT" in block:
331 origin_psp = block["PSEUDO_POT"]
332 target_dir = self.directory
333 target_fname = copy_psp_file(origin_psp, target_dir)
334 block["PSEUDO_POT"] = target_fname
336 _write_ion(
337 self._indir(".ion", label=label), data_dict, validator=self.validator
338 )
339 _write_inpt(
340 self._indir(".inpt", label=label), data_dict, validator=self.validator
341 )
342 # Update the sorting information
343 ion_dict = _read_ion(self._indir(".ion", label=label))["ion"]
344 self.sorting = ion_dict.get("sorting", None)
345 return
347 def read_raw_results(self, include_all_files=False):
348 """Parse all files using the given self.label.
349 The results are merged dict from all file formats
351 Arguments:
352 include_all_files (bool): Whether to include output files with different suffices
353 If true: include all files (e.g. SPARC.out, SPARC.out_01,
354 SPARC.out_02, etc).
355 Returns:
356 dict or List: Dict containing all raw results. Only some of them will appear in the calculator's results
358 Sets:
359 self.raw_results (dict or List): the same as the return value
361 #TODO: @TT 2024-11-01 allow accepting indices
362 #TODO: @TT last_image is a bad name, it should refer to the occurance of images
363 the same goes with num_calculations
364 """
365 # Find the max output index
366 out_files = self.directory.glob(f"{self.label}.out*")
367 valid_out_files = [
368 f
369 for f in out_files
370 if (re.fullmatch(r"^\.out(?:_\d+)?$", f.suffix) is not None)
371 ]
372 # Combine and sort the file lists
373 last_out = sorted(valid_out_files, reverse=True)
374 # No output file, only ion / inpt
375 if len(last_out) == 0:
376 self.last_image = -1
377 else:
378 suffix = last_out[0].suffix
379 if suffix == ".out":
380 self.last_image = 0
381 else:
382 self.last_image = int(suffix.split("_")[1])
383 self.num_calculations = self.last_image + 1
385 # Always make sure ion / inpt results are parsed regardless of actual calculations
386 if include_all_files:
387 if self.num_calculations > 0:
388 results = [
389 self._read_results_from_index(index)
390 for index in range(self.num_calculations)
391 ]
392 else:
393 results = [self._read_results_from_index(self.last_image)]
394 else:
395 results = self._read_results_from_index(self.last_image)
397 self.raw_results = results
399 if include_all_files:
400 init_raw_results = self.raw_results[0]
401 else:
402 init_raw_results = self.raw_results.copy()
404 self.init_atoms = dict_to_atoms(init_raw_results)
405 self.init_inputs = {
406 "ion": init_raw_results["ion"],
407 "inpt": init_raw_results["inpt"],
408 }
409 self.psp_data = self.read_psp_info()
410 return self.raw_results
412 def _read_results_from_index(self, index, d_format="{:02d}"):
413 """Read the results from one calculation index, and return a
414 single raw result dict, e.g. for index=0 --> .static
415 and index=1 --> .static_01.
417 Arguments:
418 index (int): Index of image to return the results
419 d_format (str): Format for the index suffix
421 Returns:
422 dict: Results for single image
424 #TODO: @TT should we call index --> occurance?
426 """
427 results_dict = {}
429 for ext in ("ion", "inpt"):
430 f = self._indir(ext, occur=0)
431 if f.is_file():
432 data_dict = globals()[f"_read_{ext}"](f, validator=self.validator)
433 results_dict.update(data_dict)
434 for ext in ("geopt", "static", "aimd", "out"):
435 f = self._indir(ext, occur=index, d_format=d_format)
436 if f.is_file():
437 data_dict = globals()[f"_read_{ext}"](f)
438 results_dict.update(data_dict)
440 # Must have files: ion, inpt
441 if ("ion" not in results_dict) or ("inpt" not in results_dict):
442 raise RuntimeError(
443 "Either ion or inpt files are missing from the bundle! "
444 "Your SPARC calculation may be corrupted."
445 )
447 # Copy the sorting information, if not existing
448 sorting = results_dict["ion"].get("sorting", None)
449 if sorting is not None:
450 if self.sorting is None:
451 self.sorting = sorting
452 else:
453 # Compare stored sorting
454 assert (tuple(self.sorting["sort"]) == tuple(sorting["sort"])) and (
455 tuple(self.sorting["resort"]) == tuple(sorting["resort"])
456 ), "Sorting information changed!"
457 return results_dict
459 def convert_to_ase(self, index=-1, include_all_files=False, **kwargs):
460 """Read the raw results from the bundle and create atoms with
461 single point calculators
463 Arguments:
464 index (int or str): Index or slice of the image(s) to convert. Uses the same format as ase.io.read
465 include_all_files (bool): If true, also read results with indexed suffices
467 Returns:
468 Atoms or List[Atoms]: ASE-atoms or images with single point results
470 """
471 # Convert to images!
472 # TODO: @TT 2024-11-01 read_raw_results should implement a more
473 # robust behavior handling index, as it is the entry point for all
474 rs = self.read_raw_results(include_all_files=include_all_files)
475 if isinstance(rs, dict):
476 raw_results = [rs]
477 else:
478 raw_results = list(rs)
479 res_images = []
480 for entry in raw_results:
481 if "static" in entry:
482 calc_results, images = self._extract_static_results(entry, index=":")
483 elif "geopt" in entry:
484 calc_results, images = self._extract_geopt_results(entry, index=":")
485 elif "aimd" in entry:
486 calc_results, images = self._extract_aimd_results(entry, index=":")
487 else:
488 calc_results, images = None, [self.init_atoms.copy()]
490 if images is not None:
491 if calc_results is not None:
492 images = self._make_singlepoint(calc_results, images, entry)
493 res_images.extend(images)
495 if isinstance(index, int):
496 return res_images[index]
497 else:
498 return res_images[string2index(index)]
500 def _make_singlepoint(self, calc_results, images, raw_results):
501 """Convert a calculator dict and images of Atoms to list of
502 SinglePointDFTCalculators
504 The calculator also takes parameters from ion, inpt that exist
505 in self.raw_results.
507 Arguments:
508 calc_results (List): Calculation results for all images
509 images (List): Corresponding Atoms images
510 raw_results (List): Full raw results dict to obtain additional information
512 Returns:
513 List(Atoms): ASE-atoms images with single point calculators attached
515 """
516 converted_images = []
517 for res, _atoms in zip(calc_results, images):
518 atoms = _atoms.copy()
519 sp = SinglePointDFTCalculator(atoms)
520 # Res can be empty at this point, leading to incomplete calc
521 sp.results.update(res)
522 sp.name = "sparc"
523 sp.kpts = raw_results["inpt"].get("params", {}).get("KPOINT_GRID", None)
524 # There may be a better way handling the parameters...
525 sp.parameters = raw_results["inpt"].get("params", {})
526 sp.raw_parameters = {
527 "ion": raw_results["ion"],
528 "inpt": raw_results["inpt"],
529 }
530 atoms.calc = sp
531 converted_images.append(atoms)
532 return converted_images
534 def _extract_static_results(self, raw_results, index=":"):
535 """Extract the static calculation results and atomic
536 structure(s) Returns: calc_results: dict with at least energy
537 value atoms: ASE atoms object The priority is to parse
538 position from static file first, then fallback from ion + inpt
540 Note: make all energy / forces resorted!
542 Arguments:
543 raw_results (dict): Raw results parsed from self.read_raw_results
544 index (str or int): Index or slice of images
546 Returns:
547 List[results], List[Atoms]
549 """
550 static_results = raw_results.get("static", [])
551 calc_results = []
552 # Use extra lattice information to construct the positions
553 cell = self.init_atoms.cell
554 # import pdb; pdb.set_trace()
555 static_results = _add_cell_info(static_results, cell)
557 if isinstance(index, int):
558 _images = [static_results[index]]
559 elif isinstance(index, str):
560 _images = static_results[string2index(index)]
562 ase_images = []
563 for static_results in _images:
564 partial_results = {}
565 if "free energy" in static_results:
566 partial_results["energy"] = static_results["free energy"]
567 partial_results["free energy"] = static_results["free energy"]
569 if "forces" in static_results:
570 partial_results["forces"] = static_results["forces"][self.resort]
572 if "atomic_magnetization" in static_results:
573 partial_results["magmoms"] = static_results["atomic_magnetization"][
574 self.resort
575 ]
577 if "net_magnetization" in static_results:
578 partial_results["magmom"] = static_results["net_magnetization"]
580 if "stress" in static_results:
581 partial_results["stress"] = static_results["stress"]
583 if "stress_equiv" in static_results:
584 partial_results["stress_equiv"] = static_results["stress_equiv"]
586 atoms = self.init_atoms.copy()
587 # import pdb; pdb.set_trace()
588 if "atoms" in static_results:
589 atoms_dict = static_results["atoms"]
591 # The socket mode case. Reset all cell and positions
592 # Be careful,
593 if "lattice" in static_results:
594 lat = static_results["lattice"]
595 atoms.set_cell(lat, scale_atoms=False)
596 if "coord" not in atoms_dict:
597 raise KeyError(
598 "Coordination conversion failed in socket static output!"
599 )
600 atoms.set_positions(
601 atoms_dict["coord"][self.resort], apply_constraint=False
602 )
603 else: # Do not change cell information (normal static file)
604 if "coord_frac" in atoms_dict:
605 atoms.set_scaled_positions(
606 atoms_dict["coord_frac"][self.resort]
607 )
608 elif "coord" in atoms_dict:
609 atoms.set_positions(
610 atoms_dict["coord"][self.resort], apply_constraint=False
611 )
612 ase_images.append(atoms)
613 calc_results.append(partial_results)
614 return calc_results, ase_images
616 def _extract_geopt_results(self, raw_results, index=":"):
617 """Extract the static calculation results and atomic
618 structure(s) Returns: calc_results: dict with at least energy
619 value atoms: ASE atoms object The priority is to parse
620 position from static file first, then fallback from ion + inpt
622 Arguments:
623 raw_results (dict): Raw results parsed from self.read_raw_results
624 index (str or int): Index or slice of images
626 Returns:
627 List[results], List[Atoms]
629 """
630 # print("RAW_RES: ", raw_results)
631 geopt_results = raw_results.get("geopt", [])
632 calc_results = []
633 if len(geopt_results) == 0:
634 warn(
635 "Geopt file is empty! This is not an error if the calculation is continued from restart. "
636 )
637 return None, None
639 if isinstance(index, int):
640 _images = [geopt_results[index]]
641 elif isinstance(index, str):
642 _images = geopt_results[string2index(index)]
644 ase_images = []
645 for result in _images:
646 atoms = self.init_atoms.copy()
647 partial_result = {}
648 if "energy" in result:
649 partial_result["energy"] = result["energy"]
650 partial_result["free energy"] = result["energy"]
652 if "forces" in result:
653 partial_result["forces"] = result["forces"][self.resort]
655 if "stress" in result:
656 partial_result["stress"] = result["stress"]
658 # Modify the atoms copy
659 if "positions" in result:
660 atoms.set_positions(
661 result["positions"][self.resort], apply_constraint=False
662 )
663 if "ase_cell" in result:
664 atoms.set_cell(result["ase_cell"])
665 else:
666 # For geopt and RELAX=2 (cell relaxation),
667 # the positions may not be written in .geopt file
668 relax_flag = raw_results["inpt"]["params"].get("RELAX_FLAG", 0)
669 if relax_flag != 2:
670 raise ValueError(
671 ".geopt file missing positions while RELAX!=2. "
672 "Please check your setup ad output files."
673 )
674 if "ase_cell" not in result:
675 raise ValueError(
676 "Cannot recover positions from .geopt file due to missing cell information. "
677 "Please check your setup ad output files."
678 )
679 atoms.set_cell(result["ase_cell"], scale_atoms=True)
681 # Unlike low-dimensional stress in static calculations, we need to convert
682 # stress_1d stress_2d to stress_equiv using the non-period cell dimension(s)
683 # This has to be done when the actual cell information is loaded
684 if "stress_1d" in result:
685 stress_1d = result["stress_1d"]
686 assert (
687 np.count_nonzero(atoms.pbc) == 1
688 ), "Dimension of stress and PBC mismatch!"
689 for i, bc in enumerate(atoms.pbc):
690 if not bc:
691 stress_1d /= atoms.cell.cellpar()[i]
692 stress_equiv = stress_1d
693 partial_result["stress_equiv"] = stress_equiv
695 if "stress_2d" in result:
696 stress_2d = result["stress_2d"]
697 assert (
698 np.count_nonzero(atoms.pbc) == 2
699 ), "Dimension of stress and PBC mismatch!"
700 for i, bc in enumerate(atoms.pbc):
701 if not bc:
702 stress_2d /= atoms.cell.cellpar()[i]
703 stress_equiv = stress_2d
704 partial_result["stress_equiv"] = stress_equiv
706 calc_results.append(partial_result)
707 ase_images.append(atoms)
709 return calc_results, ase_images
711 def _extract_aimd_results(self, raw_results, index=":"):
712 """Extract energy / forces from aimd results
714 For calculator, we only need the last image
716 We probably want more information for the AIMD calculations,
717 but I'll keep them for now
719 Arguments:
720 raw_results (dict): Raw results parsed from self.read_raw_results
721 index (str or int): Index or slice of images
723 Returns:
724 List[results], List[Atoms]
726 """
727 aimd_results = raw_results.get("aimd", [])
728 calc_results = []
729 if len(aimd_results) == 0:
730 warn(
731 "Aimd file is empty! "
732 "This is not an error if the calculation "
733 "is continued from restart. "
734 )
735 return None, None
737 if isinstance(index, int):
738 _images = [aimd_results[index]]
739 elif isinstance(index, str):
740 _images = aimd_results[string2index(index)]
742 ase_images = []
743 for result in _images:
744 partial_result = {}
745 atoms = self.init_atoms.copy()
746 if "total energy per atom" in result:
747 partial_result["energy"] = result["total energy per atom"] * len(atoms)
748 if "free energy per atom" in result:
749 partial_result["free energy"] = result["free energy per atom"] * len(
750 atoms
751 )
753 if "forces" in result:
754 # The forces are already re-sorted!
755 partial_result["forces"] = result["forces"][self.resort]
757 # Modify the atoms in-place
758 if "positions" not in result:
759 raise ValueError("Cannot have aimd without positions information!")
761 atoms.set_positions(
762 result["positions"][self.resort], apply_constraint=False
763 )
765 if "velocities" in result:
766 atoms.set_velocities(result["velocities"][self.resort])
768 ase_images.append(atoms)
769 calc_results.append(partial_result)
770 return calc_results, ase_images
772 @property
773 def sort(self):
774 """Wrap the self.sorting dict. If sorting information does not exist,
775 use the default slicing
776 """
778 if self.sorting is None:
779 return slice(None, None, None)
780 sort = self.sorting.get("sort", [])
781 if len(sort) > 0:
782 return sort
783 else:
784 return slice(None, None, None)
786 @property
787 def resort(self):
788 """Wrap the self.sorting dict. If sorting information does not exist,
789 use the default slicing
790 """
792 if self.sorting is None:
793 return slice(None, None, None)
794 resort = self.sorting.get("resort", [])
795 if len(resort) > 0:
796 return resort
797 else:
798 return slice(None, None, None)
800 def read_psp_info(self):
801 """Parse the psp information from inpt file options
802 The psp file locations are relative to the bundle.
804 If the files cannot be found, the dict will only contain
805 the path
806 """
807 inpt = self.init_inputs.get("ion", {})
808 blocks = inpt.get("atom_blocks", [])
809 psp_info = {}
810 for block in blocks:
811 element = block["ATOM_TYPE"]
812 pseudo_path = block["PSEUDO_POT"]
813 real_path = (self.directory / pseudo_path).resolve()
814 psp_info[element] = {"rel_path": pseudo_path}
815 if not real_path.is_file():
816 warn(f"Cannot locate pseudopotential {pseudo_path}. ")
817 else:
818 header = open(real_path, "r").read()
819 psp_data = parse_psp8_header(header)
820 psp_info[element].update(psp_data)
821 return psp_info
824def read_sparc(filename, index=-1, include_all_files=True, **kwargs):
825 """Parse a SPARC bundle, return an Atoms object or list of Atoms (image)
826 with embedded calculator result.
828 Arguments:
829 filename (str or PosixPath): Filename to the sparc bundle
830 index (int or str): Index or slice of the images, following the ase.io.read convention
831 include_all_files (bool): If true, parse all output files with indexed suffices
832 **kwargs: Additional parameters
834 Returns:
835 Atoms or List[Atoms]
837 """
838 # We rely on minimal api version choose, i.e. default or set from env
839 api = locate_api()
840 sb = SparcBundle(directory=filename, validator=api)
841 atoms_or_images = sb.convert_to_ase(
842 index=index, include_all_files=include_all_files, **kwargs
843 )
844 return atoms_or_images
847def write_sparc(filename, images, **kwargs):
848 """Write sparc file. Images can only be Atoms object
849 or list of length 1
851 Arguments:
852 filename (str or PosixPath): Filename to the output sparc directory
853 images (Atoms or List(Atoms)): Atoms object to be written. Only supports writting 1 Atoms
854 **kwargs: Additional parameters
855 """
856 if isinstance(images, Atoms):
857 atoms = images
858 elif isinstance(images, list):
859 if len(images) > 1:
860 raise ValueError("SPARC format only supports writing one atoms object!")
861 atoms = images[0]
862 api = locate_api()
863 sb = SparcBundle(directory=filename, mode="w", validator=api)
864 sb._write_ion_and_inpt(atoms, **kwargs)
865 return
868@deprecated(
869 "Reading individual .ion file is not recommended. Please use read_sparc instead."
870)
871def read_sparc_ion(filename, **kwargs):
872 """Parse an .ion file inside the SPARC bundle using a wrapper around SparcBundle
873 The reader works only when other files (.inpt) exist.
875 The returned Atoms object of read_ion method only contains the initial positions
877 Arguments:
878 filename (str or PosixPath): Filename to the .ion file
879 index (int or str): Index or slice of the images, following the ase.io.read convention
880 **kwargs: Additional parameters
882 Returns:
883 Atoms or List[Atoms]
884 """
885 api = locate_api()
886 parent_dir = Path(filename).parent
887 sb = SparcBundle(directory=parent_dir, validator=api)
888 atoms = sb._read_ion_and_inpt()
889 return atoms
892# Backward compatibity
893read_ion = read_sparc_ion
896@deprecated(
897 "Writing individual .ion file is not recommended. Please use write_sparc instead."
898)
899def write_sparc_ion(filename, atoms, **kwargs):
900 """Write .ion file using the SparcBundle wrapper. This method will also create the .inpt file
902 This is only for backward compatibility
904 Arguments:
905 filename (str or PosixPath): Filename to the .ion file
906 atoms (Atoms): atoms to be written
907 **kwargs: Additional parameters
908 """
909 label = Path(filename).with_suffix("").name
910 parent_dir = Path(filename).parent
911 api = locate_api()
912 sb = SparcBundle(directory=parent_dir, label=label, mode="w", validator=api)
913 sb._write_ion_and_inpt(atoms, **kwargs)
914 return atoms
917# Backward compatibility
918write_ion = write_sparc_ion
921@deprecated(
922 "Reading individual .static file is not recommended. Please use read_sparc instead."
923)
924def read_sparc_static(filename, index=-1, **kwargs):
925 """Parse a .static file bundle using a wrapper around SparcBundle
926 The reader works only when other files (.ion, .inpt) exist.
928 Arguments:
929 filename (str or PosixPath): Filename to the .static file
930 index (int or str): Index or slice of the images, following the ase.io.read convention
931 **kwargs: Additional parameters
933 Returns:
934 Atoms or List[Atoms]
935 """
936 parent_dir = Path(filename).parent
937 api = locate_api()
938 sb = SparcBundle(directory=parent_dir, validator=api)
939 # In most of the cases the user wants to inspect all images
940 kwargs = kwargs.copy()
941 if "include_all_files" not in kwargs:
942 kwargs.update(include_all_files=True)
943 atoms_or_images = sb.convert_to_ase(index=index, **kwargs)
944 return atoms_or_images
947# Backward compatibility
948read_static = read_sparc_static
951@deprecated(
952 "Reading individual .geopt file is not recommended. Please use read_sparc instead."
953)
954def read_sparc_geopt(filename, index=-1, **kwargs):
955 """Parse a .geopt file bundle using a wrapper around SparcBundle
956 The reader works only when other files (.ion, .inpt) exist.
958 Arguments:
959 filename (str or PosixPath): Filename to the .geopt file
960 index (int or str): Index or slice of the images, following the ase.io.read convention
961 **kwargs: Additional parameters
963 Returns:
964 Atoms or List[Atoms]
965 """
966 parent_dir = Path(filename).parent
967 api = locate_api()
968 sb = SparcBundle(directory=parent_dir, validator=api)
969 kwargs = kwargs.copy()
970 if "include_all_files" not in kwargs:
971 kwargs.update(include_all_files=True)
972 atoms_or_images = sb.convert_to_ase(index=index, **kwargs)
973 return atoms_or_images
976# Backward compatibility
977read_geopt = read_sparc_geopt
980@deprecated(
981 "Reading individual .aimd file is not recommended. Please use read_sparc instead."
982)
983def read_sparc_aimd(filename, index=-1, **kwargs):
984 """Parse a .static file bundle using a wrapper around SparcBundle
985 The reader works only when other files (.ion, .inpt) exist.
987 Arguments:
988 filename (str or PosixPath): Filename to the .aimd file
989 index (int or str): Index or slice of the images, following the ase.io.read convention
990 **kwargs: Additional parameters
992 Returns:
993 Atoms or List[Atoms]
994 """
995 parent_dir = Path(filename).parent
996 api = locate_api()
997 sb = SparcBundle(directory=parent_dir, validator=api)
998 kwargs = kwargs.copy()
999 if "include_all_files" not in kwargs:
1000 kwargs.update(include_all_files=True)
1001 atoms_or_images = sb.convert_to_ase(index=index, **kwargs)
1002 return atoms_or_images
1005# Backward compatibility
1006read_aimd = read_sparc_aimd
1009@deprecated(
1010 "__register_new_filetype will be deprecated for future releases. Please upgrade ase>=3.23."
1011)
1012def __register_new_filetype():
1013 """Register the filetype() function that allows recognizing .sparc as directory
1014 This method should only be called for ase==3.22 compatibility and for ase-gui
1015 In future versions of ase gui where format is supported, this method should be removed
1016 """
1017 import sys
1019 from ase.io import formats as hacked_formats
1020 from ase.io.formats import filetype as _old_filetype
1021 from ase.io.formats import ioformats
1023 def _new_filetype(filename, read=True, guess=True):
1024 """A hacked solution for the auto format recovery"""
1025 path = Path(filename)
1026 ext = path.name
1027 if ".sparc" in ext:
1028 return "sparc"
1029 else:
1030 if path.is_dir():
1031 if (len(list(path.glob("*.ion"))) > 0) and (
1032 len(list(path.glob("*.inpt"))) > 0
1033 ):
1034 return "sparc"
1035 return _old_filetype(filename, read, guess)
1037 hacked_formats.filetype = _new_filetype
1038 sys.modules["ase.io.formats"] = hacked_formats
1039 return
1042@deprecated(
1043 "register_ase_io_sparc will be deprecated for future releases. Please upgrade ase>=3.23."
1044)
1045def register_ase_io_sparc(name="sparc"):
1046 """
1047 **Legacy register of io-formats for ase==3.22**
1048 **For ase>=3.23, use the package entrypoint registration**
1049 Monkey patching the ase.io and ase.io.formats
1050 So that the following formats can be used
1051 after `import sparc`
1053 ```
1054 from ase.io import sparc
1055 ase.io.read("test.sparc")
1056 atoms.write("test.sparc")
1057 ```
1059 The register method only aims to work for ase 3.22
1060 the develope version of ase provides a much more powerful
1061 register mechanism, we can wait.
1062 """
1063 import sys
1064 from warnings import warn
1066 import pkg_resources
1067 from ase.io.formats import define_io_format as F
1068 from ase.io.formats import ioformats
1070 name = name.lower()
1071 if name in ioformats.keys():
1072 return
1073 desc = "SPARC .sparc bundle"
1075 # Step 1: patch the ase.io.sparc module
1076 try:
1077 entry_points = next(
1078 ep for ep in pkg_resources.iter_entry_points("ase.io") if ep.name == "sparc"
1079 )
1080 _monkey_mod = entry_points.load()
1081 except Exception as e:
1082 warn(
1083 (
1084 "Failed to load entrypoint `ase.io.sparc`, "
1085 "you may need to reinstall sparc python api.\n"
1086 "You may still use `sparc.read_sparc` and "
1087 "`sparc.write_sparc` methods, "
1088 "but not `ase.io.read`\n",
1089 f"The error is {e}",
1090 )
1091 )
1092 return
1094 sys.modules[f"ase.io.{name}"] = _monkey_mod
1095 __register_new_filetype()
1097 # Step 2: define a new format
1098 F(
1099 name,
1100 desc=desc,
1101 code="+S", # read_sparc has multi-image support
1102 ext="sparc",
1103 )
1105 if name not in ioformats.keys():
1106 warn(
1107 (
1108 "Registering .sparc format with ase.io failed. "
1109 "You may still use `sparc.read_sparc` and "
1110 "`sparc.write_sparc` methods. \n"
1111 "Please contact the developer to report this issue."
1112 )
1113 )
1114 return
1116 import tempfile
1118 from ase.io import read
1120 with tempfile.TemporaryDirectory(suffix=".sparc") as tmpdir:
1121 try:
1122 read(tmpdir.name)
1123 except Exception as e:
1124 emsg = str(e).lower()
1125 if "bundletrajectory" in emsg:
1126 warn(
1127 "Atomatic format inference for sparc is not correctly registered. "
1128 "You may need to use format=sparc in ase.io.read and ase.io.write. "
1129 )
1130 # Add additional formats including .ion (r/w), .static, .geopt, .aimd
1131 F(
1132 "ion",
1133 desc="SPARC .ion file",
1134 module="sparc",
1135 code="1S",
1136 ext="ion",
1137 )
1138 F(
1139 "static",
1140 desc="SPARC single point results",
1141 module="sparc",
1142 code="+S",
1143 ext="static",
1144 )
1145 F(
1146 "geopt",
1147 desc="SPARC geometric optimization results",
1148 module="sparc",
1149 code="+S",
1150 ext="geopt",
1151 )
1152 F("aimd", desc="SPARC AIMD results", module="sparc", code="+S", ext="aimd")
1154 # TODO: remove print options as it may be redundant
1155 print("Successfully registered sparc formats with ase.io!")
1158# ase>=3.23 uses new ExternalIOFormat as registered entrypoints
1159# Please do not use from ase.io.formats import ExternalIOFormat!
1160# This causes circular import
1161try:
1162 from ase.utils.plugins import ExternalIOFormat as EIF
1163except ImportError:
1164 # Backward Compatibility
1165 from typing import List, NamedTuple, Optional, Union
1167 # Copy definition from 3.23
1168 # Name is defined in the entry point
1169 class ExternalIOFormat(NamedTuple):
1170 desc: str
1171 code: str
1172 module: Optional[str] = None
1173 glob: Optional[Union[str, List[str]]] = None
1174 ext: Optional[Union[str, List[str]]] = None
1175 magic: Optional[Union[bytes, List[bytes]]] = None
1176 magic_regex: Optional[bytes] = None
1178 EIF = ExternalIOFormat
1180format_sparc = EIF(
1181 desc="SPARC .sparc bundle",
1182 module="sparc.io",
1183 code="+S", # read_sparc has multi-image support
1184 ext="sparc",
1185)
1186format_ion = EIF(
1187 desc="SPARC .ion file",
1188 module="sparc.io",
1189 code="1S",
1190 ext="ion",
1191)
1192format_static = EIF(
1193 desc="SPARC single point results",
1194 module="sparc.io",
1195 code="+S",
1196 glob=["*.static", "*.static_*"],
1197)
1198format_geopt = EIF(
1199 desc="SPARC geometric optimization results",
1200 module="sparc.io",
1201 code="+S",
1202 glob=["*.geopt", "*.geopt_*"],
1203)
1204format_aimd = EIF(
1205 desc="SPARC AIMD results",
1206 module="sparc",
1207 code="+S",
1208 glob=["*.aimd*", "*.geopt_*"],
1209)