Source code for sparc.io

"""Providing a new bundled SPARC file format
"""
import os
import re
from pathlib import Path
from warnings import warn

import numpy as np
from ase.atoms import Atoms
from ase.calculators.singlepoint import SinglePointDFTCalculator
from ase.config import cfg as _cfg

# various io formatters
from .api import SparcAPI
from .common import psp_dir as default_psp_dir
from .download_data import is_psp_download_complete
from .sparc_parsers.aimd import _read_aimd
from .sparc_parsers.atoms import atoms_to_dict, dict_to_atoms
from .sparc_parsers.geopt import _read_geopt
from .sparc_parsers.inpt import _read_inpt, _write_inpt
from .sparc_parsers.ion import _read_ion, _write_ion
from .sparc_parsers.out import _read_out
from .sparc_parsers.pseudopotential import copy_psp_file, parse_psp8_header
from .sparc_parsers.static import _add_cell_info, _read_static
from .utils import deprecated, locate_api, sanitize_path, string2index

# from .sparc_parsers.ion import read_ion, write_ion
defaultAPI = locate_api(cfg=_cfg)


[docs] class SparcBundle: """Provide access to a calculation folder of SPARC as a simple bundle The bundle can be optionally named as .sparc following the ASE's .bundle format Currently the write method only supports 1 image, while read method support reading atoms results in following conditions 1) No calculation (minimal): .ion + .inpt file --> 1 image 2) Single point calculation: .ion + .inpt + .out + .static --> 1 image with calc 3) Multiple SP calculations: chain all .out{digits} and .static{digitis} outputs 4) Relaxation: read from .geopt and .out (supporting chaining) 5) AIMD: read from .aimd and .out (support chaining) Attributes: directory (Path): Path to the directory containing SPARC files. mode (str): File access mode ('r', 'w', or 'a'). label (str): Name of the main SPARC file. init_atoms (Atoms): Initial atomic configuration. init_inputs (dict): Initial input parameters. psp_data (dict): Pseudopotential data. raw_results (dict): Raw results from SPARC calculations. psp_dir (Path): Directory containing pseudopotentials. sorting (list): Sort order for atoms. last_image (int): Index of the last image in a series of calculations. validator (SparcAPI): API validator for SPARC calculations. Methods: __find_psp_dir(psp_dir=None): Finds the directory for SPARC pseudopotentials. _find_files(): Finds all files matching the bundle label. _make_label(label=None): Infers or sets the label for the SPARC bundle. _indir(ext, label=None, occur=0, d_format="{:02d}"): Finds a file with a specific extension in the bundle. _read_ion_and_inpt(): Reads .ion and .inpt files together. _write_ion_and_inpt(): Writes .ion and .inpt files to the bundle. _read_results_from_index(index, d_format="{:02d}"): Reads results from a specific calculation index. _make_singlepoint(calc_results, images, raw_results): Converts results and images to SinglePointDFTCalculators. _extract_static_results(raw_results, index=":"): Extracts results from static calculations. _extract_geopt_results(raw_results, index=":"): Extracts results from geometric optimization calculations. _extract_aimd_results(raw_results, index=":"): Extracts results from AIMD calculations. convert_to_ase(index=-1, include_all_files=False, **kwargs): Converts raw results to ASE Atoms with calculators. read_raw_results(include_all_files=False): Parses all files in the bundle and merges results. read_psp_info(): Parses pseudopotential information from the inpt file. """ psp_env = ["SPARC_PSP_PATH", "SPARC_PP_PATH"] def __init__( self, directory, mode="r", atoms=None, label=None, psp_dir=None, validator=defaultAPI, cfg=_cfg, ): """ Initializes a SparcBundle for accessing SPARC calculation data. Args: directory (str or Path): The path to the directory containing the SPARC files. mode (str, optional): The file access mode. Can be 'r' (read), 'w' (write), or 'a' (append). Defaults to 'r'. atoms (Atoms, optional): The initial atomic configuration. Only relevant in write mode. label (str, optional): A custom label for the bundle. If None, the label is inferred from the directory or files. psp_dir (str or Path, optional): Path to the directory containing pseudopotentials. If None, the path is inferred. validator (SparcAPI, optional): An instance of SparcAPI for validating and parsing SPARC parameters. Defaults to a default SparcAPI instance. Raises: AssertionError: If an invalid mode is provided. ValueError: If multiple .ion files are found and no label is specified. Warning: If no .ion file is found in read-mode, or illegal characters are in the label. """ self.directory = Path(directory) self.mode = mode.lower() assert self.mode in ( "r", "w", "a", ), f"Invalid mode {self.mode}! Must one of 'r', 'w' or 'a'" self.label = self._make_label(label) self.init_atoms = atoms.copy() if atoms is not None else None self.init_inputs = {} self.psp_data = {} self.raw_results = {} self.cfg = cfg self.psp_dir = self.__find_psp_dir(psp_dir) # Sorting should be consistent across the whole bundle! self.sorting = None self.last_image = -1 self.validator = validator
[docs] def _find_files(self): """Find all files matching '{label}.*'""" return list(self.directory.glob(f"{self.label}.*"))
[docs] def _make_label(self, label=None): """Infer the label from the bundle Special cases if label is None: 1. read mode --> get the ion file name 2. write mode --> infer from the directory Arguments: label (str or None): Label to be used to write the .ion, .inpt files """ prefix = self.directory.resolve().with_suffix("").name illegal_chars = '\\/:*?"<>|' if label is not None: label_ = label elif self.mode == "w": label_ = prefix else: # read match_ion = list(self.directory.glob("*.ion")) if len(match_ion) > 1: raise ValueError( "Cannot read sparc bundle with multiple ion files without specifying the label!" ) elif len(match_ion) == 1: label_ = match_ion[0].name.split(".")[0] else: # No file found, possibly an empty bundle warn("No .ion file found in the read-mode bundle.") label_ = prefix if any([c in label_ for c in illegal_chars]): warn( f"Label name {label_} contains illegal characters! I'll make it 'SPARC'" ) label_ = "SPARC" return label_
def __find_psp_dir(self, psp_dir=None): """Use environmental variable to find the directory for SPARC pseudopotentials Searching priority: 1. User defined psp_dir 2. $SPARC_PSP_PATH 3. $SPARC_PP_PATH 4. psp bundled with sparc-api Arguments: psp_dir (str or PosixPath or None): the specific directory to search the psp files. Each element can only have 1 psp file under psp_dir Returns: PosixPath: Location of psp files """ if psp_dir is not None: return Path(psp_dir) else: for var in self.psp_env: env_psp_dir = self.cfg.get(var, None) if env_psp_dir: return Path(env_psp_dir) # Use pp_path field in cfg parser = self.cfg.parser["sparc"] if "sparc" in self.cfg.parser else {} psp_dir_ini = parser.get("psp_path", None) if psp_dir_ini: return sanitize_path(psp_dir_ini) # At this point, we try to use the psp files bundled with sparc if is_psp_download_complete(default_psp_dir): return default_psp_dir else: warn( ( "PSP directory bundled with SPARC-X-API is broken! " "Please use `sparc.download_data` to re-download them!" ) ) # Not found if self.mode == "w": warn( ( "No pseudopotential searching path was set and " "neither of $SPARC_PSP_PATH nor $SPARC_PP_PATH is set.\n" "Please explicitly provide the pseudopotentials parameter when writing the sparc bundle." ) ) return None def _indir(self, ext, label=None, occur=0, d_format="{:02d}"): """Find the file with {label}.{ext} under current dir, if label is None, use the default Arguments: ext (str): Extension of file, e.g. '.ion' or 'ion' label (str or None): Label for the file. If None, use the parent directory name for searching occur (int): Occurance index of the file, if occur > 0, search for files with suffix like 'SPARC.out_01' d_format (str): Format for the index Returns: PosixPath: Path to the target file under self.directory """ label = self.label if label is None else label if not ext.startswith("."): ext = "." + ext if occur == 0: target = self.directory / f"{label}{ext}" else: target = self.directory / f"{label}{ext}_{d_format.format(occur)}" return target
[docs] def _read_ion_and_inpt(self): """Read the ion and inpt files together to obtain basic atomstic data. Returns: Atoms: atoms object from .ion and .inpt file """ f_ion, f_inpt = self._indir(".ion"), self._indir(".inpt") ion_data = _read_ion(f_ion, validator=self.validator) inpt_data = _read_inpt(f_inpt, validator=self.validator) merged_data = {**ion_data, **inpt_data} return dict_to_atoms(merged_data)
[docs] def _write_ion_and_inpt( self, atoms=None, label=None, direct=False, sort=True, ignore_constraints=False, wrap=False, # Below are the parameters from v1 # scaled -> direct, ignore_constraints --> not add_constraints scaled=False, add_constraints=True, copy_psp=False, comment="", input_parameters={}, # Parameters that do not require type conversion **kwargs, ): """Write the ion and inpt files to a bundle. This method only supports writing 1 image. If input_parameters are empty, there will only be .ion writing the positions and .inpt writing a minimal cell information Args: atoms (Atoms, optional): The Atoms object to write. If None, uses initialized atoms associated with SparcBundle. label (str, optional): Custom label for the written files. direct (bool, optional): If True, writes positions in direct coordinates. sort (bool, optional): If True, sorts atoms before writing. ignore_constraints (bool, optional): If True, ignores constraints on atoms. wrap (bool, optional): If True, wraps atoms into the unit cell. **kwargs: Additional keyword arguments for writing. Raises: ValueError: If the bundle is not in write mode. """ if self.mode != "w": raise ValueError( "Cannot write input files while sparc bundle is opened in read or append mode!" ) os.makedirs(self.directory, exist_ok=True) atoms = self.atoms.copy() if atoms is None else atoms.copy() pseudopotentials = kwargs.pop("pseudopotentials", {}) if sort: if self.sorting is not None: old_sort = self.sorting.get("sort", None) if old_sort: sort = old_sort data_dict = atoms_to_dict( atoms, direct=direct, sort=sort, ignore_constraints=ignore_constraints, psp_dir=self.psp_dir, pseudopotentials=pseudopotentials, ) merged_inputs = input_parameters.copy() merged_inputs.update(kwargs) data_dict["inpt"]["params"].update(merged_inputs) # If copy_psp, change the PSEUDO_POT field and copy the files if copy_psp: for block in data_dict["ion"]["atom_blocks"]: if "PSEUDO_POT" in block: origin_psp = block["PSEUDO_POT"] target_dir = self.directory target_fname = copy_psp_file(origin_psp, target_dir) block["PSEUDO_POT"] = target_fname _write_ion(self._indir(".ion"), data_dict, validator=self.validator) _write_inpt(self._indir(".inpt"), data_dict, validator=self.validator) # Update the sorting information ion_dict = _read_ion(self._indir(".ion"))["ion"] self.sorting = ion_dict.get("sorting", None) return
[docs] def read_raw_results(self, include_all_files=False): """Parse all files using the given self.label. The results are merged dict from all file formats Arguments: include_all_files (bool): Whether to include output files with different suffices If true: include all files (e.g. SPARC.out, SPARC.out_01, SPARC.out_02, etc). Returns: dict or List: Dict containing all raw results. Only some of them will appear in the calculator's results Sets: self.raw_results (dict or List): the same as the return value #TODO: @TT 2024-11-01 allow accepting indices #TODO: @TT last_image is a bad name, it should refer to the occurance of images the same goes with num_calculations """ # Find the max output index out_files = self.directory.glob(f"{self.label}.out*") valid_out_files = [ f for f in out_files if (re.fullmatch(r"^\.out(?:_\d+)?$", f.suffix) is not None) ] # Combine and sort the file lists last_out = sorted(valid_out_files, reverse=True) # No output file, only ion / inpt if len(last_out) == 0: self.last_image = -1 else: suffix = last_out[0].suffix if suffix == ".out": self.last_image = 0 else: self.last_image = int(suffix.split("_")[1]) self.num_calculations = self.last_image + 1 # Always make sure ion / inpt results are parsed regardless of actual calculations if include_all_files: if self.num_calculations > 0: results = [ self._read_results_from_index(index) for index in range(self.num_calculations) ] else: results = [self._read_results_from_index(self.last_image)] else: results = self._read_results_from_index(self.last_image) self.raw_results = results if include_all_files: init_raw_results = self.raw_results[0] else: init_raw_results = self.raw_results.copy() self.init_atoms = dict_to_atoms(init_raw_results) self.init_inputs = { "ion": init_raw_results["ion"], "inpt": init_raw_results["inpt"], } self.psp_data = self.read_psp_info() return self.raw_results
def _read_results_from_index(self, index, d_format="{:02d}"): """Read the results from one calculation index, and return a single raw result dict, e.g. for index=0 --> .static and index=1 --> .static_01. Arguments: index (int): Index of image to return the results d_format (str): Format for the index suffix Returns: dict: Results for single image #TODO: @TT should we call index --> occurance? """ results_dict = {} for ext in ("ion", "inpt"): f = self._indir(ext, occur=0) if f.is_file(): data_dict = globals()[f"_read_{ext}"](f) results_dict.update(data_dict) for ext in ("geopt", "static", "aimd", "out"): f = self._indir(ext, occur=index, d_format=d_format) if f.is_file(): data_dict = globals()[f"_read_{ext}"](f) results_dict.update(data_dict) # Must have files: ion, inpt if ("ion" not in results_dict) or ("inpt" not in results_dict): raise RuntimeError( "Either ion or inpt files are missing from the bundle! " "Your SPARC calculation may be corrupted." ) # Copy the sorting information, if not existing sorting = results_dict["ion"].get("sorting", None) if sorting is not None: if self.sorting is None: self.sorting = sorting else: # Compare stored sorting assert (tuple(self.sorting["sort"]) == tuple(sorting["sort"])) and ( tuple(self.sorting["resort"]) == tuple(sorting["resort"]) ), "Sorting information changed!" return results_dict
[docs] def convert_to_ase(self, index=-1, include_all_files=False, **kwargs): """Read the raw results from the bundle and create atoms with single point calculators Arguments: index (int or str): Index or slice of the image(s) to convert. Uses the same format as ase.io.read include_all_files (bool): If true, also read results with indexed suffices Returns: Atoms or List[Atoms]: ASE-atoms or images with single point results """ # Convert to images! # TODO: @TT 2024-11-01 read_raw_results should implement a more # robust behavior handling index, as it is the entry point for all rs = self.read_raw_results(include_all_files=include_all_files) if isinstance(rs, dict): raw_results = [rs] else: raw_results = list(rs) res_images = [] for entry in raw_results: if "static" in entry: calc_results, images = self._extract_static_results(entry, index=":") elif "geopt" in entry: calc_results, images = self._extract_geopt_results(entry, index=":") elif "aimd" in entry: calc_results, images = self._extract_aimd_results(entry, index=":") else: calc_results, images = None, [self.init_atoms.copy()] if images is not None: if calc_results is not None: images = self._make_singlepoint(calc_results, images, entry) res_images.extend(images) if isinstance(index, int): return res_images[index] else: return res_images[string2index(index)]
[docs] def _make_singlepoint(self, calc_results, images, raw_results): """Convert a calculator dict and images of Atoms to list of SinglePointDFTCalculators The calculator also takes parameters from ion, inpt that exist in self.raw_results. Arguments: calc_results (List): Calculation results for all images images (List): Corresponding Atoms images raw_results (List): Full raw results dict to obtain additional information Returns: List(Atoms): ASE-atoms images with single point calculators attached """ converted_images = [] for res, _atoms in zip(calc_results, images): atoms = _atoms.copy() sp = SinglePointDFTCalculator(atoms) # Res can be empty at this point, leading to incomplete calc sp.results.update(res) sp.name = "sparc" sp.kpts = raw_results["inpt"].get("params", {}).get("KPOINT_GRID", None) # There may be a better way handling the parameters... sp.parameters = raw_results["inpt"].get("params", {}) sp.raw_parameters = { "ion": raw_results["ion"], "inpt": raw_results["inpt"], } atoms.calc = sp converted_images.append(atoms) return converted_images
def _extract_static_results(self, raw_results, index=":"): """Extract the static calculation results and atomic structure(s) Returns: calc_results: dict with at least energy value atoms: ASE atoms object The priority is to parse position from static file first, then fallback from ion + inpt Note: make all energy / forces resorted! Arguments: raw_results (dict): Raw results parsed from self.read_raw_results index (str or int): Index or slice of images Returns: List[results], List[Atoms] """ static_results = raw_results.get("static", []) calc_results = [] # Use extra lattice information to construct the positions cell = self.init_atoms.cell # import pdb; pdb.set_trace() static_results = _add_cell_info(static_results, cell) if isinstance(index, int): _images = [static_results[index]] elif isinstance(index, str): _images = static_results[string2index(index)] ase_images = [] for static_results in _images: partial_results = {} if "free energy" in static_results: partial_results["energy"] = static_results["free energy"] partial_results["free energy"] = static_results["free energy"] if "forces" in static_results: partial_results["forces"] = static_results["forces"][self.resort] if "atomic_magnetization" in static_results: partial_results["magmoms"] = static_results["atomic_magnetization"][ self.resort ] if "net_magnetization" in static_results: partial_results["magmom"] = static_results["net_magnetization"] if "stress" in static_results: partial_results["stress"] = static_results["stress"] if "stress_equiv" in static_results: partial_results["stress_equiv"] = static_results["stress_equiv"] atoms = self.init_atoms.copy() # import pdb; pdb.set_trace() if "atoms" in static_results: atoms_dict = static_results["atoms"] # The socket mode case. Reset all cell and positions # Be careful, if "lattice" in static_results: lat = static_results["lattice"] atoms.set_cell(lat, scale_atoms=False) if "coord" not in atoms_dict: raise KeyError( "Coordination conversion failed in socket static output!" ) atoms.set_positions( atoms_dict["coord"][self.resort], apply_constraint=False ) else: # Do not change cell information (normal static file) if "coord_frac" in atoms_dict: atoms.set_scaled_positions( atoms_dict["coord_frac"][self.resort] ) elif "coord" in atoms_dict: atoms.set_positions( atoms_dict["coord"][self.resort], apply_constraint=False ) ase_images.append(atoms) calc_results.append(partial_results) return calc_results, ase_images def _extract_geopt_results(self, raw_results, index=":"): """Extract the static calculation results and atomic structure(s) Returns: calc_results: dict with at least energy value atoms: ASE atoms object The priority is to parse position from static file first, then fallback from ion + inpt Arguments: raw_results (dict): Raw results parsed from self.read_raw_results index (str or int): Index or slice of images Returns: List[results], List[Atoms] """ # print("RAW_RES: ", raw_results) geopt_results = raw_results.get("geopt", []) calc_results = [] if len(geopt_results) == 0: warn( "Geopt file is empty! This is not an error if the calculation is continued from restart. " ) return None, None if isinstance(index, int): _images = [geopt_results[index]] elif isinstance(index, str): _images = geopt_results[string2index(index)] ase_images = [] for result in _images: atoms = self.init_atoms.copy() partial_result = {} if "energy" in result: partial_result["energy"] = result["energy"] partial_result["free energy"] = result["energy"] if "forces" in result: partial_result["forces"] = result["forces"][self.resort] if "stress" in result: partial_result["stress"] = result["stress"] # Modify the atoms copy if "positions" in result: atoms.set_positions( result["positions"][self.resort], apply_constraint=False ) if "ase_cell" in result: atoms.set_cell(result["ase_cell"]) else: # For geopt and RELAX=2 (cell relaxation), # the positions may not be written in .geopt file relax_flag = raw_results["inpt"]["params"].get("RELAX_FLAG", 0) if relax_flag != 2: raise ValueError( ".geopt file missing positions while RELAX!=2. " "Please check your setup ad output files." ) if "ase_cell" not in result: raise ValueError( "Cannot recover positions from .geopt file due to missing cell information. " "Please check your setup ad output files." ) atoms.set_cell(result["ase_cell"], scale_atoms=True) # Unlike low-dimensional stress in static calculations, we need to convert # stress_1d stress_2d to stress_equiv using the non-period cell dimension(s) # This has to be done when the actual cell information is loaded if "stress_1d" in result: stress_1d = result["stress_1d"] assert ( np.count_nonzero(atoms.pbc) == 1 ), "Dimension of stress and PBC mismatch!" for i, bc in enumerate(atoms.pbc): if not bc: stress_1d /= atoms.cell.cellpar()[i] stress_equiv = stress_1d partial_result["stress_equiv"] = stress_equiv if "stress_2d" in result: stress_2d = result["stress_2d"] assert ( np.count_nonzero(atoms.pbc) == 2 ), "Dimension of stress and PBC mismatch!" for i, bc in enumerate(atoms.pbc): if not bc: stress_2d /= atoms.cell.cellpar()[i] stress_equiv = stress_2d partial_result["stress_equiv"] = stress_equiv calc_results.append(partial_result) ase_images.append(atoms) return calc_results, ase_images def _extract_aimd_results(self, raw_results, index=":"): """Extract energy / forces from aimd results For calculator, we only need the last image We probably want more information for the AIMD calculations, but I'll keep them for now Arguments: raw_results (dict): Raw results parsed from self.read_raw_results index (str or int): Index or slice of images Returns: List[results], List[Atoms] """ aimd_results = raw_results.get("aimd", []) calc_results = [] if len(aimd_results) == 0: warn( "Aimd file is empty! " "This is not an error if the calculation " "is continued from restart. " ) return None, None if isinstance(index, int): _images = [aimd_results[index]] elif isinstance(index, str): _images = aimd_results[string2index(index)] ase_images = [] for result in _images: partial_result = {} atoms = self.init_atoms.copy() if "total energy per atom" in result: partial_result["energy"] = result["total energy per atom"] * len(atoms) if "free energy per atom" in result: partial_result["free energy"] = result["free energy per atom"] * len( atoms ) if "forces" in result: # The forces are already re-sorted! partial_result["forces"] = result["forces"][self.resort] # Modify the atoms in-place if "positions" not in result: raise ValueError("Cannot have aimd without positions information!") atoms.set_positions( result["positions"][self.resort], apply_constraint=False ) if "velocities" in result: atoms.set_velocities(result["velocities"][self.resort]) ase_images.append(atoms) calc_results.append(partial_result) return calc_results, ase_images @property def sort(self): """Wrap the self.sorting dict. If sorting information does not exist, use the default slicing """ if self.sorting is None: return slice(None, None, None) sort = self.sorting.get("sort", []) if len(sort) > 0: return sort else: return slice(None, None, None) @property def resort(self): """Wrap the self.sorting dict. If sorting information does not exist, use the default slicing """ if self.sorting is None: return slice(None, None, None) resort = self.sorting.get("resort", []) if len(resort) > 0: return resort else: return slice(None, None, None)
[docs] def read_psp_info(self): """Parse the psp information from inpt file options The psp file locations are relative to the bundle. If the files cannot be found, the dict will only contain the path """ inpt = self.init_inputs.get("ion", {}) blocks = inpt.get("atom_blocks", []) psp_info = {} for block in blocks: element = block["ATOM_TYPE"] pseudo_path = block["PSEUDO_POT"] real_path = (self.directory / pseudo_path).resolve() psp_info[element] = {"rel_path": pseudo_path} if not real_path.is_file(): warn(f"Cannot locate pseudopotential {pseudo_path}. ") else: header = open(real_path, "r").read() psp_data = parse_psp8_header(header) psp_info[element].update(psp_data) return psp_info
[docs] def read_sparc(filename, index=-1, include_all_files=True, **kwargs): """Parse a SPARC bundle, return an Atoms object or list of Atoms (image) with embedded calculator result. Arguments: filename (str or PosixPath): Filename to the sparc bundle index (int or str): Index or slice of the images, following the ase.io.read convention include_all_files (bool): If true, parse all output files with indexed suffices **kwargs: Additional parameters Returns: Atoms or List[Atoms] """ # We rely on minimal api version choose, i.e. default or set from env api = locate_api() sb = SparcBundle(directory=filename, validator=api) atoms_or_images = sb.convert_to_ase( index=index, include_all_files=include_all_files, **kwargs ) return atoms_or_images
[docs] def write_sparc(filename, images, **kwargs): """Write sparc file. Images can only be Atoms object or list of length 1 Arguments: filename (str or PosixPath): Filename to the output sparc directory images (Atoms or List(Atoms)): Atoms object to be written. Only supports writting 1 Atoms **kwargs: Additional parameters """ if isinstance(images, Atoms): atoms = images elif isinstance(images, list): if len(images) > 1: raise ValueError("SPARC format only supports writing one atoms object!") atoms = images[0] api = locate_api() sb = SparcBundle(directory=filename, mode="w", validator=api) sb._write_ion_and_inpt(atoms, **kwargs) return
@deprecated( "Reading individual .ion file is not recommended. Please use read_sparc instead." ) def read_sparc_ion(filename, **kwargs): """Parse an .ion file inside the SPARC bundle using a wrapper around SparcBundle The reader works only when other files (.inpt) exist. The returned Atoms object of read_ion method only contains the initial positions Arguments: filename (str or PosixPath): Filename to the .ion file index (int or str): Index or slice of the images, following the ase.io.read convention **kwargs: Additional parameters Returns: Atoms or List[Atoms] """ api = locate_api() parent_dir = Path(filename).parent sb = SparcBundle(directory=parent_dir, validator=api) atoms = sb._read_ion_and_inpt() return atoms # Backward compatibity read_ion = read_sparc_ion @deprecated( "Writing individual .ion file is not recommended. Please use write_sparc instead." ) def write_sparc_ion(filename, atoms, **kwargs): """Write .ion file using the SparcBundle wrapper. This method will also create the .inpt file This is only for backward compatibility Arguments: filename (str or PosixPath): Filename to the .ion file atoms (Atoms): atoms to be written **kwargs: Additional parameters """ label = Path(filename).with_suffix("").name parent_dir = Path(filename).parent api = locate_api() sb = SparcBundle(directory=parent_dir, label=label, mode="w", validator=api) sb._write_ion_and_inpt(atoms, **kwargs) return atoms # Backward compatibility write_ion = write_sparc_ion @deprecated( "Reading individual .static file is not recommended. Please use read_sparc instead." ) def read_sparc_static(filename, index=-1, **kwargs): """Parse a .static file bundle using a wrapper around SparcBundle The reader works only when other files (.ion, .inpt) exist. Arguments: filename (str or PosixPath): Filename to the .static file index (int or str): Index or slice of the images, following the ase.io.read convention **kwargs: Additional parameters Returns: Atoms or List[Atoms] """ parent_dir = Path(filename).parent api = locate_api() sb = SparcBundle(directory=parent_dir, validator=api) # In most of the cases the user wants to inspect all images kwargs = kwargs.copy() if "include_all_files" not in kwargs: kwargs.update(include_all_files=True) atoms_or_images = sb.convert_to_ase(index=index, **kwargs) return atoms_or_images # Backward compatibility read_static = read_sparc_static @deprecated( "Reading individual .geopt file is not recommended. Please use read_sparc instead." ) def read_sparc_geopt(filename, index=-1, **kwargs): """Parse a .geopt file bundle using a wrapper around SparcBundle The reader works only when other files (.ion, .inpt) exist. Arguments: filename (str or PosixPath): Filename to the .geopt file index (int or str): Index or slice of the images, following the ase.io.read convention **kwargs: Additional parameters Returns: Atoms or List[Atoms] """ parent_dir = Path(filename).parent api = locate_api() sb = SparcBundle(directory=parent_dir, validator=api) kwargs = kwargs.copy() if "include_all_files" not in kwargs: kwargs.update(include_all_files=True) atoms_or_images = sb.convert_to_ase(index=index, **kwargs) return atoms_or_images # Backward compatibility read_geopt = read_sparc_geopt @deprecated( "Reading individual .aimd file is not recommended. Please use read_sparc instead." ) def read_sparc_aimd(filename, index=-1, **kwargs): """Parse a .static file bundle using a wrapper around SparcBundle The reader works only when other files (.ion, .inpt) exist. Arguments: filename (str or PosixPath): Filename to the .aimd file index (int or str): Index or slice of the images, following the ase.io.read convention **kwargs: Additional parameters Returns: Atoms or List[Atoms] """ parent_dir = Path(filename).parent api = locate_api() sb = SparcBundle(directory=parent_dir, validator=api) kwargs = kwargs.copy() if "include_all_files" not in kwargs: kwargs.update(include_all_files=True) atoms_or_images = sb.convert_to_ase(index=index, **kwargs) return atoms_or_images # Backward compatibility read_aimd = read_sparc_aimd def __register_new_filetype(): """Register the filetype() function that allows recognizing .sparc as directory This method should only be called for ase==3.22 compatibility and for ase-gui In future versions of ase gui where format is supported, this method should be removed """ import sys from ase.io import formats as hacked_formats from ase.io.formats import filetype as _old_filetype from ase.io.formats import ioformats def _new_filetype(filename, read=True, guess=True): """A hacked solution for the auto format recovery""" path = Path(filename) ext = path.name if ".sparc" in ext: return "sparc" else: if path.is_dir(): if (len(list(path.glob("*.ion"))) > 0) and ( len(list(path.glob("*.inpt"))) > 0 ): return "sparc" return _old_filetype(filename, read, guess) hacked_formats.filetype = _new_filetype sys.modules["ase.io.formats"] = hacked_formats return @deprecated( "register_ase_io_sparc will be deprecated for future releases. Please upgrade ase>=3.23." ) def register_ase_io_sparc(name="sparc"): """ **Legacy register of io-formats for ase==3.22** **For ase>=3.23, use the package entrypoint registration** Monkey patching the ase.io and ase.io.formats So that the following formats can be used after `import sparc` ``` from ase.io import sparc ase.io.read("test.sparc") atoms.write("test.sparc") ``` The register method only aims to work for ase 3.22 the develope version of ase provides a much more powerful register mechanism, we can wait. """ import sys from warnings import warn import pkg_resources from ase.io.formats import define_io_format as F from ase.io.formats import ioformats name = name.lower() if name in ioformats.keys(): return desc = "SPARC .sparc bundle" # Step 1: patch the ase.io.sparc module try: entry_points = next( ep for ep in pkg_resources.iter_entry_points("ase.io") if ep.name == "sparc" ) _monkey_mod = entry_points.load() except Exception as e: warn( ( "Failed to load entrypoint `ase.io.sparc`, " "you may need to reinstall sparc python api.\n" "You may still use `sparc.read_sparc` and " "`sparc.write_sparc` methods, " "but not `ase.io.read`\n", f"The error is {e}", ) ) return sys.modules[f"ase.io.{name}"] = _monkey_mod __register_new_filetype() # Step 2: define a new format F( name, desc=desc, code="+S", # read_sparc has multi-image support ext="sparc", ) if name not in ioformats.keys(): warn( ( "Registering .sparc format with ase.io failed. " "You may still use `sparc.read_sparc` and " "`sparc.write_sparc` methods. \n" "Please contact the developer to report this issue." ) ) return import tempfile from ase.io import read with tempfile.TemporaryDirectory(suffix=".sparc") as tmpdir: try: read(tmpdir.name) except Exception as e: emsg = str(e).lower() if "bundletrajectory" in emsg: warn( "Atomatic format inference for sparc is not correctly registered. " "You may need to use format=sparc in ase.io.read and ase.io.write. " ) # Add additional formats including .ion (r/w), .static, .geopt, .aimd F( "ion", desc="SPARC .ion file", module="sparc", code="1S", ext="ion", ) F( "static", desc="SPARC single point results", module="sparc", code="+S", ext="static", ) F( "geopt", desc="SPARC geometric optimization results", module="sparc", code="+S", ext="geopt", ) F("aimd", desc="SPARC AIMD results", module="sparc", code="+S", ext="aimd") # TODO: remove print options as it may be redundant print("Successfully registered sparc formats with ase.io!") # ase>=3.23 uses new ExternalIOFormat as registered entrypoints # Please do not use from ase.io.formats import ExternalIOFormat! # This causes circular import try: from ase.utils.plugins import ExternalIOFormat as EIF except ImportError: # Backward Compatibility from typing import List, NamedTuple, Optional, Union # Copy definition from 3.23 # Name is defined in the entry point class ExternalIOFormat(NamedTuple): desc: str code: str module: Optional[str] = None glob: Optional[Union[str, List[str]]] = None ext: Optional[Union[str, List[str]]] = None magic: Optional[Union[bytes, List[bytes]]] = None magic_regex: Optional[bytes] = None EIF = ExternalIOFormat format_sparc = EIF( desc="SPARC .sparc bundle", module="sparc.io", code="+S", # read_sparc has multi-image support ext="sparc", ) format_ion = EIF( desc="SPARC .ion file", module="sparc.io", code="1S", ext="ion", ) format_static = EIF( desc="SPARC single point results", module="sparc.io", code="+S", glob=["*.static", "*.static_*"], ) format_geopt = EIF( desc="SPARC geometric optimization results", module="sparc.io", code="+S", glob=["*.geopt", "*.geopt_*"], ) format_aimd = EIF( desc="SPARC AIMD results", module="sparc", code="+S", glob=["*.aimd*", "*.geopt_*"], )