Coverage for lasso/dimred/dimred_run.py: 65%

451 statements  

« prev     ^ index     » next       coverage.py v7.2.4, created at 2023-04-28 18:42 +0100

1import argparse 

2import enum 

3import glob 

4import os 

5import re 

6import shutil 

7import sys 

8import time 

9from concurrent.futures.process import ProcessPoolExecutor 

10from typing import Sequence, Tuple, Union 

11 

12import h5py 

13import numpy as np 

14import psutil 

15from rich.console import Console 

16from rich.progress import BarColumn, Progress 

17from rich.table import Table 

18from rich.text import Text 

19 

20from ..utils.rich_progress_bars import PlaceHolderBar, WorkingDots 

21from .svd.clustering_betas import create_cluster_arg_dict, create_detector_arg_dict, group_betas 

22from .svd.plot_beta_clusters import plot_clusters_js 

23from .svd.pod_functions import calculate_v_and_betas 

24from .svd.subsampling_methods import create_reference_subsample, remap_random_subsample 

25 

26# pylint: disable = too-many-lines 

27 

28 

29class DimredRunError(Exception): 

30 """Custom exception for errors during the dimensionality reduction""" 

31 

32 def __init__(self, msg): 

33 self.message = msg 

34 

35 

36def get_application_header(): 

37 """Prints the header of the command line tool""" 

38 

39 return """ 

40 

41 ==== LASSO - AI ==== 

42 

43 visit us: [link=http://www.lasso.de/en]www.lasso.de/en[/link] 

44 mail: lasso@lasso.de 

45 """ 

46 

47 

48def timestamp() -> str: 

49 """Get current timestamp as string 

50 

51 Returns 

52 ------- 

53 timestamp : str 

54 current timestamp as string 

55 """ 

56 

57 def add_zero(in_str) -> str: 

58 if len(in_str) == 1: 

59 return "0" + in_str 

60 return in_str 

61 

62 loc_time = time.localtime()[3:6] 

63 h_str = add_zero(str(loc_time[0])) 

64 m_str = add_zero(str(loc_time[1])) 

65 s_str = add_zero(str(loc_time[2])) 

66 t_str = "[" + h_str + ":" + m_str + ":" + s_str + "]" 

67 return t_str 

68 

69 

70def parse_dimred_args(): 

71 """Parse the arguments from the command line 

72 

73 Returns 

74 ------- 

75 args : `argparse.Namespace` 

76 parsed arguments 

77 """ 

78 

79 parser = argparse.ArgumentParser( 

80 description="Python utility script for dimensionality reduction written by LASSO GmbH." 

81 ) 

82 

83 parser.add_argument( 

84 "simulation_runs", 

85 type=str, 

86 nargs="*", 

87 help="Simulation runs or patterns used to search for simulation runs.", 

88 ) 

89 parser.add_argument( 

90 "--reference-run", 

91 type=str, 

92 help="Optional. Set the reference run instead of using the first entry in simulation runs.", 

93 ) 

94 parser.add_argument( 

95 "--exclude-runs", 

96 type=str, 

97 nargs="*", 

98 default=[], 

99 help="Optional. Runs to exclude from the analysis.", 

100 ) 

101 parser.add_argument( 

102 "--start-stage", 

103 type=str, 

104 nargs="?", 

105 default=DIMRED_STAGES[0], 

106 help="Optional. " 

107 f"At which specific stage to start the analysis ({', '.join(DIMRED_STAGES)}).", 

108 ) 

109 parser.add_argument( 

110 "--end-stage", 

111 type=str, 

112 nargs="?", 

113 default=DIMRED_STAGES[-1], 

114 help="Optional. " 

115 f"At which specific stage to stop the analysis ({', '.join(DIMRED_STAGES)}).", 

116 ) 

117 parser.add_argument( 

118 "--project-dir", 

119 type=str, 

120 required=True, 

121 help="Project dir for temporary files. Must be specified to allow" 

122 + " restart at specific steps", 

123 ) 

124 parser.add_argument( 

125 "--embedding-images", 

126 type=str, 

127 default="", 

128 help="Optional. Path to folder containing images of runs. Sample names must be numbers", 

129 ) 

130 parser.add_argument( 

131 "--logfile-filepath", 

132 type=str, 

133 nargs="?", 

134 default="", 

135 help="Optional. Path for the logfile. A file will be created automatically" 

136 + "in the project dir if not specified.", 

137 ) 

138 parser.add_argument( 

139 "--n-processes", 

140 type=int, 

141 nargs="?", 

142 default=max(1, psutil.cpu_count() - 1), 

143 help="Optional. Number of processes to use (default: n_cpu-1).", 

144 ) 

145 parser.add_argument( 

146 "--part-ids", 

147 type=str, 

148 nargs="*", 

149 default=[], 

150 help="Optional. Part ids to process. By default all are taken.", 

151 ) 

152 parser.add_argument( 

153 "--timestep", 

154 type=int, 

155 default="-1", 

156 help="Optional. Sets timestep to analyse. Uses last timestep if not set.", 

157 ) 

158 parser.add_argument( 

159 "--html-name", 

160 type=str, 

161 default="3d-beta-plot", 

162 help="Optional. Sets the name of the generated 3D visualization. " 

163 + "Default is '3d_beta_plot'", 

164 ) 

165 parser.add_argument( 

166 "--html-timestamp", 

167 action="store_true", 

168 help="""Optional. If set, the visualization will include a timestamp of yymmdd_hhmmss, 

169 else the previous file will be overwritten""", 

170 ) 

171 parser.add_argument( 

172 "--cluster-args", 

173 type=str, 

174 nargs="*", 

175 help="Optional. Arguments for clustering algorithms. " 

176 + "If not set, clustering will be skipped.", 

177 ) 

178 parser.add_argument( 

179 "--outlier-args", 

180 type=str, 

181 nargs="*", 

182 help="Optional. Arguments for outlier detection before clustering.", 

183 ) 

184 

185 if len(sys.argv) < 2: 

186 parser.print_help() 

187 sys.exit(0) 

188 

189 return parser.parse_args(sys.argv[1:]) 

190 

191 

192class DimredStage(enum.Enum): 

193 """Enum for all stages of the dimenstionality reduction""" 

194 

195 REFERENCE_RUN = "REFERENCE_RUN" 

196 IMPORT_RUNS = "IMPORT_RUNS" 

197 REDUCTION = "REDUCTION" 

198 CLUSTERING = "CLUSTERING" 

199 EXPORT_PLOT = "EXPORT_PLOT" 

200 

201 

202DIMRED_STAGES = ( 

203 DimredStage.REFERENCE_RUN.value, 

204 DimredStage.IMPORT_RUNS.value, 

205 DimredStage.REDUCTION.value, 

206 DimredStage.CLUSTERING.value, 

207 DimredStage.EXPORT_PLOT.value, 

208) 

209 

210 

211class HDF5FileNames(enum.Enum): 

212 """Enum for arrays in the hdf5 file""" 

213 

214 SUBSAMPLE_SAVE_NAME = "subsample" 

215 SUBSAMPLED_GROUP_NAME = "subsampled_runs" 

216 BETAS_GROUP_NAME = "betas" 

217 V_ROB_SAVE_NAME = "v_rob" 

218 PLOT_LOAD_TIME = "t_load" 

219 SUBSAMPLE_PROCESS_TIME = "t_total" 

220 NR_CLUSTER = "nr_clusters" 

221 HAS_OUTLIERS = "has_outliers" 

222 OUTLIERS = "outlier" 

223 CLUSTER = "cluster" 

224 

225 

226class DimredRun: 

227 """Class to control and run the dimensionality reduction process""" 

228 

229 # pylint: disable = too-many-instance-attributes 

230 

231 reference_run: str 

232 simulation_runs: Sequence[str] 

233 exclude_runs: Sequence[str] 

234 project_dir: str 

235 img_path: Union[None, str] 

236 logfile_filepath: str 

237 n_processes: int 

238 part_ids: Sequence[int] 

239 timestep: int 

240 start_stage_index: int 

241 end_stage_index: int 

242 skip_valid: bool 

243 html_name: str 

244 html_set_timestamp: bool 

245 show_output: bool 

246 cluster_type: Union[None, str] 

247 detector_type: Union[None, str] 

248 cluster_args: Union[None, dict] 

249 detector_args: Union[None, dict] 

250 h5file: Union[None, h5py.File] 

251 use_folder_name: bool 

252 

253 def __init__( 

254 self, 

255 simulation_runs: Sequence[str], 

256 start_stage: str, 

257 end_stage: str, 

258 project_dir: str, 

259 html_name: str = "3d-beta-plot", 

260 html_set_timestamp: bool = False, 

261 reference_run: Union[str, None] = None, 

262 console: Union[Console, None] = None, 

263 img_path: Union[None, str] = None, 

264 exclude_runs: Union[None, Sequence[str]] = None, 

265 logfile_filepath: Union[str, None] = None, 

266 n_processes: int = 1, 

267 part_id_filter: Union[None, Sequence[int]] = None, 

268 timestep: int = -1, 

269 show_output: bool = True, 

270 cluster_args: Union[None, Sequence[str]] = None, 

271 outlier_args: Union[None, Sequence[str]] = None, 

272 ): 

273 """Class handling a dimensionality reduction 

274 

275 Parameters 

276 ---------- 

277 simulation_runs : Sequence[str] 

278 simulation runs to analyze 

279 start_stage: str 

280 where to start 

281 end_stage: str 

282 where to stop 

283 project_dir : Union[None, str] 

284 required project directory for creation of buffer files. Allows restart in between. 

285 html_name: str 

286 Name of the output .html file 

287 html_set_timestamp: bool 

288 If true, the output .html will include a timestamp (hh_mm_ss) at the end of the filename 

289 reference_run : str 

290 filepath to the reference run. 

291 If not set, first entry in simulation_runs will be used as reference run. 

292 console: Union[rich.console.Console, None], default: None 

293 Console for information printing and logging. 

294 Rich offers pretty text printing, syntax highlighting etc. 

295 img_path: Union[None, str] 

296 optional image directory to show images in visualization. 

297 exclude_runs: Union[Sequence[str], None] 

298 optional list of runs to exclude from processing 

299 logfile_filepath : Union[str, None] 

300 path of the log file (always appends) 

301 n_processes: int 

302 number of processes to use during execution 

303 part_id_filter: Union[Sequence[int], None] 

304 which part ids to process 

305 timestep: int, default: -1 

306 specifies timestep to analyze in clustering and show in output visualization 

307 show_output: bool, default: True 

308 Set to false not to show the output html in the browser 

309 cluster_args: Union[None, [str]], default: None 

310 Arguments for cluster algorithm 

311 outlier_args: Union[None, [str]], default: None 

312 Arguments for outlier detection algorithm 

313 

314 Notes 

315 ----- 

316 Using a project directory allows to restart stages of the entire 

317 process. 

318 """ 

319 

320 # pylint: disable = too-many-arguments, too-many-locals 

321 

322 # settings 

323 # Set up Rich Console and Rich logging 

324 self.console = console 

325 if self.console: 

326 self.console.print(get_application_header(), style="success", highlight=True) 

327 

328 self.logfile_filepath = ( 

329 logfile_filepath 

330 if logfile_filepath 

331 else os.path.join(project_dir, "logfile") 

332 if project_dir 

333 else "" 

334 ) 

335 

336 self._msg_option = "{:16s}: {}" 

337 

338 # run variables 

339 # table is a rich format containing information of the variables 

340 table = Table(show_header=False) 

341 self.n_processes = self._parse_n_processes(n_processes, table) 

342 

343 # check for correctly parsed simulation-runs 

344 if len(simulation_runs) == 0: 

345 err_msg = "No entries in positional argument 'simulation-runs'." 

346 err_msg += "\nIt is recommended to set the 'simulation-runs' arguments first!" 

347 self.raise_error(err_msg) 

348 

349 # parse simulation and reference run 

350 # if no reference run was set use first simulation run 

351 ( 

352 self.simulation_runs, 

353 self.reference_run, 

354 self.exclude_runs, 

355 ) = self._parse_simulation_and_reference_runs( 

356 simulation_runs, reference_run, tuple() if not exclude_runs else exclude_runs, table 

357 ) 

358 

359 # check if basename or foldername serves as unique identifier 

360 self.use_folder_name = os.path.basename(self.simulation_runs[0]) == os.path.basename( 

361 self.simulation_runs[1] 

362 ) 

363 

364 # set project dir and simulation runs 

365 self.project_dir = self._parse_project_dir(project_dir, table) 

366 self.part_ids = part_id_filter if part_id_filter is not None else tuple() 

367 if self.part_ids is not None and len(self.part_ids) != 0: 

368 table.add_row("selected parts", ",".join(str(entry) for entry in self.part_ids)) 

369 self.timestep = timestep 

370 if timestep != -1: 

371 table.add_row("Timestep: ", str(timestep)) 

372 

373 # check if start_stage_index and end_stage_index are valid 

374 self.start_stage_index, self.end_stage_index = self._parse_stages(start_stage, end_stage) 

375 if self.console: 

376 self.console.print(table) 

377 

378 # check valid image path 

379 self.img_path = self._check_img_path(img_path) if img_path else None 

380 

381 # set cluster and outlier arguments 

382 self._parse_cluster_and_outlier_args(cluster_args, outlier_args) 

383 

384 self.html_name = self._parse_html_name(html_name) 

385 self.html_set_timestamp = html_set_timestamp 

386 self.show_output = show_output 

387 

388 self.pool = None 

389 

390 def log(self, msg: str, style: Union[str, None] = None, highlight: bool = False): 

391 """Log a message 

392 

393 Parameters 

394 ---------- 

395 msg : str 

396 message to log 

397 style : Union[str, None] 

398 style of the message 

399 highlight : bool 

400 whether to highlight the message or not 

401 """ 

402 if self.console: 

403 self.console.print(timestamp() + msg, style=style, highlight=highlight) 

404 

405 def raise_error(self, err_msg: str): 

406 """ 

407 Parameters 

408 ---------- 

409 err_msg : str 

410 error message to be raised 

411 

412 Raises 

413 ------ 

414 RuntimeError 

415 raises an exception with error msg 

416 

417 Notes 

418 ----- 

419 Logs correctly and deals with open file handles. 

420 """ 

421 

422 err_msg_text = Text(err_msg, style="error") 

423 

424 if not self.console: 

425 raise DimredRunError(err_msg) 

426 

427 try: 

428 self.h5file.close() 

429 self.console.print("closed hdf5 file") 

430 except AttributeError: 

431 self.console.print("no hdf5 file to close") 

432 

433 self.console.print(err_msg_text, style="error") 

434 if self.logfile_filepath: 

435 self.console.save_html(self.logfile_filepath) 

436 

437 raise DimredRunError(err_msg) 

438 

439 # pylint believes this function has different return statements 

440 # whereas it only has one. 

441 # pylint: disable = inconsistent-return-statements 

442 def _check_img_path(self, img_path: str) -> str: 

443 """checks if provided image path is valid""" 

444 

445 if os.path.isdir(img_path): 

446 abs_path = os.path.abspath(img_path) 

447 js_path = re.sub(r"\\", "/", abs_path) 

448 return js_path 

449 

450 err_msg = "provided argument --embedding.images is not a folder" 

451 self.raise_error(err_msg) 

452 

453 def _parse_stages(self, start_stage: str, end_stage: str): 

454 

455 # check validity 

456 if start_stage not in DIMRED_STAGES: 

457 err_msg = f"{start_stage} is not a valid stage. Try: {', '.join(DIMRED_STAGES)}." 

458 self.raise_error(err_msg) 

459 

460 if end_stage not in DIMRED_STAGES: 

461 err_msg = f"{end_stage} is not a valid stage. Try: {', '.join(DIMRED_STAGES)}." 

462 self.raise_error(err_msg) 

463 

464 # get indexes 

465 start_stage_index = DIMRED_STAGES.index(start_stage) 

466 end_stage_index = DIMRED_STAGES.index(end_stage) 

467 

468 # check if start and end are in correct order 

469 if start_stage_index > end_stage_index: 

470 err_msg = ( 

471 f"The specified end stage '{end_stage}' " 

472 f"comes before the start stage ({start_stage}). " 

473 f"Try the order: {', '.join(DIMRED_STAGES)}" 

474 ) 

475 self.raise_error(err_msg) 

476 

477 return start_stage_index, end_stage_index 

478 

479 def _check_valid_stage_skip(self): 

480 # check if stage skip is valid 

481 if self.start_stage_index == DIMRED_STAGES.index(DimredStage.IMPORT_RUNS.value): 

482 self.log("Skipped setup stage", style="warning") 

483 if HDF5FileNames.SUBSAMPLE_SAVE_NAME.value not in self.h5file: # type: ignore 

484 msg = "no reference sample found" 

485 self.raise_error(msg) 

486 elif self.start_stage_index == DIMRED_STAGES.index(DimredStage.REDUCTION.value): 

487 self.log("Skipped import stage", style="warning") 

488 if HDF5FileNames.SUBSAMPLED_GROUP_NAME.value not in self.h5file: # type: ignore 

489 msg = "no subsampled samples found" 

490 self.raise_error(msg) 

491 elif self.start_stage_index == DIMRED_STAGES.index(DimredStage.CLUSTERING.value): 

492 self.log("Skipped reduction stage", style="warning") 

493 if ( 

494 HDF5FileNames.V_ROB_SAVE_NAME.value not in self.h5file # type: ignore 

495 or HDF5FileNames.BETAS_GROUP_NAME.value not in self.h5file 

496 ): # type: ignore 

497 err_msg = "Could not find reduced betas and V_ROB" 

498 self.raise_error(err_msg) 

499 elif self.start_stage_index == DIMRED_STAGES.index(DimredStage.CLUSTERING.value): 

500 self.log("Skipped clustering stage", style="warning") 

501 

502 def _parse_part_ids(self, part_ids: Union[Sequence[int], None]) -> Sequence[int]: 

503 

504 if not part_ids: 

505 return tuple() 

506 

507 assert all(isinstance(pid, int) for pid in part_ids), "All part ids must be of type 'int'" 

508 

509 return part_ids 

510 

511 def _parse_project_dir(self, project_dir: Union[str, None], table: Table): 

512 

513 if not project_dir: 

514 return "" 

515 

516 project_dir = os.path.abspath(project_dir) 

517 

518 if os.path.isfile(project_dir): 

519 err_msg = ( 

520 f"The project path '{project_dir}' is pointing at an existing file." 

521 " Change either the project path or move the file." 

522 ) 

523 self.raise_error(err_msg) 

524 

525 if not os.path.exists(project_dir): 

526 os.makedirs(project_dir, exist_ok=True) 

527 

528 table.add_row("project-dir", project_dir) 

529 return project_dir 

530 

531 def _parse_simulation_and_reference_runs( 

532 self, 

533 simulation_run_patterns: Sequence[str], 

534 reference_run_pattern: Union[None, str], 

535 exclude_runs: Sequence[str], 

536 table: Table, 

537 ) -> Tuple[Sequence[str], str, Sequence[str]]: 

538 

539 # pylint: disable = too-many-locals 

540 

541 # search all denoted runs 

542 simulation_runs = [] 

543 for pattern in simulation_run_patterns: 

544 simulation_runs += glob.glob(pattern) 

545 simulation_runs = [ 

546 os.path.normpath(filepath) for filepath in simulation_runs if os.path.isfile(filepath) 

547 ] 

548 

549 # search all excluded runs 

550 runs_to_exclude = [] 

551 for pattern in exclude_runs: 

552 runs_to_exclude += glob.glob(pattern) 

553 runs_to_exclude = [ 

554 os.path.normpath(filepath) for filepath in runs_to_exclude if os.path.isfile(filepath) 

555 ] 

556 

557 n_runs_before_filtering = len(simulation_runs) 

558 simulation_runs = [ 

559 filepath for filepath in simulation_runs if filepath not in runs_to_exclude 

560 ] 

561 n_runs_after_filtering = len(simulation_runs) 

562 

563 # check if simulation runs are valid 

564 

565 simulation_runs_ok = len(simulation_runs) != 0 

566 

567 if not simulation_runs_ok: 

568 err_msg = ( 

569 "No simulation files could be found with the specified patterns. " 

570 "Check the argument 'simulation_runs'." 

571 ) 

572 self.raise_error(err_msg) 

573 

574 table.add_row("# simul.-files", str(len(simulation_runs))) 

575 

576 table.add_row("# excluded files", f"{n_runs_before_filtering - n_runs_after_filtering}") 

577 

578 # check for valid reference run 

579 reference_run = "" 

580 if reference_run_pattern: 

581 

582 reference_run_ok = os.path.isfile(reference_run_pattern) 

583 if not reference_run_ok: 

584 err_msg = f"Filepath '{reference_run_pattern}' is not a file." 

585 self.raise_error(err_msg) 

586 

587 reference_run = os.path.normpath(reference_run_pattern) 

588 else: 

589 # use first simulation run if no reference run was provided 

590 # check if enough simulation runs remain 

591 if len(simulation_runs) > 1: 

592 reference_run = simulation_runs[0] 

593 else: 

594 err_msg = "Number of Simulation runs after using first as reference run is zero." 

595 self.raise_error(err_msg) 

596 

597 # add to table 

598 table.add_row("reference-run", reference_run) 

599 

600 # remove the reference run from simulation runs 

601 if reference_run and reference_run in simulation_runs: 

602 simulation_runs.remove(reference_run) 

603 

604 # sort it because we can! 

605 def atoi(text): 

606 return int(text) if text.isdigit() else text 

607 

608 def natural_keys(text): 

609 return [atoi(c) for c in re.split(r"(\d+)", text)] 

610 

611 simulation_runs = sorted(simulation_runs, key=natural_keys) 

612 

613 return simulation_runs, reference_run, runs_to_exclude 

614 

615 def _parse_cluster_and_outlier_args( 

616 self, cluster_args: Union[Sequence[str], None], outlier_args: Union[Sequence[str], None] 

617 ): 

618 """verifies correct oultier and cluster args, if provided""" 

619 

620 # creates a valid argument dict for clustering arguments 

621 if cluster_args is None: 

622 self.cluster_type = None 

623 self.cluster_args = None 

624 else: 

625 result = create_cluster_arg_dict(cluster_args) 

626 

627 # check for errors 

628 if isinstance(result, str): 

629 self.raise_error(result) 

630 else: 

631 self.cluster_type, self.cluster_args = result[0], result[1] 

632 

633 # creates a valid argument dict for outlier detection arguments 

634 self.detector_type = None 

635 self.detector_args = {} 

636 

637 if outlier_args: 

638 result = create_detector_arg_dict(outlier_args) 

639 # check for errors 

640 if isinstance(result, str): 

641 self.raise_error(result) 

642 self.detector_type = result[0] 

643 self.detector_args = result[1] 

644 

645 def _parse_n_processes(self, n_processes: int, table: Table) -> int: 

646 

647 if n_processes <= 0: 

648 err_msg = f"n-processes is '{n_processes}' but must be at least 1." 

649 self.raise_error(err_msg) 

650 

651 table.add_row("n-processes", str(n_processes)) 

652 return n_processes 

653 

654 def _parse_html_name(self, html_name_string: str) -> str: 

655 

656 html_name, replace_count = re.subn(r"[!§$%&/()=?\"\[\]{}\\.,;:<>|]", "", html_name_string) 

657 html_name = html_name.replace(" ", "-") 

658 

659 if replace_count > 0: 

660 msg = ( 

661 f"Replaced {replace_count} invalid characters for the html file name. " 

662 f"The new hmtl name is: {html_name}" 

663 ) 

664 self.log(msg) 

665 

666 return html_name 

667 

668 def __enter__(self): 

669 self.pool = ProcessPoolExecutor(max_workers=self.n_processes) 

670 self.h5file = h5py.File(os.path.join(self.project_dir, "project_buffer.hdf5"), "a") 

671 self._check_valid_stage_skip() 

672 return self 

673 

674 def __exit__(self, exception_type, exception_value, traceback): 

675 self.pool = None 

676 self.h5file.close() 

677 self.h5file = None 

678 

679 def _perform_context_check(self): 

680 if self.pool is None: 

681 msg = "The class function can only be used in a 'with' block on the instance itself." 

682 self.raise_error(msg) 

683 

684 def reset_project_dir(self): 

685 """resets the project directory entirely""" 

686 

687 # delete folder 

688 if os.path.exists(self.project_dir): 

689 shutil.rmtree(self.project_dir) 

690 

691 if self.project_dir: 

692 os.makedirs(self.project_dir, exist_ok=True) 

693 

694 def process_reference_run(self): 

695 """Process the reference run""" 

696 

697 # is a process pool up 

698 self._perform_context_check() 

699 

700 msg = "Reference Subsample" 

701 self.log(msg) 

702 

703 # init progress bar 

704 if self.console: 

705 prog = Progress("", WorkingDots()) 

706 else: 

707 prog = PlaceHolderBar() 

708 with prog: 

709 ref_task = prog.add_task("", total=1) 

710 

711 # Delete existing reference subsample 

712 if HDF5FileNames.SUBSAMPLE_SAVE_NAME.value in self.h5file: # type: ignore 

713 del self.h5file[HDF5FileNames.SUBSAMPLE_SAVE_NAME.value] 

714 

715 reference_sample = create_reference_subsample(self.reference_run, self.part_ids) 

716 

717 prog.advance(ref_task) # type: ignore 

718 

719 if isinstance(reference_sample, str): 

720 self.raise_error(reference_sample) 

721 

722 # create dataset in h5file 

723 h5_ref = self.h5file.create_dataset( 

724 HDF5FileNames.SUBSAMPLE_SAVE_NAME.value, data=reference_sample[0] 

725 ) 

726 h5_ref.attrs[HDF5FileNames.PLOT_LOAD_TIME.value] = reference_sample[2] 

727 h5_ref.attrs[HDF5FileNames.SUBSAMPLE_PROCESS_TIME.value] = reference_sample[1] 

728 

729 # log time and success 

730 self.log("Loadtime Reference subsample: " + str(reference_sample[2])[:5]) 

731 self.log("Total time for Reference subsample: " + str(reference_sample[1])[:5]) 

732 self.log("Reference subsample completed", style="success") 

733 

734 def subsample_to_reference_run(self): 

735 """Subsamples all runs""" 

736 

737 # pylint: disable = too-many-branches,too-many-locals 

738 

739 self._perform_context_check() 

740 self.log("Subsampling") 

741 

742 # init progress bar 

743 if self.console: 

744 prog = Progress( 

745 "[progress.description]{task.description}", 

746 WorkingDots(), 

747 BarColumn(), 

748 "{task.completed} of {task.total};", 

749 # SubsamplingWaitTime(self.n_processes) 

750 ) 

751 else: 

752 prog = PlaceHolderBar() 

753 

754 with prog: 

755 

756 # define progressbar task 

757 task1 = prog.add_task( 

758 "[cyan]Subsampling plots [/cyan]", total=len(self.simulation_runs) 

759 ) 

760 h5_ref = self.h5file[HDF5FileNames.SUBSAMPLE_SAVE_NAME.value] 

761 # prog.columns[4].update_avrg(h5_ref.attrs[HDF5FileNames.plot_load_time.value]) 

762 

763 submitted_samples = [] 

764 

765 # delete previous subsample entries 

766 if HDF5FileNames.SUBSAMPLED_GROUP_NAME.value in self.h5file: # type: ignore 

767 del self.h5file[HDF5FileNames.SUBSAMPLED_GROUP_NAME.value] 

768 

769 # submit all simulation runs 

770 for _, entry in enumerate(self.simulation_runs): 

771 name = "overwrite_this" 

772 if self.use_folder_name: 

773 name = os.path.basename(os.path.split(entry)[0]) 

774 else: 

775 name = os.path.basename(entry) 

776 

777 try: 

778 future = self.pool.submit( 

779 remap_random_subsample, entry, self.part_ids, h5_ref[:] 

780 ) 

781 

782 submitted_samples.append(np.array([name, future])) 

783 except Exception: 

784 break 

785 

786 # check if an error occurred 

787 # pylint: disable = protected-access, undefined-loop-variable 

788 if self.pool._broken and "entry" in locals(): 

789 msg = f"Failed to load file: {entry}" 

790 self.raise_error(msg) 

791 

792 # we measure required time here 

793 t_cum = 0 

794 t_cum_io = 0 

795 

796 # prepare hdf5 file 

797 self.h5file.create_group(HDF5FileNames.SUBSAMPLED_GROUP_NAME.value) 

798 # This isn't very elegant, there must be a better way 

799 while not prog.finished: 

800 for i, sub in enumerate(submitted_samples): 

801 if sub[1].done(): 

802 try: 

803 if isinstance(sub[1].result()[0], str): 

804 self.raise_error(sub[1].result()) 

805 h5_sample = self.h5file[ 

806 HDF5FileNames.SUBSAMPLED_GROUP_NAME.value 

807 ].create_dataset(sub[0], data=sub[1].result()[0]) 

808 h5_sample.attrs[HDF5FileNames.PLOT_LOAD_TIME.value] = sub[1].result()[2] 

809 h5_sample.attrs[HDF5FileNames.SUBSAMPLE_PROCESS_TIME.value] = sub[ 

810 1 

811 ].result()[1] 

812 submitted_samples.pop(i) 

813 prog.advance(task1) # type: ignore 

814 t_cum_io += sub[1].result()[2] 

815 t_cum += sub[1].result()[1] 

816 except RuntimeError: 

817 err_msg = f"Error while loading {sub}" 

818 self.raise_error(err_msg) 

819 time.sleep(0.5) 

820 

821 # calculate required time 

822 t_avrg = t_cum / len(self.simulation_runs) 

823 t_avrg_io = t_cum_io / len(self.simulation_runs) 

824 

825 # log results 

826 self.log("Average Time per Subsampling Process: " + str(t_avrg)[0:5]) 

827 self.log("Average Loadtime per sample: " + str(t_avrg_io)[0:5]) 

828 

829 self.log("Subsampling completed", style="success") 

830 

831 # Finished: We either have all sub-sampled runs in the project_dir, 

832 # or a list containing all sub-sampled runs 

833 # Problem: we might be running into issues with available RAM? 

834 # 1000 runs, 30 timesteps, sub-sampled onto 2000 points -> 1,34GB 

835 

836 def dimension_reduction_svd(self): 

837 """Calculate V_ROB and Betas""" 

838 

839 # pylint: disable = too-many-locals 

840 

841 # applying pod_functions.py 

842 # (TODO: lots of stuff in the pod_functions.py has to be overhauled) 

843 # save if appropriate into project_dir 

844 self.log("Dimension Reduction") 

845 

846 if self.console: 

847 # prog = Progress("", WorkingDots()) 

848 prog = Progress( 

849 "[progress.description]{task.description}", 

850 WorkingDots(), 

851 BarColumn(), 

852 "{task.completed} of {task.total} timesteps;", 

853 # SubsamplingWaitTime(self.n_processes) 

854 ) 

855 else: 

856 prog = PlaceHolderBar() 

857 with prog: 

858 # deletes old files 

859 if HDF5FileNames.BETAS_GROUP_NAME.value in self.h5file: # type: ignore 

860 del self.h5file[HDF5FileNames.BETAS_GROUP_NAME.value] 

861 if HDF5FileNames.V_ROB_SAVE_NAME.value in self.h5file: # type: ignore 

862 del self.h5file[HDF5FileNames.V_ROB_SAVE_NAME.value] 

863 

864 beta_group = self.h5file.create_group(HDF5FileNames.BETAS_GROUP_NAME.value) 

865 

866 excluded_entries = [ 

867 os.path.basename(os.path.split(entry)[0]) 

868 if self.use_folder_name 

869 else os.path.basename(entry) 

870 for entry in self.exclude_runs 

871 ] 

872 

873 valid_entries = [ 

874 entry 

875 for entry in self.h5file[HDF5FileNames.SUBSAMPLED_GROUP_NAME.value].keys() 

876 if entry not in excluded_entries 

877 ] 

878 

879 run_timesteps = np.array( 

880 [ 

881 self.h5file[HDF5FileNames.SUBSAMPLED_GROUP_NAME.value][entry].shape[0] 

882 for entry in valid_entries 

883 ] 

884 ) 

885 

886 min_step = np.min(run_timesteps) 

887 max_step = np.max(run_timesteps) 

888 

889 if min_step != max_step: 

890 warn_msg = ( 

891 "The timesteps fo the samples don't match, only " 

892 + "processing up to timestep {}. Skipped {} timesteps" 

893 ) 

894 warn_msg = warn_msg.format(min_step, max_step - min_step) 

895 self.log(warn_msg, style="warning") 

896 

897 # add task after checking condition, else output looks wonky 

898 beta_task = prog.add_task("[cyan]Reducing Plots [/cyan]", total=int(min_step)) 

899 

900 sub_displ = np.stack( 

901 [ 

902 self.h5file[HDF5FileNames.SUBSAMPLED_GROUP_NAME.value][entry][:min_step, :] 

903 for entry in valid_entries 

904 ] 

905 ) 

906 

907 result = calculate_v_and_betas( 

908 sub_displ, progress_bar=prog, task_id=beta_task 

909 ) # type: ignore 

910 # returns string if samplesize to small 

911 if isinstance(result, str): 

912 self.raise_error(result) 

913 

914 v_rob, betas = result 

915 for i, sample in enumerate( 

916 self.h5file[HDF5FileNames.SUBSAMPLED_GROUP_NAME.value].keys() 

917 ): 

918 beta_group.create_dataset(sample, data=betas[i]) 

919 

920 self.h5file.create_dataset(HDF5FileNames.V_ROB_SAVE_NAME.value, data=v_rob) 

921 

922 self.log("Dimension Reduction completed", style="success") 

923 

924 def clustering_results(self): 

925 """clustering results""" 

926 

927 # pylint: disable = too-many-locals 

928 

929 self._perform_context_check() 

930 # delete old entries 

931 betas_group = self.h5file[HDF5FileNames.BETAS_GROUP_NAME.value] 

932 if HDF5FileNames.HAS_OUTLIERS.value in betas_group.attrs: 

933 del betas_group.attrs[HDF5FileNames.HAS_OUTLIERS.value] 

934 

935 if HDF5FileNames.NR_CLUSTER.value in betas_group.attrs: 

936 del betas_group.attrs[HDF5FileNames.NR_CLUSTER.value] 

937 

938 if not self.cluster_type and not self.detector_type: 

939 msg = "No arguments provided for clustering, clustering aborted" 

940 self.log(msg) 

941 return 

942 

943 self.log("Clustering") 

944 

945 # init progress bar 

946 if self.console: 

947 prog = Progress("", WorkingDots()) 

948 else: 

949 prog = PlaceHolderBar() 

950 with prog: 

951 cluster_task = prog.add_task("", total=1) 

952 

953 # performs clustering with provided arguments 

954 

955 excluded_entries = [ 

956 os.path.basename(os.path.split(entry)[0]) 

957 if self.use_folder_name 

958 else os.path.basename(entry) 

959 for entry in self.exclude_runs 

960 ] 

961 

962 beta_index = np.stack( 

963 [key for key in betas_group.keys() if key not in excluded_entries] 

964 ) 

965 try: 

966 betas = np.stack( 

967 [betas_group[entry][self.timestep, :3] for entry in beta_index] 

968 ) # betas_group.keys()]) 

969 except ValueError: 

970 log_msg = ( 

971 "Invalid parameter for timestep. Set a valid timestep with --timestep.\n" 

972 "To save time, you can restart the tool with --start-stage CLUSTERING." 

973 ) 

974 self.log(log_msg, style="warning") 

975 t_max = betas_group[beta_index[0]][:].shape[0] 

976 err_msg = ( 

977 f"Timestep {self.timestep} is not a valid timestep. " 

978 f"Samples have {t_max} timesteps. " 

979 f"Choose a timestep between 0 and {t_max - 1}" 

980 ) 

981 self.raise_error(err_msg) 

982 

983 result = group_betas( 

984 beta_index, 

985 betas, 

986 cluster=self.cluster_type, 

987 cluster_params=self.cluster_args, 

988 detector=self.detector_type, 

989 detector_params=self.detector_args, 

990 ) 

991 

992 if isinstance(result, str): 

993 self.raise_error(result) 

994 

995 id_cluster = result[1] 

996 

997 # Save clusters 

998 if len(id_cluster) > 1: 

999 betas_group.attrs.create(HDF5FileNames.NR_CLUSTER.value, len(id_cluster)) 

1000 if self.detector_type is not None: 

1001 # if attribute has_outliers is set, the first cluster contains the outliers 

1002 # so all outliers can be found by searching for the cluster attribute "0" 

1003 betas_group.attrs.create(HDF5FileNames.HAS_OUTLIERS.value, len(id_cluster[0])) 

1004 for index, cluster in enumerate(id_cluster): 

1005 for entry in cluster: 

1006 # Enter appropriate cluster as attribute 

1007 sample = betas_group[entry] 

1008 sample.attrs.create(HDF5FileNames.CLUSTER.value, index) 

1009 

1010 prog.advance(cluster_task) # type: ignore 

1011 

1012 self.log("Clustering completed", style="success") 

1013 

1014 def visualize_results(self): 

1015 """creates an output .html file""" 

1016 

1017 self._perform_context_check() 

1018 self.log("Creating .html viz") 

1019 betas_group = self.h5file[HDF5FileNames.BETAS_GROUP_NAME.value] 

1020 mark_outliers = False 

1021 

1022 excluded_entries = [ 

1023 os.path.basename(os.path.split(entry)[0]) 

1024 if self.use_folder_name 

1025 else os.path.basename(entry) 

1026 for entry in self.exclude_runs 

1027 ] 

1028 

1029 # check if clustering was performed, else load all betas into one pseudo-cluster 

1030 if HDF5FileNames.NR_CLUSTER.value not in betas_group.attrs: 

1031 

1032 # plotfunction expects list of cluster 

1033 # we have no clusters -> we claim all is in one cluster 

1034 

1035 # Create and load ids 

1036 id_cluster = [ 

1037 np.stack([key for key in betas_group.keys() if key not in excluded_entries]) 

1038 ] 

1039 

1040 # Create and load betas 

1041 beta_cluster = [np.stack([betas_group[entry][-1] for entry in id_cluster[0]])] 

1042 

1043 else: 

1044 # check if outlier where detected 

1045 if HDF5FileNames.HAS_OUTLIERS.value in betas_group.attrs: 

1046 mark_outliers = True 

1047 

1048 # index of all runs 

1049 id_data = np.stack([key for key in betas_group.keys() if key not in excluded_entries]) 

1050 

1051 # create an index referencing each run to a cluster 

1052 cluster_index = np.stack( 

1053 [betas_group[entry].attrs[HDF5FileNames.CLUSTER.value] for entry in id_data] 

1054 ) 

1055 

1056 # load betas & ids 

1057 beta_data = np.stack([betas_group[entry][-1] for entry in id_data]) 

1058 

1059 # create list containing list of clusters 

1060 beta_cluster = [] 

1061 id_cluster = [] 

1062 for i, cluster in enumerate(range(betas_group.attrs[HDF5FileNames.NR_CLUSTER.value])): 

1063 chosen = np.where(cluster_index == cluster)[0] 

1064 if len(chosen) > 0: 

1065 beta_cluster.append(beta_data[chosen]) 

1066 id_cluster.append(id_data[chosen]) 

1067 elif len(chosen) == 0 and i == 0: 

1068 mark_outliers = False 

1069 

1070 plot_clusters_js( 

1071 beta_cluster, 

1072 id_cluster, 

1073 save_path=self.project_dir, 

1074 img_path=self.img_path, 

1075 mark_outliers=mark_outliers, 

1076 mark_timestamp=self.html_set_timestamp, 

1077 filename=self.html_name, 

1078 show_res=self.show_output, 

1079 ) 

1080 self.log("Finished creating viz", style="success")