PyXR

c:\python24\lib \ mimetools.py



0001 """Various tools used by MIME-reading or MIME-writing programs."""
0002 
0003 
0004 import os
0005 import rfc822
0006 import tempfile
0007 
0008 __all__ = ["Message","choose_boundary","encode","decode","copyliteral",
0009            "copybinary"]
0010 
0011 class Message(rfc822.Message):
0012     """A derived class of rfc822.Message that knows about MIME headers and
0013     contains some hooks for decoding encoded and multipart messages."""
0014 
0015     def __init__(self, fp, seekable = 1):
0016         rfc822.Message.__init__(self, fp, seekable)
0017         self.encodingheader = \
0018                 self.getheader('content-transfer-encoding')
0019         self.typeheader = \
0020                 self.getheader('content-type')
0021         self.parsetype()
0022         self.parseplist()
0023 
0024     def parsetype(self):
0025         str = self.typeheader
0026         if str is None:
0027             str = 'text/plain'
0028         if ';' in str:
0029             i = str.index(';')
0030             self.plisttext = str[i:]
0031             str = str[:i]
0032         else:
0033             self.plisttext = ''
0034         fields = str.split('/')
0035         for i in range(len(fields)):
0036             fields[i] = fields[i].strip().lower()
0037         self.type = '/'.join(fields)
0038         self.maintype = fields[0]
0039         self.subtype = '/'.join(fields[1:])
0040 
0041     def parseplist(self):
0042         str = self.plisttext
0043         self.plist = []
0044         while str[:1] == ';':
0045             str = str[1:]
0046             if ';' in str:
0047                 # XXX Should parse quotes!
0048                 end = str.index(';')
0049             else:
0050                 end = len(str)
0051             f = str[:end]
0052             if '=' in f:
0053                 i = f.index('=')
0054                 f = f[:i].strip().lower() + \
0055                         '=' + f[i+1:].strip()
0056             self.plist.append(f.strip())
0057             str = str[end:]
0058 
0059     def getplist(self):
0060         return self.plist
0061 
0062     def getparam(self, name):
0063         name = name.lower() + '='
0064         n = len(name)
0065         for p in self.plist:
0066             if p[:n] == name:
0067                 return rfc822.unquote(p[n:])
0068         return None
0069 
0070     def getparamnames(self):
0071         result = []
0072         for p in self.plist:
0073             i = p.find('=')
0074             if i >= 0:
0075                 result.append(p[:i].lower())
0076         return result
0077 
0078     def getencoding(self):
0079         if self.encodingheader is None:
0080             return '7bit'
0081         return self.encodingheader.lower()
0082 
0083     def gettype(self):
0084         return self.type
0085 
0086     def getmaintype(self):
0087         return self.maintype
0088 
0089     def getsubtype(self):
0090         return self.subtype
0091 
0092 
0093 
0094 
0095 # Utility functions
0096 # -----------------
0097 
0098 try:
0099     import thread
0100 except ImportError:
0101     import dummy_thread as thread
0102 _counter_lock = thread.allocate_lock()
0103 del thread
0104 
0105 _counter = 0
0106 def _get_next_counter():
0107     global _counter
0108     _counter_lock.acquire()
0109     _counter += 1
0110     result = _counter
0111     _counter_lock.release()
0112     return result
0113 
0114 _prefix = None
0115 
0116 def choose_boundary():
0117     """Return a string usable as a multipart boundary.
0118 
0119     The string chosen is unique within a single program run, and
0120     incorporates the user id (if available), process id (if available),
0121     and current time.  So it's very unlikely the returned string appears
0122     in message text, but there's no guarantee.
0123 
0124     The boundary contains dots so you have to quote it in the header."""
0125 
0126     global _prefix
0127     import time
0128     if _prefix is None:
0129         import socket
0130         hostid = socket.gethostbyname(socket.gethostname())
0131         try:
0132             uid = repr(os.getuid())
0133         except AttributeError:
0134             uid = '1'
0135         try:
0136             pid = repr(os.getpid())
0137         except AttributeError:
0138             pid = '1'
0139         _prefix = hostid + '.' + uid + '.' + pid
0140     return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
0141 
0142 
0143 # Subroutines for decoding some common content-transfer-types
0144 
0145 def decode(input, output, encoding):
0146     """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
0147     if encoding == 'base64':
0148         import base64
0149         return base64.decode(input, output)
0150     if encoding == 'quoted-printable':
0151         import quopri
0152         return quopri.decode(input, output)
0153     if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
0154         import uu
0155         return uu.decode(input, output)
0156     if encoding in ('7bit', '8bit'):
0157         return output.write(input.read())
0158     if encoding in decodetab:
0159         pipethrough(input, decodetab[encoding], output)
0160     else:
0161         raise ValueError, \
0162               'unknown Content-Transfer-Encoding: %s' % encoding
0163 
0164 def encode(input, output, encoding):
0165     """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
0166     if encoding == 'base64':
0167         import base64
0168         return base64.encode(input, output)
0169     if encoding == 'quoted-printable':
0170         import quopri
0171         return quopri.encode(input, output, 0)
0172     if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
0173         import uu
0174         return uu.encode(input, output)
0175     if encoding in ('7bit', '8bit'):
0176         return output.write(input.read())
0177     if encoding in encodetab:
0178         pipethrough(input, encodetab[encoding], output)
0179     else:
0180         raise ValueError, \
0181               'unknown Content-Transfer-Encoding: %s' % encoding
0182 
0183 # The following is no longer used for standard encodings
0184 
0185 # XXX This requires that uudecode and mmencode are in $PATH
0186 
0187 uudecode_pipe = '''(
0188 TEMP=/tmp/@uu.$$
0189 sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
0190 cat $TEMP
0191 rm $TEMP
0192 )'''
0193 
0194 decodetab = {
0195         'uuencode':             uudecode_pipe,
0196         'x-uuencode':           uudecode_pipe,
0197         'uue':                  uudecode_pipe,
0198         'x-uue':                uudecode_pipe,
0199         'quoted-printable':     'mmencode -u -q',
0200         'base64':               'mmencode -u -b',
0201 }
0202 
0203 encodetab = {
0204         'x-uuencode':           'uuencode tempfile',
0205         'uuencode':             'uuencode tempfile',
0206         'x-uue':                'uuencode tempfile',
0207         'uue':                  'uuencode tempfile',
0208         'quoted-printable':     'mmencode -q',
0209         'base64':               'mmencode -b',
0210 }
0211 
0212 def pipeto(input, command):
0213     pipe = os.popen(command, 'w')
0214     copyliteral(input, pipe)
0215     pipe.close()
0216 
0217 def pipethrough(input, command, output):
0218     (fd, tempname) = tempfile.mkstemp()
0219     temp = os.fdopen(fd, 'w')
0220     copyliteral(input, temp)
0221     temp.close()
0222     pipe = os.popen(command + ' <' + tempname, 'r')
0223     copybinary(pipe, output)
0224     pipe.close()
0225     os.unlink(tempname)
0226 
0227 def copyliteral(input, output):
0228     while 1:
0229         line = input.readline()
0230         if not line: break
0231         output.write(line)
0232 
0233 def copybinary(input, output):
0234     BUFSIZE = 8192
0235     while 1:
0236         line = input.read(BUFSIZE)
0237         if not line: break
0238         output.write(line)
0239 

Generated by PyXR 0.9.4
SourceForge.net Logo