Coverage for lasso/dimred/hashing_sphere.py: 0%
85 statements
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:45 +0100
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:45 +0100
1import os
2import typing
3import warnings
5import h5py
6import numpy as np
8# scipy is C-code which causes invalid linter warning about ConvexHull not
9# being around.
10# pylint: disable = no-name-in-module
11from scipy.spatial import ConvexHull
12from scipy.stats import binned_statistic_2d
13from sklearn.preprocessing import normalize
15warnings.simplefilter(action="ignore", category=FutureWarning)
18def _create_sphere_mesh(diameter: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray]:
19 """Compute the alpha and beta increments for a
20 meshed sphere for binning the projected values
22 Parameters
23 ----------
24 diameter : np.ndarray
25 sphere diameter
27 Returns
28 -------
29 bin_alpha : np.ndarray
30 alpha bin boundaries
31 bin_beta : np.ndarray
32 beta bin boundaries
33 """
35 assert diameter.dtype == float
37 # partition latitude
38 n_alpha = 145
40 # sphere radius
41 r = diameter / 2
43 # sphere area
44 a_sphere = 4 * np.pi * r**2
46 # number of elements
47 n_ele = 144**2
49 # area of one element
50 a_ele = a_sphere / n_ele
52 # bin values for alpha and the increment
53 bin_alpha, delt_alpha = np.linspace(0, 2 * np.pi, n_alpha, retstep=True)
55 # for beta axis binning
56 count = np.linspace(0.0, 144.0, 145)
57 # compute required bin boundaries to ensure area of each element is the same
58 tmp = count * a_ele
59 tmp /= r**2 * delt_alpha
60 bin_beta = 1 - tmp
62 # In case of trailing floats (-1.00000004 for example)
63 if bin_beta[-1] < -1:
64 bin_beta[-1] = -1
66 bin_beta = np.arccos(bin_beta)
68 return bin_alpha, bin_beta
71def _project_to_sphere(
72 points: np.ndarray, centroid: np.ndarray, axis: str = "Z"
73) -> typing.Tuple[np.ndarray, np.ndarray]:
74 """compute the projection vectors of centroid to each point in terms of spherical coordinates
76 Parameters
77 ----------
78 points : np.ndarray
79 hashes of first model
80 centroid : np.ndarray
81 hashes of first model
82 AXIS : str
83 global axis position
85 Returns
86 -------
87 proj_alpha : np.ndarray
88 alpha angles of all points
89 proj_beta : np.ndarray
90 beta angle of all points
91 """
92 # standard global axis
93 indexes = [0, 1, 2]
95 # correct the indexes based on user input
96 if axis == "Z":
97 indexes = [0, 1, 2] # z axis aligned with global z-axis
98 elif axis == "Y":
99 indexes = [0, 2, 1] # z axis aligned with global y-axis
100 elif axis == "X":
101 indexes = [2, 1, 0] # z axis aligned with global x-axis
103 # projection
104 vec = points - centroid
106 # normalize
107 vec = normalize(vec, axis=1, norm="l2")
109 # alpha based on sphere axis alignment
110 ang = np.arctan2(vec[:, indexes[1]], vec[:, indexes[0]])
112 # atan2 returns neg angles for values greater than 180
113 neg_indexes = np.where(ang < 0)
114 ang[neg_indexes] += 2 * np.pi
116 proj_alpha = ang
117 proj_beta = np.arccos(vec[:, indexes[2]])
119 return proj_alpha, proj_beta
122def sphere_hashing(
123 bin_numbers: np.ndarray, bin_counts: np.ndarray, field: np.ndarray
124) -> np.ndarray:
125 """Compute average field values for all the binned values
127 Parameters
128 ----------
129 bin_numbers : np.ndarray
130 bin numbers for the respective index for the x and y-axis
131 bin_counts : np.ndarray
132 number of points that fall into each bin
133 field : np.ndarray
134 a fields value (p_strain,velocity etc..)
136 Returns
137 -------
138 binned_field : np.ndarray
139 the averaged field values for each field
140 """
141 # bin_numbers holds the bin_number for its respective index and must have
142 # same length as the number of points
143 assert len(bin_numbers[0] == len(field))
144 # check data types
145 assert bin_numbers.dtype == int
146 assert bin_counts.dtype == float
148 n_rows = bin_counts.shape[0]
149 n_cols = bin_counts.shape[1]
151 # bin x and y indexes for each point in field
152 binx = np.asarray(bin_numbers[0]) - 1
153 biny = np.asarray(bin_numbers[1]) - 1
155 # bincout for averaging
156 bin_count = np.zeros((n_rows, n_cols))
158 # averaged result to return
159 binned_field = np.zeros((n_rows, n_cols))
161 # bin the field values
162 binned_field[binx[:], biny[:]] += field[:]
163 # count
164 bin_count[binx[:], biny[:]] += 1
166 binned_field = binned_field.flatten()
167 bin_count = bin_count.flatten()
169 # exclude all zero entries
170 nonzero_inds = np.where(bin_count != 0)
171 # average the fields
172 binned_field[nonzero_inds] /= bin_count[nonzero_inds]
174 return binned_field
177def compute_hashes(
178 source_path: str, target_path: str = None, n_files: int = None, ret_vals: bool = False
179):
180 """Compute the hashes using spherical projection of the field values
182 Parameters
183 ----------
184 source_path : str
185 path to source directory from which the displacements/strains are
186 loaded, this directory should contain HDF5 files of the data
187 target_path : str (optional)
188 directory in which the hashes are to be written to
189 n_files : int (optional)
190 number of files to process, useful for verification and quick visualization
191 ret_vals : bool (optional)
192 return the hashes, setting this to true, be aware that the hash list can
193 take up a lot of ram
195 Returns
196 -------
197 hashes : np.ndarray
198 hashed field values
200 Notes
201 -----
202 Key for node_displacements for all timesteps: 'xyz'
203 Key for field values for all timesteps: 'fields'
204 """
206 # pylint: disable = too-many-locals
208 node_displacement_key = "xyz"
209 fields_key = "fields"
210 file_name = "run_"
211 counter = 0
213 hashed_data = []
214 # if n_files is none get total number of files in directory
215 if n_files is None:
216 n_files = len(os.listdir(source_path))
218 # load the displacements and compute the hashes for each run and consider
219 # the last time step only
220 for ii in range(n_files):
221 with h5py.File(source_path + file_name + str(ii) + ".h5", "r") as hf:
222 node_displacements = hf[node_displacement_key]
223 fields = hf[fields_key]
225 xyz = node_displacements[:, 0, :]
227 # centorid of point cloud
228 centroid = np.mean(xyz, axis=0)
230 # convex hull of point cloud
231 hull = ConvexHull(xyz)
232 dist = np.linalg.norm(hull.max_bound - hull.min_bound)
234 # compute the bin intervals for alpha and beta split into 144 elements
235 bins_a, bins_b = _create_sphere_mesh(dist)
237 # compute the point projections
238 proj_alpha, proj_beta = _project_to_sphere(xyz, centroid, axis="Y")
240 # bin the spherical coordinates in terms of alpha and beta
241 histo = binned_statistic_2d(
242 proj_alpha, proj_beta, None, "count", bins=[bins_a, bins_b], expand_binnumbers=True
243 )
244 # get the field value
245 p_strains = fields[:, -1]
247 # compute hashes
248 hashes = sphere_hashing(histo.binnumber, histo.statistic, p_strains)
250 if target_path:
251 # write the hashes for each timestep to file
252 with h5py.File(target_path + "hashes_sphere_" + str(counter) + ".h5", "w") as hf:
253 hf.create_dataset("hashes", data=hashes)
255 if ret_vals:
256 hashed_data.append(hashes)
258 return np.asarray(hashed_data)