[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ Simplified
serverchannelbase, hopefully fixing intermittent bugs.
Shane Hathaway
shane at zope.com
Fri Sep 3 04:16:56 EDT 2004
Log message for revision 27442:
Simplified serverchannelbase, hopefully fixing intermittent bugs.
The FTP server needed a way to queue a task in the channel. It used
start_task, but start_task had no way to queue tasks correctly if tasks
already happened to be running. So an assertion failure resulted
occasionally.
Now, there is a queue of tasks rather than a queue of requests.
Anything that needs to can send a task to a channel. The task will be
executed in synchronous mode.
Also, the basic request parsing is now done in the asyncore main thread.
When SimultaneousModeChannel was dropped, it became impossible to receive
data in application threads anyway.
Changed:
U Zope3/trunk/src/zope/server/ftp/server.py
U Zope3/trunk/src/zope/server/http/httptask.py
U Zope3/trunk/src/zope/server/interfaces/__init__.py
U Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py
U Zope3/trunk/src/zope/server/linereceiver/linetask.py
U Zope3/trunk/src/zope/server/serverchannelbase.py
-=-
Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py 2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/ftp/server.py 2004-09-03 08:16:55 UTC (rev 27442)
@@ -751,7 +751,7 @@
task = FinishedRecvTask(c, self.inbuf, self.finish_args)
self.complete_transfer = 1
self.close()
- c.start_task(task)
+ c.queue_task(task)
def close(self, *reply_args):
try:
@@ -791,14 +791,13 @@
if c.adj.log_socket_errors:
raise
finally:
- c.end_task(close_on_finish)
+ if close_on_finish:
+ c.close_when_done()
-
def cancel(self):
'See ITask'
self.control_channel.close_when_done()
-
def defer(self):
'See ITask'
pass
Modified: Zope3/trunk/src/zope/server/http/httptask.py
===================================================================
--- Zope3/trunk/src/zope/server/http/httptask.py 2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/http/httptask.py 2004-09-03 08:16:55 UTC (rev 27442)
@@ -76,7 +76,8 @@
if self.channel.adj.log_socket_errors:
raise
finally:
- self.channel.end_task(self.close_on_finish)
+ if self.close_on_finish:
+ self.channel.close_when_done()
def cancel(self):
"""See zope.server.interfaces.ITask"""
Modified: Zope3/trunk/src/zope/server/interfaces/__init__.py
===================================================================
--- Zope3/trunk/src/zope/server/interfaces/__init__.py 2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/interfaces/__init__.py 2004-09-03 08:16:55 UTC (rev 27442)
@@ -280,30 +280,11 @@
task_class = Attribute("""Specifies the ITask class to be used for
generating tasks.""")
- active_channels = Attribute("Class-specific channel tracker")
- next_channel_cleanup = Attribute("Class-specific cleanup time")
-
- proto_request = Attribute("A request parser instance")
- ready_requests = Attribute("A list of requests to be processed.")
- last_activity = Attribute("Time of last activity")
- running_tasks = Attribute("boolean")
-
-
- def queue_request(self, req):
- """Queues a request to be processed in sequence by a task.
+ def queue_task(task):
+ """Queues a channel-related task to be processed in sequence.
"""
- def end_task(self, close):
- """Called at the end of a task, may launch another task.
- """
- def create_task(self, req):
- """Creates a new task and queues it for execution.
-
- The task may get executed in another thread.
- """
-
-
class IDispatcher(ISocket, IDispatcherEventHandler, IDispatcherLogging):
"""The dispatcher is the most low-level component of a server.
Modified: Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py
===================================================================
--- Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py 2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py 2004-09-03 08:16:55 UTC (rev 27442)
@@ -65,7 +65,7 @@
}
- def process_request(self, command):
+ def handle_request(self, command):
"""Processes a command.
Some commands use an alternate thread.
@@ -79,7 +79,8 @@
elif method in self.thread_commands:
# Process in another thread.
- return self.task_class(self, command, method)
+ task = self.task_class(self, command, method)
+ self.queue_task(task)
elif hasattr(self, method):
try:
@@ -88,7 +89,6 @@
self.exception()
else:
self.reply(self.unknown_reply, cmd.upper())
- return None
def reply(self, code, args=(), flush=1):
Modified: Zope3/trunk/src/zope/server/linereceiver/linetask.py
===================================================================
--- Zope3/trunk/src/zope/server/linereceiver/linetask.py 2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/linereceiver/linetask.py 2004-09-03 08:16:55 UTC (rev 27442)
@@ -49,7 +49,8 @@
except:
self.channel.exception()
finally:
- self.channel.end_task(self.close_on_finish)
+ if self.close_on_finish:
+ self.channel.close_when_done()
def cancel(self):
'See ITask'
Modified: Zope3/trunk/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/trunk/src/zope/server/serverchannelbase.py 2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/serverchannelbase.py 2004-09-03 08:16:55 UTC (rev 27442)
@@ -26,16 +26,16 @@
from zope.interface import implements
from zope.server.dualmodechannel import DualModeChannel
-from zope.server.interfaces import IServerChannel
+from zope.server.interfaces import IServerChannel, ITask
-# Synchronize access to the "running_tasks" attributes.
-running_lock = allocate_lock()
+# task_lock is useful for synchronizing access to task-related attributes.
+task_lock = allocate_lock()
class ServerChannelBase(DualModeChannel, object):
"""Base class for a high-performance, mixed-mode server-side channel."""
- implements(IServerChannel)
+ implements(IServerChannel, ITask)
# See zope.server.interfaces.IServerChannel
parser_class = None # Subclasses must provide a parser class
@@ -43,12 +43,10 @@
active_channels = {} # Class-specific channel tracker
next_channel_cleanup = [0] # Class-specific cleanup time
-
proto_request = None # A request parser instance
- ready_requests = None # A list
- # ready_requests must always be empty when not running tasks.
last_activity = 0 # Time of last activity
- running_tasks = 0 # boolean: true when any task is being executed
+ tasks = None # List of channel-related tasks to execute
+ running_tasks = False # True when another thread is running tasks
#
# ASYNCHRONOUS METHODS (including __init__)
@@ -115,8 +113,8 @@
def received(self, data):
"""See async.dispatcher
- Receive input asynchronously and send requests to
- receivedCompleteRequest().
+ Receives input asynchronously and send requests to
+ handle_request().
"""
preq = self.proto_request
while data:
@@ -125,58 +123,25 @@
n = preq.received(data)
if preq.completed:
# The request is ready to use.
+ self.proto_request = None
if not preq.empty:
- self.receivedCompleteRequest(preq)
+ self.handle_request(preq)
preq = None
- self.proto_request = None
else:
self.proto_request = preq
if n >= len(data):
break
data = data[n:]
- def receivedCompleteRequest(self, req):
- """See async.dispatcher
+ def handle_request(self, req):
+ """Creates and queues a task for processing a request.
- If there are tasks running or requests on hold, queue
- the request, otherwise execute it.
+ Subclasses may override this method to handle some requests
+ immediately in the main async thread.
"""
- do_now = 0
- running_lock.acquire()
- try:
- if self.running_tasks:
- # A task thread is working. It will read from the queue
- # when it is finished.
- rr = self.ready_requests
- if rr is None:
- rr = []
- self.ready_requests = rr
- rr.append(req)
- else:
- # Do it now.
- do_now = 1
- finally:
- running_lock.release()
- if do_now:
- task = self.process_request(req)
- if task is not None:
- self.start_task(task)
+ task = self.task_class(self, req)
+ self.queue_task(task)
- def start_task(self, task):
- """See async.dispatcher
-
- Starts the given task.
-
- *** For thread safety, this should only be called from the main
- (async) thread. ***"""
- if self.running_tasks:
- # Can't start while another task is running!
- # Otherwise two threads would work on the queue at the same time.
- raise RuntimeError, 'Already executing tasks'
- self.running_tasks = 1
- self.set_sync()
- self.server.addTask(task)
-
def handle_error(self):
"""See async.dispatcher
@@ -199,52 +164,69 @@
self.close()
#
- # SYNCHRONOUS METHODS
+ # BOTH MODES
#
- def end_task(self, close):
- """Called at the end of a task and may launch another task.
- """
- if close:
- # Note that self.running_tasks is left on, which has the
- # side effect of preventing further requests from being
- # serviced even if more appear. A good thing.
- self.close_when_done()
- return
- # Process requests held in the queue, if any.
- while 1:
- req = None
- running_lock.acquire()
+ def queue_task(self, task):
+ """Queue a channel-related task to be executed in another thread."""
+ start = False
+ task_lock.acquire()
+ try:
+ if self.tasks is None:
+ self.tasks = []
+ self.tasks.append(task)
+ if not self.running_tasks:
+ self.running_tasks = True
+ start = True
+ finally:
+ task_lock.release()
+ if start:
+ self.set_sync()
+ self.server.addTask(self)
+
+ #
+ # ITask implementation. Delegates to the queued tasks.
+ #
+
+ def service(self):
+ """Execute all pending tasks"""
+ while True:
+ task = None
+ task_lock.acquire()
try:
- rr = self.ready_requests
- if rr:
- req = rr.pop(0)
+ if self.tasks:
+ task = self.tasks.pop(0)
else:
- # No requests to process.
- self.running_tasks = 0
+ # No more tasks
+ self.running_tasks = False
+ self.set_async()
+ break
finally:
- running_lock.release()
+ task_lock.release()
+ try:
+ task.service()
+ except:
+ # propagate the exception, but keep executing tasks
+ self.server.addTask(self)
+ raise
- if req is not None:
- task = self.process_request(req)
- if task is not None:
- # Add the new task. It will service the queue.
- self.server.addTask(task)
- break
- # else check the queue again.
+ def cancel(self):
+ task_lock.acquire()
+ try:
+ if self.tasks:
+ old = self.tasks[:]
else:
- # Idle -- Wait for another request on this connection.
- self.set_async()
- break
+ old = []
+ self.tasks = []
+ self.running_tasks = False
+ finally:
+ task_lock.release()
+ try:
+ for task in old:
+ task.cancel()
+ finally:
+ self.set_async()
- #
- # BOTH MODES
- #
+ def defer(self):
+ pass
- def process_request(self, req):
- """Returns a task to execute or None if the request is quick and
- can be processed in the main thread.
-
- Override to handle some requests in the main thread.
- """
- return self.task_class(self, req)
More information about the Zope3-Checkins
mailing list