Coverage for lasso/dyna/binout.py: 17%

112 statements  

« prev     ^ index     » next       coverage.py v7.2.4, created at 2023-04-28 18:42 +0100

1import glob 

2from typing import List, Union 

3 

4import h5py 

5import numpy as np 

6import pandas as pd 

7 

8from .lsda_py3 import Lsda 

9 

10 

11class Binout: 

12 """This class is meant to read binouts from LS-Dyna 

13 

14 Parameters 

15 ---------- 

16 filepath: str 

17 Path to the binout to read. May contain * (glob) for selecting multiple 

18 files. 

19 

20 Attributes 

21 ---------- 

22 filelist: List[str] 

23 List of files which are opened. 

24 lsda: Lsda 

25 The underlying LS-Dyna binout reader instance from code from LSTC. 

26 lsda_root: Symbol 

27 Root lsda symbol which is like a root directory to traverse the 

28 content of the binout file. 

29 

30 Notes 

31 ----- 

32 This class is only a utility wrapper for Lsda from LSTC. 

33 

34 Examples 

35 -------- 

36 >>> binout = Binout("path/to/binout") 

37 """ 

38 

39 def __init__(self, filepath: str): 

40 """Constructor for a binout 

41 

42 Parameters 

43 ---------- 

44 filepath: str 

45 path to the binout or pattern 

46 

47 Notes 

48 ----- 

49 The class loads the file given in the filepath. By giving a 

50 search pattern such as: "binout*", all files with that 

51 pattern will be loaded. 

52 

53 Examples 

54 -------- 

55 >>> # reads a single binout 

56 >>> binout = Binout("path/to/binout0000") 

57 >>> binout.filelist 

58 ['path/to/binout0000'] 

59 

60 >>> # reads multiple files 

61 >>> binout = Binout("path/to/binout*") 

62 >>> binout.filelist 

63 ['path/to/binout0000','path/to/binout0001'] 

64 """ 

65 

66 self.filelist = glob.glob(filepath) 

67 

68 # check file existance 

69 if not self.filelist: 

70 raise IOError("No file was found.") 

71 

72 # open lsda buffer 

73 self.lsda = Lsda(self.filelist, "r") 

74 self.lsda_root = self.lsda.root 

75 

76 def read(self, *path) -> Union[List[str], str, np.ndarray]: 

77 """Read all data from Binout (top to low level) 

78 

79 Parameters 

80 ---------- 

81 path: Union[Tuple[str, ...], List[str], str] 

82 internal path in the folder structure of the binout 

83 

84 Returns 

85 ------- 

86 ret: Union[List[str], str, np.ndarray] 

87 list of subdata within the folder or data itself (array or string) 

88 

89 Notes 

90 ----- 

91 This function is used to read any data from the binout. It has been used 

92 to make the access to the data more comfortable. The return type depends 

93 on the given path: 

94 

95 - `binout.read()`: `List[str] names of directories (in binout) 

96 - `binout.read(dir)`: `List[str]` names of variables or subdirs 

97 - `binout.read(dir1, ..., variable)`: np.array data 

98 

99 If you have multiple outputs with different ids (e.g. in nodout for 

100 multiple nodes) then don't forget to read the id array for 

101 identification or id-labels. 

102 

103 Examples 

104 -------- 

105 >>> from lasso.dyna import Binout 

106 >>> binout = Binout("test/binout") 

107 >>> # get top dirs 

108 >>> binout.read() 

109 ['swforc'] 

110 >>> binout.read("swforc") 

111 ['title', 'failure', 'ids', 'failure_time', ...] 

112 >>> binout.read("swforc","shear").shape 

113 (321L, 26L) 

114 >>> binout.read("swforc","ids").shape 

115 (26L,) 

116 >>> binout.read("swforc","ids") 

117 array([52890, 52891, 52892, ...]) 

118 >>> # read a string value 

119 >>> binout.read("swforc","date") 

120 '11/05/2013' 

121 """ 

122 

123 return self._decode_path(path) 

124 

125 def as_df(self, *args) -> pd.DataFrame: 

126 """read data and convert to pandas dataframe if possible 

127 

128 Parameters 

129 ---------- 

130 *args: Union[Tuple[str, ...], List[str], str] 

131 internal path in the folder structure of the binout 

132 

133 Returns 

134 ------- 

135 df: pandas.DataFrame 

136 data converted to pandas dataframe 

137 

138 Raises 

139 ------ 

140 ValueError 

141 if the data cannot be converted to a pandas dataframe 

142 

143 Examples 

144 -------- 

145 >>> from lasso.dyna import Binout 

146 >>> binout = Binout('path/to/binout') 

147 

148 Read a time-dependent array. 

149 

150 >>> binout.as_df('glstat', 'eroded_kinetic_energy') 

151 time 

152 0.00000 0.000000 

153 0.19971 0.000000 

154 0.39942 0.000000 

155 0.59976 0.000000 

156 0.79947 0.000000 

157 ... 

158 119.19978 105.220786 

159 119.39949 105.220786 

160 119.59983 105.220786 

161 119.79954 105.220786 

162 119.99988 105.220786 

163 Name: eroded_kinetic_energy, Length: 601, dtype: float64 

164 

165 Read a time and id-dependent array. 

166 

167 >>> binout.as_df('secforc', 'x_force') 

168 1 2 3 ... 33 34 

169 time . 

170 0.00063 2.168547e-16 2.275245e-15 -3.118639e-14 ... -5.126108e-13 4.592941e-16 

171 0.20034 3.514243e-04 3.797908e-04 -1.701294e-03 ... 2.530416e-11 2.755493e-07 

172 0.40005 3.052490e-03 3.242951e-02 -2.699926e-02 ... 6.755315e-06 -2.608923e-03 

173 0.60039 -1.299816e-02 4.930999e-02 -1.632376e-02 ... 8.941705e-05 -2.203455e-02 

174 0.80010 1.178485e-02 4.904512e-02 -9.740204e-03 ... 5.648263e-05 -6.999854e-02 

175 ... ... ... ... ... ... ... 

176 119.00007 9.737679e-01 -8.833702e+00 1.298964e+01 ... -9.977377e-02 7.883521e+00 

177 119.20041 7.421170e-01 -8.849411e+00 1.253505e+01 ... -1.845916e-01 7.791409e+00 

178 119.40012 9.946615e-01 -8.541475e+00 1.188757e+01 ... -3.662228e-02 7.675800e+00 

179 119.60046 9.677638e-01 -8.566695e+00 1.130774e+01 ... 5.144208e-02 7.273052e+00 

180 119.80017 1.035165e+00 -8.040828e+00 1.124044e+01 ... -1.213450e-02 7.188395e+00 

181 """ 

182 

183 data = self.read(*args) 

184 

185 # validate time-based data 

186 if not isinstance(data, np.ndarray): 

187 err_msg = "data is not a numpy array but has type '{0}'" 

188 raise ValueError(err_msg.format(type(data))) 

189 

190 time_array = self.read(*args[:-1], "time") 

191 if data.shape[0] != time_array.shape[0]: 

192 raise ValueError("data series length does not match time array length") 

193 

194 time_pdi = pd.Index(time_array, name="time") 

195 

196 # create dataframe 

197 if data.ndim > 1: 

198 df = pd.DataFrame(index=time_pdi) 

199 

200 if args[0] == "rcforc": 

201 ids = [ 

202 (str(i) + "m") if j else (str(i) + "s") 

203 for i, j in zip(self.read("rcforc", "ids"), self.read("rcforc", "side")) 

204 ] 

205 else: 

206 ids = self.read(*args[:-1], "ids") 

207 

208 for i, j in enumerate(ids): 

209 df[str(j)] = data.T[i] 

210 

211 else: 

212 df = pd.Series(data, index=time_pdi, name=args[-1]) 

213 

214 return df 

215 

216 def _decode_path(self, path): 

217 """Decode a path and get whatever is inside. 

218 

219 Parameters 

220 ---------- 

221 path: List[str] 

222 path within the binout 

223 

224 Notes 

225 ----- 

226 Usually returns the folder children. If there are variables in the folder 

227 (usually also if a subfolder metadata exists), then the variables will 

228 be printed from these directories. 

229 

230 Returns 

231 ------- 

232 ret: Union[List[str], np.ndarray] 

233 either sub folder list or data array 

234 """ 

235 

236 i_level = len(path) 

237 

238 if i_level == 0: # root subfolders 

239 return self._bstr_to_str(list(self.lsda_root.children.keys())) 

240 

241 # some subdir 

242 # try if path can be resolved (then it's a dir) 

243 # in this case print the subfolders or subvars 

244 try: 

245 dir_symbol = self._get_symbol(self.lsda_root, path) 

246 

247 if "metadata" in dir_symbol.children: 

248 return self._collect_variables(dir_symbol) 

249 return self._bstr_to_str(list(dir_symbol.children.keys())) 

250 

251 # an error is risen, if the path is not resolvable 

252 # this could be, because we want to read a var 

253 except ValueError: 

254 return self._get_variable(path) 

255 

256 def _get_symbol(self, symbol, path): 

257 """Get a symbol from a path via lsda 

258 

259 Parameters 

260 ---------- 

261 symbol: Symbol 

262 current directory which is a Lsda.Symbol 

263 

264 Returns 

265 ------- 

266 symbol: Symbol 

267 final symbol after recursive search of path 

268 """ 

269 

270 # check 

271 if symbol is None: 

272 raise ValueError("Symbol may not be none.") 

273 

274 # no further path, return current symbol 

275 if len(path) == 0: 

276 return symbol 

277 

278 # more subsymbols to search for 

279 sub_path = list(path) # copy 

280 next_symbol_name = sub_path.pop(0) 

281 

282 next_symbol = symbol.get(next_symbol_name) 

283 if next_symbol is None: 

284 raise ValueError(f"Cannot find: {next_symbol_name}") 

285 

286 return self._get_symbol(next_symbol, sub_path) 

287 

288 def _get_variable(self, path): 

289 """Read a variable from a given path 

290 

291 Parameters 

292 ---------- 

293 path: List[str] 

294 path to the variable 

295 

296 Returns 

297 ------- 

298 data: np.ndarray 

299 """ 

300 

301 dir_symbol = self._get_symbol(self.lsda_root, path[:-1]) 

302 # variables are somehow binary strings ... dirs not 

303 variable_name = self._str_to_bstr(path[-1]) 

304 

305 # var in metadata 

306 if ("metadata" in dir_symbol.children) and ( 

307 variable_name in dir_symbol.get("metadata").children 

308 ): 

309 var_symbol = dir_symbol.get("metadata").get(variable_name) 

310 var_type = var_symbol.type 

311 

312 # symbol is a string 

313 if var_type == 1: 

314 return self._to_string(var_symbol.read()) 

315 

316 # symbol is numeric data 

317 return np.asarray(var_symbol.read()) 

318 

319 # var in state data ... hopefully 

320 time = [] 

321 data = [] 

322 for subdir_name, subdir_symbol in dir_symbol.children.items(): 

323 

324 # skip metadata 

325 if subdir_name == "metadata": 

326 continue 

327 

328 # read data 

329 if variable_name in subdir_symbol.children: 

330 state_data = subdir_symbol.get(variable_name).read() 

331 if len(state_data) == 1: 

332 data.append(state_data[0]) 

333 else: # more than one data entry 

334 data.append(state_data) 

335 

336 time_symbol = subdir_symbol.get(b"time") 

337 if time_symbol: 

338 time += time_symbol.read() 

339 

340 # return sorted by time 

341 if len(time) == len(data): 

342 return np.array(data)[np.argsort(time)] 

343 

344 return np.array(data) 

345 

346 def _collect_variables(self, symbol): 

347 """Collect all variables from a symbol 

348 

349 Parameters 

350 ---------- 

351 symbol: Symbol 

352 

353 Returns 

354 ------- 

355 variable_names: List[str] 

356 

357 Notes 

358 ----- 

359 This function collect all variables from the state dirs and metadata. 

360 """ 

361 

362 var_names = set() 

363 for _, subdir_symbol in symbol.children.items(): 

364 var_names = var_names.union(subdir_symbol.children.keys()) 

365 

366 return self._bstr_to_str(list(var_names)) 

367 

368 def _to_string(self, data_array): 

369 """Convert a data series of numbers (usually ints) to a string 

370 

371 Parameters 

372 ---------- 

373 data_array: Union[int, np.ndarray] 

374 some data array 

375 

376 Returns 

377 ------- 

378 string: str 

379 data array converted to characters 

380 

381 Notes 

382 ----- 

383 This is needed for the reason that sometimes the binary data 

384 within the files are strings. 

385 """ 

386 

387 return "".join([chr(entry) for entry in data_array]) 

388 

389 def _bstr_to_str(self, arg): 

390 """Encodes or decodes a string correctly regarding python version 

391 

392 Parameters 

393 ---------- 

394 arg: Union[str, bytes] 

395 

396 Returns 

397 ------- 

398 string: str 

399 converted to python version 

400 """ 

401 

402 # in case of a list call this function with its atomic strings 

403 if isinstance(arg, (list, tuple)): 

404 return [self._bstr_to_str(entry) for entry in arg] 

405 

406 # convert a string (dependent on python version) 

407 if not isinstance(arg, str): 

408 return arg.decode("utf-8") 

409 

410 return arg 

411 

412 def _str_to_bstr(self, string): 

413 """Convert a string to a binary string python version independent 

414 

415 Parameters 

416 ---------- 

417 string: str 

418 

419 Returns 

420 ------- 

421 string: bytes 

422 """ 

423 

424 if not isinstance(string, bytes): 

425 return string.encode("utf-8") 

426 

427 return string 

428 

429 def save_hdf5(self, filepath, compression="gzip"): 

430 """Save a binout as HDF5 

431 

432 Parameters 

433 ---------- 

434 filepath: str 

435 path where the HDF5 shall be saved 

436 compression: str 

437 compression technique (see h5py docs) 

438 

439 Examples 

440 -------- 

441 >>> binout = Binout("path/to/binout") 

442 >>> binout.save_hdf5("path/to/binout.h5") 

443 """ 

444 

445 with h5py.File(filepath, "w") as fh: 

446 self._save_all_variables(fh, compression) 

447 

448 def _save_all_variables(self, hdf5_grp, compression, *path): 

449 """Iterates through all variables in the Binout 

450 

451 Parameters 

452 ---------- 

453 hdf5_grp: Group 

454 group object in the HDF5, where all the data 

455 shall be saved into (of course in a tree like 

456 manner) 

457 compression: str 

458 compression technique (see h5py docs) 

459 path: Tuple[str, ...] 

460 entry path in the binout 

461 """ 

462 

463 ret = self.read(*path) 

464 path_str = "/".join(path) 

465 

466 # iterate through subdirs 

467 if isinstance(ret, list): 

468 

469 if path_str: 

470 hdf5_grp = hdf5_grp.create_group(path_str) 

471 

472 for entry in ret: 

473 path_child = path + (entry,) 

474 self._save_all_variables(hdf5_grp, compression, *path_child) 

475 # children are variables 

476 else: 

477 # can not save strings, only list of strings ... 

478 if isinstance(ret, str): 

479 ret = np.array([ret], dtype=np.dtype("S")) 

480 hdf5_grp.create_dataset(path[-1], data=ret, compression=compression)