Coverage for lasso/femzip/femzip_api.py: 35%
417 statements
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:42 +0100
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:42 +0100
1import logging
2import os
3import re
4import stat
5import sys
6import time
7from ctypes import (
8 CDLL,
9 POINTER,
10 Structure,
11 byref,
12 c_char_p,
13 c_float,
14 c_int,
15 c_int32,
16 c_int64,
17 c_uint64,
18 sizeof,
19)
20from typing import Any, Dict, List, Set, Tuple, Union
22import numpy as np
24from .fz_config import FemzipArrayType, FemzipVariableCategory, get_last_int_of_line
26# During next refactoring we should take a look at reducing the file size.
27# pylint: disable = too-many-lines
29# The c-structs python wrappers set variables outside of the init method which
30# is okay.
31# pylint: disable = attribute-defined-outside-init
34class FemzipException(Exception):
35 """Custom exception specifically for anything going wrong in femzip"""
38class FemzipError(Structure):
39 """Struct representing femzip errors in c-code
41 Attributes
42 ----------
43 ier: c_int32
44 Error code
45 msg: c_char_p
46 Error message
47 """
49 _fields_ = [
50 ("ier", c_int32),
51 ("msg", c_char_p),
52 ]
55class VariableInfo(Structure):
56 """Struct for details about a single femzip variable
58 Attributes
59 ----------
60 var_index: c_int32
61 Index of the variable
62 name: c_char_p
63 Name from femzip
64 var_type: c_int32
65 Variable type. See FemzipVariableCategory for translation.
66 var_size: c_int32
67 Array size of the field variable.
68 """
70 _fields_ = [
71 ("var_index", c_int32),
72 ("name", c_char_p),
73 ("var_type", c_int32),
74 ("var_size", c_int32),
75 ]
78class FemzipFileMetadata(Structure):
79 """This struct contains metadata about femzip files.
81 Attributes
82 ----------
83 version_zip: c_float
84 activity_flag: c_int32
85 number_of_variables: c_int32
86 number_of_nodes: c_int32
87 number_of_solid_elements: c_int32
88 number_of_thick_shell_elements: c_int32
89 number_of_1D_elements: c_int32
90 number_of_tool_elements: c_int32
91 number_of_shell_elements: c_int32
92 number_of_solid_element_neighbors: c_int32
93 number_of_rbe_element_neighbors: c_int32
94 number_of_bar_elements: c_int32
95 number_of_beam_elements: c_int32
96 number_of_plotel_elements: c_int32
97 number_of_spring_elements: c_int32
98 number_of_damper_elements: c_int32
99 number_of_joint_elements: c_int32
100 number_of_joint_element_neighbors: c_int32
101 number_of_bar_element_neighbors: c_int32
102 number_of_beamcross_elements: c_int32
103 number_of_spotweld_elements: c_int32
104 number_of_rbe_elements: c_int32
105 number_of_hexa20_elements: c_int32
106 number_of_rigid_shell_elements: c_int32
107 number_of_timesteps: c_int32
108 variable_infos: POINTER(VariableInfo)
109 """
111 _fields_ = [
112 ("version_zip", c_float),
113 ("activity_flag", c_int32),
114 ("number_of_variables", c_int32),
115 ("number_of_nodes", c_int32),
116 ("number_of_solid_elements", c_int32),
117 ("number_of_thick_shell_elements", c_int32),
118 ("number_of_1D_elements", c_int32),
119 ("number_of_tool_elements", c_int32),
120 ("number_of_shell_elements", c_int32),
121 ("number_of_solid_element_neighbors", c_int32),
122 ("number_of_rbe_element_neighbors", c_int32),
123 ("number_of_bar_elements", c_int32),
124 ("number_of_beam_elements", c_int32),
125 ("number_of_plotel_elements", c_int32), # NOTE typo?
126 ("number_of_spring_elements", c_int32),
127 ("number_of_damper_elements", c_int32),
128 ("number_of_joint_elements", c_int32),
129 ("number_of_joint_element_neighbors", c_int32),
130 ("number_of_bar_element_neighbors", c_int32),
131 ("number_of_beamcross_elements", c_int32),
132 ("number_of_spotweld_elements", c_int32),
133 ("number_of_rbe_elements", c_int32),
134 ("number_of_hexa20_elements", c_int32),
135 ("number_of_rigid_shell_elements", c_int32),
136 ("number_of_timesteps", c_int32),
137 ("variable_infos", POINTER(VariableInfo)),
138 ]
141class FemzipBufferInfo(Structure):
142 """This struct describes necessary buffer sizes for reading the file
144 Attributes
145 ----------
146 n_timesteps: c_uint64
147 Number of timesteps
148 timesteps: POINTER(c_float)
149 Time for each timestep
150 size_geometry: c_uint64
151 Size of the geometry buffer
152 size_state: c_uint64
153 Size of the state buffer
154 size_displacement: c_uint64
155 Size for displacement array
156 size_activity: c_uint64
157 Size for activity array (deletion stuff)
158 size_post: c_uint64
159 Size of the post region of which I currently don't know anymore what it
160 was.
161 size_titles: c_uint64
162 Size of the titles region behind the geomtry.
163 """
165 _fields_ = [
166 ("n_timesteps", c_uint64),
167 ("timesteps", POINTER(c_float)),
168 ("size_geometry", c_uint64),
169 ("size_state", c_uint64),
170 ("size_displacement", c_uint64),
171 ("size_activity", c_uint64),
172 ("size_post", c_uint64),
173 ("size_titles", c_uint64),
174 ]
177class FemzipAPIStatus(Structure):
178 """This struct summarizes the state of the femzip API library. The library
179 has a shared, global state which is stored in static variables. The state
180 of the gloval vars is tracked by this struct.
182 Attributes
183 ----------
184 is_file_open: c_int32
185 Whether a femzip file is opened and being processed.
186 is_geometry_read: c_int32
187 Whether the geometry was already read.
188 is_states_open: c_int32
189 Whether processing of the states was started.
190 i_timestep_state: c_int32
191 Counter of timestep processing.
192 i_timestep_activity: c_int32
193 Counter of activity data for timesteps.
194 """
196 _fields_ = [
197 ("is_file_open", c_int32),
198 ("is_geometry_read", c_int32),
199 ("is_states_open", c_int32),
200 ("i_timestep_state", c_int32),
201 ("i_timestep_activity", c_int32),
202 ]
205class FemzipAPI:
206 """FemzipAPI contains wrapper functions around the femzip library."""
208 _api: Union[None, CDLL] = None
210 @staticmethod
211 def load_dynamic_library(path: str) -> CDLL:
212 """Load a library and check for correct execution
214 Parameters
215 ----------
216 path: str
217 path to the library
219 Returns
220 -------
221 library: CDLL
222 loaded library
223 """
225 # check executable rights
226 if not os.access(path, os.X_OK) or not os.access(path, os.R_OK):
227 os.chmod(path, os.stat(path).st_mode | stat.S_IEXEC | stat.S_IREAD)
228 if not os.access(path, os.X_OK) or not os.access(path, os.R_OK):
229 err_msg = "Library '{0}' is not executable and couldn't change execution rights."
230 raise RuntimeError(err_msg.format(path))
232 return CDLL(path)
234 @property
235 def api(self) -> CDLL:
236 """Returns the loaded, shared object library of the native interface
238 Returns
239 -------
240 shared_object_lib: CDLL
241 Loaded shared object library.
242 """
244 # pylint: disable = too-many-statements
246 if self._api is None:
248 bin_dirpath = (
249 os.path.abspath(os.path.dirname(sys.executable))
250 if hasattr(sys, "frozen")
251 else os.path.dirname(os.path.abspath(__file__))
252 )
254 # Flexlm Settings
255 # prevent flexlm gui to pop up
256 os.environ["FLEXLM_BATCH"] = "1"
257 # set a low timeout from originally 10 seconds
258 if "FLEXLM_TIMEOUT" not in os.environ:
259 os.environ["FLEXLM_TIMEOUT"] = "200000"
261 # windows
262 if "win32" in sys.platform:
264 shared_lib_name = "api_extended.dll"
265 self.load_dynamic_library(os.path.join(bin_dirpath, "libmmd.dll"))
266 self.load_dynamic_library(os.path.join(bin_dirpath, "libifcoremd.dll"))
267 self.load_dynamic_library(os.path.join(bin_dirpath, "libifportmd.dll"))
268 self.load_dynamic_library(os.path.join(bin_dirpath, "libiomp5md.dll"))
269 self.load_dynamic_library(
270 os.path.join(bin_dirpath, "femzip_a_dyna_sidact_generic.dll")
271 )
272 self.load_dynamic_library(
273 os.path.join(bin_dirpath, "libfemzip_post_licgenerator_ext_flexlm.dll")
274 )
275 # linux hopefully
276 else:
277 shared_lib_name = "api_extended.so"
278 self.load_dynamic_library(os.path.join(bin_dirpath, "libiomp5.so"))
279 self.load_dynamic_library(os.path.join(bin_dirpath, "libintlc.so.5"))
280 self.load_dynamic_library(os.path.join(bin_dirpath, "libirng.so"))
281 self.load_dynamic_library(os.path.join(bin_dirpath, "libimf.so"))
282 self.load_dynamic_library(os.path.join(bin_dirpath, "libsvml.so"))
283 self.load_dynamic_library(
284 os.path.join(bin_dirpath, "libfemzip_a_dyna_sidact_generic.so")
285 )
286 self.load_dynamic_library(
287 os.path.join(bin_dirpath, "libfemzip_post_licgenerator_ext_flexlm.so")
288 )
290 filepath = os.path.join(bin_dirpath, shared_lib_name)
291 self._api = self.load_dynamic_library(filepath)
293 # license check
294 self._api.has_femunziplib_license.restype = c_int
296 # file check
297 self._api.is_sidact_file.argtypes = (c_char_p,)
298 self._api.is_sidact_file.restype = c_int
300 # content infos
301 self._api.get_file_metadata.argtypes = (c_char_p, POINTER(FemzipFileMetadata))
302 self._api.get_file_metadata.restype = FemzipError
304 # free
305 self._api.free_variable_array.argtypes = (POINTER(FemzipFileMetadata),)
306 self._api.free_variable_array.restype = c_int32
308 # get buffer dims
309 self._api.get_buffer_info.argtypes = (c_char_p, POINTER(FemzipBufferInfo))
310 self._api.get_buffer_info.restype = FemzipError
312 # read geom
313 self._api.read_geometry.argtypes = (
314 c_char_p,
315 POINTER(FemzipBufferInfo),
316 POINTER(c_int32),
317 c_int32,
318 )
319 self._api.read_geometry.restype = FemzipError
321 # read var
322 self._api.read_variables.argtypes = (
323 POINTER(c_float),
324 c_int,
325 c_int,
326 POINTER(FemzipFileMetadata),
327 )
328 self._api.read_variables.restype = FemzipError
330 # femunzip version
331 self._api.is_femunzip_version_ok.argtypes = (c_char_p, POINTER(c_int))
332 self._api.is_femunzip_version_ok.restype = FemzipError
334 # femzip status
335 self._api.get_femzip_status.argtypes = tuple()
336 self._api.get_femzip_status.restype = FemzipAPIStatus
338 # get part titles
339 self._api.get_part_titles.argtypes = (c_char_p, POINTER(c_int32), c_int32)
340 self._api.get_part_titles.restype = FemzipError
342 # finish reading states
343 self._api.finish_reading_states.argtypes = (POINTER(c_int32), c_int64)
344 self._api.finish_reading_states.restype = FemzipError
346 # close file
347 self._api.close_current_file.argtypes = tuple()
348 self._api.close_current_file.restype = FemzipError
350 # read single state
351 self._api.read_single_state.argtypes = (c_int32, c_int32, POINTER(c_float), c_int64)
352 self._api.read_single_state.restype = FemzipError
354 # read state activity
355 self._api.read_activity.argtypes = (c_int32, c_int32, POINTER(c_float))
356 self._api.read_activity.restype = FemzipError
358 # free buffer info
359 self._api.free_buffer_info.argtypes = (POINTER(FemzipBufferInfo),)
360 self._api.free_buffer_info.restype = c_int32
362 return self._api
364 @staticmethod
365 def _parse_state_filter(state_filter: Union[Set[int], None], n_timesteps: int) -> Set[int]:
367 # convert negative indexes
368 state_filter_parsed = (
369 {entry if entry >= 0 else entry + n_timesteps for entry in state_filter}
370 if state_filter is not None
371 else set(range(n_timesteps))
372 )
374 # filter invalid indexes
375 state_filter_valid = {entry for entry in state_filter_parsed if 0 <= entry < n_timesteps}
377 return state_filter_valid
379 @staticmethod
380 def _check_femzip_error(err: FemzipError) -> None:
381 """Checks a femzip error coming from C (usually)
383 Parameters
384 ----------
385 err: FemzipError
386 c struct error
388 Raises
389 ------
390 FemzipException
391 If the error flag is set with the corresponding
392 error message.
393 """
394 if err.ier != 0:
395 fz_error_msg = "Unknown"
396 try:
397 fz_error_msg = err.msg.decode("ascii")
398 except ValueError:
399 pass
401 err_msg = "Error Code '{0}': {1}"
402 raise FemzipException(err_msg.format(err.ier, fz_error_msg))
404 @staticmethod
405 def struct_to_dict(struct: Structure) -> Dict[str, Any]:
406 """Converts a ctypes struct into a dict
408 Parameters
409 ----------
410 struct: Structure
412 Returns
413 -------
414 fields: Dict[str, Any]
415 struct as dict
417 Examples
418 --------
419 >>> api.struct_to_dict(api.get_femzip_status())
420 {'is_file_open': 1, 'is_geometry_read': 1, 'is_states_open': 0,
421 'i_timestep_state': -1, 'i_timestep_activity': -1}
422 """
423 # We access some internal members to do some magic.
424 # pylint: disable = protected-access
425 return {field_name: getattr(struct, field_name) for field_name, _ in struct._fields_}
427 @staticmethod
428 def copy_struct(src: Structure, dest: Structure):
429 """Copies all fields from src struct to dest
431 Parameters
432 ----------
433 src: Structure
434 src struct
435 src: Structure
436 destination struct
438 Examples
439 --------
440 >>> err1 = FemzipError()
441 >>> err1.ier = -1
442 >>> err1.msg = b"Oops"
443 >>> err2 = FemzipError()
444 >>> api.copy_struct(err1, err2)
445 >>> err2.ier
446 -1
447 >>> err2.msg
448 b'Oops'
449 """
450 # We access some internal members to do some magic.
451 # pylint: disable = protected-access
452 assert src._fields_ == dest._fields_
454 for field_name, _ in src._fields_:
455 setattr(dest, field_name, getattr(src, field_name))
457 def get_part_titles(
458 self, filepath: str, buffer_info: Union[None, FemzipBufferInfo] = None
459 ) -> memoryview:
460 """Get the part title section
462 Parameters
463 ----------
464 filepath: str
465 path to femzip file
466 buffer_info: Union[None, FemzipBufferInfo]
467 buffer info if previously fetched
469 Returns
470 -------
471 mview: memoryview
472 memory of the part title section
473 """
475 # find out how much memory to allocate
476 buffer_info_parsed = self.get_buffer_info(filepath) if buffer_info is None else buffer_info
478 # allocate memory
479 # pylint: disable = invalid-name
480 BufferType = c_int32 * (buffer_info_parsed.size_titles)
481 buffer = BufferType()
483 # do the thing
484 err = self.api.get_part_titles(
485 filepath.encode("utf-8"),
486 buffer,
487 buffer_info_parsed.size_titles,
488 )
489 self._check_femzip_error(err)
491 return memoryview(buffer).cast("B")
493 def read_state_deletion_info(
494 self, buffer_info: FemzipBufferInfo, state_filter: Union[Set[int], None] = None
495 ) -> np.ndarray:
496 """Get information which elements are alive
498 Parameters
499 ----------
500 buffer_info: FemzipBufferInfo
501 infos about buffer sizes
502 state_filter: Union[Set[int], None]
503 usable to read only specific states
505 Notes
506 -----
507 The `buffer` must have the size of at least
508 `buffer_info.size_activity`.
510 Examples
511 --------
512 >>> # get info about required memory
513 >>> buffer_info = api.get_buffer_info(filepath)
515 >>> # first read geometry and leave file open!
516 >>> mview_geom = api.read_geometry(filepath, buffer_info, False)
518 >>> # now read deletion info
519 >>> array_deletion = api.read_state_activity(buffer_info)
521 >>> # close file
522 >>> api.close_current_file()
523 """
525 logging.debug("FemzipAPI.read_state_deletion_info start")
527 # filter timesteps
528 state_filter_valid = self._parse_state_filter(state_filter, buffer_info.n_timesteps)
529 logging.debug("state filter: %s", state_filter_valid)
531 # allocate memory
532 # pylint: disable = invalid-name
533 StateBufferType = c_float * buffer_info.size_activity
534 BufferType = c_float * (buffer_info.size_activity * len(state_filter_valid))
535 buffer_c = BufferType()
537 # major looping
538 n_timesteps_read = 0
539 for i_timestep in range(buffer_info.n_timesteps):
540 logging.debug("i_timestep %d", i_timestep)
542 # walk forward in buffer
543 state_buffer_ptr = StateBufferType.from_buffer(
544 buffer_c, sizeof(c_float) * buffer_info.size_activity * n_timesteps_read
545 )
547 # do the thing
548 err = self.api.read_activity(i_timestep, buffer_info.size_activity, state_buffer_ptr)
549 self._check_femzip_error(err)
551 # increment buffer ptr if we needed this one
552 if i_timestep in state_filter_valid:
553 logging.debug("saved")
554 n_timesteps_read += 1
555 state_filter_valid.remove(i_timestep)
557 # we processe what we need
558 if not state_filter_valid:
559 break
561 # convert buffer into array
562 array = np.frombuffer(buffer_c, dtype=np.float32).reshape(
563 (n_timesteps_read, buffer_info.size_activity)
564 )
566 logging.debug("FemzipAPI.read_state_deletion_info end")
568 return array
570 # return memoryview(buffer_c).cast('B')
572 def read_single_state(
573 self,
574 i_timestep: int,
575 buffer_info: FemzipBufferInfo,
576 state_buffer: Union[None, memoryview] = None,
577 ) -> memoryview:
578 """Read a single state
580 Parameters
581 ----------
582 i_timestep: int
583 timestep to be read
584 buffer_info: FemzipBufferInfo
585 infos about buffer sizes
586 state_buffer: Union[None, memoryview]
587 buffer in which the states are stored
589 Notes
590 -----
591 It is unclear to us why the state buffer needs to be given
592 in order to terminate state reading.
594 Examples
595 --------
596 >>> # get info about required memory
597 >>> buffer_info = api.get_buffer_info(filepath)
599 >>> # first read geometry and leave file open
600 >>> mview_geom = api.read_geometry(filepath, buffer_info, False)
602 >>> # now read a state
603 >>> mview_state = api.read_single_state(0, buffer_info=buffer_info)
605 >>> # close file
606 >>> api.close_current_file()
607 """
609 if state_buffer is not None and "f" not in state_buffer.format:
610 err_msg = "The state buffer must have a float format '<f' instead of '{0}'."
611 raise ValueError(err_msg.format(state_buffer.format))
613 # pylint: disable = invalid-name
614 StateBufferType = c_float * buffer_info.size_state
615 state_buffer_c = (
616 StateBufferType() if state_buffer is None else StateBufferType.from_buffer(state_buffer)
617 )
619 err = self.api.read_single_state(
620 i_timestep, buffer_info.n_timesteps, state_buffer_c, buffer_info.size_state
621 )
622 self._check_femzip_error(err)
624 return memoryview(state_buffer_c).cast("B")
626 def close_current_file(self) -> None:
627 """Closes the current file handle(use not recommended)
629 Notes
630 -----
631 Closes a currently opened file by the API. There
632 is no arg because femzip can process only one file
633 at a time.
634 This can also be used in case of bugs.
636 Examples
637 --------
638 >>> api.close_current_file()
639 """
640 err = self.api.close_current_file()
641 self._check_femzip_error(err)
643 def get_femzip_status(self) -> FemzipAPIStatus:
644 """Check the status of the femzip api
646 Returns
647 -------
648 femzip_status: FemzipAPIStatus
649 c struct with info about femzip API
651 Notes
652 -----
653 This reports whether a file is currently
654 opened and how far it was processed. This
655 internal state is used to avoid internal
656 conflicts and crashes, thus is useful for
657 debugging.
659 Examples
660 --------
661 >>> print(api.struct_to_dict(api.get_femzip_status()))
662 {'is_file_open': 0, 'is_geometry_read': 0, 'is_states_open': 0,
663 'i_timestep_state': -1, 'i_timestep_activity': -1}
664 """
665 return self.api.get_femzip_status()
667 def is_femunzip_version_ok(self, filepath: str) -> bool:
668 """Checks if the femunzip version can be handled
670 Parameters
671 ----------
672 filepath: str
673 path to the femzpi file
675 Returns
676 -------
677 version_ok: bool
679 Examples
680 --------
681 >>> api.is_femunzip_version_ok("path/to/d3plot.fz")
682 True
683 """
684 is_ok = c_int(-1)
685 err = self.api.is_femunzip_version_ok(filepath.encode("ascii"), byref(is_ok))
686 self._check_femzip_error(err)
687 return is_ok.value == 1
689 def has_femunziplib_license(self) -> bool:
690 """Checks whether the extended libraries are available
692 Returns
693 -------
694 has_license: bool
696 Examples
697 --------
698 >>> api.has_femunziplib_license()
699 False
700 """
701 start_time = time.time()
702 has_license = self.api.has_femunziplib_license() == 1
703 logging.debug("License check duration: %fs", (time.time() - start_time))
704 return has_license
706 def is_sidact_file(self, filepath: str) -> bool:
707 """Tests if a filepath points at a sidact file
709 Parameters
710 ----------
711 filepath: path to file
713 Returns
714 -------
715 is_sidact_file: bool
717 Examples
718 --------
719 >>> api.is_sidact_file("path/to/d3plot.fz")
720 True
721 >>> api.is_sidact_file("path/to/d3plot")
722 False
723 >>> api.is_sidact_file("path/to/non/existing/file")
724 False
725 """
726 return self.api.is_sidact_file(filepath.encode("ascii")) == 1
728 def get_buffer_info(self, filepath: str) -> FemzipBufferInfo:
729 """Get the dimensions of the buffers for femzip
731 Parameters
732 ----------
733 filepath: str
734 path to femzip file
736 Returns
737 -------
738 buffer_info: FemzipBufferInfo
739 c struct with infos about the memory required by femzip
741 Examples
742 --------
743 >>> # read memory demand info first
744 >>> buffer_info = api.get_buffer_info(filepath)
745 >>> # buffer info is a c struct, but we can print it
746 >>> api.struct_to_dict(buffer_info)
747 {'n_timesteps': 12,
748 'timesteps': <lasso.femzip.femzip_api.LP_c_float object at 0x0000028A8F6B21C0>,
749 'size_geometry': 537125, 'size_state': 1462902, 'size_displacement': 147716,
750 'size_activity': 47385, 'size_post': 1266356, 'size_titles': 1448}
751 >>> for i_timestep in range(buffer_info.n_timesteps):
752 >>> print(buffer_info.timesteps[i_timestep])
753 0.0
754 0.9998100399971008
755 1.9998900890350342
756 2.9999701976776123
757 3.9997801780700684
758 """
759 buffer_info = FemzipBufferInfo()
761 err = self.api.get_buffer_info(
762 filepath.encode("ascii"),
763 byref(buffer_info),
764 )
765 self._check_femzip_error(err)
767 # we need to copy the timesteps from C to Python
768 buffer_info_2 = FemzipBufferInfo()
770 # pylint: disable = invalid-name
771 TimestepsType = c_float * buffer_info.n_timesteps
772 timesteps_buffer = TimestepsType()
773 for i_timestep in range(buffer_info.n_timesteps):
774 timesteps_buffer[i_timestep] = buffer_info.timesteps[i_timestep]
775 buffer_info_2.timesteps = timesteps_buffer
777 self.copy_struct(buffer_info, buffer_info_2)
778 buffer_info_2.timesteps = timesteps_buffer
780 # free C controlled memory
781 self.api.free_buffer_info(byref(buffer_info))
783 return buffer_info_2
785 def read_geometry(
786 self,
787 filepath: str,
788 buffer_info: Union[FemzipBufferInfo, None] = None,
789 close_file: bool = True,
790 ) -> memoryview:
791 """Read the geometry buffer from femzip
793 Parameters
794 ----------
795 filepath: str
796 path to femzpi file
797 buffer_info: Union[FemzipBufferInfo, None]
798 struct with info regarding required memory for femzip
799 close_file: bool
800 it is useful to leave the file open if
801 states are processed right afterwards
803 Returns
804 -------
805 buffer: memoryview
806 memoryview of buffer
808 Notes
809 -----
810 If the file isn't closed appropriately bugs and crashes
811 might occur.
813 Examples
814 --------
815 >>> mview = api.read_geometry(filepath, buffer_info)
816 """
818 # find out how much memory to allocate
819 buffer_info = self.get_buffer_info(filepath) if buffer_info is None else buffer_info
821 # allocate memory
822 # pylint: disable = invalid-name
823 GeomBufferType = c_int * (buffer_info.size_geometry + buffer_info.size_titles)
824 buffer = GeomBufferType()
826 # read geometry
827 err = self.api.read_geometry(
828 filepath.encode("ascii"),
829 byref(buffer_info),
830 buffer,
831 c_int32(close_file),
832 )
834 self._check_femzip_error(err)
836 return memoryview(buffer).cast("B")
838 def read_states(
839 self,
840 filepath: str,
841 buffer_info: Union[FemzipBufferInfo, None] = None,
842 state_filter: Union[Set[int], None] = None,
843 ) -> np.ndarray:
844 """Reads all femzip state information
846 Parameters
847 ----------
848 filepath: str
849 path to femzip file
850 buffer_info: Union[FemzipBufferInfo, None]
851 struct with info regarding required memory for femzip
852 state_filter: Union[Set[int], None]
853 usable to load only specific states
855 Returns
856 -------
857 buffer: memoryview
858 buffer containing all state data
860 Examples
861 --------
862 >>> buffer_info = api.get_buffer_info("path/to/d3plot.fz")
863 >>> array_states = api.read_states("path/to/d3plot.fz", buffer_info)
864 """
866 buffer_info_parsed = self.get_buffer_info(filepath) if buffer_info is None else buffer_info
868 # filter invalid indexes
869 state_filter_valid = self._parse_state_filter(state_filter, buffer_info_parsed.n_timesteps)
871 n_states_to_allocate = (
872 buffer_info_parsed.n_timesteps if state_filter is None else len(state_filter_valid)
873 )
875 # allocate buffer
876 # pylint: disable = invalid-name
877 BufferType = c_float * (buffer_info_parsed.size_state * n_states_to_allocate)
878 buffer = BufferType()
880 n_timesteps_read = 0
881 for i_timestep in range(buffer_info_parsed.n_timesteps):
883 # forward pointer in buffer
884 buffer_state = buffer[buffer_info.size_state * n_timesteps_read]
886 # read state data
887 self.read_single_state(i_timestep, buffer_info_parsed, buffer_state)
889 if i_timestep in state_filter_valid:
890 n_timesteps_read += 1
891 state_filter_valid.remove(i_timestep)
893 if not state_filter_valid:
894 break
896 array = np.from_buffer(buffer, dtype=np.float32).reshape(
897 (n_timesteps_read, buffer_info_parsed.size_state)
898 )
900 return array
902 def get_file_metadata(self, filepath: str) -> FemzipFileMetadata:
903 """Get infos about the femzip variables in the file
905 Parameters
906 ----------
907 filepath: str
908 path to femzip file
910 Returns
911 -------
912 file_metadata: FemzipFileMetadata
913 c struct with infos about the femzip file
915 Notes
916 -----
917 This is for direct interaction with the C-API, thus should
918 not be used by users.
920 Examples
921 --------
922 >>> file_metadata = api.get_file_metadata("path/to/d3plot.fz")
923 >>> # print general internals
924 >>> api.struct_to_dict(file_metadata)
925 {'version_zip': 605.0, 'activity_flag': 1, 'number_of_variables': 535, ...}
927 >>> # We can iterate the variable names contained in the file
928 >>> print(
929 [file_metadata.variable_infos[i_var].name.decode("utf8").strip()
930 for i_var in range(file_metadata.number_of_variables)]
931 )
932 ['global', 'Parts: Energies and others', 'coordinates', 'velocities', ...]
933 """
934 file_metadata = FemzipFileMetadata()
936 # get variable infos
937 err = self.api.get_file_metadata(filepath.encode("ascii"), byref(file_metadata))
938 self._check_femzip_error(err)
940 # transfer memory to python
941 file_metadata2 = self._copy_variable_info_array(file_metadata)
943 # release c memory
944 self.api.free_variable_array(byref(file_metadata))
946 return file_metadata2
948 def _get_variables_state_buffer_size(
949 self,
950 n_parts: int,
951 n_rigid_walls: int,
952 n_rigid_wall_vars: int,
953 n_airbag_particles: int,
954 n_airbags: int,
955 file_metadata: FemzipFileMetadata,
956 ) -> int:
957 # pylint: disable=too-many-arguments
958 # pylint: disable=too-many-locals
959 # pylint: disable=too-many-branches
960 # pylint: disable=too-many-statements
962 buffer_size_state = 0
963 var_indexes_to_remove: Set[int] = set()
964 for i_var in range(file_metadata.number_of_variables):
965 var_info = file_metadata.variable_infos[i_var]
966 variable_name = var_info.name.decode("utf-8")
967 variable_category = FemzipVariableCategory.from_int(var_info.var_type)
968 if variable_category == FemzipVariableCategory.NODE:
970 variable_multiplier = 1
971 if (
972 FemzipArrayType.NODE_DISPLACEMENT.value in variable_name
973 or FemzipArrayType.NODE_VELOCITIES.value in variable_name
974 or FemzipArrayType.NODE_ACCELERATIONS.value in variable_name
975 ):
976 variable_multiplier = 3
978 array_size = file_metadata.number_of_nodes * variable_multiplier
979 buffer_size_state += array_size
980 file_metadata.variable_infos[i_var].var_size = array_size
982 elif variable_category == FemzipVariableCategory.SHELL:
983 array_size = (
984 file_metadata.number_of_shell_elements
985 - file_metadata.number_of_rigid_shell_elements
986 )
987 file_metadata.variable_infos[i_var].var_size = array_size
988 buffer_size_state += array_size
989 elif variable_category == FemzipVariableCategory.SOLID:
990 array_size = file_metadata.number_of_solid_elements
991 file_metadata.variable_infos[i_var].var_size = array_size
992 buffer_size_state += array_size
993 elif variable_category == FemzipVariableCategory.BEAM:
994 array_size = file_metadata.number_of_1D_elements
995 file_metadata.variable_infos[i_var].var_size = array_size
996 buffer_size_state += file_metadata.number_of_1D_elements
997 elif variable_category == FemzipVariableCategory.THICK_SHELL:
998 array_size = file_metadata.number_of_thick_shell_elements
999 file_metadata.variable_infos[i_var].var_size = array_size
1000 buffer_size_state += file_metadata.number_of_thick_shell_elements
1001 elif variable_category == FemzipVariableCategory.GLOBAL:
1002 array_size = 6
1003 file_metadata.variable_infos[i_var].var_size = array_size
1004 buffer_size_state += array_size
1005 elif variable_category == FemzipVariableCategory.PART:
1006 logging.debug("n_parts: %d", n_parts)
1007 array_size = n_parts * 7 + n_rigid_walls * n_rigid_wall_vars
1008 file_metadata.variable_infos[i_var].var_size = array_size
1009 buffer_size_state += array_size
1010 elif variable_category == FemzipVariableCategory.CPM_FLOAT_VAR:
1011 array_size = n_airbag_particles
1012 file_metadata.variable_infos[i_var].var_size = array_size
1013 buffer_size_state += array_size
1014 elif variable_category == FemzipVariableCategory.CPM_INT_VAR:
1015 array_size = n_airbag_particles
1016 file_metadata.variable_infos[i_var].var_size = array_size
1017 buffer_size_state += array_size
1018 elif variable_category == FemzipVariableCategory.CPM_AIRBAG:
1019 array_size = n_airbags * 2
1020 file_metadata.variable_infos[i_var].var_size = array_size
1021 buffer_size_state += array_size
1022 else:
1023 warn_msg = "Femzip variable category '%s' is not supported"
1024 logging.warning(warn_msg, variable_category)
1025 var_indexes_to_remove.add(i_var)
1027 # one more for end marker
1028 buffer_size_state += 1
1030 return buffer_size_state
1032 def _decompose_read_variables_array(
1033 self,
1034 n_parts: int,
1035 n_rigid_walls: int,
1036 n_rigid_wall_vars: int,
1037 n_airbag_particles: int,
1038 n_airbags: int,
1039 all_vars_array: np.ndarray,
1040 n_timesteps_read: int,
1041 file_metadata: FemzipFileMetadata,
1042 ) -> Dict[Tuple[int, str, FemzipVariableCategory], np.ndarray]:
1044 # pylint: disable=too-many-arguments
1045 # pylint: disable=too-many-locals
1046 # pylint: disable=too-many-branches
1047 # pylint: disable=too-many-statements
1049 # decompose array
1050 result_arrays: Dict[Tuple[int, str, FemzipVariableCategory], np.ndarray] = {}
1051 var_pos = 0
1052 for i_var in range(file_metadata.number_of_variables):
1054 var_info: VariableInfo = file_metadata.variable_infos[i_var]
1055 variable_name: str = var_info.name.decode("utf-8")
1056 variable_index: int = var_info.var_index
1057 variable_type = FemzipArrayType.from_string(variable_name)
1058 variable_category = FemzipVariableCategory.from_int(var_info.var_type)
1060 if variable_category == FemzipVariableCategory.NODE:
1061 if variable_type.value in (
1062 FemzipArrayType.NODE_DISPLACEMENT.value,
1063 FemzipArrayType.NODE_VELOCITIES.value,
1064 FemzipArrayType.NODE_ACCELERATIONS.value,
1065 ):
1066 array_size = file_metadata.number_of_nodes * 3
1067 var_array = all_vars_array[:, var_pos : var_pos + array_size].reshape(
1068 (n_timesteps_read, file_metadata.number_of_nodes, 3)
1069 )
1070 var_pos += array_size
1071 result_arrays[
1072 (variable_index, variable_name, FemzipVariableCategory.NODE)
1073 ] = var_array
1074 else:
1075 array_size = file_metadata.number_of_nodes
1076 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1077 var_pos += array_size
1078 result_arrays[
1079 (variable_index, variable_name, FemzipVariableCategory.NODE)
1080 ] = var_array
1082 elif variable_category == FemzipVariableCategory.SHELL:
1083 array_size = (
1084 file_metadata.number_of_shell_elements
1085 - file_metadata.number_of_rigid_shell_elements
1086 )
1087 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1088 var_pos += array_size
1089 result_arrays[
1090 (variable_index, variable_name, FemzipVariableCategory.SHELL)
1091 ] = var_array
1092 elif variable_category == FemzipVariableCategory.SOLID:
1093 array_size = file_metadata.number_of_solid_elements
1094 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1095 var_pos += array_size
1096 result_arrays[
1097 (variable_index, variable_name, FemzipVariableCategory.SOLID)
1098 ] = var_array
1099 elif variable_category == FemzipVariableCategory.BEAM:
1100 array_size = file_metadata.number_of_1D_elements
1101 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1102 var_pos += array_size
1103 result_arrays[
1104 variable_index, variable_name, FemzipVariableCategory.BEAM
1105 ] = var_array
1106 elif variable_category == FemzipVariableCategory.THICK_SHELL:
1107 array_size = file_metadata.number_of_thick_shell_elements
1108 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1109 var_pos += array_size
1110 result_arrays[
1111 variable_index, variable_name, FemzipVariableCategory.THICK_SHELL
1112 ] = var_array
1113 elif variable_category == FemzipVariableCategory.GLOBAL:
1114 array_size = 6
1115 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1116 var_pos += array_size
1117 result_arrays[
1118 variable_index, variable_name, FemzipVariableCategory.GLOBAL
1119 ] = var_array
1120 elif variable_category == FemzipVariableCategory.PART:
1121 array_size = n_parts * 7 + n_rigid_walls * n_rigid_wall_vars
1122 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1123 var_pos += array_size
1124 result_arrays[
1125 variable_index, variable_name, FemzipVariableCategory.PART
1126 ] = var_array
1127 elif variable_category == FemzipVariableCategory.CPM_FLOAT_VAR:
1128 array_size = n_airbag_particles
1129 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1130 var_pos += array_size
1131 result_arrays[variable_index, variable_name, variable_category] = var_array
1132 elif variable_category == FemzipVariableCategory.CPM_INT_VAR:
1133 array_size = n_airbag_particles
1134 var_array = all_vars_array[:, var_pos : var_pos + array_size].view(np.int32)
1135 var_pos += array_size
1136 result_arrays[variable_index, variable_name, variable_category] = var_array
1137 elif variable_category == FemzipVariableCategory.CPM_AIRBAG:
1138 n_airbag_vars = 2
1139 array_size = n_airbags * n_airbag_vars
1140 var_array = all_vars_array[:, var_pos : var_pos + array_size]
1141 var_array = var_array.reshape((var_array.shape[0], n_airbags, n_airbag_vars))
1142 var_pos += array_size
1143 result_arrays[variable_index, variable_name, variable_category] = var_array
1144 else:
1145 err_msg = "Femzip variable category '{0}' is not supported"
1146 raise RuntimeError(err_msg)
1148 return result_arrays
1150 def read_variables(
1151 self,
1152 file_metadata: FemzipFileMetadata,
1153 n_parts: int,
1154 n_rigid_walls: int,
1155 n_rigid_wall_vars: int,
1156 n_airbag_particles: int,
1157 n_airbags: int,
1158 state_filter: Union[Set[int], None] = None,
1159 ) -> Dict[Tuple[int, str, FemzipVariableCategory], np.ndarray]:
1160 """Read specific variables from Femzip
1162 Parameters
1163 ----------
1164 file_metadata: FemzipFileMetadata
1165 metadata of file including which variables to read
1166 n_parts: int
1167 number of parts in the file
1168 n_rigid_walls: int
1169 number of rigid walls
1170 n_rigid_wall_vars: int
1171 number of rigid wall variables
1172 n_airbag_particles: int
1173 number of airbag particles in the file
1174 n_airbags: int
1175 state_filter: Union[Set[int], None]
1176 used to read specific arrays
1178 Returns
1179 -------
1180 arrays: dict
1181 dictionary with d3plot arrays
1182 """
1184 # pylint: disable = too-many-arguments
1185 # pylint: disable = too-many-locals
1187 # fetch metadata if required
1188 n_timesteps = file_metadata.number_of_timesteps
1189 logging.info("file_metadata: %s", self.struct_to_dict(file_metadata))
1191 # log variable names
1192 for i_var in range(file_metadata.number_of_variables):
1193 var_info = file_metadata.variable_infos[i_var]
1194 logging.debug("%s", self.struct_to_dict(var_info))
1196 # estimate float buffer size
1197 buffer_size_state = self._get_variables_state_buffer_size(
1198 n_parts=n_parts,
1199 n_rigid_walls=n_rigid_walls,
1200 n_rigid_wall_vars=n_rigid_wall_vars,
1201 n_airbag_particles=n_airbag_particles,
1202 n_airbags=n_airbags,
1203 file_metadata=file_metadata,
1204 )
1205 logging.info("buffer_size_state: %s", buffer_size_state)
1207 # specify which states to read
1208 states_to_copy = (
1209 {i_timestep for i_timestep in state_filter if i_timestep < n_timesteps + 1}
1210 if state_filter is not None
1211 else set(range(n_timesteps))
1212 )
1213 logging.info("states_to_copy: %s", states_to_copy)
1215 # take timesteps into account
1216 buffer_size = len(states_to_copy) * buffer_size_state
1217 logging.info("buffer_size: %s", buffer_size)
1219 # allocate memory
1220 # pylint: disable = invalid-name
1221 BufferType = c_float * buffer_size
1222 buffer = BufferType()
1224 # do the thing
1225 # pylint: disable = invalid-name
1226 BufferStateType = c_float * buffer_size_state
1227 n_timesteps_read = 0
1228 for i_timestep in range(n_timesteps):
1229 logging.info("timestep: %d", i_timestep)
1231 buffer_ptr_state = BufferStateType.from_buffer(
1232 buffer, sizeof(c_float) * n_timesteps_read * buffer_size_state
1233 )
1235 # read the variables into the buffer
1236 fortran_offset = 1
1237 err = self.api.read_variables(
1238 buffer_ptr_state,
1239 buffer_size_state,
1240 i_timestep + fortran_offset,
1241 byref(file_metadata),
1242 )
1243 self._check_femzip_error(err)
1245 # check if there is nothing to read anymore
1246 # thus we can terminate earlier
1247 if i_timestep in states_to_copy:
1248 states_to_copy.remove(i_timestep)
1249 n_timesteps_read += 1
1251 if not states_to_copy:
1252 logging.info("All states processed")
1253 break
1255 array = np.ctypeslib.as_array(buffer, shape=(buffer_size,)).reshape((n_timesteps_read, -1))
1257 # decompose total array into array pieces again
1258 result_arrays = self._decompose_read_variables_array(
1259 n_parts=n_parts,
1260 n_rigid_walls=n_rigid_walls,
1261 n_rigid_wall_vars=n_rigid_wall_vars,
1262 n_airbag_particles=n_airbag_particles,
1263 n_airbags=n_airbags,
1264 all_vars_array=array,
1265 n_timesteps_read=n_timesteps_read,
1266 file_metadata=file_metadata,
1267 )
1269 return result_arrays
1271 def _copy_variable_info_array(self, file_metadata: FemzipFileMetadata) -> FemzipFileMetadata:
1272 """Copies a variable info array into python memory
1274 Parameters
1275 ----------
1276 file_metadata: FemzipFileMetadata
1277 metadata object for femzip file
1279 Returns
1280 -------
1281 file_metadata2: FemzipFileMetadata
1282 very same data object but the data in
1283 variable_infos is now managed by python and
1284 not C anymore
1285 """
1286 file_metadata2 = FemzipFileMetadata()
1288 # allocate memory on python side
1289 data2 = (VariableInfo * file_metadata.number_of_variables)()
1291 # copy data
1292 for i_var in range(file_metadata.number_of_variables):
1293 var1 = file_metadata.variable_infos[i_var]
1294 var2 = data2[i_var]
1295 self.copy_struct(var1, var2)
1297 # assign
1298 self.copy_struct(file_metadata, file_metadata2)
1299 file_metadata2.variable_infos = data2
1300 return file_metadata2
1303class FemzipD3plotArrayMapping:
1304 """Contains information about how to map femzip arrays to d3plot arrays"""
1306 d3plot_array_type: str
1307 i_integration_point: Union[int, None]
1308 i_var_index: Union[int, None]
1310 fz_array_slices = Tuple[slice]
1312 def __init__(
1313 self,
1314 d3plot_array_type: str,
1315 fz_array_slices: Tuple[slice] = (slice(None),),
1316 i_integration_point: Union[int, None] = None,
1317 i_var_index: Union[int, None] = None,
1318 ):
1319 self.d3plot_array_type = d3plot_array_type
1320 self.fz_array_slices = fz_array_slices
1321 self.i_integration_point = i_integration_point
1322 self.i_var_index = i_var_index
1325class FemzipArrayMetadata:
1326 """Contains metadata about femzip arrays"""
1328 array_type: FemzipArrayType
1329 category: FemzipVariableCategory
1330 d3plot_mappings: List[FemzipD3plotArrayMapping]
1331 # set when parsed
1332 fz_var_index: Union[int, None] = None
1334 def __init__(
1335 self,
1336 array_type: FemzipArrayType,
1337 category: FemzipVariableCategory,
1338 d3plot_mappings: List[FemzipD3plotArrayMapping],
1339 ):
1340 self.array_type = array_type
1341 self.category = category
1342 self.d3plot_mappings = d3plot_mappings
1344 def match(self, fz_name: str) -> bool:
1345 """Checks if the given name matches the array
1347 Parameters
1348 ----------
1349 fz_name: str
1350 femzip array name
1352 Returns
1353 -------
1354 match: bool
1355 If the array metadata instance matches the given array
1356 """
1357 return self.array_type.value in fz_name
1359 def parse(self, fz_var_name: str, fz_var_index: int) -> None:
1360 """Parses the incoming femzip variable name and extracts infos
1362 Parameters
1363 ----------
1364 fz_var_name: str
1365 variable name from femzip
1366 fz_var_index: int
1367 variable index from femzip
1368 """
1369 # matches anything until brackets start
1370 pattern = re.compile(r"(^[^\(\n]+)(\([^\)]+\))*")
1372 matches = pattern.findall(fz_var_name)
1374 if not len(matches) == 1:
1375 err_msg = f"Could not match femzip array name: {fz_var_name}"
1376 raise RuntimeError(err_msg)
1377 if not len(matches[0]) == 2:
1378 err_msg = f"Could not match femzip array name: {fz_var_name}"
1379 raise RuntimeError(err_msg)
1381 # first group contains
1382 # - var name
1383 # - var index (if existing)
1384 # second group contains
1385 # - integration layer index
1386 (first_grp, second_grp) = matches[0]
1387 _, var_index = get_last_int_of_line(first_grp)
1389 # the slice 1:-1 leaves out the brackets '(' and ')'
1390 second_grp = second_grp[1:-1]
1391 if "inner" in second_grp:
1392 i_integration_point = 0
1393 elif "outer" in second_grp:
1394 i_integration_point = 1
1395 else:
1396 _, i_integration_point = get_last_int_of_line(second_grp)
1398 # setters
1399 self.fz_var_index = fz_var_index
1400 for mapping in self.d3plot_mappings:
1401 mapping.i_integration_point = i_integration_point
1402 mapping.i_var_index = var_index