Coverage for lasso/dyna/lsda_py3.py: 14%

424 statements  

« prev     ^ index     » next       coverage.py v7.2.4, created at 2023-04-28 18:42 +0100

1import glob 

2import struct 

3 

4# We disable pylint here since this code is ancient code from LSTC and has the 

5# respective quality. I tried rewriting it but could not understand it at all 

6# in time. 

7# pylint: skip-file 

8 

9 

10class LsdaError(Exception): 

11 """This is only here, so I can raise an error in case the data type 

12 sizes are not what I expect""" 

13 

14 

15class _Diskfile: 

16 """ 

17 Handles all the low level file I/O. Nothing here should be 

18 called directly by a user. 

19 """ 

20 

21 packsize = [0, "b", "h", 0, "i", 0, 0, 0, "q"] 

22 packtype = [0, "b", "h", "i", "q", "B", "H", "I", "Q", "f", "d", "s"] 

23 sizeof = [0, 1, 2, 4, 8, 1, 2, 4, 8, 4, 8, 1] 

24 

25 def __init__(self, name, mode): 

26 self.mode = mode # file open mode (r,r+,w,w+) 

27 self.name = name # file name 

28 self.ateof = 0 # 1 if the file pointer is at EOF 

29 self.fp = open(name, mode + "b") 

30 if mode[0] == "r": 

31 s = self.fp.read(8) 

32 header = struct.unpack("BBBBBBBB", s) 

33 if header[0] > 8: 

34 self.fp.seek(header[0]) 

35 else: 

36 header = [8, 8, 8, 1, 1, 0, 0, 0] 

37 # Determine if my native ordering is big or little endian.... 

38 b = struct.unpack("bbbb", struct.pack("i", 1)) 

39 if b[0]: 

40 header[5] = 1 

41 else: 

42 header[5] = 0 

43 self.lengthsize = header[1] 

44 self.offsetsize = header[2] 

45 self.commandsize = header[3] 

46 self.typesize = header[4] 

47 if header[5] == 0: 

48 self.ordercode = ">" 

49 else: 

50 self.ordercode = "<" 

51 self.ounpack = self.ordercode + _Diskfile.packsize[self.offsetsize] 

52 self.lunpack = self.ordercode + _Diskfile.packsize[self.lengthsize] 

53 self.lcunpack = ( 

54 self.ordercode 

55 + _Diskfile.packsize[self.lengthsize] 

56 + _Diskfile.packsize[self.commandsize] 

57 ) 

58 self.tolunpack = ( 

59 self.ordercode 

60 + _Diskfile.packsize[self.typesize] 

61 + _Diskfile.packsize[self.offsetsize] 

62 + _Diskfile.packsize[self.lengthsize] 

63 ) 

64 self.comp1 = self.typesize + self.offsetsize + self.lengthsize 

65 self.comp2 = self.lengthsize + self.commandsize + self.typesize + 1 

66 if mode[0] != "r": 

67 # Write initial header and ST offset command. 

68 s = bytes("", "UTF-8") 

69 for h in header: 

70 s = s + struct.pack("B", h) 

71 self.fp.write(s) 

72 self.writecommand(17, Lsda.SYMBOLTABLEOFFSET) 

73 self.writeoffset(17, 0) 

74 self.lastoffset = 17 

75 

76 def readcommand(self): 

77 """Read a LENGTH,COMMAND pair from the file at the current location""" 

78 s = self.fp.read(self.lengthsize + self.commandsize) 

79 return struct.unpack(self.lcunpack, s) 

80 

81 def writecommand(self, length, cmd): 

82 """Write a LENGTH,COMMAND pair to the file at the current location""" 

83 s = struct.pack(self.lcunpack, length, cmd) 

84 self.fp.write(s) 

85 

86 def readoffset(self): 

87 """Read an OFFSET from the file at the current location""" 

88 s = self.fp.read(self.offsetsize) 

89 return struct.unpack(self.ounpack, s)[0] 

90 

91 def writeoffset(self, offset, value): 

92 """Write an OFFSET to the file at the given location""" 

93 self.fp.seek(offset, 0) 

94 s = struct.pack(self.ounpack, value) 

95 self.fp.write(s) 

96 self.ateof = 0 

97 

98 def writelength(self, length): 

99 """Write a LENGTH to the file at the current location""" 

100 s = struct.pack(self.lunpack, length) 

101 self.fp.write(s) 

102 

103 def writecd(self, dir): 

104 """Write a whole CD command to the file at the current location""" 

105 length = self.lengthsize + self.commandsize + len(dir) 

106 s = struct.pack(self.lcunpack, length, Lsda.CD) 

107 self.fp.write(s) 

108 if type(dir) is str: 

109 self.fp.write(bytes(dir, "utf-8")) 

110 else: 

111 self.fp.write(dir) 

112 

113 def writestentry(self, r): 

114 """Write a VARIABLE command (symbol table entry) to the file at 

115 the current location""" 

116 length = ( 

117 2 * self.lengthsize + self.commandsize + len(r.name) + self.typesize + self.offsetsize 

118 ) 

119 s = struct.pack(self.lcunpack, length, Lsda.VARIABLE) 

120 self.fp.write(s) 

121 if type(r.name) is str: 

122 self.fp.write(bytes(r.name, "utf-8")) 

123 else: 

124 self.fp.write(r.name) 

125 s = struct.pack(self.tolunpack, r.type, r.offset, r.length) 

126 self.fp.write(s) 

127 

128 def writedata(self, sym, data): 

129 """Write a DATA command to the file at the current location""" 

130 nlen = len(sym.name) 

131 length = ( 

132 self.lengthsize 

133 + self.commandsize 

134 + self.typesize 

135 + 1 

136 + nlen 

137 + self.sizeof[sym.type] * sym.length 

138 ) 

139 sym.offset = self.fp.tell() 

140 self.fp.write(struct.pack(self.lcunpack, length, Lsda.DATA)) 

141 self.fp.write(struct.pack("bb", sym.type, nlen) + bytes(sym.name, "utf-8")) 

142 # fmt=self.ordercode+self.packtype[sym.type]*sym.length 

143 fmt = "%c%d%c" % (self.ordercode, sym.length, self.packtype[sym.type]) 

144 self.fp.write(struct.pack(fmt, *data)) 

145 sym.file = self 

146 

147 

148class Symbol: 

149 """ 

150 A directory tree structure. A Symbol can be a directory (type==0) 

151 or data 

152 """ 

153 

154 def __init__(self, name="", parent=0): 

155 self.name = name # name of var or directory 

156 self.type = 0 # data type 

157 self.offset = 0 # offset of DATA record in file 

158 self.length = 0 # number of data entries, or # of children 

159 self.file = 0 # which file the data is in 

160 self.children = {} # directory contents 

161 self.parent = parent # directory that holds me 

162 if parent: 

163 parent.children[name] = self 

164 parent.length = len(parent.children) 

165 

166 def path(self): 

167 """Return absolute path for this Symbol""" 

168 if not self.parent: 

169 return "/" 

170 sym = self 

171 ret = "/" + sym.name 

172 while sym.parent and sym.parent.name != "/": 

173 sym = sym.parent 

174 ret = "/" + sym.name + ret 

175 return ret 

176 

177 def get(self, name): 

178 """Return the Symbol with the indicated name. The name can be 

179 prefixed with a relative or absolute path""" 

180 # If I am just a variable, let my parent handle this 

181 if self.type != 0: 

182 return self.parent.get(name) 

183 # If I have this variable, return it 

184 if name in self.children: 

185 return self.children[name] 

186 # If name has a path component, then look for it there 

187 if name[0] == "/": # absolute path 

188 parts = name.split("/")[1:] 

189 sym = self 

190 while sym.parent: 

191 sym = sym.parent 

192 for i in range(len(parts)): 

193 if parts[i] in sym.children: 

194 sym = sym.children[parts[i]] 

195 else: 

196 return None 

197 return sym 

198 if name[0] == ".": # relative path 

199 parts = name.split("/")[1:] 

200 # Throw out any "." in the path -- those are just useless.... 

201 parts = filter(lambda p: p != ".", parts) 

202 if len(parts) == 0: 

203 return self 

204 sym = self 

205 for i in range(parts): 

206 if parts[i] == "..": 

207 if sym.parent: 

208 sym = sym.parent 

209 elif parts[i] in sym: 

210 sym = sym.children[parts[i]] 

211 else: 

212 return None 

213 return sym 

214 # Not found 

215 return None 

216 

217 def lread(self, start=0, end=2000000000): 

218 """Read data from the file. 

219 If this symbol is a DIRECTORY, this returns a sorted list of the 

220 contents of the directory, and "start" and "end" are ignored. 

221 Otherwise, read and return data[start:end] (including start but 

222 not including end -- standard Python slice behavior). 

223 This routine does NOT follow links.""" 

224 if self.type == 0: # directory -- return listing 

225 return sorted(self.children.keys()) 

226 if end > self.length: 

227 end = self.length 

228 if end < 0: 

229 end = self.length + end 

230 if start > self.length: 

231 return () 

232 if start < 0: 

233 start = self.length + start 

234 if start >= end: 

235 return () 

236 size = _Diskfile.sizeof[self.type] 

237 pos = self.offset + self.file.comp2 + len(self.name) + start * size 

238 self.file.fp.seek(pos) 

239 self.file.ateof = 0 

240 # format = self.file.ordercode + _Diskfile.packtype[self.type]*(end-start) 

241 # return struct.unpack(format,self.file.fp.read(size*(end-start))) 

242 format = "%c%d%c" % (self.file.ordercode, (end - start), _Diskfile.packtype[self.type]) 

243 if self.type == Lsda.LINK: 

244 return struct.unpack(format, self.file.fp.read(size * (end - start)))[0] 

245 else: 

246 return struct.unpack(format, self.file.fp.read(size * (end - start))) 

247 

248 def read(self, start=0, end=2000000000): 

249 """Read data from the file. Same as lread, but follows links""" 

250 return _resolve_link(self).lread(start, end) 

251 

252 def read_raw(self, start=0, end=2000000000): 

253 """Read data from the file and return as bytestring""" 

254 if self.type == 0: # directory -- return listing 

255 return sorted(self.children.keys()) 

256 if end > self.length: 

257 end = self.length 

258 if end < 0: 

259 end = self.length + end 

260 if start > self.length: 

261 return () 

262 if start < 0: 

263 start = self.length + start 

264 if start >= end: 

265 return () 

266 size = _Diskfile.sizeof[self.type] 

267 pos = self.offset + self.file.comp2 + len(self.name) + start * size 

268 self.file.fp.seek(pos) 

269 self.file.ateof = 0 

270 size = size * (end - start) 

271 return self.file.fp.read(size) 

272 

273 

274def _resolve_link(var): 

275 """Follow a link to find what it finally resolves to""" 

276 ret = var 

277 while ret.type == Lsda.LINK: 

278 ret = ret.get(ret.lread()) 

279 return ret 

280 

281 

282def _readentry(f, reclen, parent): 

283 """ 

284 Read a VARIABLE record from the file, and construct the proper Symbol 

285 Users should never call this. 

286 """ 

287 s = f.fp.read(reclen) 

288 n = reclen - f.comp1 

289 name = s[:n] 

290 # If parent already has a symbol by this name, orphan it.... 

291 # if parent.children.has_key(name)): 

292 if name in parent.children: 

293 var = parent.children[name] 

294 else: 

295 var = Symbol(name, parent) 

296 (var.type, var.offset, var.length) = struct.unpack(f.tolunpack, s[n:]) 

297 var.file = f 

298 

299 

300def _readsymboltable(lsda, f): 

301 """ 

302 Read all the SYMBOLTABLEs in the current file 

303 Users should never call this. 

304 """ 

305 f.ateof = 0 

306 while 1: 

307 f.lastoffset = f.fp.tell() 

308 offset = f.readoffset() 

309 if offset == 0: 

310 return 

311 f.fp.seek(offset) 

312 (clen, cmd) = f.readcommand() 

313 if cmd != Lsda.BEGINSYMBOLTABLE: 

314 return 

315 while 1: 

316 (clen, cmd) = f.readcommand() 

317 clen = clen - f.commandsize - f.lengthsize 

318 if cmd == Lsda.CD: 

319 path = f.fp.read(clen) 

320 lsda.cd(path, 1) 

321 elif cmd == Lsda.VARIABLE: 

322 _readentry(f, clen, lsda.cwd) 

323 else: # is end of symbol table...get next part if there is one 

324 break 

325 

326 

327def _writesymboltable(lsda, f): 

328 """ 

329 Collect all the symbols we want to write out, and sort 

330 them by path. This is a bit strange: the symbols don't store 

331 the path, but build it when needed. So build it, and store 

332 (symbol,path) pairs, then sort by path. "path" returns the full 

333 path to the symbol, and we only want the directory it is in, so 

334 get the path of its parent instead. 

335 """ 

336 if len(lsda.dirty_symbols) == 0: 

337 return 

338 

339 slist = [] 

340 for s in lsda.dirty_symbols: 

341 p = s.parent.path() 

342 slist.append((s, p)) 

343 # slist.sort(key = lambda r1,r2: cmp(r1[1],r2[1])) 

344 slist.sort(key=lambda x: x[1]) 

345 lsda.dirty_symbols = set() 

346 

347 # Move to end of the file and write the symbol table 

348 if not f.ateof: 

349 f.fp.seek(0, 2) 

350 f.ateof = 1 

351 start_st_at = f.fp.tell() 

352 f.writecommand(0, Lsda.BEGINSYMBOLTABLE) 

353 cwd = None 

354 

355 # Write all records 

356 for (s, path) in slist: 

357 if path != cwd: 

358 cdcmd = _get_min_cd(cwd, path) 

359 f.writecd(cdcmd) 

360 cwd = path 

361 f.writestentry(s) 

362 

363 # Finish ST: write END record, and patch up ST length 

364 cmdlen = f.offsetsize + f.lengthsize + f.commandsize 

365 f.writecommand(cmdlen, Lsda.ENDSYMBOLTABLE) 

366 nextoffset = f.fp.tell() 

367 f.writeoffset(nextoffset, 0) 

368 cmdlen = nextoffset + f.offsetsize - start_st_at 

369 f.fp.seek(start_st_at) 

370 f.writelength(cmdlen) 

371 

372 # Purge symbol table, if we are only writing 

373 if f.mode == "w": 

374 cwd = lsda.cwd 

375 cwd.children = {} 

376 while cwd.parent: 

377 cwd.parent.children = {cwd.name: cwd} 

378 cwd = cwd.parent 

379 

380 # And add link from previous ST 

381 f.writeoffset(f.lastoffset, start_st_at) 

382 f.lastoffset = nextoffset 

383 f.ateof = 0 

384 

385 

386def _get_min_cd(cwd, cd): 

387 """ 

388 Given two absolute paths, return the shortest "cd" string that 

389 gets from the first (cwd) to the second (cd) 

390 """ 

391 if cwd is None: 

392 return cd 

393 

394 # Find common part of path 

395 have = cwd.split("/")[1:] 

396 want = cd.split("/")[1:] 

397 nhave = len(have) 

398 nwant = len(want) 

399 n = min(nhave, nwant) 

400 head = 0 

401 head_length = 0 

402 for i in range(n): 

403 if have[i] != want[i]: 

404 break 

405 head = i + 1 

406 head_length = head_length + len(have[i]) 

407 if head == 0: 

408 return cd 

409 

410 # head = # of common components. 

411 # head_length = string length of common part of path (sans "/" separators) 

412 # tail1 = Number of components we would need ".." leaders for 

413 tail1 = nhave - head 

414 

415 # Now see if "cd" is shorter than "../../tail_part" 

416 if 2 * tail1 >= head_length: 

417 return cd 

418 

419 # nope, the ".." version is shorter.... 

420 return tail1 * "../" + "/".join(want[head:]) 

421 

422 

423class Lsda: 

424 """ 

425 Main class: holds all the Symbols for an LSDA file, and has methods 

426 for reading data from and writing data to the file 

427 """ 

428 

429 CD = 2 

430 DATA = 3 

431 VARIABLE = 4 

432 BEGINSYMBOLTABLE = 5 

433 ENDSYMBOLTABLE = 6 

434 SYMBOLTABLEOFFSET = 7 

435 I1 = 1 

436 I2 = 2 

437 I4 = 3 

438 I8 = 4 

439 U1 = 5 

440 U2 = 6 

441 U4 = 7 

442 U8 = 8 

443 R4 = 9 

444 R8 = 10 

445 LINK = 11 

446 

447 def __init__(self, files, mode="r"): 

448 """Creates the LSDA structure, opens the file and reads the 

449 SYMBOLTABLE (if reading), or creates the initial file contents 

450 (if writing). "files" is a tuple of file names to be opened 

451 and treated as a single file. All the %XXX continuation files 

452 will be automatically included. "mode" is the file open mode: 

453 "r", "r+", "w", or "w+". If a "w" mode is selected, "files" 

454 must contain only a single file name""" 

455 

456 # If they only input a single name, put it in a tuple, so I can 

457 # accept input of either kind 

458 if not types_ok: 

459 raise LsdaError 

460 if not isinstance(files, (tuple, list)): 

461 files = (files,) 

462 self.files = [] 

463 

464 if mode[0] == "r": 

465 # Open all the files in the list that is input, and anything 

466 # that looks like a continuation of one of them. 

467 nameset = set() 

468 for name in files: 

469 nameset.add(name) 

470 nameset = nameset.union(set(glob.glob(name + "%[0-9][0-9]*"))) 

471 

472 # Convert to a list and sort, because if I'm going to be writing, 

473 # I want the last one in the list to be the last one of its family 

474 namelist = list(nameset) 

475 namelist.sort() 

476 for file in namelist: 

477 self.files.append(_Diskfile(file, mode)) 

478 self.root = Symbol("/") 

479 for f in self.files: 

480 # We are already positioned to read the SYMBOLTABLEOFFSET record 

481 _, cmd = f.readcommand() 

482 self.cwd = self.root 

483 if cmd == Lsda.SYMBOLTABLEOFFSET: 

484 _readsymboltable(self, f) 

485 else: 

486 if len(files) > 1: 

487 return None # can't open multiple files for WRITING 

488 self.files.append(_Diskfile(files[0], mode)) 

489 self.root = Symbol("/") 

490 self.cwd = self.root 

491 self.dirty_symbols = set() 

492 self.lastpath = None 

493 self.mode = mode 

494 

495 # writing will always be to the last one of the files 

496 if mode == "r": 

497 self.fw = None 

498 self.make_dirs = 0 

499 else: 

500 self.fw = self.files[-1] 

501 self.make_dirs = 1 

502 

503 def __del__(self): # close files 

504 self.flush() 

505 for f in self.files: 

506 if not f.fp.closed: 

507 f.fp.close() 

508 

509 def cd(self, path, create=2): # change CWD 

510 """Change the current working directory in the file. The optional 

511 argument "create" is for internal use only""" 

512 # DEBUG FIX qd-codie: 

513 # Someone forgot to decode bytes into str here. Due to this the 

514 # following comparisons always went wrong (of course they were) 

515 # not similar ... 

516 if isinstance(path, bytes): 

517 path = path.decode("utf-8") 

518 if path == "/": 

519 self.cwd = self.root 

520 return self.root 

521 if path[-1] == "/": # remove trailing / 

522 path = path[:-1] 

523 if path[0] == "/": # absolute path 

524 path = path[1:] 

525 self.cwd = self.root 

526 # path = string.split(path,"/") 

527 # print(type(path)) 

528 if type(path) is bytes: 

529 path = str(path, "utf-8").split("/") 

530 else: 

531 path = path.split("/") 

532 

533 for part in path: 

534 if part == "..": 

535 if self.cwd.parent: 

536 self.cwd = self.cwd.parent 

537 else: 

538 # if self.cwd.children.has_key(part)): 

539 if part in self.cwd.children: 

540 self.cwd = self.cwd.children[part] 

541 if self.cwd.type != 0: # component is a variable, not a directory! 

542 self.cwd = self.cwd.parent 

543 break 

544 elif create == 1 or (create == 2 and self.make_dirs == 1): 

545 self.cwd = Symbol(part, self.cwd) # Create directory on the fly 

546 else: # component in path is missing 

547 break 

548 return self.cwd 

549 

550 def write(self, name, type, data): 

551 """Write a new DATA record to the file. Creates and returns 

552 the Symbol for the data written""" 

553 if self.fw is None: 

554 return None 

555 

556 # want a tuple, but if they hand us a single value that should work too... 

557 try: 

558 _ = data[0] 

559 except TypeError: 

560 data = (data,) 

561 pwd = self.cwd.path() 

562 if not self.fw.ateof: 

563 self.fw.fp.seek(0, 2) 

564 self.fw.ateof = 1 

565 if pwd != self.lastpath: 

566 cdcmd = _get_min_cd(self.lastpath, pwd) 

567 self.fw.writecd(cdcmd) 

568 self.lastpath = pwd 

569 

570 # Overwrite existing symbol if there is one 

571 if name in self.cwd.children: 

572 sym = self.cwd.children[name] 

573 else: 

574 sym = Symbol(name, self.cwd) 

575 sym.type = type 

576 sym.length = len(data) 

577 self.fw.writedata(sym, data) 

578 self.dirty_symbols.add(sym) 

579 return sym 

580 

581 def close(self): 

582 """Close the file""" 

583 self.flush() 

584 for f in self.files: 

585 if not f.fp.closed: 

586 f.fp.close() 

587 self.files = [] 

588 

589 def get(self, path): 

590 """Return the Symbol with the indicated name. The name can be 

591 prefixed with a relative or absolute path""" 

592 return self.cwd.get(path) 

593 

594 def flush(self): # write ST and flush file 

595 """Write a SYMBOLTABLE as needed for any new DATA, and flush the file""" 

596 if self.fw is None or self.fw.fp.closed: 

597 return 

598 _writesymboltable(self, self.fw) 

599 self.fw.fp.flush() 

600 

601 def filesize(self): 

602 """Returns the current size, on disk, of the file we are currently 

603 writing to. Returns 0 for files that are opened readonly""" 

604 if self.fw is None: 

605 return 0 

606 if not self.fw.ateof: 

607 self.fw.fp.seek(0, 2) 

608 self.fw.ateof = 1 

609 return self.fw.fp.tell() 

610 

611 def nextfile(self): # Open next file in sequence 

612 """Flush the current output file and open the next file in the 

613 sequence""" 

614 if self.fw is None: 

615 return None 

616 if not self.fw.fp.closed: 

617 _writesymboltable(self, self.fw) 

618 self.fw.fp.flush() 

619 parts = self.fw.name.split("%") 

620 if len(parts) == 1: 

621 ret = 1 

622 newname = parts[0] + "%001" 

623 else: 

624 ret = int(parts[1]) + 1 

625 newname = "%s%%%3.3d" % (parts[0], ret) 

626 if self.mode == "w": 

627 self.fw = _Diskfile(newname, "w") 

628 else: 

629 self.fw = _Diskfile(newname, "w+") 

630 self.files.append(self.fw) 

631 self.lastpath = None 

632 return ret 

633 

634 

635types = [("b", 1), ("h", 2), ("i", 4), ("q", 8), ("f", 4), ("d", 8)] 

636x = 17 

637types_ok = 1 

638for (a, b) in types: 

639 s = struct.pack(a, x) 

640 if len(s) != b: 

641 print("LSDA: initialization error") 

642 print("Data type %s has length %d instead of %d" % (a, len(s), b)) 

643 types_ok = 0