Coverage for lasso/dyna/lsda_py3.py: 14%
424 statements
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:42 +0100
« prev ^ index » next coverage.py v7.2.4, created at 2023-04-28 18:42 +0100
1import glob
2import struct
4# We disable pylint here since this code is ancient code from LSTC and has the
5# respective quality. I tried rewriting it but could not understand it at all
6# in time.
7# pylint: skip-file
10class LsdaError(Exception):
11 """This is only here, so I can raise an error in case the data type
12 sizes are not what I expect"""
15class _Diskfile:
16 """
17 Handles all the low level file I/O. Nothing here should be
18 called directly by a user.
19 """
21 packsize = [0, "b", "h", 0, "i", 0, 0, 0, "q"]
22 packtype = [0, "b", "h", "i", "q", "B", "H", "I", "Q", "f", "d", "s"]
23 sizeof = [0, 1, 2, 4, 8, 1, 2, 4, 8, 4, 8, 1]
25 def __init__(self, name, mode):
26 self.mode = mode # file open mode (r,r+,w,w+)
27 self.name = name # file name
28 self.ateof = 0 # 1 if the file pointer is at EOF
29 self.fp = open(name, mode + "b")
30 if mode[0] == "r":
31 s = self.fp.read(8)
32 header = struct.unpack("BBBBBBBB", s)
33 if header[0] > 8:
34 self.fp.seek(header[0])
35 else:
36 header = [8, 8, 8, 1, 1, 0, 0, 0]
37 # Determine if my native ordering is big or little endian....
38 b = struct.unpack("bbbb", struct.pack("i", 1))
39 if b[0]:
40 header[5] = 1
41 else:
42 header[5] = 0
43 self.lengthsize = header[1]
44 self.offsetsize = header[2]
45 self.commandsize = header[3]
46 self.typesize = header[4]
47 if header[5] == 0:
48 self.ordercode = ">"
49 else:
50 self.ordercode = "<"
51 self.ounpack = self.ordercode + _Diskfile.packsize[self.offsetsize]
52 self.lunpack = self.ordercode + _Diskfile.packsize[self.lengthsize]
53 self.lcunpack = (
54 self.ordercode
55 + _Diskfile.packsize[self.lengthsize]
56 + _Diskfile.packsize[self.commandsize]
57 )
58 self.tolunpack = (
59 self.ordercode
60 + _Diskfile.packsize[self.typesize]
61 + _Diskfile.packsize[self.offsetsize]
62 + _Diskfile.packsize[self.lengthsize]
63 )
64 self.comp1 = self.typesize + self.offsetsize + self.lengthsize
65 self.comp2 = self.lengthsize + self.commandsize + self.typesize + 1
66 if mode[0] != "r":
67 # Write initial header and ST offset command.
68 s = bytes("", "UTF-8")
69 for h in header:
70 s = s + struct.pack("B", h)
71 self.fp.write(s)
72 self.writecommand(17, Lsda.SYMBOLTABLEOFFSET)
73 self.writeoffset(17, 0)
74 self.lastoffset = 17
76 def readcommand(self):
77 """Read a LENGTH,COMMAND pair from the file at the current location"""
78 s = self.fp.read(self.lengthsize + self.commandsize)
79 return struct.unpack(self.lcunpack, s)
81 def writecommand(self, length, cmd):
82 """Write a LENGTH,COMMAND pair to the file at the current location"""
83 s = struct.pack(self.lcunpack, length, cmd)
84 self.fp.write(s)
86 def readoffset(self):
87 """Read an OFFSET from the file at the current location"""
88 s = self.fp.read(self.offsetsize)
89 return struct.unpack(self.ounpack, s)[0]
91 def writeoffset(self, offset, value):
92 """Write an OFFSET to the file at the given location"""
93 self.fp.seek(offset, 0)
94 s = struct.pack(self.ounpack, value)
95 self.fp.write(s)
96 self.ateof = 0
98 def writelength(self, length):
99 """Write a LENGTH to the file at the current location"""
100 s = struct.pack(self.lunpack, length)
101 self.fp.write(s)
103 def writecd(self, dir):
104 """Write a whole CD command to the file at the current location"""
105 length = self.lengthsize + self.commandsize + len(dir)
106 s = struct.pack(self.lcunpack, length, Lsda.CD)
107 self.fp.write(s)
108 if type(dir) is str:
109 self.fp.write(bytes(dir, "utf-8"))
110 else:
111 self.fp.write(dir)
113 def writestentry(self, r):
114 """Write a VARIABLE command (symbol table entry) to the file at
115 the current location"""
116 length = (
117 2 * self.lengthsize + self.commandsize + len(r.name) + self.typesize + self.offsetsize
118 )
119 s = struct.pack(self.lcunpack, length, Lsda.VARIABLE)
120 self.fp.write(s)
121 if type(r.name) is str:
122 self.fp.write(bytes(r.name, "utf-8"))
123 else:
124 self.fp.write(r.name)
125 s = struct.pack(self.tolunpack, r.type, r.offset, r.length)
126 self.fp.write(s)
128 def writedata(self, sym, data):
129 """Write a DATA command to the file at the current location"""
130 nlen = len(sym.name)
131 length = (
132 self.lengthsize
133 + self.commandsize
134 + self.typesize
135 + 1
136 + nlen
137 + self.sizeof[sym.type] * sym.length
138 )
139 sym.offset = self.fp.tell()
140 self.fp.write(struct.pack(self.lcunpack, length, Lsda.DATA))
141 self.fp.write(struct.pack("bb", sym.type, nlen) + bytes(sym.name, "utf-8"))
142 # fmt=self.ordercode+self.packtype[sym.type]*sym.length
143 fmt = "%c%d%c" % (self.ordercode, sym.length, self.packtype[sym.type])
144 self.fp.write(struct.pack(fmt, *data))
145 sym.file = self
148class Symbol:
149 """
150 A directory tree structure. A Symbol can be a directory (type==0)
151 or data
152 """
154 def __init__(self, name="", parent=0):
155 self.name = name # name of var or directory
156 self.type = 0 # data type
157 self.offset = 0 # offset of DATA record in file
158 self.length = 0 # number of data entries, or # of children
159 self.file = 0 # which file the data is in
160 self.children = {} # directory contents
161 self.parent = parent # directory that holds me
162 if parent:
163 parent.children[name] = self
164 parent.length = len(parent.children)
166 def path(self):
167 """Return absolute path for this Symbol"""
168 if not self.parent:
169 return "/"
170 sym = self
171 ret = "/" + sym.name
172 while sym.parent and sym.parent.name != "/":
173 sym = sym.parent
174 ret = "/" + sym.name + ret
175 return ret
177 def get(self, name):
178 """Return the Symbol with the indicated name. The name can be
179 prefixed with a relative or absolute path"""
180 # If I am just a variable, let my parent handle this
181 if self.type != 0:
182 return self.parent.get(name)
183 # If I have this variable, return it
184 if name in self.children:
185 return self.children[name]
186 # If name has a path component, then look for it there
187 if name[0] == "/": # absolute path
188 parts = name.split("/")[1:]
189 sym = self
190 while sym.parent:
191 sym = sym.parent
192 for i in range(len(parts)):
193 if parts[i] in sym.children:
194 sym = sym.children[parts[i]]
195 else:
196 return None
197 return sym
198 if name[0] == ".": # relative path
199 parts = name.split("/")[1:]
200 # Throw out any "." in the path -- those are just useless....
201 parts = filter(lambda p: p != ".", parts)
202 if len(parts) == 0:
203 return self
204 sym = self
205 for i in range(parts):
206 if parts[i] == "..":
207 if sym.parent:
208 sym = sym.parent
209 elif parts[i] in sym:
210 sym = sym.children[parts[i]]
211 else:
212 return None
213 return sym
214 # Not found
215 return None
217 def lread(self, start=0, end=2000000000):
218 """Read data from the file.
219 If this symbol is a DIRECTORY, this returns a sorted list of the
220 contents of the directory, and "start" and "end" are ignored.
221 Otherwise, read and return data[start:end] (including start but
222 not including end -- standard Python slice behavior).
223 This routine does NOT follow links."""
224 if self.type == 0: # directory -- return listing
225 return sorted(self.children.keys())
226 if end > self.length:
227 end = self.length
228 if end < 0:
229 end = self.length + end
230 if start > self.length:
231 return ()
232 if start < 0:
233 start = self.length + start
234 if start >= end:
235 return ()
236 size = _Diskfile.sizeof[self.type]
237 pos = self.offset + self.file.comp2 + len(self.name) + start * size
238 self.file.fp.seek(pos)
239 self.file.ateof = 0
240 # format = self.file.ordercode + _Diskfile.packtype[self.type]*(end-start)
241 # return struct.unpack(format,self.file.fp.read(size*(end-start)))
242 format = "%c%d%c" % (self.file.ordercode, (end - start), _Diskfile.packtype[self.type])
243 if self.type == Lsda.LINK:
244 return struct.unpack(format, self.file.fp.read(size * (end - start)))[0]
245 else:
246 return struct.unpack(format, self.file.fp.read(size * (end - start)))
248 def read(self, start=0, end=2000000000):
249 """Read data from the file. Same as lread, but follows links"""
250 return _resolve_link(self).lread(start, end)
252 def read_raw(self, start=0, end=2000000000):
253 """Read data from the file and return as bytestring"""
254 if self.type == 0: # directory -- return listing
255 return sorted(self.children.keys())
256 if end > self.length:
257 end = self.length
258 if end < 0:
259 end = self.length + end
260 if start > self.length:
261 return ()
262 if start < 0:
263 start = self.length + start
264 if start >= end:
265 return ()
266 size = _Diskfile.sizeof[self.type]
267 pos = self.offset + self.file.comp2 + len(self.name) + start * size
268 self.file.fp.seek(pos)
269 self.file.ateof = 0
270 size = size * (end - start)
271 return self.file.fp.read(size)
274def _resolve_link(var):
275 """Follow a link to find what it finally resolves to"""
276 ret = var
277 while ret.type == Lsda.LINK:
278 ret = ret.get(ret.lread())
279 return ret
282def _readentry(f, reclen, parent):
283 """
284 Read a VARIABLE record from the file, and construct the proper Symbol
285 Users should never call this.
286 """
287 s = f.fp.read(reclen)
288 n = reclen - f.comp1
289 name = s[:n]
290 # If parent already has a symbol by this name, orphan it....
291 # if parent.children.has_key(name)):
292 if name in parent.children:
293 var = parent.children[name]
294 else:
295 var = Symbol(name, parent)
296 (var.type, var.offset, var.length) = struct.unpack(f.tolunpack, s[n:])
297 var.file = f
300def _readsymboltable(lsda, f):
301 """
302 Read all the SYMBOLTABLEs in the current file
303 Users should never call this.
304 """
305 f.ateof = 0
306 while 1:
307 f.lastoffset = f.fp.tell()
308 offset = f.readoffset()
309 if offset == 0:
310 return
311 f.fp.seek(offset)
312 (clen, cmd) = f.readcommand()
313 if cmd != Lsda.BEGINSYMBOLTABLE:
314 return
315 while 1:
316 (clen, cmd) = f.readcommand()
317 clen = clen - f.commandsize - f.lengthsize
318 if cmd == Lsda.CD:
319 path = f.fp.read(clen)
320 lsda.cd(path, 1)
321 elif cmd == Lsda.VARIABLE:
322 _readentry(f, clen, lsda.cwd)
323 else: # is end of symbol table...get next part if there is one
324 break
327def _writesymboltable(lsda, f):
328 """
329 Collect all the symbols we want to write out, and sort
330 them by path. This is a bit strange: the symbols don't store
331 the path, but build it when needed. So build it, and store
332 (symbol,path) pairs, then sort by path. "path" returns the full
333 path to the symbol, and we only want the directory it is in, so
334 get the path of its parent instead.
335 """
336 if len(lsda.dirty_symbols) == 0:
337 return
339 slist = []
340 for s in lsda.dirty_symbols:
341 p = s.parent.path()
342 slist.append((s, p))
343 # slist.sort(key = lambda r1,r2: cmp(r1[1],r2[1]))
344 slist.sort(key=lambda x: x[1])
345 lsda.dirty_symbols = set()
347 # Move to end of the file and write the symbol table
348 if not f.ateof:
349 f.fp.seek(0, 2)
350 f.ateof = 1
351 start_st_at = f.fp.tell()
352 f.writecommand(0, Lsda.BEGINSYMBOLTABLE)
353 cwd = None
355 # Write all records
356 for (s, path) in slist:
357 if path != cwd:
358 cdcmd = _get_min_cd(cwd, path)
359 f.writecd(cdcmd)
360 cwd = path
361 f.writestentry(s)
363 # Finish ST: write END record, and patch up ST length
364 cmdlen = f.offsetsize + f.lengthsize + f.commandsize
365 f.writecommand(cmdlen, Lsda.ENDSYMBOLTABLE)
366 nextoffset = f.fp.tell()
367 f.writeoffset(nextoffset, 0)
368 cmdlen = nextoffset + f.offsetsize - start_st_at
369 f.fp.seek(start_st_at)
370 f.writelength(cmdlen)
372 # Purge symbol table, if we are only writing
373 if f.mode == "w":
374 cwd = lsda.cwd
375 cwd.children = {}
376 while cwd.parent:
377 cwd.parent.children = {cwd.name: cwd}
378 cwd = cwd.parent
380 # And add link from previous ST
381 f.writeoffset(f.lastoffset, start_st_at)
382 f.lastoffset = nextoffset
383 f.ateof = 0
386def _get_min_cd(cwd, cd):
387 """
388 Given two absolute paths, return the shortest "cd" string that
389 gets from the first (cwd) to the second (cd)
390 """
391 if cwd is None:
392 return cd
394 # Find common part of path
395 have = cwd.split("/")[1:]
396 want = cd.split("/")[1:]
397 nhave = len(have)
398 nwant = len(want)
399 n = min(nhave, nwant)
400 head = 0
401 head_length = 0
402 for i in range(n):
403 if have[i] != want[i]:
404 break
405 head = i + 1
406 head_length = head_length + len(have[i])
407 if head == 0:
408 return cd
410 # head = # of common components.
411 # head_length = string length of common part of path (sans "/" separators)
412 # tail1 = Number of components we would need ".." leaders for
413 tail1 = nhave - head
415 # Now see if "cd" is shorter than "../../tail_part"
416 if 2 * tail1 >= head_length:
417 return cd
419 # nope, the ".." version is shorter....
420 return tail1 * "../" + "/".join(want[head:])
423class Lsda:
424 """
425 Main class: holds all the Symbols for an LSDA file, and has methods
426 for reading data from and writing data to the file
427 """
429 CD = 2
430 DATA = 3
431 VARIABLE = 4
432 BEGINSYMBOLTABLE = 5
433 ENDSYMBOLTABLE = 6
434 SYMBOLTABLEOFFSET = 7
435 I1 = 1
436 I2 = 2
437 I4 = 3
438 I8 = 4
439 U1 = 5
440 U2 = 6
441 U4 = 7
442 U8 = 8
443 R4 = 9
444 R8 = 10
445 LINK = 11
447 def __init__(self, files, mode="r"):
448 """Creates the LSDA structure, opens the file and reads the
449 SYMBOLTABLE (if reading), or creates the initial file contents
450 (if writing). "files" is a tuple of file names to be opened
451 and treated as a single file. All the %XXX continuation files
452 will be automatically included. "mode" is the file open mode:
453 "r", "r+", "w", or "w+". If a "w" mode is selected, "files"
454 must contain only a single file name"""
456 # If they only input a single name, put it in a tuple, so I can
457 # accept input of either kind
458 if not types_ok:
459 raise LsdaError
460 if not isinstance(files, (tuple, list)):
461 files = (files,)
462 self.files = []
464 if mode[0] == "r":
465 # Open all the files in the list that is input, and anything
466 # that looks like a continuation of one of them.
467 nameset = set()
468 for name in files:
469 nameset.add(name)
470 nameset = nameset.union(set(glob.glob(name + "%[0-9][0-9]*")))
472 # Convert to a list and sort, because if I'm going to be writing,
473 # I want the last one in the list to be the last one of its family
474 namelist = list(nameset)
475 namelist.sort()
476 for file in namelist:
477 self.files.append(_Diskfile(file, mode))
478 self.root = Symbol("/")
479 for f in self.files:
480 # We are already positioned to read the SYMBOLTABLEOFFSET record
481 _, cmd = f.readcommand()
482 self.cwd = self.root
483 if cmd == Lsda.SYMBOLTABLEOFFSET:
484 _readsymboltable(self, f)
485 else:
486 if len(files) > 1:
487 return None # can't open multiple files for WRITING
488 self.files.append(_Diskfile(files[0], mode))
489 self.root = Symbol("/")
490 self.cwd = self.root
491 self.dirty_symbols = set()
492 self.lastpath = None
493 self.mode = mode
495 # writing will always be to the last one of the files
496 if mode == "r":
497 self.fw = None
498 self.make_dirs = 0
499 else:
500 self.fw = self.files[-1]
501 self.make_dirs = 1
503 def __del__(self): # close files
504 self.flush()
505 for f in self.files:
506 if not f.fp.closed:
507 f.fp.close()
509 def cd(self, path, create=2): # change CWD
510 """Change the current working directory in the file. The optional
511 argument "create" is for internal use only"""
512 # DEBUG FIX qd-codie:
513 # Someone forgot to decode bytes into str here. Due to this the
514 # following comparisons always went wrong (of course they were)
515 # not similar ...
516 if isinstance(path, bytes):
517 path = path.decode("utf-8")
518 if path == "/":
519 self.cwd = self.root
520 return self.root
521 if path[-1] == "/": # remove trailing /
522 path = path[:-1]
523 if path[0] == "/": # absolute path
524 path = path[1:]
525 self.cwd = self.root
526 # path = string.split(path,"/")
527 # print(type(path))
528 if type(path) is bytes:
529 path = str(path, "utf-8").split("/")
530 else:
531 path = path.split("/")
533 for part in path:
534 if part == "..":
535 if self.cwd.parent:
536 self.cwd = self.cwd.parent
537 else:
538 # if self.cwd.children.has_key(part)):
539 if part in self.cwd.children:
540 self.cwd = self.cwd.children[part]
541 if self.cwd.type != 0: # component is a variable, not a directory!
542 self.cwd = self.cwd.parent
543 break
544 elif create == 1 or (create == 2 and self.make_dirs == 1):
545 self.cwd = Symbol(part, self.cwd) # Create directory on the fly
546 else: # component in path is missing
547 break
548 return self.cwd
550 def write(self, name, type, data):
551 """Write a new DATA record to the file. Creates and returns
552 the Symbol for the data written"""
553 if self.fw is None:
554 return None
556 # want a tuple, but if they hand us a single value that should work too...
557 try:
558 _ = data[0]
559 except TypeError:
560 data = (data,)
561 pwd = self.cwd.path()
562 if not self.fw.ateof:
563 self.fw.fp.seek(0, 2)
564 self.fw.ateof = 1
565 if pwd != self.lastpath:
566 cdcmd = _get_min_cd(self.lastpath, pwd)
567 self.fw.writecd(cdcmd)
568 self.lastpath = pwd
570 # Overwrite existing symbol if there is one
571 if name in self.cwd.children:
572 sym = self.cwd.children[name]
573 else:
574 sym = Symbol(name, self.cwd)
575 sym.type = type
576 sym.length = len(data)
577 self.fw.writedata(sym, data)
578 self.dirty_symbols.add(sym)
579 return sym
581 def close(self):
582 """Close the file"""
583 self.flush()
584 for f in self.files:
585 if not f.fp.closed:
586 f.fp.close()
587 self.files = []
589 def get(self, path):
590 """Return the Symbol with the indicated name. The name can be
591 prefixed with a relative or absolute path"""
592 return self.cwd.get(path)
594 def flush(self): # write ST and flush file
595 """Write a SYMBOLTABLE as needed for any new DATA, and flush the file"""
596 if self.fw is None or self.fw.fp.closed:
597 return
598 _writesymboltable(self, self.fw)
599 self.fw.fp.flush()
601 def filesize(self):
602 """Returns the current size, on disk, of the file we are currently
603 writing to. Returns 0 for files that are opened readonly"""
604 if self.fw is None:
605 return 0
606 if not self.fw.ateof:
607 self.fw.fp.seek(0, 2)
608 self.fw.ateof = 1
609 return self.fw.fp.tell()
611 def nextfile(self): # Open next file in sequence
612 """Flush the current output file and open the next file in the
613 sequence"""
614 if self.fw is None:
615 return None
616 if not self.fw.fp.closed:
617 _writesymboltable(self, self.fw)
618 self.fw.fp.flush()
619 parts = self.fw.name.split("%")
620 if len(parts) == 1:
621 ret = 1
622 newname = parts[0] + "%001"
623 else:
624 ret = int(parts[1]) + 1
625 newname = "%s%%%3.3d" % (parts[0], ret)
626 if self.mode == "w":
627 self.fw = _Diskfile(newname, "w")
628 else:
629 self.fw = _Diskfile(newname, "w+")
630 self.files.append(self.fw)
631 self.lastpath = None
632 return ret
635types = [("b", 1), ("h", 2), ("i", 4), ("q", 8), ("f", 4), ("d", 8)]
636x = 17
637types_ok = 1
638for (a, b) in types:
639 s = struct.pack(a, x)
640 if len(s) != b:
641 print("LSDA: initialization error")
642 print("Data type %s has length %d instead of %d" % (a, len(s), b))
643 types_ok = 0