Coverage for lasso/dimred/hashing_sphere.py: 0%

85 statements  

« prev     ^ index     » next       coverage.py v7.2.4, created at 2023-04-28 18:45 +0100

1import os 

2import typing 

3import warnings 

4 

5import h5py 

6import numpy as np 

7 

8# scipy is C-code which causes invalid linter warning about ConvexHull not 

9# being around. 

10# pylint: disable = no-name-in-module 

11from scipy.spatial import ConvexHull 

12from scipy.stats import binned_statistic_2d 

13from sklearn.preprocessing import normalize 

14 

15warnings.simplefilter(action="ignore", category=FutureWarning) 

16 

17 

18def _create_sphere_mesh(diameter: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray]: 

19 """Compute the alpha and beta increments for a 

20 meshed sphere for binning the projected values 

21 

22 Parameters 

23 ---------- 

24 diameter : np.ndarray 

25 sphere diameter 

26 

27 Returns 

28 ------- 

29 bin_alpha : np.ndarray 

30 alpha bin boundaries 

31 bin_beta : np.ndarray 

32 beta bin boundaries 

33 """ 

34 

35 assert diameter.dtype == float 

36 

37 # partition latitude 

38 n_alpha = 145 

39 

40 # sphere radius 

41 r = diameter / 2 

42 

43 # sphere area 

44 a_sphere = 4 * np.pi * r**2 

45 

46 # number of elements 

47 n_ele = 144**2 

48 

49 # area of one element 

50 a_ele = a_sphere / n_ele 

51 

52 # bin values for alpha and the increment 

53 bin_alpha, delt_alpha = np.linspace(0, 2 * np.pi, n_alpha, retstep=True) 

54 

55 # for beta axis binning 

56 count = np.linspace(0.0, 144.0, 145) 

57 # compute required bin boundaries to ensure area of each element is the same 

58 tmp = count * a_ele 

59 tmp /= r**2 * delt_alpha 

60 bin_beta = 1 - tmp 

61 

62 # In case of trailing floats (-1.00000004 for example) 

63 if bin_beta[-1] < -1: 

64 bin_beta[-1] = -1 

65 

66 bin_beta = np.arccos(bin_beta) 

67 

68 return bin_alpha, bin_beta 

69 

70 

71def _project_to_sphere( 

72 points: np.ndarray, centroid: np.ndarray, axis: str = "Z" 

73) -> typing.Tuple[np.ndarray, np.ndarray]: 

74 """compute the projection vectors of centroid to each point in terms of spherical coordinates 

75 

76 Parameters 

77 ---------- 

78 points : np.ndarray 

79 hashes of first model 

80 centroid : np.ndarray 

81 hashes of first model 

82 AXIS : str 

83 global axis position 

84 

85 Returns 

86 ------- 

87 proj_alpha : np.ndarray 

88 alpha angles of all points 

89 proj_beta : np.ndarray 

90 beta angle of all points 

91 """ 

92 # standard global axis 

93 indexes = [0, 1, 2] 

94 

95 # correct the indexes based on user input 

96 if axis == "Z": 

97 indexes = [0, 1, 2] # z axis aligned with global z-axis 

98 elif axis == "Y": 

99 indexes = [0, 2, 1] # z axis aligned with global y-axis 

100 elif axis == "X": 

101 indexes = [2, 1, 0] # z axis aligned with global x-axis 

102 

103 # projection 

104 vec = points - centroid 

105 

106 # normalize 

107 vec = normalize(vec, axis=1, norm="l2") 

108 

109 # alpha based on sphere axis alignment 

110 ang = np.arctan2(vec[:, indexes[1]], vec[:, indexes[0]]) 

111 

112 # atan2 returns neg angles for values greater than 180 

113 neg_indexes = np.where(ang < 0) 

114 ang[neg_indexes] += 2 * np.pi 

115 

116 proj_alpha = ang 

117 proj_beta = np.arccos(vec[:, indexes[2]]) 

118 

119 return proj_alpha, proj_beta 

120 

121 

122def sphere_hashing( 

123 bin_numbers: np.ndarray, bin_counts: np.ndarray, field: np.ndarray 

124) -> np.ndarray: 

125 """Compute average field values for all the binned values 

126 

127 Parameters 

128 ---------- 

129 bin_numbers : np.ndarray 

130 bin numbers for the respective index for the x and y-axis 

131 bin_counts : np.ndarray 

132 number of points that fall into each bin 

133 field : np.ndarray 

134 a fields value (p_strain,velocity etc..) 

135 

136 Returns 

137 ------- 

138 binned_field : np.ndarray 

139 the averaged field values for each field 

140 """ 

141 # bin_numbers holds the bin_number for its respective index and must have 

142 # same length as the number of points 

143 assert len(bin_numbers[0] == len(field)) 

144 # check data types 

145 assert bin_numbers.dtype == int 

146 assert bin_counts.dtype == float 

147 

148 n_rows = bin_counts.shape[0] 

149 n_cols = bin_counts.shape[1] 

150 

151 # bin x and y indexes for each point in field 

152 binx = np.asarray(bin_numbers[0]) - 1 

153 biny = np.asarray(bin_numbers[1]) - 1 

154 

155 # bincout for averaging 

156 bin_count = np.zeros((n_rows, n_cols)) 

157 

158 # averaged result to return 

159 binned_field = np.zeros((n_rows, n_cols)) 

160 

161 # bin the field values 

162 binned_field[binx[:], biny[:]] += field[:] 

163 # count 

164 bin_count[binx[:], biny[:]] += 1 

165 

166 binned_field = binned_field.flatten() 

167 bin_count = bin_count.flatten() 

168 

169 # exclude all zero entries 

170 nonzero_inds = np.where(bin_count != 0) 

171 # average the fields 

172 binned_field[nonzero_inds] /= bin_count[nonzero_inds] 

173 

174 return binned_field 

175 

176 

177def compute_hashes( 

178 source_path: str, target_path: str = None, n_files: int = None, ret_vals: bool = False 

179): 

180 """Compute the hashes using spherical projection of the field values 

181 

182 Parameters 

183 ---------- 

184 source_path : str 

185 path to source directory from which the displacements/strains are 

186 loaded, this directory should contain HDF5 files of the data 

187 target_path : str (optional) 

188 directory in which the hashes are to be written to 

189 n_files : int (optional) 

190 number of files to process, useful for verification and quick visualization 

191 ret_vals : bool (optional) 

192 return the hashes, setting this to true, be aware that the hash list can 

193 take up a lot of ram 

194 

195 Returns 

196 ------- 

197 hashes : np.ndarray 

198 hashed field values 

199 

200 Notes 

201 ----- 

202 Key for node_displacements for all timesteps: 'xyz' 

203 Key for field values for all timesteps: 'fields' 

204 """ 

205 

206 # pylint: disable = too-many-locals 

207 

208 node_displacement_key = "xyz" 

209 fields_key = "fields" 

210 file_name = "run_" 

211 counter = 0 

212 

213 hashed_data = [] 

214 # if n_files is none get total number of files in directory 

215 if n_files is None: 

216 n_files = len(os.listdir(source_path)) 

217 

218 # load the displacements and compute the hashes for each run and consider 

219 # the last time step only 

220 for ii in range(n_files): 

221 with h5py.File(source_path + file_name + str(ii) + ".h5", "r") as hf: 

222 node_displacements = hf[node_displacement_key] 

223 fields = hf[fields_key] 

224 

225 xyz = node_displacements[:, 0, :] 

226 

227 # centorid of point cloud 

228 centroid = np.mean(xyz, axis=0) 

229 

230 # convex hull of point cloud 

231 hull = ConvexHull(xyz) 

232 dist = np.linalg.norm(hull.max_bound - hull.min_bound) 

233 

234 # compute the bin intervals for alpha and beta split into 144 elements 

235 bins_a, bins_b = _create_sphere_mesh(dist) 

236 

237 # compute the point projections 

238 proj_alpha, proj_beta = _project_to_sphere(xyz, centroid, axis="Y") 

239 

240 # bin the spherical coordinates in terms of alpha and beta 

241 histo = binned_statistic_2d( 

242 proj_alpha, proj_beta, None, "count", bins=[bins_a, bins_b], expand_binnumbers=True 

243 ) 

244 # get the field value 

245 p_strains = fields[:, -1] 

246 

247 # compute hashes 

248 hashes = sphere_hashing(histo.binnumber, histo.statistic, p_strains) 

249 

250 if target_path: 

251 # write the hashes for each timestep to file 

252 with h5py.File(target_path + "hashes_sphere_" + str(counter) + ".h5", "w") as hf: 

253 hf.create_dataset("hashes", data=hashes) 

254 

255 if ret_vals: 

256 hashed_data.append(hashes) 

257 

258 return np.asarray(hashed_data)