PyXR

c:\python24\lib \ xdrlib.py



0001 """Implements (a subset of) Sun XDR -- eXternal Data Representation.
0002 
0003 See: RFC 1014
0004 
0005 """
0006 
0007 import struct
0008 try:
0009     from cStringIO import StringIO as _StringIO
0010 except ImportError:
0011     from StringIO import StringIO as _StringIO
0012 
0013 __all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
0014 
0015 # exceptions
0016 class Error(Exception):
0017     """Exception class for this module. Use:
0018 
0019     except xdrlib.Error, var:
0020         # var has the Error instance for the exception
0021 
0022     Public ivars:
0023         msg -- contains the message
0024 
0025     """
0026     def __init__(self, msg):
0027         self.msg = msg
0028     def __repr__(self):
0029         return repr(self.msg)
0030     def __str__(self):
0031         return str(self.msg)
0032 
0033 
0034 class ConversionError(Error):
0035     pass
0036 
0037 
0038 
0039 class Packer:
0040     """Pack various data representations into a buffer."""
0041 
0042     def __init__(self):
0043         self.reset()
0044 
0045     def reset(self):
0046         self.__buf = _StringIO()
0047 
0048     def get_buffer(self):
0049         return self.__buf.getvalue()
0050     # backwards compatibility
0051     get_buf = get_buffer
0052 
0053     def pack_uint(self, x):
0054         self.__buf.write(struct.pack('>L', x))
0055 
0056     pack_int = pack_uint
0057     pack_enum = pack_int
0058 
0059     def pack_bool(self, x):
0060         if x: self.__buf.write('\0\0\0\1')
0061         else: self.__buf.write('\0\0\0\0')
0062 
0063     def pack_uhyper(self, x):
0064         self.pack_uint(x>>32 & 0xffffffffL)
0065         self.pack_uint(x & 0xffffffffL)
0066 
0067     pack_hyper = pack_uhyper
0068 
0069     def pack_float(self, x):
0070         try: self.__buf.write(struct.pack('>f', x))
0071         except struct.error, msg:
0072             raise ConversionError, msg
0073 
0074     def pack_double(self, x):
0075         try: self.__buf.write(struct.pack('>d', x))
0076         except struct.error, msg:
0077             raise ConversionError, msg
0078 
0079     def pack_fstring(self, n, s):
0080         if n < 0:
0081             raise ValueError, 'fstring size must be nonnegative'
0082         n = ((n+3)/4)*4
0083         data = s[:n]
0084         data = data + (n - len(data)) * '\0'
0085         self.__buf.write(data)
0086 
0087     pack_fopaque = pack_fstring
0088 
0089     def pack_string(self, s):
0090         n = len(s)
0091         self.pack_uint(n)
0092         self.pack_fstring(n, s)
0093 
0094     pack_opaque = pack_string
0095     pack_bytes = pack_string
0096 
0097     def pack_list(self, list, pack_item):
0098         for item in list:
0099             self.pack_uint(1)
0100             pack_item(item)
0101         self.pack_uint(0)
0102 
0103     def pack_farray(self, n, list, pack_item):
0104         if len(list) != n:
0105             raise ValueError, 'wrong array size'
0106         for item in list:
0107             pack_item(item)
0108 
0109     def pack_array(self, list, pack_item):
0110         n = len(list)
0111         self.pack_uint(n)
0112         self.pack_farray(n, list, pack_item)
0113 
0114 
0115 
0116 class Unpacker:
0117     """Unpacks various data representations from the given buffer."""
0118 
0119     def __init__(self, data):
0120         self.reset(data)
0121 
0122     def reset(self, data):
0123         self.__buf = data
0124         self.__pos = 0
0125 
0126     def get_position(self):
0127         return self.__pos
0128 
0129     def set_position(self, position):
0130         self.__pos = position
0131 
0132     def get_buffer(self):
0133         return self.__buf
0134 
0135     def done(self):
0136         if self.__pos < len(self.__buf):
0137             raise Error('unextracted data remains')
0138 
0139     def unpack_uint(self):
0140         i = self.__pos
0141         self.__pos = j = i+4
0142         data = self.__buf[i:j]
0143         if len(data) < 4:
0144             raise EOFError
0145         x = struct.unpack('>L', data)[0]
0146         try:
0147             return int(x)
0148         except OverflowError:
0149             return x
0150 
0151     def unpack_int(self):
0152         i = self.__pos
0153         self.__pos = j = i+4
0154         data = self.__buf[i:j]
0155         if len(data) < 4:
0156             raise EOFError
0157         return struct.unpack('>l', data)[0]
0158 
0159     unpack_enum = unpack_int
0160     unpack_bool = unpack_int
0161 
0162     def unpack_uhyper(self):
0163         hi = self.unpack_uint()
0164         lo = self.unpack_uint()
0165         return long(hi)<<32 | lo
0166 
0167     def unpack_hyper(self):
0168         x = self.unpack_uhyper()
0169         if x >= 0x8000000000000000L:
0170             x = x - 0x10000000000000000L
0171         return x
0172 
0173     def unpack_float(self):
0174         i = self.__pos
0175         self.__pos = j = i+4
0176         data = self.__buf[i:j]
0177         if len(data) < 4:
0178             raise EOFError
0179         return struct.unpack('>f', data)[0]
0180 
0181     def unpack_double(self):
0182         i = self.__pos
0183         self.__pos = j = i+8
0184         data = self.__buf[i:j]
0185         if len(data) < 8:
0186             raise EOFError
0187         return struct.unpack('>d', data)[0]
0188 
0189     def unpack_fstring(self, n):
0190         if n < 0:
0191             raise ValueError, 'fstring size must be nonnegative'
0192         i = self.__pos
0193         j = i + (n+3)/4*4
0194         if j > len(self.__buf):
0195             raise EOFError
0196         self.__pos = j
0197         return self.__buf[i:i+n]
0198 
0199     unpack_fopaque = unpack_fstring
0200 
0201     def unpack_string(self):
0202         n = self.unpack_uint()
0203         return self.unpack_fstring(n)
0204 
0205     unpack_opaque = unpack_string
0206     unpack_bytes = unpack_string
0207 
0208     def unpack_list(self, unpack_item):
0209         list = []
0210         while 1:
0211             x = self.unpack_uint()
0212             if x == 0: break
0213             if x != 1:
0214                 raise ConversionError, '0 or 1 expected, got %r' % (x,)
0215             item = unpack_item()
0216             list.append(item)
0217         return list
0218 
0219     def unpack_farray(self, n, unpack_item):
0220         list = []
0221         for i in range(n):
0222             list.append(unpack_item())
0223         return list
0224 
0225     def unpack_array(self, unpack_item):
0226         n = self.unpack_uint()
0227         return self.unpack_farray(n, unpack_item)
0228 
0229 
0230 # test suite
0231 def _test():
0232     p = Packer()
0233     packtest = [
0234         (p.pack_uint,    (9,)),
0235         (p.pack_bool,    (None,)),
0236         (p.pack_bool,    ('hello',)),
0237         (p.pack_uhyper,  (45L,)),
0238         (p.pack_float,   (1.9,)),
0239         (p.pack_double,  (1.9,)),
0240         (p.pack_string,  ('hello world',)),
0241         (p.pack_list,    (range(5), p.pack_uint)),
0242         (p.pack_array,   (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
0243         ]
0244     succeedlist = [1] * len(packtest)
0245     count = 0
0246     for method, args in packtest:
0247         print 'pack test', count,
0248         try:
0249             method(*args)
0250             print 'succeeded'
0251         except ConversionError, var:
0252             print 'ConversionError:', var.msg
0253             succeedlist[count] = 0
0254         count = count + 1
0255     data = p.get_buffer()
0256     # now verify
0257     up = Unpacker(data)
0258     unpacktest = [
0259         (up.unpack_uint,   (), lambda x: x == 9),
0260         (up.unpack_bool,   (), lambda x: not x),
0261         (up.unpack_bool,   (), lambda x: x),
0262         (up.unpack_uhyper, (), lambda x: x == 45L),
0263         (up.unpack_float,  (), lambda x: 1.89 < x < 1.91),
0264         (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
0265         (up.unpack_string, (), lambda x: x == 'hello world'),
0266         (up.unpack_list,   (up.unpack_uint,), lambda x: x == range(5)),
0267         (up.unpack_array,  (up.unpack_string,),
0268          lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
0269         ]
0270     count = 0
0271     for method, args, pred in unpacktest:
0272         print 'unpack test', count,
0273         try:
0274             if succeedlist[count]:
0275                 x = method(*args)
0276                 print pred(x) and 'succeeded' or 'failed', ':', x
0277             else:
0278                 print 'skipping'
0279         except ConversionError, var:
0280             print 'ConversionError:', var.msg
0281         count = count + 1
0282 
0283 
0284 if __name__ == '__main__':
0285     _test()
0286 

Generated by PyXR 0.9.4
SourceForge.net Logo