0001 """Various tools used by MIME-reading or MIME-writing programs.""" 0002 0003 0004 import os 0005 import rfc822 0006 import tempfile 0007 0008 __all__ = ["Message","choose_boundary","encode","decode","copyliteral", 0009 "copybinary"] 0010 0011 class Message(rfc822.Message): 0012 """A derived class of rfc822.Message that knows about MIME headers and 0013 contains some hooks for decoding encoded and multipart messages.""" 0014 0015 def __init__(self, fp, seekable = 1): 0016 rfc822.Message.__init__(self, fp, seekable) 0017 self.encodingheader = \ 0018 self.getheader('content-transfer-encoding') 0019 self.typeheader = \ 0020 self.getheader('content-type') 0021 self.parsetype() 0022 self.parseplist() 0023 0024 def parsetype(self): 0025 str = self.typeheader 0026 if str is None: 0027 str = 'text/plain' 0028 if ';' in str: 0029 i = str.index(';') 0030 self.plisttext = str[i:] 0031 str = str[:i] 0032 else: 0033 self.plisttext = '' 0034 fields = str.split('/') 0035 for i in range(len(fields)): 0036 fields[i] = fields[i].strip().lower() 0037 self.type = '/'.join(fields) 0038 self.maintype = fields[0] 0039 self.subtype = '/'.join(fields[1:]) 0040 0041 def parseplist(self): 0042 str = self.plisttext 0043 self.plist = [] 0044 while str[:1] == ';': 0045 str = str[1:] 0046 if ';' in str: 0047 # XXX Should parse quotes! 0048 end = str.index(';') 0049 else: 0050 end = len(str) 0051 f = str[:end] 0052 if '=' in f: 0053 i = f.index('=') 0054 f = f[:i].strip().lower() + \ 0055 '=' + f[i+1:].strip() 0056 self.plist.append(f.strip()) 0057 str = str[end:] 0058 0059 def getplist(self): 0060 return self.plist 0061 0062 def getparam(self, name): 0063 name = name.lower() + '=' 0064 n = len(name) 0065 for p in self.plist: 0066 if p[:n] == name: 0067 return rfc822.unquote(p[n:]) 0068 return None 0069 0070 def getparamnames(self): 0071 result = [] 0072 for p in self.plist: 0073 i = p.find('=') 0074 if i >= 0: 0075 result.append(p[:i].lower()) 0076 return result 0077 0078 def getencoding(self): 0079 if self.encodingheader is None: 0080 return '7bit' 0081 return self.encodingheader.lower() 0082 0083 def gettype(self): 0084 return self.type 0085 0086 def getmaintype(self): 0087 return self.maintype 0088 0089 def getsubtype(self): 0090 return self.subtype 0091 0092 0093 0094 0095 # Utility functions 0096 # ----------------- 0097 0098 try: 0099 import thread 0100 except ImportError: 0101 import dummy_thread as thread 0102 _counter_lock = thread.allocate_lock() 0103 del thread 0104 0105 _counter = 0 0106 def _get_next_counter(): 0107 global _counter 0108 _counter_lock.acquire() 0109 _counter += 1 0110 result = _counter 0111 _counter_lock.release() 0112 return result 0113 0114 _prefix = None 0115 0116 def choose_boundary(): 0117 """Return a string usable as a multipart boundary. 0118 0119 The string chosen is unique within a single program run, and 0120 incorporates the user id (if available), process id (if available), 0121 and current time. So it's very unlikely the returned string appears 0122 in message text, but there's no guarantee. 0123 0124 The boundary contains dots so you have to quote it in the header.""" 0125 0126 global _prefix 0127 import time 0128 if _prefix is None: 0129 import socket 0130 hostid = socket.gethostbyname(socket.gethostname()) 0131 try: 0132 uid = repr(os.getuid()) 0133 except AttributeError: 0134 uid = '1' 0135 try: 0136 pid = repr(os.getpid()) 0137 except AttributeError: 0138 pid = '1' 0139 _prefix = hostid + '.' + uid + '.' + pid 0140 return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter()) 0141 0142 0143 # Subroutines for decoding some common content-transfer-types 0144 0145 def decode(input, output, encoding): 0146 """Decode common content-transfer-encodings (base64, quopri, uuencode).""" 0147 if encoding == 'base64': 0148 import base64 0149 return base64.decode(input, output) 0150 if encoding == 'quoted-printable': 0151 import quopri 0152 return quopri.decode(input, output) 0153 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'): 0154 import uu 0155 return uu.decode(input, output) 0156 if encoding in ('7bit', '8bit'): 0157 return output.write(input.read()) 0158 if encoding in decodetab: 0159 pipethrough(input, decodetab[encoding], output) 0160 else: 0161 raise ValueError, \ 0162 'unknown Content-Transfer-Encoding: %s' % encoding 0163 0164 def encode(input, output, encoding): 0165 """Encode common content-transfer-encodings (base64, quopri, uuencode).""" 0166 if encoding == 'base64': 0167 import base64 0168 return base64.encode(input, output) 0169 if encoding == 'quoted-printable': 0170 import quopri 0171 return quopri.encode(input, output, 0) 0172 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'): 0173 import uu 0174 return uu.encode(input, output) 0175 if encoding in ('7bit', '8bit'): 0176 return output.write(input.read()) 0177 if encoding in encodetab: 0178 pipethrough(input, encodetab[encoding], output) 0179 else: 0180 raise ValueError, \ 0181 'unknown Content-Transfer-Encoding: %s' % encoding 0182 0183 # The following is no longer used for standard encodings 0184 0185 # XXX This requires that uudecode and mmencode are in $PATH 0186 0187 uudecode_pipe = '''( 0188 TEMP=/tmp/@uu.$$ 0189 sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode 0190 cat $TEMP 0191 rm $TEMP 0192 )''' 0193 0194 decodetab = { 0195 'uuencode': uudecode_pipe, 0196 'x-uuencode': uudecode_pipe, 0197 'uue': uudecode_pipe, 0198 'x-uue': uudecode_pipe, 0199 'quoted-printable': 'mmencode -u -q', 0200 'base64': 'mmencode -u -b', 0201 } 0202 0203 encodetab = { 0204 'x-uuencode': 'uuencode tempfile', 0205 'uuencode': 'uuencode tempfile', 0206 'x-uue': 'uuencode tempfile', 0207 'uue': 'uuencode tempfile', 0208 'quoted-printable': 'mmencode -q', 0209 'base64': 'mmencode -b', 0210 } 0211 0212 def pipeto(input, command): 0213 pipe = os.popen(command, 'w') 0214 copyliteral(input, pipe) 0215 pipe.close() 0216 0217 def pipethrough(input, command, output): 0218 (fd, tempname) = tempfile.mkstemp() 0219 temp = os.fdopen(fd, 'w') 0220 copyliteral(input, temp) 0221 temp.close() 0222 pipe = os.popen(command + ' <' + tempname, 'r') 0223 copybinary(pipe, output) 0224 pipe.close() 0225 os.unlink(tempname) 0226 0227 def copyliteral(input, output): 0228 while 1: 0229 line = input.readline() 0230 if not line: break 0231 output.write(line) 0232 0233 def copybinary(input, output): 0234 BUFSIZE = 8192 0235 while 1: 0236 line = input.read(BUFSIZE) 0237 if not line: break 0238 output.write(line) 0239
Generated by PyXR 0.9.4