[Zope3-checkins] SVN: Zope3/branches/ZopeX3-3.0/src/zope/server/
Merged from trunk:
Jim Fulton
jim at zope.com
Mon Sep 13 11:03:01 EDT 2004
Log message for revision 27510:
Merged from trunk:
r27442 | shane | 2004-09-03 04:16:55 -0400 (Fri, 03 Sep 2004) | 16 lines
Simplified serverchannelbase, hopefully fixing intermittent bugs.
The FTP server needed a way to queue a task in the channel. It used
start_task, but start_task had no way to queue tasks correctly if tasks
already happened to be running. So an assertion failure resulted
occasionally.
Now, there is a queue of tasks rather than a queue of requests.
Anything that needs to can send a task to a channel. The task will be
executed in synchronous mode.
Also, the basic request parsing is now done in the asyncore main thread.
When SimultaneousModeChannel was dropped, it became impossible to receive
data in application threads anyway.
------------------------------------------------------------------------
r27441 | shane | 2004-09-03 04:07:01 -0400 (Fri, 03 Sep 2004) | 2 lines
After the timeout, stop waiting for threads to exit.
------------------------------------------------------------------------
r27440 | shane | 2004-09-03 04:06:17 -0400 (Fri, 03 Sep 2004) | 5 lines
Allow old asyncore connections to close before testing.
This seems to fix the spurious messages about leaking sockets. However,
other tests shouldn't be leaving open sockets around.
Changed:
U Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/tests/test_ftpserver.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/http/httptask.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/http/tests/test_httpserver.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/lineserverchannel.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/linetask.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py
U Zope3/branches/ZopeX3-3.0/src/zope/server/taskthreads.py
-=-
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -751,7 +751,7 @@
task = FinishedRecvTask(c, self.inbuf, self.finish_args)
self.complete_transfer = 1
self.close()
- c.start_task(task)
+ c.queue_task(task)
def close(self, *reply_args):
try:
@@ -791,14 +791,13 @@
if c.adj.log_socket_errors:
raise
finally:
- c.end_task(close_on_finish)
+ if close_on_finish:
+ c.close_when_done()
-
def cancel(self):
'See ITask'
self.control_channel.close_when_done()
-
def defer(self):
'See ITask'
pass
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/tests/test_ftpserver.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/tests/test_ftpserver.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/tests/test_ftpserver.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -51,6 +51,10 @@
def setUp(self):
td.setThreadCount(1)
+ if len(asyncore.socket_map) != 1:
+ # Let sockets die off.
+ # XXX tests should be more careful to clear the socket map.
+ asyncore.poll(0.1)
self.orig_map_size = len(asyncore.socket_map)
self.hook_asyncore_error()
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/http/httptask.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/http/httptask.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/http/httptask.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -76,7 +76,8 @@
if self.channel.adj.log_socket_errors:
raise
finally:
- self.channel.end_task(self.close_on_finish)
+ if self.close_on_finish:
+ self.channel.close_when_done()
def cancel(self):
"""See zope.server.interfaces.ITask"""
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/http/tests/test_httpserver.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/http/tests/test_httpserver.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/http/tests/test_httpserver.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -19,7 +19,7 @@
from asyncore import socket_map, poll
import socket
-from threading import Thread
+from threading import Thread, Event
from zope.server.taskthreads import ThreadedTaskDispatcher
from zope.server.http.httpserver import HTTPServer
from zope.server.adjustments import Adjustments
@@ -78,6 +78,10 @@
def setUp(self):
td.setThreadCount(4)
+ if len(socket_map) != 1:
+ # Let sockets die off.
+ # XXX tests should be more careful to clear the socket map.
+ asyncore.poll(0.1)
self.orig_map_size = len(socket_map)
self.hook_asyncore_error()
self.server = EchoHTTPServer(LOCALHOST, SERVER_PORT,
@@ -88,9 +92,11 @@
self.port = CONNECT_TO_PORT
self.run_loop = 1
self.counter = 0
+ self.thread_started = Event()
self.thread = Thread(target=self.loop)
self.thread.start()
- sleep(0.1) # Give the thread some time to start.
+ self.thread_started.wait(10.0)
+ self.assert_(self.thread_started.isSet())
def tearDown(self):
self.run_loop = 0
@@ -109,6 +115,7 @@
self.unhook_asyncore_error()
def loop(self):
+ self.thread_started.set()
while self.run_loop:
self.counter = self.counter + 1
#print 'loop', self.counter
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -280,30 +280,11 @@
task_class = Attribute("""Specifies the ITask class to be used for
generating tasks.""")
- active_channels = Attribute("Class-specific channel tracker")
- next_channel_cleanup = Attribute("Class-specific cleanup time")
-
- proto_request = Attribute("A request parser instance")
- ready_requests = Attribute("A list of requests to be processed.")
- last_activity = Attribute("Time of last activity")
- running_tasks = Attribute("boolean")
-
-
- def queue_request(self, req):
- """Queues a request to be processed in sequence by a task.
+ def queue_task(task):
+ """Queues a channel-related task to be processed in sequence.
"""
- def end_task(self, close):
- """Called at the end of a task, may launch another task.
- """
- def create_task(self, req):
- """Creates a new task and queues it for execution.
-
- The task may get executed in another thread.
- """
-
-
class IDispatcher(ISocket, IDispatcherEventHandler, IDispatcherLogging):
"""The dispatcher is the most low-level component of a server.
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/lineserverchannel.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/lineserverchannel.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/lineserverchannel.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -65,7 +65,7 @@
}
- def process_request(self, command):
+ def handle_request(self, command):
"""Processes a command.
Some commands use an alternate thread.
@@ -79,7 +79,8 @@
elif method in self.thread_commands:
# Process in another thread.
- return self.task_class(self, command, method)
+ task = self.task_class(self, command, method)
+ self.queue_task(task)
elif hasattr(self, method):
try:
@@ -88,7 +89,6 @@
self.exception()
else:
self.reply(self.unknown_reply, cmd.upper())
- return None
def reply(self, code, args=(), flush=1):
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/linetask.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/linetask.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/linereceiver/linetask.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -49,7 +49,8 @@
except:
self.channel.exception()
finally:
- self.channel.end_task(self.close_on_finish)
+ if self.close_on_finish:
+ self.channel.close_when_done()
def cancel(self):
'See ITask'
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -26,16 +26,16 @@
from zope.interface import implements
from zope.server.dualmodechannel import DualModeChannel
-from zope.server.interfaces import IServerChannel
+from zope.server.interfaces import IServerChannel, ITask
-# Synchronize access to the "running_tasks" attributes.
-running_lock = allocate_lock()
+# task_lock is useful for synchronizing access to task-related attributes.
+task_lock = allocate_lock()
class ServerChannelBase(DualModeChannel, object):
"""Base class for a high-performance, mixed-mode server-side channel."""
- implements(IServerChannel)
+ implements(IServerChannel, ITask)
# See zope.server.interfaces.IServerChannel
parser_class = None # Subclasses must provide a parser class
@@ -43,12 +43,10 @@
active_channels = {} # Class-specific channel tracker
next_channel_cleanup = [0] # Class-specific cleanup time
-
proto_request = None # A request parser instance
- ready_requests = None # A list
- # ready_requests must always be empty when not running tasks.
last_activity = 0 # Time of last activity
- running_tasks = 0 # boolean: true when any task is being executed
+ tasks = None # List of channel-related tasks to execute
+ running_tasks = False # True when another thread is running tasks
#
# ASYNCHRONOUS METHODS (including __init__)
@@ -115,8 +113,8 @@
def received(self, data):
"""See async.dispatcher
- Receive input asynchronously and send requests to
- receivedCompleteRequest().
+ Receives input asynchronously and send requests to
+ handle_request().
"""
preq = self.proto_request
while data:
@@ -125,58 +123,25 @@
n = preq.received(data)
if preq.completed:
# The request is ready to use.
+ self.proto_request = None
if not preq.empty:
- self.receivedCompleteRequest(preq)
+ self.handle_request(preq)
preq = None
- self.proto_request = None
else:
self.proto_request = preq
if n >= len(data):
break
data = data[n:]
- def receivedCompleteRequest(self, req):
- """See async.dispatcher
+ def handle_request(self, req):
+ """Creates and queues a task for processing a request.
- If there are tasks running or requests on hold, queue
- the request, otherwise execute it.
+ Subclasses may override this method to handle some requests
+ immediately in the main async thread.
"""
- do_now = 0
- running_lock.acquire()
- try:
- if self.running_tasks:
- # A task thread is working. It will read from the queue
- # when it is finished.
- rr = self.ready_requests
- if rr is None:
- rr = []
- self.ready_requests = rr
- rr.append(req)
- else:
- # Do it now.
- do_now = 1
- finally:
- running_lock.release()
- if do_now:
- task = self.process_request(req)
- if task is not None:
- self.start_task(task)
+ task = self.task_class(self, req)
+ self.queue_task(task)
- def start_task(self, task):
- """See async.dispatcher
-
- Starts the given task.
-
- *** For thread safety, this should only be called from the main
- (async) thread. ***"""
- if self.running_tasks:
- # Can't start while another task is running!
- # Otherwise two threads would work on the queue at the same time.
- raise RuntimeError, 'Already executing tasks'
- self.running_tasks = 1
- self.set_sync()
- self.server.addTask(task)
-
def handle_error(self):
"""See async.dispatcher
@@ -199,52 +164,69 @@
self.close()
#
- # SYNCHRONOUS METHODS
+ # BOTH MODES
#
- def end_task(self, close):
- """Called at the end of a task and may launch another task.
- """
- if close:
- # Note that self.running_tasks is left on, which has the
- # side effect of preventing further requests from being
- # serviced even if more appear. A good thing.
- self.close_when_done()
- return
- # Process requests held in the queue, if any.
- while 1:
- req = None
- running_lock.acquire()
+ def queue_task(self, task):
+ """Queue a channel-related task to be executed in another thread."""
+ start = False
+ task_lock.acquire()
+ try:
+ if self.tasks is None:
+ self.tasks = []
+ self.tasks.append(task)
+ if not self.running_tasks:
+ self.running_tasks = True
+ start = True
+ finally:
+ task_lock.release()
+ if start:
+ self.set_sync()
+ self.server.addTask(self)
+
+ #
+ # ITask implementation. Delegates to the queued tasks.
+ #
+
+ def service(self):
+ """Execute all pending tasks"""
+ while True:
+ task = None
+ task_lock.acquire()
try:
- rr = self.ready_requests
- if rr:
- req = rr.pop(0)
+ if self.tasks:
+ task = self.tasks.pop(0)
else:
- # No requests to process.
- self.running_tasks = 0
+ # No more tasks
+ self.running_tasks = False
+ self.set_async()
+ break
finally:
- running_lock.release()
+ task_lock.release()
+ try:
+ task.service()
+ except:
+ # propagate the exception, but keep executing tasks
+ self.server.addTask(self)
+ raise
- if req is not None:
- task = self.process_request(req)
- if task is not None:
- # Add the new task. It will service the queue.
- self.server.addTask(task)
- break
- # else check the queue again.
+ def cancel(self):
+ task_lock.acquire()
+ try:
+ if self.tasks:
+ old = self.tasks[:]
else:
- # Idle -- Wait for another request on this connection.
- self.set_async()
- break
+ old = []
+ self.tasks = []
+ self.running_tasks = False
+ finally:
+ task_lock.release()
+ try:
+ for task in old:
+ task.cancel()
+ finally:
+ self.set_async()
- #
- # BOTH MODES
- #
+ def defer(self):
+ pass
- def process_request(self, req):
- """Returns a task to execute or None if the request is quick and
- can be processed in the main thread.
-
- Override to handle some requests in the main thread.
- """
- return self.task_class(self, req)
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/taskthreads.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/taskthreads.py 2004-09-13 14:41:01 UTC (rev 27509)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/taskthreads.py 2004-09-13 15:03:01 UTC (rev 27510)
@@ -105,6 +105,7 @@
while threads:
if time() >= expiration:
logging.error("%d thread(s) still running" % len(threads))
+ break
sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
More information about the Zope3-Checkins
mailing list