Coverage for lasso/io/binary_buffer.py: 87%

78 statements  

« prev     ^ index     » next       coverage.py v7.2.4, created at 2023-04-28 18:42 +0100

1import mmap 

2import os 

3from typing import Any, List, Union 

4 

5import numpy as np 

6 

7 

8class BinaryBuffer: 

9 """This class is used to handle binary data""" 

10 

11 def __init__(self, filepath: Union[str, None] = None, n_bytes: int = 0): 

12 """Buffer used to read binary files 

13 

14 Parameters 

15 ---------- 

16 filepath: Union[str, None] 

17 path to a binary file 

18 n_bytes: int 

19 how many bytes to load (uses memory mapping) 

20 

21 Returns 

22 ------- 

23 instance: BinaryBuffer 

24 """ 

25 self.filepath_ = None 

26 self.sizes_ = [] 

27 self.load(filepath, n_bytes) 

28 

29 @property 

30 def memoryview(self) -> memoryview: 

31 """Get the underlying memoryview of the binary buffer 

32 

33 Returns 

34 ------- 

35 mv_: memoryview 

36 memoryview used to store the data 

37 """ 

38 return self.mv_ 

39 

40 @memoryview.setter 

41 def memoryview(self, new_mv): 

42 """Set the memoryview of the binary buffer manually 

43 

44 Parameters 

45 ---------- 

46 new_mv: memoryview 

47 memoryview used to store the bytes 

48 """ 

49 assert isinstance(new_mv, memoryview) 

50 self.mv_ = new_mv 

51 self.sizes_ = [len(self.mv_)] 

52 

53 def get_slice(self, start: int, end=Union[None, int], step: int = 1) -> "BinaryBuffer": 

54 """Get a slice of the binary buffer 

55 

56 Parameters 

57 ---------- 

58 start: int 

59 start position in bytes 

60 end: Union[int, None] 

61 end position 

62 step: int 

63 step for slicing (default 1) 

64 

65 Returns 

66 ------- 

67 new_buffer: BinaryBuffer 

68 the slice as a new buffer 

69 """ 

70 

71 assert start < len(self) 

72 assert end is None or end < len(self) 

73 

74 end = len(self) if end is None else end 

75 

76 new_binary_buffer = BinaryBuffer() 

77 new_binary_buffer.memoryview = self.mv_[start:end:step] 

78 

79 return new_binary_buffer 

80 

81 def __len__(self) -> int: 

82 """Get the length of the byte buffer 

83 

84 Returns 

85 ------- 

86 len: int 

87 """ 

88 return len(self.mv_) 

89 

90 @property 

91 def size(self) -> int: 

92 """Get the size of the byte buffer 

93 

94 Returns 

95 ------- 

96 size: int 

97 size of buffer in bytes 

98 """ 

99 return len(self.mv_) 

100 

101 @size.setter 

102 def size(self, size: int): 

103 """Set the length of the byte buffer 

104 

105 Parameters 

106 ---------- 

107 size: int 

108 new size of the buffer 

109 """ 

110 

111 if len(self.mv_) > size: 

112 self.mv_ = self.mv_[:size] 

113 elif len(self.mv_) < size: 

114 buffer = bytearray(self.mv_) + bytearray(b"0" * (size - len(self.mv_))) 

115 self.mv_ = memoryview(buffer) 

116 

117 def read_number(self, start: int, dtype: np.dtype) -> Union[float, int]: 

118 """Read a number from the buffer 

119 

120 Parameters 

121 ---------- 

122 start: int 

123 at which byte to start reading 

124 dtype: np.dtype 

125 type of the number to read 

126 

127 Returns 

128 ------- 

129 number: np.dtype 

130 number with the type specified 

131 """ 

132 return np.frombuffer(self.mv_, dtype=dtype, count=1, offset=start)[0] 

133 

134 def write_number(self, start: int, value: Any, dtype: np.dtype): 

135 """Write a number to the buffer 

136 

137 Parameters 

138 ---------- 

139 start: int 

140 at which byte to start writing 

141 value: Any 

142 value to write 

143 dtype: np.dtype 

144 type of the number to write 

145 """ 

146 

147 wrapper = np.frombuffer(self.mv_[start:], dtype=dtype) 

148 wrapper[0] = value 

149 

150 def read_ndarray(self, start: int, length: int, step: int, dtype: np.dtype) -> np.ndarray: 

151 """Read a numpy array from the buffer 

152 

153 Parameters 

154 ---------- 

155 start: int 

156 at which byte to start reading 

157 length: int 

158 length in bytes to read 

159 step: int 

160 byte step size (how many bytes to skip) 

161 dtype: np.dtype 

162 type of the number to read 

163 

164 Returns 

165 ------- 

166 array: np.ndarray 

167 """ 

168 

169 return np.frombuffer(self.mv_[start : start + length : step], dtype=dtype) 

170 

171 def write_ndarray(self, array: np.ndarray, start: int, step: int): 

172 """Write a numpy array to the buffer 

173 

174 Parameters 

175 ---------- 

176 array: np.ndarray 

177 array to save to the file 

178 start: int 

179 start in bytes 

180 step: int 

181 byte step size (how many bytes to skip) 

182 """ 

183 

184 wrapper = np.frombuffer(self.mv_[start::step], dtype=array.dtype) 

185 

186 np.copyto(wrapper[: array.size], array, casting="no") 

187 

188 def read_text(self, start: int, length: int, step: int = 1, encoding: str = "utf8") -> str: 

189 """Read text from the binary buffer 

190 

191 Parameters 

192 ---------- 

193 start: int 

194 start in bytes 

195 length: int 

196 length in bytes to read 

197 step: int 

198 byte step size 

199 encoding: str 

200 encoding used 

201 """ 

202 return self.mv_[start : start + length : step].tobytes().decode(encoding) 

203 

204 def save(self, filepath: Union[str, None] = None): 

205 """Save the binary buffer to a file 

206 

207 Parameters 

208 ---------- 

209 filepath: str 

210 path where to save the data 

211 

212 Notes 

213 ----- 

214 Overwrites to original file if no filepath 

215 is specified. 

216 """ 

217 

218 filepath_parsed = filepath if filepath else (self.filepath_[0] if self.filepath_ else None) 

219 

220 if filepath_parsed is None: 

221 return 

222 

223 with open(filepath_parsed, "wb") as fp: 

224 fp.write(self.mv_) 

225 

226 self.filepath_ = filepath_parsed 

227 

228 def load(self, filepath: Union[List[str], str, None] = None, n_bytes: int = 0): 

229 """load a file 

230 

231 Parameters 

232 ---------- 

233 filepath: Union[str, None] 

234 path to the file to load 

235 n_bytes: int 

236 number of bytes to load (uses memory mapping if nonzero) 

237 

238 Notes 

239 ----- 

240 If not filepath is specified, then the opened file is simply 

241 reloaded. 

242 """ 

243 

244 filepath = filepath if filepath else self.filepath_ 

245 

246 if not filepath: 

247 return 

248 

249 # convert to a list if only a single file is given 

250 filepath_parsed = [filepath] if isinstance(filepath, str) else filepath 

251 

252 # get size of all files 

253 sizes = [os.path.getsize(path) for path in filepath_parsed] 

254 

255 # reduce memory if required 

256 sizes = [entry if n_bytes == 0 else min(n_bytes, entry) for entry in sizes] 

257 

258 memorysize = sum(sizes) 

259 

260 # allocate memory 

261 buffer = memoryview(bytearray(b"0" * memorysize)) 

262 

263 # read files and concatenate them 

264 sizes_tmp = [0] + sizes 

265 for i_path, path in enumerate(filepath_parsed): 

266 with open(path, "br") as fp: 

267 if n_bytes: 

268 mm = mmap.mmap(fp.fileno(), sizes[i_path], access=mmap.ACCESS_READ) 

269 buffer[sizes_tmp[i_path] :] = mm[: sizes[i_path]] 

270 else: 

271 fp.readinto(buffer[sizes_tmp[i_path] :]) 

272 

273 self.filepath_ = filepath_parsed 

274 self.sizes_ = sizes 

275 self.mv_ = buffer 

276 

277 def append(self, binary_buffer: "BinaryBuffer"): 

278 """Append another binary buffer to this one 

279 

280 Parameters 

281 ---------- 

282 binary_buffer: BinaryBuffer 

283 buffer to append 

284 """ 

285 

286 assert isinstance(binary_buffer, BinaryBuffer) 

287 

288 self.mv_ = memoryview(bytearray(self.mv_) + bytearray(binary_buffer.mv_)) 

289 self.sizes_.append(len(binary_buffer))