Coverage for sparc/sparc_parsers/out.py: 91%

140 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 01:13 +0000

1""" 

2Created on Thu Oct 18 14:16:21 2018 

3 

4Ben Comer (Georgia Tech) 

5 

6This file has been heavily modified since SPARC 0.1 

7 

8TODO: more descriptions about this file io parser 

9""" 

10import re 

11from datetime import datetime 

12from warnings import warn 

13 

14import numpy as np 

15from ase.units import Bohr, GPa, Hartree 

16 

17# Safe wrappers for both string and fd 

18from ase.utils import reader, writer 

19 

20from ..api import SparcAPI 

21from .utils import bisect_and_strip, read_block_input 

22 

23# TODO: should allow user to select the api 

24defaultAPI = SparcAPI() 

25 

26 

27@reader 

28def _read_out(fileobj): 

29 """ 

30 Read the .out file content 

31 

32 The output file is just stdout. The blocks are read using re-patterns rather than the way .static / .geopt or .aimd are parsed 

33 """ 

34 contents = fileobj.read() 

35 sparc_version = _read_sparc_version(contents[:4096]) 

36 # print(sparc_version) 

37 # TODO: use the sparc version to construct the API 

38 output_dict = {"sparc_version": sparc_version} 

39 # Combine the input parameters and parallelization sections 

40 output_dict["parameters"] = _read_input_params(contents) 

41 

42 # Parse the Initialization and timing info, and if calculation 

43 # successfully finished 

44 # Note: not all information are converted! 

45 output_dict["run_info"] = _read_run_info(contents) 

46 # List of scf information, 

47 # including scf convergence, energy etc 

48 output_dict["ionic_steps"] = _read_scfs(contents) 

49 return {"out": output_dict} 

50 

51 

52def _read_sparc_version(header): 

53 """Read the sparc version from the output file header. 

54 

55 This function should live outside the _read_output since some other functions may use it 

56 

57 TODO: combine it with the version from initialization.c 

58 """ 

59 pattern_version = r"SPARC\s+\(\s*?version(.*?)\)" 

60 match = re.findall(pattern_version, header) 

61 if len(match) != 1: 

62 warn("Header does not contain SPARC version information!") 

63 return None 

64 date_str = match[0].strip().replace(",", " ") 

65 # Accept both abbreviate and full month name 

66 try: 

67 date_version = datetime.strptime(date_str, "%B %d %Y").strftime("%Y.%m.%d") 

68 except ValueError: 

69 try: 

70 date_version = datetime.strptime(date_str, "%b %d %Y").strftime("%Y.%m.%d") 

71 except ValueError: 

72 warn("Cannot fetch SPARC version information!") 

73 date_version = None 

74 return date_version 

75 

76 

77def _read_input_params(contents, validator=defaultAPI): 

78 """Parse the Input parameters and Paral""" 

79 lines = "\n".join( 

80 _get_block_text(contents, "Input parameters") 

81 + _get_block_text(contents, "Parallelization") 

82 ).split("\n") 

83 # print(lines) 

84 params = read_block_input(lines, validator=validator) 

85 return params 

86 

87 

88def _read_run_info(contents): 

89 """Parse the run info sections 

90 Note due to the complexity of the run info, 

91 the types are not directly converted 

92 """ 

93 lines = "\n".join( 

94 _get_block_text(contents, "Timing info") 

95 + _get_block_text(contents, "Initialization") 

96 ).split("\n") 

97 block_dict = {"raw_info": lines} 

98 # Select key fields to store 

99 for line in lines: 

100 if ":" not in line: 

101 continue 

102 key, value = bisect_and_strip(line, ":") 

103 key = key.lower() 

104 if key in block_dict: 

105 if key not in ("pseudopotential",): 

106 warn( 

107 f"Key {key} from run information appears multiple times in your outputfile!" 

108 ) 

109 # For keys like pseudopotential, we make it a list 

110 else: 

111 origin_value = list(block_dict[key]) 

112 value = origin_value + [value] 

113 

114 block_dict[key] = value 

115 return block_dict 

116 

117 

118def _read_scfs(contents): 

119 """Parse the ionic steps 

120 

121 Return: 

122 List of ionic steps information 

123 

124 

125 """ 

126 convergence_info = _get_block_text(contents, r"Self Consistent Field \(SCF.*?\)") 

127 results_info = _get_block_text(contents, "Energy and force calculation") 

128 

129 # Should not happen 

130 if len(convergence_info) > len(results_info) + 1: 

131 raise ValueError( 

132 "Error, length of convergence information and energy calculation mismatch!" 

133 ) 

134 elif len(convergence_info) == len(results_info) + 1: 

135 warn("Last ionic SCF has not finished! The results may be incomplete") 

136 else: 

137 pass 

138 

139 # Stick to the convergence information as the main section 

140 n_steps = len(convergence_info) 

141 steps = [] 

142 # for i, step in enumerate(zip(convergence_info, results_info)): 

143 for i in range(n_steps): 

144 conv = convergence_info[i] 

145 # Solution for incomplete calculations 

146 if i >= len(results_info): 

147 res = "" # Empty lines 

148 else: 

149 res = results_info[i] 

150 current_step = {"scf_step": i} 

151 # TODO: add support for convergence fields 

152 conv_lines = conv.splitlines() 

153 # conv_header is normally 4-column table 

154 conv_header = re.split(r"\s{3,}", conv_lines[0]) 

155 

156 scf_sub_steps = [] 

157 # For ground-state calculations, the output will be only 1 block 

158 # For hybrid (HSE/PBE0) calculations the EXX loops will also be included 

159 # General rule: we search for the line "Total number of SCF: N", read back N(+1) lines 

160 for lino, line in enumerate(conv_lines): 

161 if "Total number of SCF:" not in line: 

162 continue 

163 scf_num = int(line.split(":")[-1]) 

164 conv_array = np.genfromtxt( 

165 [ 

166 l 

167 for l in conv_lines[lino - scf_num : lino] 

168 if l.split()[0].isdigit() 

169 ], 

170 dtype=float, 

171 ndmin=2, 

172 ) 

173 conv_dict = {} 

174 for i, field in enumerate(conv_header): 

175 field = field.strip() 

176 value = conv_array[:, i] 

177 # TODO: re-use the value conversion function in res part 

178 if "Ha/atom" in field: 

179 value *= Hartree 

180 field.replace("Ha/atom", "eV/atom") 

181 if "Iteration" in field: 

182 value = value.astype(int) 

183 conv_dict[field] = value 

184 # Determine if the current block is a ground-state or EXX 

185 name_line = conv_lines[lino - scf_num - 1] 

186 if "Iteration" in name_line: 

187 name = "ground state" 

188 else: 

189 name = name_line 

190 

191 conv_dict["name"] = name 

192 scf_sub_steps.append(conv_dict) 

193 

194 current_step["convergence"] = scf_sub_steps 

195 

196 res = res.splitlines() 

197 for line in res: 

198 if ":" not in line: 

199 continue 

200 key, value = bisect_and_strip(line, ":") 

201 key = key.lower() 

202 if key in current_step: 

203 warn( 

204 f"Key {key} appears multiples in one energy / force calculation, your output file may be incorrect." 

205 ) 

206 # Conversion of values are relatively easy 

207 pattern_value = r"([+\-\d.Ee]+)\s+\((.*?)\)" 

208 match = re.findall(pattern_value, value) 

209 raw_value, unit = float(match[0][0]), match[0][1] 

210 if unit == "Ha": 

211 converted_value = raw_value * Hartree 

212 converted_unit = "eV" 

213 elif unit == "Ha/atom": 

214 converted_value = raw_value * Hartree 

215 converted_unit = "eV/atom" 

216 elif unit == "Ha/Bohr": 

217 converted_value = raw_value * Hartree / Bohr 

218 converted_unit = "eV/Angstrom" 

219 elif unit == "GPa": 

220 converted_value = raw_value * GPa 

221 converted_unit = "eV/Angstrom^3" 

222 elif unit == "sec": 

223 converted_value = raw_value * 1 

224 converted_unit = "sec" 

225 elif unit == "Bohr magneton": 

226 converted_value = raw_value 

227 converted_unit = "Bohr magneton" 

228 else: 

229 warn(f"Conversion for unit {unit} unknown! Treat as unit") 

230 converted_value = raw_value 

231 converted_unit = unit 

232 current_step[key] = { 

233 "value": converted_value, 

234 "unit": converted_unit, 

235 } 

236 steps.append(current_step) 

237 return steps 

238 

239 

240def _get_block_text(text, block_name): 

241 """Get an output 'block' with a specific block name 

242 

243 the outputs are not line-split 

244 """ 

245 # Add the ending separator so matching is possible for partial-complete 

246 # .out file from socket calculations 

247 text = text + ("=" * 68) + "\n" 

248 pattern_block = ( 

249 r"[\*=]{50,}\s*?\n\s*?BLOCK_NAME\s*?\n[\*=]{50,}\s*\n(.*?)[\*=]{50,}" 

250 ) 

251 pattern = pattern_block.replace("BLOCK_NAME", block_name) 

252 match = re.findall(pattern, text, re.DOTALL | re.MULTILINE) 

253 if len(match) == 0: 

254 warn(f"Block {block_name} cannot be parsed from current text!") 

255 return match 

256 

257 

258@writer 

259def _write_out( 

260 fileobj, 

261 data_dict, 

262): 

263 raise NotImplementedError("Writing output file from SPARC-X-API not supported!")