0001 """Implements (a subset of) Sun XDR -- eXternal Data Representation. 0002 0003 See: RFC 1014 0004 0005 """ 0006 0007 import struct 0008 try: 0009 from cStringIO import StringIO as _StringIO 0010 except ImportError: 0011 from StringIO import StringIO as _StringIO 0012 0013 __all__ = ["Error", "Packer", "Unpacker", "ConversionError"] 0014 0015 # exceptions 0016 class Error(Exception): 0017 """Exception class for this module. Use: 0018 0019 except xdrlib.Error, var: 0020 # var has the Error instance for the exception 0021 0022 Public ivars: 0023 msg -- contains the message 0024 0025 """ 0026 def __init__(self, msg): 0027 self.msg = msg 0028 def __repr__(self): 0029 return repr(self.msg) 0030 def __str__(self): 0031 return str(self.msg) 0032 0033 0034 class ConversionError(Error): 0035 pass 0036 0037 0038 0039 class Packer: 0040 """Pack various data representations into a buffer.""" 0041 0042 def __init__(self): 0043 self.reset() 0044 0045 def reset(self): 0046 self.__buf = _StringIO() 0047 0048 def get_buffer(self): 0049 return self.__buf.getvalue() 0050 # backwards compatibility 0051 get_buf = get_buffer 0052 0053 def pack_uint(self, x): 0054 self.__buf.write(struct.pack('>L', x)) 0055 0056 pack_int = pack_uint 0057 pack_enum = pack_int 0058 0059 def pack_bool(self, x): 0060 if x: self.__buf.write('\0\0\0\1') 0061 else: self.__buf.write('\0\0\0\0') 0062 0063 def pack_uhyper(self, x): 0064 self.pack_uint(x>>32 & 0xffffffffL) 0065 self.pack_uint(x & 0xffffffffL) 0066 0067 pack_hyper = pack_uhyper 0068 0069 def pack_float(self, x): 0070 try: self.__buf.write(struct.pack('>f', x)) 0071 except struct.error, msg: 0072 raise ConversionError, msg 0073 0074 def pack_double(self, x): 0075 try: self.__buf.write(struct.pack('>d', x)) 0076 except struct.error, msg: 0077 raise ConversionError, msg 0078 0079 def pack_fstring(self, n, s): 0080 if n < 0: 0081 raise ValueError, 'fstring size must be nonnegative' 0082 n = ((n+3)/4)*4 0083 data = s[:n] 0084 data = data + (n - len(data)) * '\0' 0085 self.__buf.write(data) 0086 0087 pack_fopaque = pack_fstring 0088 0089 def pack_string(self, s): 0090 n = len(s) 0091 self.pack_uint(n) 0092 self.pack_fstring(n, s) 0093 0094 pack_opaque = pack_string 0095 pack_bytes = pack_string 0096 0097 def pack_list(self, list, pack_item): 0098 for item in list: 0099 self.pack_uint(1) 0100 pack_item(item) 0101 self.pack_uint(0) 0102 0103 def pack_farray(self, n, list, pack_item): 0104 if len(list) != n: 0105 raise ValueError, 'wrong array size' 0106 for item in list: 0107 pack_item(item) 0108 0109 def pack_array(self, list, pack_item): 0110 n = len(list) 0111 self.pack_uint(n) 0112 self.pack_farray(n, list, pack_item) 0113 0114 0115 0116 class Unpacker: 0117 """Unpacks various data representations from the given buffer.""" 0118 0119 def __init__(self, data): 0120 self.reset(data) 0121 0122 def reset(self, data): 0123 self.__buf = data 0124 self.__pos = 0 0125 0126 def get_position(self): 0127 return self.__pos 0128 0129 def set_position(self, position): 0130 self.__pos = position 0131 0132 def get_buffer(self): 0133 return self.__buf 0134 0135 def done(self): 0136 if self.__pos < len(self.__buf): 0137 raise Error('unextracted data remains') 0138 0139 def unpack_uint(self): 0140 i = self.__pos 0141 self.__pos = j = i+4 0142 data = self.__buf[i:j] 0143 if len(data) < 4: 0144 raise EOFError 0145 x = struct.unpack('>L', data)[0] 0146 try: 0147 return int(x) 0148 except OverflowError: 0149 return x 0150 0151 def unpack_int(self): 0152 i = self.__pos 0153 self.__pos = j = i+4 0154 data = self.__buf[i:j] 0155 if len(data) < 4: 0156 raise EOFError 0157 return struct.unpack('>l', data)[0] 0158 0159 unpack_enum = unpack_int 0160 unpack_bool = unpack_int 0161 0162 def unpack_uhyper(self): 0163 hi = self.unpack_uint() 0164 lo = self.unpack_uint() 0165 return long(hi)<<32 | lo 0166 0167 def unpack_hyper(self): 0168 x = self.unpack_uhyper() 0169 if x >= 0x8000000000000000L: 0170 x = x - 0x10000000000000000L 0171 return x 0172 0173 def unpack_float(self): 0174 i = self.__pos 0175 self.__pos = j = i+4 0176 data = self.__buf[i:j] 0177 if len(data) < 4: 0178 raise EOFError 0179 return struct.unpack('>f', data)[0] 0180 0181 def unpack_double(self): 0182 i = self.__pos 0183 self.__pos = j = i+8 0184 data = self.__buf[i:j] 0185 if len(data) < 8: 0186 raise EOFError 0187 return struct.unpack('>d', data)[0] 0188 0189 def unpack_fstring(self, n): 0190 if n < 0: 0191 raise ValueError, 'fstring size must be nonnegative' 0192 i = self.__pos 0193 j = i + (n+3)/4*4 0194 if j > len(self.__buf): 0195 raise EOFError 0196 self.__pos = j 0197 return self.__buf[i:i+n] 0198 0199 unpack_fopaque = unpack_fstring 0200 0201 def unpack_string(self): 0202 n = self.unpack_uint() 0203 return self.unpack_fstring(n) 0204 0205 unpack_opaque = unpack_string 0206 unpack_bytes = unpack_string 0207 0208 def unpack_list(self, unpack_item): 0209 list = [] 0210 while 1: 0211 x = self.unpack_uint() 0212 if x == 0: break 0213 if x != 1: 0214 raise ConversionError, '0 or 1 expected, got %r' % (x,) 0215 item = unpack_item() 0216 list.append(item) 0217 return list 0218 0219 def unpack_farray(self, n, unpack_item): 0220 list = [] 0221 for i in range(n): 0222 list.append(unpack_item()) 0223 return list 0224 0225 def unpack_array(self, unpack_item): 0226 n = self.unpack_uint() 0227 return self.unpack_farray(n, unpack_item) 0228 0229 0230 # test suite 0231 def _test(): 0232 p = Packer() 0233 packtest = [ 0234 (p.pack_uint, (9,)), 0235 (p.pack_bool, (None,)), 0236 (p.pack_bool, ('hello',)), 0237 (p.pack_uhyper, (45L,)), 0238 (p.pack_float, (1.9,)), 0239 (p.pack_double, (1.9,)), 0240 (p.pack_string, ('hello world',)), 0241 (p.pack_list, (range(5), p.pack_uint)), 0242 (p.pack_array, (['what', 'is', 'hapnin', 'doctor'], p.pack_string)), 0243 ] 0244 succeedlist = [1] * len(packtest) 0245 count = 0 0246 for method, args in packtest: 0247 print 'pack test', count, 0248 try: 0249 method(*args) 0250 print 'succeeded' 0251 except ConversionError, var: 0252 print 'ConversionError:', var.msg 0253 succeedlist[count] = 0 0254 count = count + 1 0255 data = p.get_buffer() 0256 # now verify 0257 up = Unpacker(data) 0258 unpacktest = [ 0259 (up.unpack_uint, (), lambda x: x == 9), 0260 (up.unpack_bool, (), lambda x: not x), 0261 (up.unpack_bool, (), lambda x: x), 0262 (up.unpack_uhyper, (), lambda x: x == 45L), 0263 (up.unpack_float, (), lambda x: 1.89 < x < 1.91), 0264 (up.unpack_double, (), lambda x: 1.89 < x < 1.91), 0265 (up.unpack_string, (), lambda x: x == 'hello world'), 0266 (up.unpack_list, (up.unpack_uint,), lambda x: x == range(5)), 0267 (up.unpack_array, (up.unpack_string,), 0268 lambda x: x == ['what', 'is', 'hapnin', 'doctor']), 0269 ] 0270 count = 0 0271 for method, args, pred in unpacktest: 0272 print 'unpack test', count, 0273 try: 0274 if succeedlist[count]: 0275 x = method(*args) 0276 print pred(x) and 'succeeded' or 'failed', ':', x 0277 else: 0278 print 'skipping' 0279 except ConversionError, var: 0280 print 'ConversionError:', var.msg 0281 count = count + 1 0282 0283 0284 if __name__ == '__main__': 0285 _test() 0286
Generated by PyXR 0.9.4