[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - TaskThreads.py:1.1.2.4
Shane Hathaway
shane@digicool.com
Tue, 27 Nov 2001 17:33:13 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv21173
Modified Files:
Tag: Zope-3x-branch
TaskThreads.py
Log Message:
Better thread handling: kill the threads that aren't busy, and kill them
right away.
=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1.2.3 => 1.1.2.4 ===
from Queue import Queue, Empty
from thread import allocate_lock, start_new_thread
+from time import time, sleep
try:
from zLOG import LOG, ERROR
except ImportError:
LOG = None
+ ERROR = None
class ITask: # Interface
@@ -49,17 +51,19 @@
def handlerThread(self, thread_no):
threads = self.threads
- while threads.has_key(thread_no):
- task = self.queue.get()
- try:
- task.service()
- except:
- if LOG is None:
- import traceback
- traceback.print_exc()
- else:
- LOG('ThreadedTaskDispatcher', ERROR,
- 'Exception during task', error=sys.exc_info())
+ try:
+ while threads.get(thread_no):
+ task = self.queue.get()
+ if task is None:
+ # Special value: kill this thread.
+ break
+ try:
+ task.service()
+ except:
+ self.error('Exception during task', sys.exc_info())
+ finally:
+ try: del threads[thread_no]
+ except KeyError: pass
def setThreadCount(self, count):
mlock = self.thread_mgmt_lock
@@ -73,25 +77,40 @@
threads[thread_no] = 1
start_new_thread(self.handlerThread, (thread_no,))
thread_no = thread_no + 1
- while (len(threads) > count):
- if count == 0:
- threads.clear()
- else:
- thread_no = threads.keys()[0]
- del threads[thread_no]
+ if len(threads) > count:
+ to_kill = len(threads) - count
+ for n in range(to_kill):
+ self.queue.put(None)
finally:
mlock.release()
def addTask(self, task):
+ if task is None:
+ raise ValueError, "No task passed to addTask()."
try:
task.defer()
- self.queue.put_nowait(task)
+ self.queue.put(task)
except:
task.cancel()
raise
+ def error(self, msg, exc=None):
+ if LOG is not None:
+ LOG('ThreadedTaskDispatcher', ERROR, msg, error=exc)
+ else:
+ sys.stderr.write(msg + '\n')
+ if exc is not None:
+ import traceback
+ traceback.print_exception(exc[0], exc[1], exc[2])
+
def shutdown(self, cancel_pending=1):
self.setThreadCount(0)
+ threads = self.threads
+ timeout = time() + 5 # Up to 5 seconds.
+ while threads:
+ if time() > timeout:
+ self.error("%d zombie threads still exist" % len(threads))
+ sleep(0.1)
if cancel_pending:
try:
while 1: