Coverage for sparc/io.py: 73%
481 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 01:13 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 01:13 +0000
1"""Providing a new bundled SPARC file format
2"""
3import os
4import re
5from pathlib import Path
6from warnings import warn
8import numpy as np
9from ase.atoms import Atoms
10from ase.calculators.singlepoint import SinglePointDFTCalculator
11from ase.config import cfg as _cfg
13# various io formatters
14from .api import SparcAPI
15from .common import psp_dir as default_psp_dir
16from .download_data import is_psp_download_complete
17from .sparc_parsers.aimd import _read_aimd
18from .sparc_parsers.atoms import atoms_to_dict, dict_to_atoms
19from .sparc_parsers.geopt import _read_geopt
20from .sparc_parsers.inpt import _read_inpt, _write_inpt
21from .sparc_parsers.ion import _read_ion, _write_ion
22from .sparc_parsers.out import _read_out
23from .sparc_parsers.pseudopotential import copy_psp_file, parse_psp8_header
24from .sparc_parsers.static import _add_cell_info, _read_static
25from .utils import deprecated, locate_api, sanitize_path, string2index
27# from .sparc_parsers.ion import read_ion, write_ion
28defaultAPI = locate_api(cfg=_cfg)
31class SparcBundle:
32 """Provide access to a calculation folder of SPARC as a simple bundle
34 The bundle can be optionally named as .sparc following the ASE's
35 .bundle format
37 Currently the write method only supports 1 image, while read method support reading
38 atoms results in following conditions
40 1) No calculation (minimal): .ion + .inpt file --> 1 image
41 2) Single point calculation: .ion + .inpt + .out + .static --> 1
42 image with calc
43 3) Multiple SP calculations: chain all
44 .out{digits} and .static{digitis} outputs 4) Relaxation: read from
45 .geopt and .out (supporting chaining) 5) AIMD: read from .aimd and
46 .out (support chaining)
49 Attributes:
50 directory (Path): Path to the directory containing SPARC files.
51 mode (str): File access mode ('r', 'w', or 'a').
52 label (str): Name of the main SPARC file.
53 init_atoms (Atoms): Initial atomic configuration.
54 init_inputs (dict): Initial input parameters.
55 psp_data (dict): Pseudopotential data.
56 raw_results (dict): Raw results from SPARC calculations.
57 psp_dir (Path): Directory containing pseudopotentials.
58 sorting (list): Sort order for atoms.
59 last_image (int): Index of the last image in a series of calculations.
60 validator (SparcAPI): API validator for SPARC calculations.
62 Methods:
63 __find_psp_dir(psp_dir=None): Finds the directory for SPARC pseudopotentials.
64 _find_files(): Finds all files matching the bundle label.
65 _make_label(label=None): Infers or sets the label for the SPARC bundle.
66 _indir(ext, label=None, occur=0, d_format="{:02d}"): Finds a file with a specific extension in the bundle.
67 _read_ion_and_inpt(): Reads .ion and .inpt files together.
68 _write_ion_and_inpt(): Writes .ion and .inpt files to the bundle.
69 _read_results_from_index(index, d_format="{:02d}"): Reads results from a specific calculation index.
70 _make_singlepoint(calc_results, images, raw_results): Converts results and images to SinglePointDFTCalculators.
71 _extract_static_results(raw_results, index=":"): Extracts results from static calculations.
72 _extract_geopt_results(raw_results, index=":"): Extracts results from geometric optimization calculations.
73 _extract_aimd_results(raw_results, index=":"): Extracts results from AIMD calculations.
74 convert_to_ase(index=-1, include_all_files=False, **kwargs): Converts raw results to ASE Atoms with calculators.
75 read_raw_results(include_all_files=False): Parses all files in the bundle and merges results.
76 read_psp_info(): Parses pseudopotential information from the inpt file.
77 """
79 psp_env = ["SPARC_PSP_PATH", "SPARC_PP_PATH"]
81 def __init__(
82 self,
83 directory,
84 mode="r",
85 atoms=None,
86 label=None,
87 psp_dir=None,
88 validator=defaultAPI,
89 cfg=_cfg,
90 ):
91 """
92 Initializes a SparcBundle for accessing SPARC calculation data.
94 Args:
95 directory (str or Path): The path to the directory containing the SPARC files.
96 mode (str, optional): The file access mode. Can be 'r' (read), 'w' (write), or 'a' (append). Defaults to 'r'.
97 atoms (Atoms, optional): The initial atomic configuration. Only relevant in write mode.
98 label (str, optional): A custom label for the bundle. If None, the label is inferred from the directory or files.
99 psp_dir (str or Path, optional): Path to the directory containing pseudopotentials. If None, the path is inferred.
100 validator (SparcAPI, optional): An instance of SparcAPI for validating and parsing SPARC parameters. Defaults to a default SparcAPI instance.
102 Raises:
103 AssertionError: If an invalid mode is provided.
104 ValueError: If multiple .ion files are found and no label is specified.
105 Warning: If no .ion file is found in read-mode, or illegal characters are in the label.
106 """
107 self.directory = Path(directory)
108 self.mode = mode.lower()
109 assert self.mode in (
110 "r",
111 "w",
112 "a",
113 ), f"Invalid mode {self.mode}! Must one of 'r', 'w' or 'a'"
114 self.label = self._make_label(label)
115 self.init_atoms = atoms.copy() if atoms is not None else None
116 self.init_inputs = {}
117 self.psp_data = {}
118 self.raw_results = {}
119 self.cfg = cfg
120 self.psp_dir = self.__find_psp_dir(psp_dir)
121 # Sorting should be consistent across the whole bundle!
122 self.sorting = None
123 self.last_image = -1
124 self.validator = validator
126 def _find_files(self):
127 """Find all files matching '{label}.*'"""
128 return list(self.directory.glob(f"{self.label}.*"))
130 def _make_label(self, label=None):
131 """Infer the label from the bundle
133 Special cases if label is None:
134 1. read mode --> get the ion file name
135 2. write mode --> infer from the directory
137 Arguments:
138 label (str or None): Label to be used to write the .ion, .inpt files
139 """
140 prefix = self.directory.resolve().with_suffix("").name
142 illegal_chars = '\\/:*?"<>|'
143 if label is not None:
144 label_ = label
145 elif self.mode == "w":
146 label_ = prefix
147 else:
148 # read
149 match_ion = list(self.directory.glob("*.ion"))
150 if len(match_ion) > 1:
151 raise ValueError(
152 "Cannot read sparc bundle with multiple ion files without specifying the label!"
153 )
154 elif len(match_ion) == 1:
155 label_ = match_ion[0].name.split(".")[0]
156 else:
157 # No file found, possibly an empty bundle
158 warn("No .ion file found in the read-mode bundle.")
159 label_ = prefix
161 if any([c in label_ for c in illegal_chars]):
162 warn(
163 f"Label name {label_} contains illegal characters! I'll make it 'SPARC'"
164 )
165 label_ = "SPARC"
166 return label_
168 def __find_psp_dir(self, psp_dir=None):
169 """Use environmental variable to find the directory for SPARC
170 pseudopotentials
172 Searching priority:
173 1. User defined psp_dir
174 2. $SPARC_PSP_PATH
175 3. $SPARC_PP_PATH
176 4. psp bundled with sparc-api
178 Arguments:
179 psp_dir (str or PosixPath or None): the specific directory to search the psp files.
180 Each element can only have 1 psp file under psp_dir
181 Returns:
182 PosixPath: Location of psp files
183 """
184 if psp_dir is not None:
185 return Path(psp_dir)
186 else:
187 for var in self.psp_env:
188 env_psp_dir = self.cfg.get(var, None)
189 if env_psp_dir:
190 return Path(env_psp_dir)
191 # Use pp_path field in cfg
192 parser = self.cfg.parser["sparc"] if "sparc" in self.cfg.parser else {}
193 psp_dir_ini = parser.get("psp_path", None)
194 if psp_dir_ini:
195 return sanitize_path(psp_dir_ini)
196 # At this point, we try to use the psp files bundled with sparc
197 if is_psp_download_complete(default_psp_dir):
198 return default_psp_dir
199 else:
200 warn(
201 (
202 "PSP directory bundled with SPARC-X-API is broken! "
203 "Please use `sparc.download_data` to re-download them!"
204 )
205 )
207 # Not found
208 if self.mode == "w":
209 warn(
210 (
211 "No pseudopotential searching path was set and "
212 "neither of $SPARC_PSP_PATH nor $SPARC_PP_PATH is set.\n"
213 "Please explicitly provide the pseudopotentials parameter when writing the sparc bundle."
214 )
215 )
216 return None
218 def _indir(self, ext, label=None, occur=0, d_format="{:02d}"):
219 """Find the file with {label}.{ext} under current dir,
220 if label is None, use the default
222 Arguments:
223 ext (str): Extension of file, e.g. '.ion' or 'ion'
224 label (str or None): Label for the file. If None, use the parent directory name for searching
225 occur (int): Occurance index of the file, if occur > 0, search for files with suffix like 'SPARC.out_01'
226 d_format (str): Format for the index
228 Returns:
229 PosixPath: Path to the target file under self.directory
230 """
231 label = self.label if label is None else label
232 if not ext.startswith("."):
233 ext = "." + ext
234 if occur == 0:
235 target = self.directory / f"{label}{ext}"
236 else:
237 target = self.directory / f"{label}{ext}_{d_format.format(occur)}"
238 return target
240 def _read_ion_and_inpt(self):
241 """Read the ion and inpt files together to obtain basic atomstic data.
243 Returns:
244 Atoms: atoms object from .ion and .inpt file
245 """
246 f_ion, f_inpt = self._indir(".ion"), self._indir(".inpt")
247 ion_data = _read_ion(f_ion, validator=self.validator)
248 inpt_data = _read_inpt(f_inpt, validator=self.validator)
249 merged_data = {**ion_data, **inpt_data}
250 return dict_to_atoms(merged_data)
252 def _write_ion_and_inpt(
253 self,
254 atoms=None,
255 label=None,
256 direct=False,
257 sort=True,
258 ignore_constraints=False,
259 wrap=False,
260 # Below are the parameters from v1
261 # scaled -> direct, ignore_constraints --> not add_constraints
262 scaled=False,
263 add_constraints=True,
264 copy_psp=False,
265 comment="",
266 input_parameters={},
267 # Parameters that do not require type conversion
268 **kwargs,
269 ):
270 """Write the ion and inpt files to a bundle. This method only
271 supports writing 1 image. If input_parameters are empty,
272 there will only be .ion writing the positions and .inpt
273 writing a minimal cell information
275 Args:
276 atoms (Atoms, optional): The Atoms object to write. If None, uses initialized atoms associated with SparcBundle.
277 label (str, optional): Custom label for the written files.
278 direct (bool, optional): If True, writes positions in direct coordinates.
279 sort (bool, optional): If True, sorts atoms before writing.
280 ignore_constraints (bool, optional): If True, ignores constraints on atoms.
281 wrap (bool, optional): If True, wraps atoms into the unit cell.
282 **kwargs: Additional keyword arguments for writing.
284 Raises:
285 ValueError: If the bundle is not in write mode.
286 """
287 if self.mode != "w":
288 raise ValueError(
289 "Cannot write input files while sparc bundle is opened in read or append mode!"
290 )
291 os.makedirs(self.directory, exist_ok=True)
292 atoms = self.atoms.copy() if atoms is None else atoms.copy()
293 pseudopotentials = kwargs.pop("pseudopotentials", {})
295 if sort:
296 if self.sorting is not None:
297 old_sort = self.sorting.get("sort", None)
298 if old_sort:
299 sort = old_sort
301 data_dict = atoms_to_dict(
302 atoms,
303 direct=direct,
304 sort=sort,
305 ignore_constraints=ignore_constraints,
306 psp_dir=self.psp_dir,
307 pseudopotentials=pseudopotentials,
308 )
309 merged_inputs = input_parameters.copy()
310 merged_inputs.update(kwargs)
311 data_dict["inpt"]["params"].update(merged_inputs)
313 # If copy_psp, change the PSEUDO_POT field and copy the files
314 if copy_psp:
315 for block in data_dict["ion"]["atom_blocks"]:
316 if "PSEUDO_POT" in block:
317 origin_psp = block["PSEUDO_POT"]
318 target_dir = self.directory
319 target_fname = copy_psp_file(origin_psp, target_dir)
320 block["PSEUDO_POT"] = target_fname
322 _write_ion(self._indir(".ion"), data_dict, validator=self.validator)
323 _write_inpt(self._indir(".inpt"), data_dict, validator=self.validator)
324 # Update the sorting information
325 ion_dict = _read_ion(self._indir(".ion"))["ion"]
326 self.sorting = ion_dict.get("sorting", None)
327 return
329 def read_raw_results(self, include_all_files=False):
330 """Parse all files using the given self.label.
331 The results are merged dict from all file formats
333 Arguments:
334 include_all_files (bool): Whether to include output files with different suffices
335 If true: include all files (e.g. SPARC.out, SPARC.out_01,
336 SPARC.out_02, etc).
337 Returns:
338 dict or List: Dict containing all raw results. Only some of them will appear in the calculator's results
340 Sets:
341 self.raw_results (dict or List): the same as the return value
343 #TODO: @TT 2024-11-01 allow accepting indices
344 #TODO: @TT last_image is a bad name, it should refer to the occurance of images
345 the same goes with num_calculations
346 """
347 # Find the max output index
348 out_files = self.directory.glob(f"{self.label}.out*")
349 valid_out_files = [
350 f
351 for f in out_files
352 if (re.fullmatch(r"^\.out(?:_\d+)?$", f.suffix) is not None)
353 ]
354 # Combine and sort the file lists
355 last_out = sorted(valid_out_files, reverse=True)
356 # No output file, only ion / inpt
357 if len(last_out) == 0:
358 self.last_image = -1
359 else:
360 suffix = last_out[0].suffix
361 if suffix == ".out":
362 self.last_image = 0
363 else:
364 self.last_image = int(suffix.split("_")[1])
365 self.num_calculations = self.last_image + 1
367 # Always make sure ion / inpt results are parsed regardless of actual calculations
368 if include_all_files:
369 if self.num_calculations > 0:
370 results = [
371 self._read_results_from_index(index)
372 for index in range(self.num_calculations)
373 ]
374 else:
375 results = [self._read_results_from_index(self.last_image)]
376 else:
377 results = self._read_results_from_index(self.last_image)
379 self.raw_results = results
381 if include_all_files:
382 init_raw_results = self.raw_results[0]
383 else:
384 init_raw_results = self.raw_results.copy()
386 self.init_atoms = dict_to_atoms(init_raw_results)
387 self.init_inputs = {
388 "ion": init_raw_results["ion"],
389 "inpt": init_raw_results["inpt"],
390 }
391 self.psp_data = self.read_psp_info()
392 return self.raw_results
394 def _read_results_from_index(self, index, d_format="{:02d}"):
395 """Read the results from one calculation index, and return a
396 single raw result dict, e.g. for index=0 --> .static
397 and index=1 --> .static_01.
399 Arguments:
400 index (int): Index of image to return the results
401 d_format (str): Format for the index suffix
403 Returns:
404 dict: Results for single image
406 #TODO: @TT should we call index --> occurance?
408 """
409 results_dict = {}
411 for ext in ("ion", "inpt"):
412 f = self._indir(ext, occur=0)
413 if f.is_file():
414 data_dict = globals()[f"_read_{ext}"](f)
415 results_dict.update(data_dict)
416 for ext in ("geopt", "static", "aimd", "out"):
417 f = self._indir(ext, occur=index, d_format=d_format)
418 if f.is_file():
419 data_dict = globals()[f"_read_{ext}"](f)
420 results_dict.update(data_dict)
422 # Must have files: ion, inpt
423 if ("ion" not in results_dict) or ("inpt" not in results_dict):
424 raise RuntimeError(
425 "Either ion or inpt files are missing from the bundle! "
426 "Your SPARC calculation may be corrupted."
427 )
429 # Copy the sorting information, if not existing
430 sorting = results_dict["ion"].get("sorting", None)
431 if sorting is not None:
432 if self.sorting is None:
433 self.sorting = sorting
434 else:
435 # Compare stored sorting
436 assert (tuple(self.sorting["sort"]) == tuple(sorting["sort"])) and (
437 tuple(self.sorting["resort"]) == tuple(sorting["resort"])
438 ), "Sorting information changed!"
439 return results_dict
441 def convert_to_ase(self, index=-1, include_all_files=False, **kwargs):
442 """Read the raw results from the bundle and create atoms with
443 single point calculators
445 Arguments:
446 index (int or str): Index or slice of the image(s) to convert. Uses the same format as ase.io.read
447 include_all_files (bool): If true, also read results with indexed suffices
449 Returns:
450 Atoms or List[Atoms]: ASE-atoms or images with single point results
452 """
453 # Convert to images!
454 # TODO: @TT 2024-11-01 read_raw_results should implement a more
455 # robust behavior handling index, as it is the entry point for all
456 rs = self.read_raw_results(include_all_files=include_all_files)
457 if isinstance(rs, dict):
458 raw_results = [rs]
459 else:
460 raw_results = list(rs)
461 res_images = []
462 for entry in raw_results:
463 if "static" in entry:
464 calc_results, images = self._extract_static_results(entry, index=":")
465 elif "geopt" in entry:
466 calc_results, images = self._extract_geopt_results(entry, index=":")
467 elif "aimd" in entry:
468 calc_results, images = self._extract_aimd_results(entry, index=":")
469 else:
470 calc_results, images = None, [self.init_atoms.copy()]
472 if images is not None:
473 if calc_results is not None:
474 images = self._make_singlepoint(calc_results, images, entry)
475 res_images.extend(images)
477 if isinstance(index, int):
478 return res_images[index]
479 else:
480 return res_images[string2index(index)]
482 def _make_singlepoint(self, calc_results, images, raw_results):
483 """Convert a calculator dict and images of Atoms to list of
484 SinglePointDFTCalculators
486 The calculator also takes parameters from ion, inpt that exist
487 in self.raw_results.
489 Arguments:
490 calc_results (List): Calculation results for all images
491 images (List): Corresponding Atoms images
492 raw_results (List): Full raw results dict to obtain additional information
494 Returns:
495 List(Atoms): ASE-atoms images with single point calculators attached
497 """
498 converted_images = []
499 for res, _atoms in zip(calc_results, images):
500 atoms = _atoms.copy()
501 sp = SinglePointDFTCalculator(atoms)
502 # Res can be empty at this point, leading to incomplete calc
503 sp.results.update(res)
504 sp.name = "sparc"
505 sp.kpts = raw_results["inpt"].get("params", {}).get("KPOINT_GRID", None)
506 # There may be a better way handling the parameters...
507 sp.parameters = raw_results["inpt"].get("params", {})
508 sp.raw_parameters = {
509 "ion": raw_results["ion"],
510 "inpt": raw_results["inpt"],
511 }
512 atoms.calc = sp
513 converted_images.append(atoms)
514 return converted_images
516 def _extract_static_results(self, raw_results, index=":"):
517 """Extract the static calculation results and atomic
518 structure(s) Returns: calc_results: dict with at least energy
519 value atoms: ASE atoms object The priority is to parse
520 position from static file first, then fallback from ion + inpt
522 Note: make all energy / forces resorted!
524 Arguments:
525 raw_results (dict): Raw results parsed from self.read_raw_results
526 index (str or int): Index or slice of images
528 Returns:
529 List[results], List[Atoms]
531 """
532 static_results = raw_results.get("static", [])
533 calc_results = []
534 # Use extra lattice information to construct the positions
535 cell = self.init_atoms.cell
536 # import pdb; pdb.set_trace()
537 static_results = _add_cell_info(static_results, cell)
539 if isinstance(index, int):
540 _images = [static_results[index]]
541 elif isinstance(index, str):
542 _images = static_results[string2index(index)]
544 ase_images = []
545 for static_results in _images:
546 partial_results = {}
547 if "free energy" in static_results:
548 partial_results["energy"] = static_results["free energy"]
549 partial_results["free energy"] = static_results["free energy"]
551 if "forces" in static_results:
552 partial_results["forces"] = static_results["forces"][self.resort]
554 if "atomic_magnetization" in static_results:
555 partial_results["magmoms"] = static_results["atomic_magnetization"][
556 self.resort
557 ]
559 if "net_magnetization" in static_results:
560 partial_results["magmom"] = static_results["net_magnetization"]
562 if "stress" in static_results:
563 partial_results["stress"] = static_results["stress"]
565 if "stress_equiv" in static_results:
566 partial_results["stress_equiv"] = static_results["stress_equiv"]
568 atoms = self.init_atoms.copy()
569 # import pdb; pdb.set_trace()
570 if "atoms" in static_results:
571 atoms_dict = static_results["atoms"]
573 # The socket mode case. Reset all cell and positions
574 # Be careful,
575 if "lattice" in static_results:
576 lat = static_results["lattice"]
577 atoms.set_cell(lat, scale_atoms=False)
578 if "coord" not in atoms_dict:
579 raise KeyError(
580 "Coordination conversion failed in socket static output!"
581 )
582 atoms.set_positions(
583 atoms_dict["coord"][self.resort], apply_constraint=False
584 )
585 else: # Do not change cell information (normal static file)
586 if "coord_frac" in atoms_dict:
587 atoms.set_scaled_positions(
588 atoms_dict["coord_frac"][self.resort]
589 )
590 elif "coord" in atoms_dict:
591 atoms.set_positions(
592 atoms_dict["coord"][self.resort], apply_constraint=False
593 )
594 ase_images.append(atoms)
595 calc_results.append(partial_results)
596 return calc_results, ase_images
598 def _extract_geopt_results(self, raw_results, index=":"):
599 """Extract the static calculation results and atomic
600 structure(s) Returns: calc_results: dict with at least energy
601 value atoms: ASE atoms object The priority is to parse
602 position from static file first, then fallback from ion + inpt
604 Arguments:
605 raw_results (dict): Raw results parsed from self.read_raw_results
606 index (str or int): Index or slice of images
608 Returns:
609 List[results], List[Atoms]
611 """
612 # print("RAW_RES: ", raw_results)
613 geopt_results = raw_results.get("geopt", [])
614 calc_results = []
615 if len(geopt_results) == 0:
616 warn(
617 "Geopt file is empty! This is not an error if the calculation is continued from restart. "
618 )
619 return None, None
621 if isinstance(index, int):
622 _images = [geopt_results[index]]
623 elif isinstance(index, str):
624 _images = geopt_results[string2index(index)]
626 ase_images = []
627 for result in _images:
628 atoms = self.init_atoms.copy()
629 partial_result = {}
630 if "energy" in result:
631 partial_result["energy"] = result["energy"]
632 partial_result["free energy"] = result["energy"]
634 if "forces" in result:
635 partial_result["forces"] = result["forces"][self.resort]
637 if "stress" in result:
638 partial_result["stress"] = result["stress"]
640 # Modify the atoms copy
641 if "positions" in result:
642 atoms.set_positions(
643 result["positions"][self.resort], apply_constraint=False
644 )
645 if "ase_cell" in result:
646 atoms.set_cell(result["ase_cell"])
647 else:
648 # For geopt and RELAX=2 (cell relaxation),
649 # the positions may not be written in .geopt file
650 relax_flag = raw_results["inpt"]["params"].get("RELAX_FLAG", 0)
651 if relax_flag != 2:
652 raise ValueError(
653 ".geopt file missing positions while RELAX!=2. "
654 "Please check your setup ad output files."
655 )
656 if "ase_cell" not in result:
657 raise ValueError(
658 "Cannot recover positions from .geopt file due to missing cell information. "
659 "Please check your setup ad output files."
660 )
661 atoms.set_cell(result["ase_cell"], scale_atoms=True)
663 # Unlike low-dimensional stress in static calculations, we need to convert
664 # stress_1d stress_2d to stress_equiv using the non-period cell dimension(s)
665 # This has to be done when the actual cell information is loaded
666 if "stress_1d" in result:
667 stress_1d = result["stress_1d"]
668 assert (
669 np.count_nonzero(atoms.pbc) == 1
670 ), "Dimension of stress and PBC mismatch!"
671 for i, bc in enumerate(atoms.pbc):
672 if not bc:
673 stress_1d /= atoms.cell.cellpar()[i]
674 stress_equiv = stress_1d
675 partial_result["stress_equiv"] = stress_equiv
677 if "stress_2d" in result:
678 stress_2d = result["stress_2d"]
679 assert (
680 np.count_nonzero(atoms.pbc) == 2
681 ), "Dimension of stress and PBC mismatch!"
682 for i, bc in enumerate(atoms.pbc):
683 if not bc:
684 stress_2d /= atoms.cell.cellpar()[i]
685 stress_equiv = stress_2d
686 partial_result["stress_equiv"] = stress_equiv
688 calc_results.append(partial_result)
689 ase_images.append(atoms)
691 return calc_results, ase_images
693 def _extract_aimd_results(self, raw_results, index=":"):
694 """Extract energy / forces from aimd results
696 For calculator, we only need the last image
698 We probably want more information for the AIMD calculations,
699 but I'll keep them for now
701 Arguments:
702 raw_results (dict): Raw results parsed from self.read_raw_results
703 index (str or int): Index or slice of images
705 Returns:
706 List[results], List[Atoms]
708 """
709 aimd_results = raw_results.get("aimd", [])
710 calc_results = []
711 if len(aimd_results) == 0:
712 warn(
713 "Aimd file is empty! "
714 "This is not an error if the calculation "
715 "is continued from restart. "
716 )
717 return None, None
719 if isinstance(index, int):
720 _images = [aimd_results[index]]
721 elif isinstance(index, str):
722 _images = aimd_results[string2index(index)]
724 ase_images = []
725 for result in _images:
726 partial_result = {}
727 atoms = self.init_atoms.copy()
728 if "total energy per atom" in result:
729 partial_result["energy"] = result["total energy per atom"] * len(atoms)
730 if "free energy per atom" in result:
731 partial_result["free energy"] = result["free energy per atom"] * len(
732 atoms
733 )
735 if "forces" in result:
736 # The forces are already re-sorted!
737 partial_result["forces"] = result["forces"][self.resort]
739 # Modify the atoms in-place
740 if "positions" not in result:
741 raise ValueError("Cannot have aimd without positions information!")
743 atoms.set_positions(
744 result["positions"][self.resort], apply_constraint=False
745 )
747 if "velocities" in result:
748 atoms.set_velocities(result["velocities"][self.resort])
750 ase_images.append(atoms)
751 calc_results.append(partial_result)
752 return calc_results, ase_images
754 @property
755 def sort(self):
756 """Wrap the self.sorting dict. If sorting information does not exist,
757 use the default slicing
758 """
760 if self.sorting is None:
761 return slice(None, None, None)
762 sort = self.sorting.get("sort", [])
763 if len(sort) > 0:
764 return sort
765 else:
766 return slice(None, None, None)
768 @property
769 def resort(self):
770 """Wrap the self.sorting dict. If sorting information does not exist,
771 use the default slicing
772 """
774 if self.sorting is None:
775 return slice(None, None, None)
776 resort = self.sorting.get("resort", [])
777 if len(resort) > 0:
778 return resort
779 else:
780 return slice(None, None, None)
782 def read_psp_info(self):
783 """Parse the psp information from inpt file options
784 The psp file locations are relative to the bundle.
786 If the files cannot be found, the dict will only contain
787 the path
788 """
789 inpt = self.init_inputs.get("ion", {})
790 blocks = inpt.get("atom_blocks", [])
791 psp_info = {}
792 for block in blocks:
793 element = block["ATOM_TYPE"]
794 pseudo_path = block["PSEUDO_POT"]
795 real_path = (self.directory / pseudo_path).resolve()
796 psp_info[element] = {"rel_path": pseudo_path}
797 if not real_path.is_file():
798 warn(f"Cannot locate pseudopotential {pseudo_path}. ")
799 else:
800 header = open(real_path, "r").read()
801 psp_data = parse_psp8_header(header)
802 psp_info[element].update(psp_data)
803 return psp_info
806def read_sparc(filename, index=-1, include_all_files=True, **kwargs):
807 """Parse a SPARC bundle, return an Atoms object or list of Atoms (image)
808 with embedded calculator result.
810 Arguments:
811 filename (str or PosixPath): Filename to the sparc bundle
812 index (int or str): Index or slice of the images, following the ase.io.read convention
813 include_all_files (bool): If true, parse all output files with indexed suffices
814 **kwargs: Additional parameters
816 Returns:
817 Atoms or List[Atoms]
819 """
820 # We rely on minimal api version choose, i.e. default or set from env
821 api = locate_api()
822 sb = SparcBundle(directory=filename, validator=api)
823 atoms_or_images = sb.convert_to_ase(
824 index=index, include_all_files=include_all_files, **kwargs
825 )
826 return atoms_or_images
829def write_sparc(filename, images, **kwargs):
830 """Write sparc file. Images can only be Atoms object
831 or list of length 1
833 Arguments:
834 filename (str or PosixPath): Filename to the output sparc directory
835 images (Atoms or List(Atoms)): Atoms object to be written. Only supports writting 1 Atoms
836 **kwargs: Additional parameters
837 """
838 if isinstance(images, Atoms):
839 atoms = images
840 elif isinstance(images, list):
841 if len(images) > 1:
842 raise ValueError("SPARC format only supports writing one atoms object!")
843 atoms = images[0]
844 api = locate_api()
845 sb = SparcBundle(directory=filename, mode="w", validator=api)
846 sb._write_ion_and_inpt(atoms, **kwargs)
847 return
850@deprecated(
851 "Reading individual .ion file is not recommended. Please use read_sparc instead."
852)
853def read_sparc_ion(filename, **kwargs):
854 """Parse an .ion file inside the SPARC bundle using a wrapper around SparcBundle
855 The reader works only when other files (.inpt) exist.
857 The returned Atoms object of read_ion method only contains the initial positions
859 Arguments:
860 filename (str or PosixPath): Filename to the .ion file
861 index (int or str): Index or slice of the images, following the ase.io.read convention
862 **kwargs: Additional parameters
864 Returns:
865 Atoms or List[Atoms]
866 """
867 api = locate_api()
868 parent_dir = Path(filename).parent
869 sb = SparcBundle(directory=parent_dir, validator=api)
870 atoms = sb._read_ion_and_inpt()
871 return atoms
874# Backward compatibity
875read_ion = read_sparc_ion
878@deprecated(
879 "Writing individual .ion file is not recommended. Please use write_sparc instead."
880)
881def write_sparc_ion(filename, atoms, **kwargs):
882 """Write .ion file using the SparcBundle wrapper. This method will also create the .inpt file
884 This is only for backward compatibility
886 Arguments:
887 filename (str or PosixPath): Filename to the .ion file
888 atoms (Atoms): atoms to be written
889 **kwargs: Additional parameters
890 """
891 label = Path(filename).with_suffix("").name
892 parent_dir = Path(filename).parent
893 api = locate_api()
894 sb = SparcBundle(directory=parent_dir, label=label, mode="w", validator=api)
895 sb._write_ion_and_inpt(atoms, **kwargs)
896 return atoms
899# Backward compatibility
900write_ion = write_sparc_ion
903@deprecated(
904 "Reading individual .static file is not recommended. Please use read_sparc instead."
905)
906def read_sparc_static(filename, index=-1, **kwargs):
907 """Parse a .static file bundle using a wrapper around SparcBundle
908 The reader works only when other files (.ion, .inpt) exist.
910 Arguments:
911 filename (str or PosixPath): Filename to the .static file
912 index (int or str): Index or slice of the images, following the ase.io.read convention
913 **kwargs: Additional parameters
915 Returns:
916 Atoms or List[Atoms]
917 """
918 parent_dir = Path(filename).parent
919 api = locate_api()
920 sb = SparcBundle(directory=parent_dir, validator=api)
921 # In most of the cases the user wants to inspect all images
922 kwargs = kwargs.copy()
923 if "include_all_files" not in kwargs:
924 kwargs.update(include_all_files=True)
925 atoms_or_images = sb.convert_to_ase(index=index, **kwargs)
926 return atoms_or_images
929# Backward compatibility
930read_static = read_sparc_static
933@deprecated(
934 "Reading individual .geopt file is not recommended. Please use read_sparc instead."
935)
936def read_sparc_geopt(filename, index=-1, **kwargs):
937 """Parse a .geopt file bundle using a wrapper around SparcBundle
938 The reader works only when other files (.ion, .inpt) exist.
940 Arguments:
941 filename (str or PosixPath): Filename to the .geopt file
942 index (int or str): Index or slice of the images, following the ase.io.read convention
943 **kwargs: Additional parameters
945 Returns:
946 Atoms or List[Atoms]
947 """
948 parent_dir = Path(filename).parent
949 api = locate_api()
950 sb = SparcBundle(directory=parent_dir, validator=api)
951 kwargs = kwargs.copy()
952 if "include_all_files" not in kwargs:
953 kwargs.update(include_all_files=True)
954 atoms_or_images = sb.convert_to_ase(index=index, **kwargs)
955 return atoms_or_images
958# Backward compatibility
959read_geopt = read_sparc_geopt
962@deprecated(
963 "Reading individual .aimd file is not recommended. Please use read_sparc instead."
964)
965def read_sparc_aimd(filename, index=-1, **kwargs):
966 """Parse a .static file bundle using a wrapper around SparcBundle
967 The reader works only when other files (.ion, .inpt) exist.
969 Arguments:
970 filename (str or PosixPath): Filename to the .aimd file
971 index (int or str): Index or slice of the images, following the ase.io.read convention
972 **kwargs: Additional parameters
974 Returns:
975 Atoms or List[Atoms]
976 """
977 parent_dir = Path(filename).parent
978 api = locate_api()
979 sb = SparcBundle(directory=parent_dir, validator=api)
980 kwargs = kwargs.copy()
981 if "include_all_files" not in kwargs:
982 kwargs.update(include_all_files=True)
983 atoms_or_images = sb.convert_to_ase(index=index, **kwargs)
984 return atoms_or_images
987# Backward compatibility
988read_aimd = read_sparc_aimd
991def __register_new_filetype():
992 """Register the filetype() function that allows recognizing .sparc as directory
993 This method should only be called for ase==3.22 compatibility and for ase-gui
994 In future versions of ase gui where format is supported, this method should be removed
995 """
996 import sys
998 from ase.io import formats as hacked_formats
999 from ase.io.formats import filetype as _old_filetype
1000 from ase.io.formats import ioformats
1002 def _new_filetype(filename, read=True, guess=True):
1003 """A hacked solution for the auto format recovery"""
1004 path = Path(filename)
1005 ext = path.name
1006 if ".sparc" in ext:
1007 return "sparc"
1008 else:
1009 if path.is_dir():
1010 if (len(list(path.glob("*.ion"))) > 0) and (
1011 len(list(path.glob("*.inpt"))) > 0
1012 ):
1013 return "sparc"
1014 return _old_filetype(filename, read, guess)
1016 hacked_formats.filetype = _new_filetype
1017 sys.modules["ase.io.formats"] = hacked_formats
1018 return
1021@deprecated(
1022 "register_ase_io_sparc will be deprecated for future releases. Please upgrade ase>=3.23."
1023)
1024def register_ase_io_sparc(name="sparc"):
1025 """
1026 **Legacy register of io-formats for ase==3.22**
1027 **For ase>=3.23, use the package entrypoint registration**
1028 Monkey patching the ase.io and ase.io.formats
1029 So that the following formats can be used
1030 after `import sparc`
1032 ```
1033 from ase.io import sparc
1034 ase.io.read("test.sparc")
1035 atoms.write("test.sparc")
1036 ```
1038 The register method only aims to work for ase 3.22
1039 the develope version of ase provides a much more powerful
1040 register mechanism, we can wait.
1041 """
1042 import sys
1043 from warnings import warn
1045 import pkg_resources
1046 from ase.io.formats import define_io_format as F
1047 from ase.io.formats import ioformats
1049 name = name.lower()
1050 if name in ioformats.keys():
1051 return
1052 desc = "SPARC .sparc bundle"
1054 # Step 1: patch the ase.io.sparc module
1055 try:
1056 entry_points = next(
1057 ep for ep in pkg_resources.iter_entry_points("ase.io") if ep.name == "sparc"
1058 )
1059 _monkey_mod = entry_points.load()
1060 except Exception as e:
1061 warn(
1062 (
1063 "Failed to load entrypoint `ase.io.sparc`, "
1064 "you may need to reinstall sparc python api.\n"
1065 "You may still use `sparc.read_sparc` and "
1066 "`sparc.write_sparc` methods, "
1067 "but not `ase.io.read`\n",
1068 f"The error is {e}",
1069 )
1070 )
1071 return
1073 sys.modules[f"ase.io.{name}"] = _monkey_mod
1074 __register_new_filetype()
1076 # Step 2: define a new format
1077 F(
1078 name,
1079 desc=desc,
1080 code="+S", # read_sparc has multi-image support
1081 ext="sparc",
1082 )
1084 if name not in ioformats.keys():
1085 warn(
1086 (
1087 "Registering .sparc format with ase.io failed. "
1088 "You may still use `sparc.read_sparc` and "
1089 "`sparc.write_sparc` methods. \n"
1090 "Please contact the developer to report this issue."
1091 )
1092 )
1093 return
1095 import tempfile
1097 from ase.io import read
1099 with tempfile.TemporaryDirectory(suffix=".sparc") as tmpdir:
1100 try:
1101 read(tmpdir.name)
1102 except Exception as e:
1103 emsg = str(e).lower()
1104 if "bundletrajectory" in emsg:
1105 warn(
1106 "Atomatic format inference for sparc is not correctly registered. "
1107 "You may need to use format=sparc in ase.io.read and ase.io.write. "
1108 )
1109 # Add additional formats including .ion (r/w), .static, .geopt, .aimd
1110 F(
1111 "ion",
1112 desc="SPARC .ion file",
1113 module="sparc",
1114 code="1S",
1115 ext="ion",
1116 )
1117 F(
1118 "static",
1119 desc="SPARC single point results",
1120 module="sparc",
1121 code="+S",
1122 ext="static",
1123 )
1124 F(
1125 "geopt",
1126 desc="SPARC geometric optimization results",
1127 module="sparc",
1128 code="+S",
1129 ext="geopt",
1130 )
1131 F("aimd", desc="SPARC AIMD results", module="sparc", code="+S", ext="aimd")
1133 # TODO: remove print options as it may be redundant
1134 print("Successfully registered sparc formats with ase.io!")
1137# ase>=3.23 uses new ExternalIOFormat as registered entrypoints
1138# Please do not use from ase.io.formats import ExternalIOFormat!
1139# This causes circular import
1140try:
1141 from ase.utils.plugins import ExternalIOFormat as EIF
1142except ImportError:
1143 # Backward Compatibility
1144 from typing import List, NamedTuple, Optional, Union
1146 # Copy definition from 3.23
1147 # Name is defined in the entry point
1148 class ExternalIOFormat(NamedTuple):
1149 desc: str
1150 code: str
1151 module: Optional[str] = None
1152 glob: Optional[Union[str, List[str]]] = None
1153 ext: Optional[Union[str, List[str]]] = None
1154 magic: Optional[Union[bytes, List[bytes]]] = None
1155 magic_regex: Optional[bytes] = None
1157 EIF = ExternalIOFormat
1159format_sparc = EIF(
1160 desc="SPARC .sparc bundle",
1161 module="sparc.io",
1162 code="+S", # read_sparc has multi-image support
1163 ext="sparc",
1164)
1165format_ion = EIF(
1166 desc="SPARC .ion file",
1167 module="sparc.io",
1168 code="1S",
1169 ext="ion",
1170)
1171format_static = EIF(
1172 desc="SPARC single point results",
1173 module="sparc.io",
1174 code="+S",
1175 glob=["*.static", "*.static_*"],
1176)
1177format_geopt = EIF(
1178 desc="SPARC geometric optimization results",
1179 module="sparc.io",
1180 code="+S",
1181 glob=["*.geopt", "*.geopt_*"],
1182)
1183format_aimd = EIF(
1184 desc="SPARC AIMD results",
1185 module="sparc",
1186 code="+S",
1187 glob=["*.aimd*", "*.geopt_*"],
1188)