PyXR

c:\python24\lib \ mailbox.py



0001 #! /usr/bin/env python
0002 
0003 """Classes to handle Unix style, MMDF style, and MH style mailboxes."""
0004 
0005 
0006 import rfc822
0007 import os
0008 
0009 __all__ = ["UnixMailbox","MmdfMailbox","MHMailbox","Maildir","BabylMailbox",
0010            "PortableUnixMailbox"]
0011 
0012 class _Mailbox:
0013 
0014     def __init__(self, fp, factory=rfc822.Message):
0015         self.fp = fp
0016         self.seekp = 0
0017         self.factory = factory
0018 
0019     def __iter__(self):
0020         return iter(self.next, None)
0021 
0022     def next(self):
0023         while 1:
0024             self.fp.seek(self.seekp)
0025             try:
0026                 self._search_start()
0027             except EOFError:
0028                 self.seekp = self.fp.tell()
0029                 return None
0030             start = self.fp.tell()
0031             self._search_end()
0032             self.seekp = stop = self.fp.tell()
0033             if start != stop:
0034                 break
0035         return self.factory(_Subfile(self.fp, start, stop))
0036 
0037 
0038 class _Subfile:
0039 
0040     def __init__(self, fp, start, stop):
0041         self.fp = fp
0042         self.start = start
0043         self.stop = stop
0044         self.pos = self.start
0045 
0046 
0047     def _read(self, length, read_function):
0048         if self.pos >= self.stop:
0049             return ''
0050         remaining = self.stop - self.pos
0051         if length is None or length < 0 or length > remaining:
0052             length = remaining
0053         self.fp.seek(self.pos)
0054         data = read_function(length)
0055         self.pos = self.fp.tell()
0056         return data
0057 
0058     def read(self, length = None):
0059         return self._read(length, self.fp.read)
0060 
0061     def readline(self, length = None):
0062         return self._read(length, self.fp.readline)
0063 
0064     def readlines(self, sizehint = -1):
0065         lines = []
0066         while 1:
0067             line = self.readline()
0068             if not line:
0069                 break
0070             lines.append(line)
0071             if sizehint >= 0:
0072                 sizehint = sizehint - len(line)
0073                 if sizehint <= 0:
0074                     break
0075         return lines
0076 
0077     def tell(self):
0078         return self.pos - self.start
0079 
0080     def seek(self, pos, whence=0):
0081         if whence == 0:
0082             self.pos = self.start + pos
0083         elif whence == 1:
0084             self.pos = self.pos + pos
0085         elif whence == 2:
0086             self.pos = self.stop + pos
0087 
0088     def close(self):
0089         del self.fp
0090 
0091 
0092 # Recommended to use PortableUnixMailbox instead!
0093 class UnixMailbox(_Mailbox):
0094 
0095     def _search_start(self):
0096         while 1:
0097             pos = self.fp.tell()
0098             line = self.fp.readline()
0099             if not line:
0100                 raise EOFError
0101             if line[:5] == 'From ' and self._isrealfromline(line):
0102                 self.fp.seek(pos)
0103                 return
0104 
0105     def _search_end(self):
0106         self.fp.readline()      # Throw away header line
0107         while 1:
0108             pos = self.fp.tell()
0109             line = self.fp.readline()
0110             if not line:
0111                 return
0112             if line[:5] == 'From ' and self._isrealfromline(line):
0113                 self.fp.seek(pos)
0114                 return
0115 
0116     # An overridable mechanism to test for From-line-ness.  You can either
0117     # specify a different regular expression or define a whole new
0118     # _isrealfromline() method.  Note that this only gets called for lines
0119     # starting with the 5 characters "From ".
0120     #
0121     # BAW: According to
0122     #http://home.netscape.com/eng/mozilla/2.0/relnotes/demo/content-length.html
0123     # the only portable, reliable way to find message delimiters in a BSD (i.e
0124     # Unix mailbox) style folder is to search for "\n\nFrom .*\n", or at the
0125     # beginning of the file, "^From .*\n".  While _fromlinepattern below seems
0126     # like a good idea, in practice, there are too many variations for more
0127     # strict parsing of the line to be completely accurate.
0128     #
0129     # _strict_isrealfromline() is the old version which tries to do stricter
0130     # parsing of the From_ line.  _portable_isrealfromline() simply returns
0131     # true, since it's never called if the line doesn't already start with
0132     # "From ".
0133     #
0134     # This algorithm, and the way it interacts with _search_start() and
0135     # _search_end() may not be completely correct, because it doesn't check
0136     # that the two characters preceding "From " are \n\n or the beginning of
0137     # the file.  Fixing this would require a more extensive rewrite than is
0138     # necessary.  For convenience, we've added a PortableUnixMailbox class
0139     # which uses the more lenient _fromlinepattern regular expression.
0140 
0141     _fromlinepattern = r"From \s*[^\s]+\s+\w\w\w\s+\w\w\w\s+\d?\d\s+" \
0142                        r"\d?\d:\d\d(:\d\d)?(\s+[^\s]+)?\s+\d\d\d\d\s*$"
0143     _regexp = None
0144 
0145     def _strict_isrealfromline(self, line):
0146         if not self._regexp:
0147             import re
0148             self._regexp = re.compile(self._fromlinepattern)
0149         return self._regexp.match(line)
0150 
0151     def _portable_isrealfromline(self, line):
0152         return True
0153 
0154     _isrealfromline = _strict_isrealfromline
0155 
0156 
0157 class PortableUnixMailbox(UnixMailbox):
0158     _isrealfromline = UnixMailbox._portable_isrealfromline
0159 
0160 
0161 class MmdfMailbox(_Mailbox):
0162 
0163     def _search_start(self):
0164         while 1:
0165             line = self.fp.readline()
0166             if not line:
0167                 raise EOFError
0168             if line[:5] == '\001\001\001\001\n':
0169                 return
0170 
0171     def _search_end(self):
0172         while 1:
0173             pos = self.fp.tell()
0174             line = self.fp.readline()
0175             if not line:
0176                 return
0177             if line == '\001\001\001\001\n':
0178                 self.fp.seek(pos)
0179                 return
0180 
0181 
0182 class MHMailbox:
0183 
0184     def __init__(self, dirname, factory=rfc822.Message):
0185         import re
0186         pat = re.compile('^[1-9][0-9]*$')
0187         self.dirname = dirname
0188         # the three following lines could be combined into:
0189         # list = map(long, filter(pat.match, os.listdir(self.dirname)))
0190         list = os.listdir(self.dirname)
0191         list = filter(pat.match, list)
0192         list = map(long, list)
0193         list.sort()
0194         # This only works in Python 1.6 or later;
0195         # before that str() added 'L':
0196         self.boxes = map(str, list)
0197         self.boxes.reverse()
0198         self.factory = factory
0199 
0200     def __iter__(self):
0201         return iter(self.next, None)
0202 
0203     def next(self):
0204         if not self.boxes:
0205             return None
0206         fn = self.boxes.pop()
0207         fp = open(os.path.join(self.dirname, fn))
0208         msg = self.factory(fp)
0209         try:
0210             msg._mh_msgno = fn
0211         except (AttributeError, TypeError):
0212             pass
0213         return msg
0214 
0215 
0216 class Maildir:
0217     # Qmail directory mailbox
0218 
0219     def __init__(self, dirname, factory=rfc822.Message):
0220         self.dirname = dirname
0221         self.factory = factory
0222 
0223         # check for new mail
0224         newdir = os.path.join(self.dirname, 'new')
0225         boxes = [os.path.join(newdir, f)
0226                  for f in os.listdir(newdir) if f[0] != '.']
0227 
0228         # Now check for current mail in this maildir
0229         curdir = os.path.join(self.dirname, 'cur')
0230         boxes += [os.path.join(curdir, f)
0231                   for f in os.listdir(curdir) if f[0] != '.']
0232         boxes.reverse()
0233         self.boxes = boxes
0234 
0235     def __iter__(self):
0236         return iter(self.next, None)
0237 
0238     def next(self):
0239         if not self.boxes:
0240             return None
0241         fn = self.boxes.pop()
0242         fp = open(fn)
0243         return self.factory(fp)
0244 
0245 
0246 class BabylMailbox(_Mailbox):
0247 
0248     def _search_start(self):
0249         while 1:
0250             line = self.fp.readline()
0251             if not line:
0252                 raise EOFError
0253             if line == '*** EOOH ***\n':
0254                 return
0255 
0256     def _search_end(self):
0257         while 1:
0258             pos = self.fp.tell()
0259             line = self.fp.readline()
0260             if not line:
0261                 return
0262             if line == '\037\014\n' or line == '\037':
0263                 self.fp.seek(pos)
0264                 return
0265 
0266 
0267 def _test():
0268     import sys
0269 
0270     args = sys.argv[1:]
0271     if not args:
0272         for key in 'MAILDIR', 'MAIL', 'LOGNAME', 'USER':
0273             if key in os.environ:
0274                 mbox = os.environ[key]
0275                 break
0276         else:
0277             print "$MAIL, $LOGNAME nor $USER set -- who are you?"
0278             return
0279     else:
0280         mbox = args[0]
0281     if mbox[:1] == '+':
0282         mbox = os.environ['HOME'] + '/Mail/' + mbox[1:]
0283     elif not '/' in mbox:
0284         if os.path.isfile('/var/mail/' + mbox):
0285             mbox = '/var/mail/' + mbox
0286         else:
0287             mbox = '/usr/mail/' + mbox
0288     if os.path.isdir(mbox):
0289         if os.path.isdir(os.path.join(mbox, 'cur')):
0290             mb = Maildir(mbox)
0291         else:
0292             mb = MHMailbox(mbox)
0293     else:
0294         fp = open(mbox, 'r')
0295         mb = PortableUnixMailbox(fp)
0296 
0297     msgs = []
0298     while 1:
0299         msg = mb.next()
0300         if msg is None:
0301             break
0302         msgs.append(msg)
0303         if len(args) <= 1:
0304             msg.fp = None
0305     if len(args) > 1:
0306         num = int(args[1])
0307         print 'Message %d body:'%num
0308         msg = msgs[num-1]
0309         msg.rewindbody()
0310         sys.stdout.write(msg.fp.read())
0311     else:
0312         print 'Mailbox',mbox,'has',len(msgs),'messages:'
0313         for msg in msgs:
0314             f = msg.getheader('from') or ""
0315             s = msg.getheader('subject') or ""
0316             d = msg.getheader('date') or ""
0317             print '-%20.20s   %20.20s   %-30.30s'%(f, d[5:], s)
0318 
0319 
0320 if __name__ == '__main__':
0321     _test()
0322 

Generated by PyXR 0.9.4
SourceForge.net Logo