PyXR

c:\python24\lib\site-packages \ isapi \ threaded_extension.py



0001 """An ISAPI extension base class implemented using a thread-pool."""
0002 # $Id: threaded_extension.py,v 1.1 2004/10/06 05:11:52 mhammond Exp $
0003 
0004 import sys
0005 from isapi import isapicon
0006 import isapi.simple
0007 from win32file import GetQueuedCompletionStatus, CreateIoCompletionPort, \
0008                       PostQueuedCompletionStatus, CloseHandle
0009 from win32security import SetThreadToken
0010 from win32event import INFINITE
0011 from pywintypes import OVERLAPPED
0012 
0013 # Python 2.3 and earlier insists on "C" locale - if it isn't, subtle things
0014 # break, such as floating point constants loaded from .pyc files.
0015 # The threading module uses such floating-points as an argument to sleep(),
0016 # resulting in extremely long sleeps when tiny intervals are specified.
0017 # We can work around this by resetting the C locale before the import.
0018 if sys.hexversion < 0x02040000:
0019     import locale
0020     locale.setlocale(locale.LC_NUMERIC, "C")
0021 
0022 import threading
0023 import traceback
0024 
0025 ISAPI_REQUEST = 1
0026 ISAPI_SHUTDOWN = 2
0027 
0028 class WorkerThread(threading.Thread):
0029     def __init__(self, extension, io_req_port):
0030         self.running = False
0031         self.io_req_port = io_req_port
0032         self.extension = extension
0033         threading.Thread.__init__(self)
0034 
0035     def run(self):
0036         self.running = True
0037         while self.running:
0038             errCode, bytes, key, overlapped = \
0039                 GetQueuedCompletionStatus(self.io_req_port, INFINITE)
0040             if key == ISAPI_SHUTDOWN and overlapped is None:
0041                 break
0042 
0043             # Let the parent extension handle the command.
0044             dispatcher = self.extension.dispatch_map.get(key)
0045             if dispatcher is None:
0046                 raise RuntimeError, "Bad request '%s'" % (key,)
0047             
0048             dispatcher(errCode, bytes, key, overlapped)
0049 
0050     def call_handler(self, cblock):
0051         self.extension.Dispatch(cblock)
0052 
0053 # A generic thread-pool based extension, using IO Completion Ports.
0054 # Sub-classes can override one method to implement a simple extension, or
0055 # may leverage the CompletionPort to queue their own requests, and implement a
0056 # fully asynch extension.
0057 class ThreadPoolExtension(isapi.simple.SimpleExtension):
0058     "Base class for an ISAPI extension based around a thread-pool"
0059     max_workers = 20
0060     worker_shutdown_wait = 15000 # 15 seconds for workers to quit. XXX - per thread!!! Fix me!
0061     def __init__(self):
0062         self.workers = []
0063         # extensible dispatch map, for sub-classes that need to post their
0064         # own requests to the completion port.
0065         # Each of these functions is called with the result of 
0066         # GetQueuedCompletionStatus for our port.
0067         self.dispatch_map = {
0068             ISAPI_REQUEST: self.DispatchConnection,
0069         }
0070 
0071     def GetExtensionVersion(self, vi):
0072         isapi.simple.SimpleExtension.GetExtensionVersion(self, vi)
0073         # As per Q192800, the CompletionPort should be created with the number
0074         # of processors, even if the number of worker threads is much larger.
0075         # Passing 0 means the system picks the number.
0076         self.io_req_port = CreateIoCompletionPort(-1, None, 0, 0)
0077         # start up the workers
0078         self.workers = []
0079         for i in range(self.max_workers):
0080             worker = WorkerThread(self, self.io_req_port)
0081             worker.start()
0082             self.workers.append(worker)
0083 
0084     def HttpExtensionProc(self, control_block):
0085         overlapped = OVERLAPPED()
0086         overlapped.object = control_block
0087         PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_REQUEST, overlapped)
0088         return isapicon.HSE_STATUS_PENDING
0089 
0090     def TerminateExtension(self, status):
0091         for worker in self.workers:
0092             worker.running = False
0093         for worker in self.workers:
0094             PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_SHUTDOWN, None)
0095         for worker in self.workers:
0096             worker.join(self.worker_shutdown_wait)
0097         self.dispatch_map = {} # break circles
0098         CloseHandle(self.io_req_port)
0099 
0100     # This is the one operation the base class supports - a simple
0101     # Connection request.  We setup the thread-token, and dispatch to the
0102     # sub-class's 'Dispatch' method.
0103     def DispatchConnection(self, errCode, bytes, key, overlapped):
0104         control_block = overlapped.object
0105         # setup the correct user for this request
0106         hRequestToken = control_block.GetImpersonationToken()
0107         SetThreadToken(None, hRequestToken)
0108         try:
0109             try:
0110                 self.Dispatch(control_block)
0111             except:
0112                 self.HandleDispatchError(control_block)
0113         finally:
0114             # reset the security context
0115             SetThreadToken(None, None)
0116 
0117     def Dispatch(self, ecb):
0118         """Overridden by the sub-class to handle connection requests.
0119         
0120         This class creates a thread-pool using a Windows completion port,
0121         and dispatches requests via this port.  Sub-classes can generally
0122         implementeach connection request using blocking reads and writes, and
0123         the thread-pool will still provide decent response to the end user.
0124         
0125         The sub-class can set a max_workers attribute (default is 20).  Note
0126         that this generally does *not* mean 20 threads will all be concurrently
0127         running, via the magic of Windows completion ports.
0128         
0129         There is no default implementation - sub-classes must implement this.
0130         """
0131         raise NotImplementedError, "sub-classes should override Dispatch"
0132 
0133     def HandleDispatchError(self, ecb):
0134         """Handles errors in the Dispatch method.
0135         
0136         When a Dispatch method call fails, this method is called to handle
0137         the exception.  The default implementation formats the traceback
0138         in the browser.
0139         """
0140         ecb.HttpStatusCode = isapicon.HSE_STATUS_ERROR
0141         #control_block.LogData = "we failed!"
0142         ecb.SendResponseHeaders("200 OK", "Content-type: text/html\r\n\r\n", 
0143                                 False)
0144         exc_typ, exc_val, exc_tb = sys.exc_info()
0145         limit = None
0146         try:
0147             try:
0148                 import cgi
0149                 print >> ecb
0150                 print >> ecb, "<H3>Traceback (most recent call last):</H3>"
0151                 list = traceback.format_tb(exc_tb, limit) + \
0152                        traceback.format_exception_only(exc_typ, exc_val)
0153                 print >> ecb, "<PRE>%s<B>%s</B></PRE>" % (
0154                     cgi.escape("".join(list[:-1])), cgi.escape(list[-1]),)
0155             except:
0156                 print "FAILED to render the error message!"
0157                 traceback.print_exc()
0158                 print "ORIGINAL extension error:"
0159                 traceback.print_exception(exc_typ, exc_val, exc_tb)
0160         finally:
0161             # holding tracebacks in a local of a frame that may itself be 
0162             # part of a traceback used to be evil and cause leaks!
0163             exc_tb = None
0164             ecb.DoneWithSession()
0165 

Generated by PyXR 0.9.4
SourceForge.net Logo