0001 """An ISAPI extension base class implemented using a thread-pool.""" 0002 # $Id: threaded_extension.py,v 1.1 2004/10/06 05:11:52 mhammond Exp $ 0003 0004 import sys 0005 from isapi import isapicon 0006 import isapi.simple 0007 from win32file import GetQueuedCompletionStatus, CreateIoCompletionPort, \ 0008 PostQueuedCompletionStatus, CloseHandle 0009 from win32security import SetThreadToken 0010 from win32event import INFINITE 0011 from pywintypes import OVERLAPPED 0012 0013 # Python 2.3 and earlier insists on "C" locale - if it isn't, subtle things 0014 # break, such as floating point constants loaded from .pyc files. 0015 # The threading module uses such floating-points as an argument to sleep(), 0016 # resulting in extremely long sleeps when tiny intervals are specified. 0017 # We can work around this by resetting the C locale before the import. 0018 if sys.hexversion < 0x02040000: 0019 import locale 0020 locale.setlocale(locale.LC_NUMERIC, "C") 0021 0022 import threading 0023 import traceback 0024 0025 ISAPI_REQUEST = 1 0026 ISAPI_SHUTDOWN = 2 0027 0028 class WorkerThread(threading.Thread): 0029 def __init__(self, extension, io_req_port): 0030 self.running = False 0031 self.io_req_port = io_req_port 0032 self.extension = extension 0033 threading.Thread.__init__(self) 0034 0035 def run(self): 0036 self.running = True 0037 while self.running: 0038 errCode, bytes, key, overlapped = \ 0039 GetQueuedCompletionStatus(self.io_req_port, INFINITE) 0040 if key == ISAPI_SHUTDOWN and overlapped is None: 0041 break 0042 0043 # Let the parent extension handle the command. 0044 dispatcher = self.extension.dispatch_map.get(key) 0045 if dispatcher is None: 0046 raise RuntimeError, "Bad request '%s'" % (key,) 0047 0048 dispatcher(errCode, bytes, key, overlapped) 0049 0050 def call_handler(self, cblock): 0051 self.extension.Dispatch(cblock) 0052 0053 # A generic thread-pool based extension, using IO Completion Ports. 0054 # Sub-classes can override one method to implement a simple extension, or 0055 # may leverage the CompletionPort to queue their own requests, and implement a 0056 # fully asynch extension. 0057 class ThreadPoolExtension(isapi.simple.SimpleExtension): 0058 "Base class for an ISAPI extension based around a thread-pool" 0059 max_workers = 20 0060 worker_shutdown_wait = 15000 # 15 seconds for workers to quit. XXX - per thread!!! Fix me! 0061 def __init__(self): 0062 self.workers = [] 0063 # extensible dispatch map, for sub-classes that need to post their 0064 # own requests to the completion port. 0065 # Each of these functions is called with the result of 0066 # GetQueuedCompletionStatus for our port. 0067 self.dispatch_map = { 0068 ISAPI_REQUEST: self.DispatchConnection, 0069 } 0070 0071 def GetExtensionVersion(self, vi): 0072 isapi.simple.SimpleExtension.GetExtensionVersion(self, vi) 0073 # As per Q192800, the CompletionPort should be created with the number 0074 # of processors, even if the number of worker threads is much larger. 0075 # Passing 0 means the system picks the number. 0076 self.io_req_port = CreateIoCompletionPort(-1, None, 0, 0) 0077 # start up the workers 0078 self.workers = [] 0079 for i in range(self.max_workers): 0080 worker = WorkerThread(self, self.io_req_port) 0081 worker.start() 0082 self.workers.append(worker) 0083 0084 def HttpExtensionProc(self, control_block): 0085 overlapped = OVERLAPPED() 0086 overlapped.object = control_block 0087 PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_REQUEST, overlapped) 0088 return isapicon.HSE_STATUS_PENDING 0089 0090 def TerminateExtension(self, status): 0091 for worker in self.workers: 0092 worker.running = False 0093 for worker in self.workers: 0094 PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_SHUTDOWN, None) 0095 for worker in self.workers: 0096 worker.join(self.worker_shutdown_wait) 0097 self.dispatch_map = {} # break circles 0098 CloseHandle(self.io_req_port) 0099 0100 # This is the one operation the base class supports - a simple 0101 # Connection request. We setup the thread-token, and dispatch to the 0102 # sub-class's 'Dispatch' method. 0103 def DispatchConnection(self, errCode, bytes, key, overlapped): 0104 control_block = overlapped.object 0105 # setup the correct user for this request 0106 hRequestToken = control_block.GetImpersonationToken() 0107 SetThreadToken(None, hRequestToken) 0108 try: 0109 try: 0110 self.Dispatch(control_block) 0111 except: 0112 self.HandleDispatchError(control_block) 0113 finally: 0114 # reset the security context 0115 SetThreadToken(None, None) 0116 0117 def Dispatch(self, ecb): 0118 """Overridden by the sub-class to handle connection requests. 0119 0120 This class creates a thread-pool using a Windows completion port, 0121 and dispatches requests via this port. Sub-classes can generally 0122 implementeach connection request using blocking reads and writes, and 0123 the thread-pool will still provide decent response to the end user. 0124 0125 The sub-class can set a max_workers attribute (default is 20). Note 0126 that this generally does *not* mean 20 threads will all be concurrently 0127 running, via the magic of Windows completion ports. 0128 0129 There is no default implementation - sub-classes must implement this. 0130 """ 0131 raise NotImplementedError, "sub-classes should override Dispatch" 0132 0133 def HandleDispatchError(self, ecb): 0134 """Handles errors in the Dispatch method. 0135 0136 When a Dispatch method call fails, this method is called to handle 0137 the exception. The default implementation formats the traceback 0138 in the browser. 0139 """ 0140 ecb.HttpStatusCode = isapicon.HSE_STATUS_ERROR 0141 #control_block.LogData = "we failed!" 0142 ecb.SendResponseHeaders("200 OK", "Content-type: text/html\r\n\r\n", 0143 False) 0144 exc_typ, exc_val, exc_tb = sys.exc_info() 0145 limit = None 0146 try: 0147 try: 0148 import cgi 0149 print >> ecb 0150 print >> ecb, "<H3>Traceback (most recent call last):</H3>" 0151 list = traceback.format_tb(exc_tb, limit) + \ 0152 traceback.format_exception_only(exc_typ, exc_val) 0153 print >> ecb, "<PRE>%s<B>%s</B></PRE>" % ( 0154 cgi.escape("".join(list[:-1])), cgi.escape(list[-1]),) 0155 except: 0156 print "FAILED to render the error message!" 0157 traceback.print_exc() 0158 print "ORIGINAL extension error:" 0159 traceback.print_exception(exc_typ, exc_val, exc_tb) 0160 finally: 0161 # holding tracebacks in a local of a frame that may itself be 0162 # part of a traceback used to be evil and cause leaks! 0163 exc_tb = None 0164 ecb.DoneWithSession() 0165
Generated by PyXR 0.9.4