Coverage for lasso/io/binary_buffer.py: 87%
78 statements
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:42 +0100
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:42 +0100
1import mmap
2import os
3from typing import Any, List, Union
5import numpy as np
8class BinaryBuffer:
9 """This class is used to handle binary data"""
11 def __init__(self, filepath: Union[str, None] = None, n_bytes: int = 0):
12 """Buffer used to read binary files
14 Parameters
15 ----------
16 filepath: Union[str, None]
17 path to a binary file
18 n_bytes: int
19 how many bytes to load (uses memory mapping)
21 Returns
22 -------
23 instance: BinaryBuffer
24 """
25 self.filepath_ = None
26 self.sizes_ = []
27 self.load(filepath, n_bytes)
29 @property
30 def memoryview(self) -> memoryview:
31 """Get the underlying memoryview of the binary buffer
33 Returns
34 -------
35 mv_: memoryview
36 memoryview used to store the data
37 """
38 return self.mv_
40 @memoryview.setter
41 def memoryview(self, new_mv):
42 """Set the memoryview of the binary buffer manually
44 Parameters
45 ----------
46 new_mv: memoryview
47 memoryview used to store the bytes
48 """
49 assert isinstance(new_mv, memoryview)
50 self.mv_ = new_mv
51 self.sizes_ = [len(self.mv_)]
53 def get_slice(self, start: int, end=Union[None, int], step: int = 1) -> "BinaryBuffer":
54 """Get a slice of the binary buffer
56 Parameters
57 ----------
58 start: int
59 start position in bytes
60 end: Union[int, None]
61 end position
62 step: int
63 step for slicing (default 1)
65 Returns
66 -------
67 new_buffer: BinaryBuffer
68 the slice as a new buffer
69 """
71 assert start < len(self)
72 assert end is None or end < len(self)
74 end = len(self) if end is None else end
76 new_binary_buffer = BinaryBuffer()
77 new_binary_buffer.memoryview = self.mv_[start:end:step]
79 return new_binary_buffer
81 def __len__(self) -> int:
82 """Get the length of the byte buffer
84 Returns
85 -------
86 len: int
87 """
88 return len(self.mv_)
90 @property
91 def size(self) -> int:
92 """Get the size of the byte buffer
94 Returns
95 -------
96 size: int
97 size of buffer in bytes
98 """
99 return len(self.mv_)
101 @size.setter
102 def size(self, size: int):
103 """Set the length of the byte buffer
105 Parameters
106 ----------
107 size: int
108 new size of the buffer
109 """
111 if len(self.mv_) > size:
112 self.mv_ = self.mv_[:size]
113 elif len(self.mv_) < size:
114 buffer = bytearray(self.mv_) + bytearray(b"0" * (size - len(self.mv_)))
115 self.mv_ = memoryview(buffer)
117 def read_number(self, start: int, dtype: np.dtype) -> Union[float, int]:
118 """Read a number from the buffer
120 Parameters
121 ----------
122 start: int
123 at which byte to start reading
124 dtype: np.dtype
125 type of the number to read
127 Returns
128 -------
129 number: np.dtype
130 number with the type specified
131 """
132 return np.frombuffer(self.mv_, dtype=dtype, count=1, offset=start)[0]
134 def write_number(self, start: int, value: Any, dtype: np.dtype):
135 """Write a number to the buffer
137 Parameters
138 ----------
139 start: int
140 at which byte to start writing
141 value: Any
142 value to write
143 dtype: np.dtype
144 type of the number to write
145 """
147 wrapper = np.frombuffer(self.mv_[start:], dtype=dtype)
148 wrapper[0] = value
150 def read_ndarray(self, start: int, length: int, step: int, dtype: np.dtype) -> np.ndarray:
151 """Read a numpy array from the buffer
153 Parameters
154 ----------
155 start: int
156 at which byte to start reading
157 length: int
158 length in bytes to read
159 step: int
160 byte step size (how many bytes to skip)
161 dtype: np.dtype
162 type of the number to read
164 Returns
165 -------
166 array: np.ndarray
167 """
169 return np.frombuffer(self.mv_[start : start + length : step], dtype=dtype)
171 def write_ndarray(self, array: np.ndarray, start: int, step: int):
172 """Write a numpy array to the buffer
174 Parameters
175 ----------
176 array: np.ndarray
177 array to save to the file
178 start: int
179 start in bytes
180 step: int
181 byte step size (how many bytes to skip)
182 """
184 wrapper = np.frombuffer(self.mv_[start::step], dtype=array.dtype)
186 np.copyto(wrapper[: array.size], array, casting="no")
188 def read_text(self, start: int, length: int, step: int = 1, encoding: str = "utf8") -> str:
189 """Read text from the binary buffer
191 Parameters
192 ----------
193 start: int
194 start in bytes
195 length: int
196 length in bytes to read
197 step: int
198 byte step size
199 encoding: str
200 encoding used
201 """
202 return self.mv_[start : start + length : step].tobytes().decode(encoding)
204 def save(self, filepath: Union[str, None] = None):
205 """Save the binary buffer to a file
207 Parameters
208 ----------
209 filepath: str
210 path where to save the data
212 Notes
213 -----
214 Overwrites to original file if no filepath
215 is specified.
216 """
218 filepath_parsed = filepath if filepath else (self.filepath_[0] if self.filepath_ else None)
220 if filepath_parsed is None:
221 return
223 with open(filepath_parsed, "wb") as fp:
224 fp.write(self.mv_)
226 self.filepath_ = filepath_parsed
228 def load(self, filepath: Union[List[str], str, None] = None, n_bytes: int = 0):
229 """load a file
231 Parameters
232 ----------
233 filepath: Union[str, None]
234 path to the file to load
235 n_bytes: int
236 number of bytes to load (uses memory mapping if nonzero)
238 Notes
239 -----
240 If not filepath is specified, then the opened file is simply
241 reloaded.
242 """
244 filepath = filepath if filepath else self.filepath_
246 if not filepath:
247 return
249 # convert to a list if only a single file is given
250 filepath_parsed = [filepath] if isinstance(filepath, str) else filepath
252 # get size of all files
253 sizes = [os.path.getsize(path) for path in filepath_parsed]
255 # reduce memory if required
256 sizes = [entry if n_bytes == 0 else min(n_bytes, entry) for entry in sizes]
258 memorysize = sum(sizes)
260 # allocate memory
261 buffer = memoryview(bytearray(b"0" * memorysize))
263 # read files and concatenate them
264 sizes_tmp = [0] + sizes
265 for i_path, path in enumerate(filepath_parsed):
266 with open(path, "br") as fp:
267 if n_bytes:
268 mm = mmap.mmap(fp.fileno(), sizes[i_path], access=mmap.ACCESS_READ)
269 buffer[sizes_tmp[i_path] :] = mm[: sizes[i_path]]
270 else:
271 fp.readinto(buffer[sizes_tmp[i_path] :])
273 self.filepath_ = filepath_parsed
274 self.sizes_ = sizes
275 self.mv_ = buffer
277 def append(self, binary_buffer: "BinaryBuffer"):
278 """Append another binary buffer to this one
280 Parameters
281 ----------
282 binary_buffer: BinaryBuffer
283 buffer to append
284 """
286 assert isinstance(binary_buffer, BinaryBuffer)
288 self.mv_ = memoryview(bytearray(self.mv_) + bytearray(binary_buffer.mv_))
289 self.sizes_.append(len(binary_buffer))