Coverage for lasso/dimred/test_dimred_run.py: 99%

73 statements  

« prev     ^ index     » next       coverage.py v7.2.4, created at 2023-04-28 18:42 +0100

1import os 

2import tempfile 

3from unittest import TestCase 

4 

5import h5py 

6import numpy as np 

7 

8from lasso.dimred.dimred_run import DIMRED_STAGES, DimredRun, DimredRunError, HDF5FileNames 

9from lasso.dimred.test_plot_creator import create_50_fake_plots 

10 

11 

12class TestDimredRun(TestCase): 

13 def test_run(self): 

14 """Verifies correct function of DimredRun.py""" 

15 verification_hdf5_file = h5py.File("test/DimredRunTest/verificationFile.hdf5", "r") 

16 

17 with tempfile.TemporaryDirectory() as tmpdir: 

18 

19 # create simulation runs 

20 create_50_fake_plots(folder=tmpdir, n_nodes_x=500, n_nodes_y=10) 

21 

22 # collect all simulation runs 

23 # sim_dir = "test/dimredTestPlots" 

24 sim_files = os.listdir(tmpdir) 

25 # sim_files.pop(sim_files.index("htmlTestPage.html")) 

26 sim_runs = [] 

27 for sim in sim_files: 

28 sim_runs.append(os.path.join(tmpdir, sim, "plot")) 

29 

30 test_run = DimredRun( 

31 reference_run=os.path.join(tmpdir, "SVDTestPlot00/plot"), 

32 simulation_runs=sim_runs, 

33 start_stage=DIMRED_STAGES[0], 

34 end_stage="CLUSTERING", 

35 console=None, 

36 project_dir="test/DimredRunTest", 

37 n_processes=5, 

38 cluster_args=["kmeans"], 

39 ) 

40 

41 with test_run: 

42 # verify creation of reference_subsample 

43 # to be able to reproduce results, each DimredRun must return same results 

44 test_run.process_reference_run() 

45 

46 # check if reference subsamples match 

47 test_refsample = test_run.h5file[HDF5FileNames.SUBSAMPLE_SAVE_NAME.value] 

48 verification_refsample = verification_hdf5_file[ 

49 HDF5FileNames.SUBSAMPLE_SAVE_NAME.value 

50 ] 

51 self.assertEqual(test_refsample.shape, verification_refsample.shape) 

52 self.assertTrue((test_refsample[:] - verification_refsample[:]).max() == 0) 

53 

54 # check if the expected reference run is chosen 

55 self.assertEqual( 

56 os.path.abspath(os.path.join(tmpdir, "SVDTestPlot00/plot")), 

57 test_run.reference_run, 

58 ) 

59 

60 # check if subsampled samples match 

61 test_run.subsample_to_reference_run() 

62 

63 # get subsampled samples 

64 test_sub_group = test_run.h5file[HDF5FileNames.SUBSAMPLED_GROUP_NAME.value] 

65 test_subs = np.stack([test_sub_group[key][:] for key in test_sub_group.keys()]) 

66 

67 # check if shape is equal to (n_samples, timesteps, subsampled nodes, dims) 

68 # we have 50 sample, minus ref_run is 49 

69 # we have 5 timesteps 

70 # we subsample to 2000 nodes 

71 # we always have 3 spatial dimensions 

72 self.assertEqual(test_subs.shape, (49, 5, 2000, 3)) 

73 

74 # check if svd yields consistent results 

75 test_run.dimension_reduction_svd() 

76 

77 # get test betas 

78 test_betas_group = test_run.h5file[HDF5FileNames.BETAS_GROUP_NAME.value] 

79 test_ids = np.stack([key for key in test_betas_group.keys()]) 

80 test_betas = np.stack([test_betas_group[key][:] for key in test_betas_group.keys()]) 

81 

82 # we check if test_ids and test_betas are of correct shape 

83 # we have 44 samples, 5 timesteps and save the first 10 betas 

84 self.assertEqual(test_ids.shape, (49,)) 

85 self.assertEqual(test_betas.shape, (49, 5, 10)) 

86 

87 test_v_rob = test_run.h5file[HDF5FileNames.V_ROB_SAVE_NAME.value][:] 

88 # shape of v_rob must be (eigen, timesteps, nodes) 

89 self.assertEqual(test_v_rob.shape, (10, 5, 2000 * 3)) 

90 

91 # verify that calculated betas are reproducable as expected 

92 # first, create displ mat containing difference in displ over time 

93 verify_displ_stacked = test_subs.reshape(49, 5, 2000 * 3) 

94 verify_diff_mat = np.stack( 

95 [verify_displ_stacked[:, 0, :] for _ in range(5)] 

96 ).reshape(49, 5, 2000 * 3) 

97 verify_displ_stacked = verify_displ_stacked - verify_diff_mat 

98 

99 # calculate betas and check if they are similar 

100 verify_betas = np.einsum("stn, ktn -> stk", verify_displ_stacked, test_v_rob) 

101 self.assertTrue(np.allclose(verify_betas, test_betas)) 

102 

103 # recalculate displ 

104 recalc_displ_stacked = np.einsum("stk, ktn -> stn", test_betas, test_v_rob) 

105 

106 # Due to projection into eigenspace and back not using all avaiable eigenvectors, 

107 # a small error margin is inevitable 

108 self.assertTrue((verify_displ_stacked - recalc_displ_stacked).max() <= 1e-5) 

109 

110 # checking clustering and html output makes little sense here, 

111 # but we know how the created plots are laid out: 25 bending up, 25 bending down 

112 # this should be presented in the betas 

113 # We will only look at the last timestep 

114 # We only check the first beta 

115 

116 # first 24 betas point one direction (reference run is run 0 and points up) 

117 betas_up = test_betas[:24, -1] 

118 # other 25 betas point down 

119 betas_down = test_betas[24:, -1] 

120 

121 # check that first beta has the same sign as others bending up 

122 is_pos_up = betas_up[0, 0] > 0 

123 for b in betas_up: 

124 self.assertEqual(is_pos_up, b[0] > 0) 

125 

126 # check that 25th betas has same sign as other bending down 

127 is_pos_down = betas_down[0, 0] > 0 

128 for b in betas_down: 

129 self.assertEqual(is_pos_down, b[0] > 0) 

130 

131 # verify that one group has negative and other group positive direction 

132 self.assertFalse(is_pos_down and is_pos_up) 

133 

134 test_run.clustering_results() 

135 

136 # check if glob pattern works correctly 

137 DimredRun( 

138 simulation_runs=os.path.join(tmpdir, "SVDTestPlot*/plot"), 

139 start_stage=DIMRED_STAGES[0], 

140 end_stage=DIMRED_STAGES[0], 

141 project_dir="test/DimredRunTest", 

142 console=None, 

143 ) 

144 

145 def test_for_errors(self): 

146 """Verifies correct error behaviour when facing incorrect parser arguments""" 

147 

148 with tempfile.TemporaryDirectory() as tmpdir: 

149 # collect all simulation runs 

150 sim_files = os.listdir(tmpdir) 

151 sim_runs = [] 

152 for sim in sim_files: 

153 sim_runs.append(os.path.join(tmpdir, sim, "plot")) 

154 

155 # check invalid start_stage 

156 self.assertRaises( 

157 DimredRunError, 

158 DimredRun, 

159 reference_run="test/dimredTestPlots/SVDTestPlot0/plot", 

160 simulation_runs=sim_runs, 

161 start_stage="INVALID_START", 

162 end_stage=DIMRED_STAGES[-1], 

163 console=None, 

164 project_dir="test/DimredRunTest", 

165 n_processes=5, 

166 ) 

167 

168 # check invalid end_stage 

169 self.assertRaises( 

170 DimredRunError, 

171 DimredRun, 

172 reference_run="test/dimredTestPlots/SVDTestPlot0/plot", 

173 simulation_runs=sim_runs, 

174 start_stage=DIMRED_STAGES[0], 

175 end_stage="INVALID_END", 

176 console=None, 

177 project_dir="test/DimredRunTest", 

178 n_processes=5, 

179 ) 

180 

181 # check invalid start_stage after end_stage 

182 self.assertRaises( 

183 DimredRunError, 

184 DimredRun, 

185 reference_run="test/dimredTestPlots/SVDTestPlot0/plot", 

186 simulation_runs=sim_runs, 

187 start_stage=DIMRED_STAGES[-1], 

188 end_stage=DIMRED_STAGES[0], 

189 console=None, 

190 project_dir="test/DimredRunTest", 

191 n_processes=5, 

192 ) 

193 

194 # check invalid simulation runs 

195 self.assertRaises( 

196 DimredRunError, 

197 DimredRun, 

198 simulation_runs="test/dimredTestPlots200/plot", 

199 start_stage=DIMRED_STAGES[0], 

200 end_stage=DIMRED_STAGES[-1], 

201 console=None, 

202 project_dir="test/DimredRunTest", 

203 n_processes=5, 

204 ) 

205 

206 # check invalid cluster_args 

207 self.assertRaises( 

208 DimredRunError, 

209 DimredRun, 

210 simulation_runs=sim_runs, 

211 start_stage=DIMRED_STAGES[0], 

212 end_stage=DIMRED_STAGES[-1], 

213 console=None, 

214 project_dir="test/DimredRunTest", 

215 n_processes=5, 

216 cluster_args=["noMeans"], 

217 ) 

218 

219 # check invalid outlier-args 

220 self.assertRaises( 

221 DimredRunError, 

222 DimredRun, 

223 simulation_runs=sim_runs, 

224 start_stage=DIMRED_STAGES[0], 

225 end_stage=DIMRED_STAGES[-1], 

226 console=None, 

227 project_dir="test/DimredRunTest", 

228 n_processes=5, 

229 cluster_args=["kmeans"], 

230 outlier_args=["DoesNotExist"], 

231 ) 

232 

233 # check inexistent reference run 

234 self.assertRaises( 

235 DimredRunError, 

236 DimredRun, 

237 reference_run=os.path.join(tmpdir, "IDontExist"), 

238 simulation_runs=sim_runs, 

239 start_stage=DIMRED_STAGES[0], 

240 end_stage=DIMRED_STAGES[-1], 

241 console=None, 

242 project_dir="test/DimredRunTest", 

243 n_processes=5, 

244 ) 

245 # check for empty simulation runs 

246 self.assertRaises( 

247 DimredRunError, 

248 DimredRun, 

249 simulation_runs="", 

250 start_stage=DIMRED_STAGES[0], 

251 end_stage=DIMRED_STAGES[-1], 

252 console=None, 

253 project_dir="test/DimredRunTest", 

254 n_processes=5, 

255 ) 

256 

257 def tearDown(self): 

258 # cleanup of created files 

259 test_files = os.listdir("test/DimredRunTest") 

260 test_files.pop(test_files.index("verificationFile.hdf5")) 

261 for entry in test_files: 

262 os.remove(os.path.join("test/DimredRunTest", entry))