0001 """A multi-producer, multi-consumer queue.""" 0002 0003 from time import time as _time 0004 from collections import deque 0005 0006 __all__ = ['Empty', 'Full', 'Queue'] 0007 0008 class Empty(Exception): 0009 "Exception raised by Queue.get(block=0)/get_nowait()." 0010 pass 0011 0012 class Full(Exception): 0013 "Exception raised by Queue.put(block=0)/put_nowait()." 0014 pass 0015 0016 class Queue: 0017 def __init__(self, maxsize=0): 0018 """Initialize a queue object with a given maximum size. 0019 0020 If maxsize is <= 0, the queue size is infinite. 0021 """ 0022 try: 0023 import threading 0024 except ImportError: 0025 import dummy_threading as threading 0026 self._init(maxsize) 0027 # mutex must be held whenever the queue is mutating. All methods 0028 # that acquire mutex must release it before returning. mutex 0029 # is shared between the two conditions, so acquiring and 0030 # releasing the conditions also acquires and releases mutex. 0031 self.mutex = threading.Lock() 0032 # Notify not_empty whenever an item is added to the queue; a 0033 # thread waiting to get is notified then. 0034 self.not_empty = threading.Condition(self.mutex) 0035 # Notify not_full whenever an item is removed from the queue; 0036 # a thread waiting to put is notified then. 0037 self.not_full = threading.Condition(self.mutex) 0038 0039 def qsize(self): 0040 """Return the approximate size of the queue (not reliable!).""" 0041 self.mutex.acquire() 0042 n = self._qsize() 0043 self.mutex.release() 0044 return n 0045 0046 def empty(self): 0047 """Return True if the queue is empty, False otherwise (not reliable!).""" 0048 self.mutex.acquire() 0049 n = self._empty() 0050 self.mutex.release() 0051 return n 0052 0053 def full(self): 0054 """Return True if the queue is full, False otherwise (not reliable!).""" 0055 self.mutex.acquire() 0056 n = self._full() 0057 self.mutex.release() 0058 return n 0059 0060 def put(self, item, block=True, timeout=None): 0061 """Put an item into the queue. 0062 0063 If optional args 'block' is true and 'timeout' is None (the default), 0064 block if necessary until a free slot is available. If 'timeout' is 0065 a positive number, it blocks at most 'timeout' seconds and raises 0066 the Full exception if no free slot was available within that time. 0067 Otherwise ('block' is false), put an item on the queue if a free slot 0068 is immediately available, else raise the Full exception ('timeout' 0069 is ignored in that case). 0070 """ 0071 self.not_full.acquire() 0072 try: 0073 if not block: 0074 if self._full(): 0075 raise Full 0076 elif timeout is None: 0077 while self._full(): 0078 self.not_full.wait() 0079 else: 0080 if timeout < 0: 0081 raise ValueError("'timeout' must be a positive number") 0082 endtime = _time() + timeout 0083 while self._full(): 0084 remaining = endtime - _time() 0085 if remaining <= 0.0: 0086 raise Full 0087 self.not_full.wait(remaining) 0088 self._put(item) 0089 self.not_empty.notify() 0090 finally: 0091 self.not_full.release() 0092 0093 def put_nowait(self, item): 0094 """Put an item into the queue without blocking. 0095 0096 Only enqueue the item if a free slot is immediately available. 0097 Otherwise raise the Full exception. 0098 """ 0099 return self.put(item, False) 0100 0101 def get(self, block=True, timeout=None): 0102 """Remove and return an item from the queue. 0103 0104 If optional args 'block' is true and 'timeout' is None (the default), 0105 block if necessary until an item is available. If 'timeout' is 0106 a positive number, it blocks at most 'timeout' seconds and raises 0107 the Empty exception if no item was available within that time. 0108 Otherwise ('block' is false), return an item if one is immediately 0109 available, else raise the Empty exception ('timeout' is ignored 0110 in that case). 0111 """ 0112 self.not_empty.acquire() 0113 try: 0114 if not block: 0115 if self._empty(): 0116 raise Empty 0117 elif timeout is None: 0118 while self._empty(): 0119 self.not_empty.wait() 0120 else: 0121 if timeout < 0: 0122 raise ValueError("'timeout' must be a positive number") 0123 endtime = _time() + timeout 0124 while self._empty(): 0125 remaining = endtime - _time() 0126 if remaining <= 0.0: 0127 raise Empty 0128 self.not_empty.wait(remaining) 0129 item = self._get() 0130 self.not_full.notify() 0131 return item 0132 finally: 0133 self.not_empty.release() 0134 0135 def get_nowait(self): 0136 """Remove and return an item from the queue without blocking. 0137 0138 Only get an item if one is immediately available. Otherwise 0139 raise the Empty exception. 0140 """ 0141 return self.get(False) 0142 0143 # Override these methods to implement other queue organizations 0144 # (e.g. stack or priority queue). 0145 # These will only be called with appropriate locks held 0146 0147 # Initialize the queue representation 0148 def _init(self, maxsize): 0149 self.maxsize = maxsize 0150 self.queue = deque() 0151 0152 def _qsize(self): 0153 return len(self.queue) 0154 0155 # Check whether the queue is empty 0156 def _empty(self): 0157 return not self.queue 0158 0159 # Check whether the queue is full 0160 def _full(self): 0161 return self.maxsize > 0 and len(self.queue) == self.maxsize 0162 0163 # Put a new item in the queue 0164 def _put(self, item): 0165 self.queue.append(item) 0166 0167 # Get an item from the queue 0168 def _get(self): 0169 return self.queue.popleft() 0170
Generated by PyXR 0.9.4