PyXR

c:\python24\lib \ test \ test_logging.py



0001 #!/usr/bin/env python
0002 #
0003 # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
0004 #
0005 # Permission to use, copy, modify, and distribute this software and its
0006 # documentation for any purpose and without fee is hereby granted,
0007 # provided that the above copyright notice appear in all copies and that
0008 # both that copyright notice and this permission notice appear in
0009 # supporting documentation, and that the name of Vinay Sajip
0010 # not be used in advertising or publicity pertaining to distribution
0011 # of the software without specific, written prior permission.
0012 # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
0013 # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
0014 # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
0015 # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
0016 # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
0017 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
0018 #
0019 # This file is part of the Python logging distribution. See
0020 # http://www.red-dove.com/python_logging.html
0021 #
0022 """Test harness for the logging module. Run all tests.
0023 
0024 Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
0025 """
0026 
0027 import select
0028 import os, sys, string, struct, types, cPickle, cStringIO
0029 import socket, threading, time
0030 import logging, logging.handlers, logging.config
0031 
0032 BANNER = "-- %-10s %-6s ---------------------------------------------------\n"
0033 
0034 FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24."
0035 
0036 #----------------------------------------------------------------------------
0037 # Log receiver
0038 #----------------------------------------------------------------------------
0039 
0040 TIMEOUT         = 10
0041 
0042 from SocketServer import ThreadingTCPServer, StreamRequestHandler
0043 
0044 class LogRecordStreamHandler(StreamRequestHandler):
0045     """
0046     Handler for a streaming logging request. It basically logs the record
0047     using whatever logging policy is configured locally.
0048     """
0049 
0050     def handle(self):
0051         """
0052         Handle multiple requests - each expected to be a 4-byte length,
0053         followed by the LogRecord in pickle format. Logs the record
0054         according to whatever policy is configured locally.
0055         """
0056         while 1:
0057             try:
0058                 chunk = self.connection.recv(4)
0059                 if len(chunk) < 4:
0060                     break
0061                 slen = struct.unpack(">L", chunk)[0]
0062                 chunk = self.connection.recv(slen)
0063                 while len(chunk) < slen:
0064                     chunk = chunk + self.connection.recv(slen - len(chunk))
0065                 obj = self.unPickle(chunk)
0066                 record = logging.makeLogRecord(obj)
0067                 self.handleLogRecord(record)
0068             except:
0069                 raise
0070 
0071     def unPickle(self, data):
0072         return cPickle.loads(data)
0073 
0074     def handleLogRecord(self, record):
0075         logname = "logrecv.tcp." + record.name
0076         #If the end-of-messages sentinel is seen, tell the server to terminate
0077         if record.msg == FINISH_UP:
0078             self.server.abort = 1
0079         record.msg = record.msg + " (via " + logname + ")"
0080         logger = logging.getLogger(logname)
0081         logger.handle(record)
0082 
0083 # The server sets socketDataProcessed when it's done.
0084 socketDataProcessed = threading.Event()
0085 
0086 class LogRecordSocketReceiver(ThreadingTCPServer):
0087     """
0088     A simple-minded TCP socket-based logging receiver suitable for test
0089     purposes.
0090     """
0091 
0092     allow_reuse_address = 1
0093 
0094     def __init__(self, host='localhost',
0095                              port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
0096                      handler=LogRecordStreamHandler):
0097         ThreadingTCPServer.__init__(self, (host, port), handler)
0098         self.abort = 0
0099         self.timeout = 1
0100 
0101     def serve_until_stopped(self):
0102         abort = 0
0103         while not abort:
0104             rd, wr, ex = select.select([self.socket.fileno()],
0105                                        [], [],
0106                                        self.timeout)
0107             if rd:
0108                 self.handle_request()
0109             abort = self.abort
0110         #notify the main thread that we're about to exit
0111         socketDataProcessed.set()
0112 
0113     def process_request(self, request, client_address):
0114         #import threading
0115         t = threading.Thread(target = self.finish_request,
0116                              args = (request, client_address))
0117         t.start()
0118 
0119 def runTCP(tcpserver):
0120     tcpserver.serve_until_stopped()
0121 
0122 #----------------------------------------------------------------------------
0123 # Test 0
0124 #----------------------------------------------------------------------------
0125 
0126 msgcount = 0
0127 
0128 def nextmessage():
0129     global msgcount
0130     rv = "Message %d" % msgcount
0131     msgcount = msgcount + 1
0132     return rv
0133 
0134 def test0():
0135     ERR = logging.getLogger("ERR")
0136     ERR.setLevel(logging.ERROR)
0137     INF = logging.getLogger("INF")
0138     INF.setLevel(logging.INFO)
0139     INF_ERR  = logging.getLogger("INF.ERR")
0140     INF_ERR.setLevel(logging.ERROR)
0141     DEB = logging.getLogger("DEB")
0142     DEB.setLevel(logging.DEBUG)
0143 
0144     INF_UNDEF = logging.getLogger("INF.UNDEF")
0145     INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
0146     UNDEF = logging.getLogger("UNDEF")
0147 
0148     GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
0149     CHILD = logging.getLogger("INF.BADPARENT")
0150 
0151     #These should log
0152     ERR.log(logging.FATAL, nextmessage())
0153     ERR.error(nextmessage())
0154 
0155     INF.log(logging.FATAL, nextmessage())
0156     INF.error(nextmessage())
0157     INF.warn(nextmessage())
0158     INF.info(nextmessage())
0159 
0160     INF_UNDEF.log(logging.FATAL, nextmessage())
0161     INF_UNDEF.error(nextmessage())
0162     INF_UNDEF.warn (nextmessage())
0163     INF_UNDEF.info (nextmessage())
0164 
0165     INF_ERR.log(logging.FATAL, nextmessage())
0166     INF_ERR.error(nextmessage())
0167 
0168     INF_ERR_UNDEF.log(logging.FATAL, nextmessage())
0169     INF_ERR_UNDEF.error(nextmessage())
0170 
0171     DEB.log(logging.FATAL, nextmessage())
0172     DEB.error(nextmessage())
0173     DEB.warn (nextmessage())
0174     DEB.info (nextmessage())
0175     DEB.debug(nextmessage())
0176 
0177     UNDEF.log(logging.FATAL, nextmessage())
0178     UNDEF.error(nextmessage())
0179     UNDEF.warn (nextmessage())
0180     UNDEF.info (nextmessage())
0181 
0182     GRANDCHILD.log(logging.FATAL, nextmessage())
0183     CHILD.log(logging.FATAL, nextmessage())
0184 
0185     #These should not log
0186     ERR.warn(nextmessage())
0187     ERR.info(nextmessage())
0188     ERR.debug(nextmessage())
0189 
0190     INF.debug(nextmessage())
0191     INF_UNDEF.debug(nextmessage())
0192 
0193     INF_ERR.warn(nextmessage())
0194     INF_ERR.info(nextmessage())
0195     INF_ERR.debug(nextmessage())
0196     INF_ERR_UNDEF.warn(nextmessage())
0197     INF_ERR_UNDEF.info(nextmessage())
0198     INF_ERR_UNDEF.debug(nextmessage())
0199 
0200     INF.info(FINISH_UP)
0201 
0202 #----------------------------------------------------------------------------
0203 # Test 1
0204 #----------------------------------------------------------------------------
0205 
0206 #
0207 #   First, we define our levels. There can be as many as you want - the only
0208 #     limitations are that they should be integers, the lowest should be > 0 and
0209 #   larger values mean less information being logged. If you need specific
0210 #   level values which do not fit into these limitations, you can use a
0211 #   mapping dictionary to convert between your application levels and the
0212 #   logging system.
0213 #
0214 SILENT      = 10
0215 TACITURN    = 9
0216 TERSE       = 8
0217 EFFUSIVE    = 7
0218 SOCIABLE    = 6
0219 VERBOSE     = 5
0220 TALKATIVE   = 4
0221 GARRULOUS   = 3
0222 CHATTERBOX  = 2
0223 BORING      = 1
0224 
0225 LEVEL_RANGE = range(BORING, SILENT + 1)
0226 
0227 #
0228 #   Next, we define names for our levels. You don't need to do this - in which
0229 #   case the system will use "Level n" to denote the text for the level.
0230 #
0231 my_logging_levels = {
0232     SILENT      : 'Silent',
0233     TACITURN    : 'Taciturn',
0234     TERSE       : 'Terse',
0235     EFFUSIVE    : 'Effusive',
0236     SOCIABLE    : 'Sociable',
0237     VERBOSE     : 'Verbose',
0238     TALKATIVE   : 'Talkative',
0239     GARRULOUS   : 'Garrulous',
0240     CHATTERBOX  : 'Chatterbox',
0241     BORING      : 'Boring',
0242 }
0243 
0244 #
0245 #   Now, to demonstrate filtering: suppose for some perverse reason we only
0246 #   want to print out all except GARRULOUS messages. Let's create a filter for
0247 #   this purpose...
0248 #
0249 class SpecificLevelFilter(logging.Filter):
0250     def __init__(self, lvl):
0251         self.level = lvl
0252 
0253     def filter(self, record):
0254         return self.level != record.levelno
0255 
0256 class GarrulousFilter(SpecificLevelFilter):
0257     def __init__(self):
0258         SpecificLevelFilter.__init__(self, GARRULOUS)
0259 
0260 #
0261 #   Now, let's demonstrate filtering at the logger. This time, use a filter
0262 #   which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events
0263 #   are still excluded.
0264 #
0265 class VerySpecificFilter(logging.Filter):
0266     def filter(self, record):
0267         return record.levelno not in [SOCIABLE, TACITURN]
0268 
0269 def message(s):
0270     sys.stdout.write("%s\n" % s)
0271 
0272 SHOULD1 = "This should only be seen at the '%s' logging level (or lower)"
0273 
0274 def test1():
0275 #
0276 #   Now, tell the logging system to associate names with our levels.
0277 #
0278     for lvl in my_logging_levels.keys():
0279         logging.addLevelName(lvl, my_logging_levels[lvl])
0280 
0281 #
0282 #   Now, define a test function which logs an event at each of our levels.
0283 #
0284 
0285     def doLog(log):
0286         for lvl in LEVEL_RANGE:
0287             log.log(lvl, SHOULD1, logging.getLevelName(lvl))
0288 
0289     log = logging.getLogger("")
0290     hdlr = log.handlers[0]
0291 #
0292 #   Set the logging level to each different value and call the utility
0293 #   function to log events.
0294 #   In the output, you should see that each time round the loop, the number of
0295 #   logging events which are actually output decreases.
0296 #
0297     for lvl in LEVEL_RANGE:
0298         message("-- setting logging level to '%s' -----" %
0299                         logging.getLevelName(lvl))
0300         log.setLevel(lvl)
0301         doLog(log)
0302   #
0303   #   Now, we demonstrate level filtering at the handler level. Tell the
0304   #   handler defined above to filter at level 'SOCIABLE', and repeat the
0305   #   above loop. Compare the output from the two runs.
0306   #
0307     hdlr.setLevel(SOCIABLE)
0308     message("-- Filtering at handler level to SOCIABLE --")
0309     for lvl in LEVEL_RANGE:
0310         message("-- setting logging level to '%s' -----" %
0311                       logging.getLevelName(lvl))
0312         log.setLevel(lvl)
0313         doLog(log)
0314 
0315     hdlr.setLevel(0)    #turn off level filtering at the handler
0316 
0317     garr = GarrulousFilter()
0318     hdlr.addFilter(garr)
0319     message("-- Filtering using GARRULOUS filter --")
0320     for lvl in LEVEL_RANGE:
0321         message("-- setting logging level to '%s' -----" %
0322                         logging.getLevelName(lvl))
0323         log.setLevel(lvl)
0324         doLog(log)
0325     spec = VerySpecificFilter()
0326     log.addFilter(spec)
0327     message("-- Filtering using specific filter for SOCIABLE, TACITURN --")
0328     for lvl in LEVEL_RANGE:
0329         message("-- setting logging level to '%s' -----" %
0330                       logging.getLevelName(lvl))
0331         log.setLevel(lvl)
0332         doLog(log)
0333 
0334     log.removeFilter(spec)
0335     hdlr.removeFilter(garr)
0336     #Undo the one level which clashes...for regression tests
0337     logging.addLevelName(logging.DEBUG, "DEBUG")
0338 
0339 #----------------------------------------------------------------------------
0340 # Test 2
0341 #----------------------------------------------------------------------------
0342 
0343 MSG = "-- logging %d at INFO, messages should be seen every 10 events --"
0344 def test2():
0345     logger = logging.getLogger("")
0346     sh = logger.handlers[0]
0347     sh.close()
0348     logger.removeHandler(sh)
0349     mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh)
0350     logger.setLevel(logging.DEBUG)
0351     logger.addHandler(mh)
0352     message("-- logging at DEBUG, nothing should be seen yet --")
0353     logger.debug("Debug message")
0354     message("-- logging at INFO, nothing should be seen yet --")
0355     logger.info("Info message")
0356     message("-- logging at WARNING, 3 messages should be seen --")
0357     logger.warn("Warn message")
0358     for i in xrange(102):
0359         message(MSG % i)
0360         logger.info("Info index = %d", i)
0361     mh.close()
0362     logger.removeHandler(mh)
0363     logger.addHandler(sh)
0364 
0365 #----------------------------------------------------------------------------
0366 # Test 3
0367 #----------------------------------------------------------------------------
0368 
0369 FILTER = "a.b"
0370 
0371 def doLog3():
0372     logging.getLogger("a").info("Info 1")
0373     logging.getLogger("a.b").info("Info 2")
0374     logging.getLogger("a.c").info("Info 3")
0375     logging.getLogger("a.b.c").info("Info 4")
0376     logging.getLogger("a.b.c.d").info("Info 5")
0377     logging.getLogger("a.bb.c").info("Info 6")
0378     logging.getLogger("b").info("Info 7")
0379     logging.getLogger("b.a").info("Info 8")
0380     logging.getLogger("c.a.b").info("Info 9")
0381     logging.getLogger("a.bb").info("Info 10")
0382 
0383 def test3():
0384     root = logging.getLogger()
0385     root.setLevel(logging.DEBUG)
0386     hand = root.handlers[0]
0387     message("Unfiltered...")
0388     doLog3()
0389     message("Filtered with '%s'..." % FILTER)
0390     filt = logging.Filter(FILTER)
0391     hand.addFilter(filt)
0392     doLog3()
0393     hand.removeFilter(filt)
0394 
0395 #----------------------------------------------------------------------------
0396 # Test Harness
0397 #----------------------------------------------------------------------------
0398 def banner(nm, typ):
0399     sep = BANNER % (nm, typ)
0400     sys.stdout.write(sep)
0401     sys.stdout.flush()
0402 
0403 def test_main_inner():
0404     rootLogger = logging.getLogger("")
0405     rootLogger.setLevel(logging.DEBUG)
0406     hdlr = logging.StreamHandler(sys.stdout)
0407     fmt = logging.Formatter(logging.BASIC_FORMAT)
0408     hdlr.setFormatter(fmt)
0409     rootLogger.addHandler(hdlr)
0410 
0411     #Set up a handler such that all events are sent via a socket to the log
0412     #receiver (logrecv).
0413     #The handler will only be added to the rootLogger for some of the tests
0414     shdlr = logging.handlers.SocketHandler('localhost',
0415                                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
0416 
0417     #Configure the logger for logrecv so events do not propagate beyond it.
0418     #The sockLogger output is buffered in memory until the end of the test,
0419     #and printed at the end.
0420     sockOut = cStringIO.StringIO()
0421     sockLogger = logging.getLogger("logrecv")
0422     sockLogger.setLevel(logging.DEBUG)
0423     sockhdlr = logging.StreamHandler(sockOut)
0424     sockhdlr.setFormatter(logging.Formatter(
0425                                    "%(name)s -> %(levelname)s: %(message)s"))
0426     sockLogger.addHandler(sockhdlr)
0427     sockLogger.propagate = 0
0428 
0429     #Set up servers
0430     threads = []
0431     tcpserver = LogRecordSocketReceiver()
0432     #sys.stdout.write("About to start TCP server...\n")
0433     threads.append(threading.Thread(target=runTCP, args=(tcpserver,)))
0434 
0435     for thread in threads:
0436         thread.start()
0437     try:
0438         banner("log_test0", "begin")
0439 
0440         rootLogger.addHandler(shdlr)
0441         test0()
0442         shdlr.close()
0443         rootLogger.removeHandler(shdlr)
0444 
0445         banner("log_test0", "end")
0446 
0447         banner("log_test1", "begin")
0448         test1()
0449         banner("log_test1", "end")
0450 
0451         banner("log_test2", "begin")
0452         test2()
0453         banner("log_test2", "end")
0454 
0455         banner("log_test3", "begin")
0456         test3()
0457         banner("log_test3", "end")
0458 
0459     finally:
0460         #wait for TCP receiver to terminate
0461         socketDataProcessed.wait()
0462         for thread in threads:
0463             thread.join()
0464         banner("logrecv output", "begin")
0465         sys.stdout.write(sockOut.getvalue())
0466         sockOut.close()
0467         sockLogger.removeHandler(sockhdlr)
0468         sockhdlr.close()
0469         banner("logrecv output", "end")
0470         sys.stdout.flush()
0471         try:
0472             hdlr.close()
0473         except:
0474             pass
0475         rootLogger.removeHandler(hdlr)
0476 
0477 def test_main():
0478     import locale
0479     # Set the locale to the platform-dependent default.  I have no idea
0480     # why the test does this, but in any case we save the current locale
0481     # first so we can restore it at the end.
0482     try:
0483         original_locale = locale.setlocale(locale.LC_ALL)
0484         locale.setlocale(locale.LC_ALL, '')
0485     except (ValueError, locale.Error):
0486         # this happens on a Solaris box which only supports "C" locale
0487         # or a Mac OS X box which supports very little locale stuff at all
0488         original_locale = None
0489 
0490     try:
0491         test_main_inner()
0492     finally:
0493         if original_locale is not None:
0494             locale.setlocale(locale.LC_ALL, original_locale)
0495 
0496 if __name__ == "__main__":
0497     sys.stdout.write("test_logging\n")
0498     test_main()
0499 

Generated by PyXR 0.9.4
SourceForge.net Logo