[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - Adjustments.py:1.2 Buffers.py:1.2 DualModeChannel.py:1.2 FixedStreamReceiver.py:1.2 IDispatcher.py:1.2 IDispatcherEventHandler.py:1.2 IDispatcherLogging.py:1.2 IHeaderOutput.py:1.2 IRequestFactory.py:1.2 IServer.py:1.2 IServerChannel.py:1.2 ISocket.py:1.2 IStreamConsumer.py:1.2 ITask.py:1.2 ITaskDispatcher.py:1.2 MaxSockets.py:1.2 ServerBase.py:1.2 ServerChannelBase.py:1.2 TaskThreads.py:1.2 Utilities.py:1.2 ZLogIntegration.py:1.2 __init__.py:1.2
Jim Fulton
jim@zope.com
Mon, 10 Jun 2002 19:30:07 -0400
Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server
Added Files:
Adjustments.py Buffers.py DualModeChannel.py
FixedStreamReceiver.py IDispatcher.py
IDispatcherEventHandler.py IDispatcherLogging.py
IHeaderOutput.py IRequestFactory.py IServer.py
IServerChannel.py ISocket.py IStreamConsumer.py ITask.py
ITaskDispatcher.py MaxSockets.py ServerBase.py
ServerChannelBase.py TaskThreads.py Utilities.py
ZLogIntegration.py __init__.py
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.
=== Zope3/lib/python/Zope/Server/Adjustments.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+import MaxSockets
+
+
+class Adjustments:
+ """This class contains tunable communication parameters.
+
+ You can either change default_adj to adjust parameters for
+ all sockets, or you can create a new instance of this class,
+ change its attributes, and pass it to the channel constructors.
+ """
+
+ # backlog is the argument to pass to socket.listen().
+ backlog = 1024
+
+ # recv_bytes is the argument to pass to socket.recv().
+ recv_bytes = 8192
+
+ # send_bytes is the number of bytes to send to socket.send().
+ send_bytes = 8192
+
+ # copy_bytes is the number of bytes to copy from one file to another.
+ copy_bytes = 65536
+
+ # Create a tempfile if the pending output data gets larger
+ # than outbuf_overflow. With RAM so cheap, this probably
+ # ought to be set to the 16-32 MB range (circa 2001) for
+ # good performance with big transfers. The default is
+ # conservative.
+ outbuf_overflow = 1050000
+
+ # Create a tempfile if the data received gets larger
+ # than inbuf_overflow.
+ inbuf_overflow = 525000
+
+ # Stop accepting new connections if too many are already active.
+ connection_limit = MaxSockets.max_select_sockets() - 3 # Safe
+
+ # Minimum seconds between cleaning up inactive channels.
+ cleanup_interval = 300
+
+ # Maximum seconds to leave an inactive connection open.
+ channel_timeout = 900
+
+ # Boolean: turn off to not log premature client disconnects.
+ log_socket_errors = 1
+
+
+default_adj = Adjustments()
+
=== Zope3/lib/python/Zope/Server/Buffers.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+
+# copy_bytes controls the size of temp. strings for shuffling data around.
+COPY_BYTES = 1 << 18 # 256K
+
+# The maximum number of bytes to buffer in a simple string.
+STRBUF_LIMIT = 8192
+
+
+class FileBasedBuffer:
+
+ remain = 0
+
+ def __init__(self, file, from_buffer=None):
+ self.file = file
+ if from_buffer is not None:
+ from_file = from_buffer.getfile()
+ read_pos = from_file.tell()
+ from_file.seek(0)
+ while 1:
+ data = from_file.read(COPY_BYTES)
+ if not data:
+ break
+ file.write(data)
+ self.remain = int(file.tell() - read_pos)
+ from_file.seek(read_pos)
+ file.seek(read_pos)
+
+ def __len__(self):
+ return self.remain
+
+ def append(self, s):
+ file = self.file
+ read_pos = file.tell()
+ file.seek(0, 2)
+ file.write(s)
+ file.seek(read_pos)
+ self.remain = self.remain + len(s)
+
+ def get(self, bytes=-1, skip=0):
+ file = self.file
+ if not skip:
+ read_pos = file.tell()
+ if bytes < 0:
+ # Read all
+ res = file.read()
+ else:
+ res = file.read(bytes)
+ if skip:
+ self.remain -= len(res)
+ else:
+ file.seek(read_pos)
+ return res
+
+ def skip(self, bytes, allow_prune=0):
+ if self.remain < bytes:
+ raise ValueError, (
+ "Can't skip %d bytes in buffer of %d bytes" %
+ (bytes, self.remain))
+ self.file.seek(bytes, 1)
+ self.remain = self.remain - bytes
+
+ def newfile(self):
+ raise 'NotImplemented'
+
+ def prune(self):
+ file = self.file
+ if self.remain == 0:
+ read_pos = file.tell()
+ file.seek(0, 2)
+ sz = file.tell()
+ file.seek(read_pos)
+ if sz == 0:
+ # Nothing to prune.
+ return
+ nf = self.newfile()
+ while 1:
+ data = file.read(COPY_BYTES)
+ if not data:
+ break
+ nf.write(data)
+ self.file = nf
+
+ def getfile(self):
+ return self.file
+
+
+
+class TempfileBasedBuffer(FileBasedBuffer):
+
+ def __init__(self, from_buffer=None):
+ FileBasedBuffer.__init__(self, self.newfile(), from_buffer)
+
+ def newfile(self):
+ from tempfile import TemporaryFile
+ return TemporaryFile('w+b')
+
+
+
+class StringIOBasedBuffer(FileBasedBuffer):
+
+ def __init__(self, from_buffer=None):
+ if from_buffer is not None:
+ FileBasedBuffer.__init__(self, StringIO(), from_buffer)
+ else:
+ # Shortcut. :-)
+ self.file = StringIO()
+
+ def newfile(self):
+ return StringIO()
+
+
+
+class OverflowableBuffer:
+ """
+ This buffer implementation has four stages:
+ - No data
+ - String-based buffer
+ - StringIO-based buffer
+ - Temporary file storage
+ The first two stages are fastest for simple transfers.
+ """
+
+ overflowed = 0
+ buf = None
+ strbuf = '' # String-based buffer.
+
+ def __init__(self, overflow):
+ # overflow is the maximum to be stored in a StringIO buffer.
+ self.overflow = overflow
+
+ def __len__(self):
+ buf = self.buf
+ if buf is not None:
+ return len(buf)
+ else:
+ return len(self.strbuf)
+
+ def _create_buffer(self):
+ # print 'creating buffer'
+ strbuf = self.strbuf
+ if len(strbuf) >= self.overflow:
+ self._set_large_buffer()
+ else:
+ self._set_small_buffer()
+ buf = self.buf
+ if strbuf:
+ buf.append(self.strbuf)
+ self.strbuf = ''
+ return buf
+
+ def _set_small_buffer(self):
+ self.buf = StringIOBasedBuffer(self.buf)
+ self.overflowed = 0
+
+ def _set_large_buffer(self):
+ self.buf = TempfileBasedBuffer(self.buf)
+ self.overflowed = 1
+
+ def append(self, s):
+ buf = self.buf
+ if buf is None:
+ strbuf = self.strbuf
+ if len(strbuf) + len(s) < STRBUF_LIMIT:
+ self.strbuf = strbuf + s
+ return
+ buf = self._create_buffer()
+ buf.append(s)
+ sz = len(buf)
+ if not self.overflowed:
+ if sz >= self.overflow:
+ self._set_large_buffer()
+
+ def get(self, bytes=-1, skip=0):
+ buf = self.buf
+ if buf is None:
+ strbuf = self.strbuf
+ if not skip:
+ return strbuf
+ buf = self._create_buffer()
+ return buf.get(bytes, skip)
+
+ def skip(self, bytes, allow_prune=0):
+ buf = self.buf
+ if buf is None:
+ strbuf = self.strbuf
+ if allow_prune and bytes == len(strbuf):
+ # We could slice instead of converting to
+ # a buffer, but that would eat up memory in
+ # large transfers.
+ self.strbuf = ''
+ return
+ buf = self._create_buffer()
+ buf.skip(bytes, allow_prune)
+
+ def prune(self):
+ """
+ A potentially expensive operation that removes all data
+ already retrieved from the buffer.
+ """
+ buf = self.buf
+ if buf is None:
+ self.strbuf = ''
+ return
+ buf.prune()
+ if self.overflowed:
+ sz = len(buf)
+ if sz < self.overflow:
+ # Revert to a faster buffer.
+ self._set_small_buffer()
+
+ def getfile(self):
+ buf = self.buf
+ if buf is None:
+ buf = self._create_buffer()
+ return buf.getfile()
=== Zope3/lib/python/Zope/Server/DualModeChannel.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import asyncore
+import socket
+from time import time
+from UserDict import UserDict
+
+from Thread import SelectTrigger
+from Adjustments import default_adj
+from Buffers import OverflowableBuffer
+
+
+# Create the main trigger if it doesn't exist yet.
+if SelectTrigger.the_trigger is None:
+ SelectTrigger.the_trigger = SelectTrigger.Trigger()
+
+
+
+class DualModeChannel(asyncore.dispatcher):
+ """Channel that switches between asynchronous and synchronous mode.
+
+ Call set_sync() before using a channel in a thread other than
+ the thread handling the main loop.
+
+ Call set_async() to give the channel back to the thread handling
+ the main loop.
+ """
+
+ __implements__ = asyncore.dispatcher.__implements__
+
+ # will_close is set to 1 to close the socket.
+ will_close = 0
+
+ # boolean: async or sync mode
+ async_mode = 1
+
+ def __init__(self, conn, addr, adj=None):
+ self.addr = addr
+ if adj is None:
+ adj = default_adj
+ self.adj = adj
+ self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
+ self.creation_time = time()
+ asyncore.dispatcher.__init__(self, conn)
+
+ #
+ # ASYNCHRONOUS METHODS
+ #
+
+ def handle_close(self):
+ self.close()
+
+ def writable(self):
+ if not self.async_mode:
+ return 0
+ return self.will_close or self.outbuf
+
+ def handle_write(self):
+ if not self.async_mode:
+ return
+ self.inner_handle_write()
+
+ def inner_handle_write(self):
+ if self.outbuf:
+ try:
+ self._flush_some()
+ except socket.error:
+ self.handle_comm_error()
+ elif self.will_close:
+ self.close()
+
+ def readable(self):
+ if not self.async_mode:
+ return 0
+ return not self.will_close
+
+ def handle_read(self):
+ if not self.async_mode:
+ return
+ self.inner_handle_read()
+
+ def inner_handle_read(self):
+ try:
+ data = self.recv(self.adj.recv_bytes)
+ except socket.error:
+ self.handle_comm_error()
+ return
+ self.received(data)
+
+ def received(self, data):
+ """
+ Override to receive data in async mode.
+ """
+ pass
+
+ def handle_comm_error(self):
+ """
+ Designed for handling communication errors that occur
+ during asynchronous operations *only*. Probably should log
+ this, but in a different place.
+ """
+ self.handle_error()
+
+ def set_sync(self):
+ """Switches to synchronous mode.
+
+ The main thread will stop calling received().
+ """
+ self.async_mode = 0
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def write(self, data):
+ if data:
+ self.outbuf.append(data)
+ while len(self.outbuf) >= self.adj.send_bytes:
+ # Send what we can without blocking.
+ # We propagate errors to the application on purpose
+ # (to stop the application if the connection closes).
+ if not self._flush_some():
+ break
+
+ def flush(self, block=1):
+ """Sends pending data.
+
+ If block is set, this pauses the application. If it is turned
+ off, only the amount of data that can be sent without blocking
+ is sent.
+ """
+ if not block:
+ while self._flush_some():
+ pass
+ return
+ blocked = 0
+ try:
+ while self.outbuf:
+ # We propagate errors to the application on purpose.
+ if not blocked:
+ self.socket.setblocking(1)
+ blocked = 1
+ self._flush_some()
+ finally:
+ if blocked:
+ self.socket.setblocking(0)
+
+ def set_async(self):
+ """Switches to asynchronous mode.
+
+ The main thread will begin calling received() again.
+ """
+ self.async_mode = 1
+ self.pull_trigger()
+
+ #
+ # METHODS USED IN BOTH MODES
+ #
+
+ def pull_trigger(self):
+ """Wakes up the main loop.
+ """
+ SelectTrigger.the_trigger.pull_trigger()
+
+ def _flush_some(self):
+ """Flushes data.
+
+ Returns 1 if some data was sent."""
+ outbuf = self.outbuf
+ if outbuf and self.connected:
+ chunk = outbuf.get(self.adj.send_bytes)
+ num_sent = self.send(chunk)
+ if num_sent:
+ outbuf.skip(num_sent, 1)
+ return 1
+ return 0
+
+ def close_when_done(self):
+ # We might be able close immediately.
+ while self._flush_some():
+ pass
+ if not self.outbuf:
+ # Quick exit.
+ self.close()
+ else:
+ # Wait until outbuf is flushed.
+ self.will_close = 1
+ if not self.async_mode:
+ self.async_mode = 1
+ self.pull_trigger()
+
+
+allocate_lock = None
+
+
+class SimultaneousModeChannel (DualModeChannel):
+ """Layer on top of DualModeChannel that allows communication in
+ both the main thread and other threads at the same time.
+
+ The channel operates in synchronous mode with an asynchronous
+ helper. The asynchronous callbacks empty the output buffer
+ and fill the input buffer.
+ """
+
+ __implements__ = asyncore.dispatcher.__implements__
+
+
+ def __init__(self, conn, addr, adj=None):
+ global allocate_lock
+ if allocate_lock is None:
+ from thread import allocate_lock
+
+ # writelock protects all accesses to outbuf, since reads and
+ # writes of buffers in this class need to be serialized.
+ writelock = allocate_lock()
+ self._writelock_acquire = writelock.acquire
+ self._writelock_release = writelock.release
+ self._writelock_locked = writelock.locked
+ DualModeChannel.__init__(self, conn, addr, adj)
+
+ #
+ # ASYNCHRONOUS METHODS
+ #
+
+ def writable(self):
+ return self.will_close or (
+ self.outbuf and not self._writelock_locked())
+
+ def handle_write(self):
+ if not self._writelock_acquire(0):
+ # A synchronous method is writing.
+ return
+ try:
+ self.inner_handle_write()
+ finally:
+ self._writelock_release()
+
+ def readable(self):
+ return not self.will_close
+
+ def handle_read(self):
+ self.inner_handle_read()
+
+ def set_sync(self):
+ pass
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def write(self, data):
+ self._writelock_acquire()
+ try:
+ DualModeChannel.write(self, data)
+ finally:
+ self._writelock_release()
+
+ def flush(self, block=1):
+ self._writelock_acquire()
+ try:
+ DualModeChannel.flush(self, block)
+ finally:
+ self._writelock_release()
+
+ def set_async(self):
+ pass
+
+ #
+ # METHODS USED IN BOTH MODES
+ #
+
+ def close_when_done(self):
+ self.will_close = 1
+ self.pull_trigger()
+
+
=== Zope3/lib/python/Zope/Server/FixedStreamReceiver.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from IStreamConsumer import IStreamConsumer
+
+
+class FixedStreamReceiver:
+
+ __implements__ = IStreamConsumer
+
+ # See Zope.Server.IStreamConsumer.IStreamConsumer
+ completed = 0
+
+ def __init__(self, cl, buf):
+ self.remain = cl
+ self.buf = buf
+
+ ############################################################
+ # Implementation methods for interface
+ # Zope.Server.IStreamConsumer
+
+ def received(self, data):
+ 'See Zope.Server.IStreamConsumer.IStreamConsumer'
+ rm = self.remain
+ if rm < 1:
+ self.completed = 1 # Avoid any chance of spinning
+ return 0
+ datalen = len(data)
+ if rm <= datalen:
+ self.buf.append(data[:rm])
+ self.remain = 0
+ self.completed = 1
+ return rm
+ else:
+ self.buf.append(data)
+ self.remain -= datalen
+ return datalen
+
+ #
+ ############################################################
+
+ def getfile(self):
+ return self.buf.getfile()
=== Zope3/lib/python/Zope/Server/IDispatcher.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from ISocket import ISocket
+from IDispatcherEventHandler import IDispatcherEventHandler
+from IDispatcherLogging import IDispatcherLogging
+
+
+class IDispatcher(ISocket, IDispatcherEventHandler, IDispatcherLogging):
+ """The dispatcher is the most low-level component of a server.
+
+ 1. It manages the socket connections and distributes the
+ request to the appropriate channel.
+
+ 2. It handles the events passed to it, such as reading input,
+ writing output and handling errors. More about this
+ functionality can be found in IDispatcherEventHandler.
+
+ 3. It handles logging of the requests passed to the server as
+ well as other informational messages and erros. Please see
+ IDispatcherLogging for more details.
+
+ Note: Most of this documentation is taken from the Python
+ Library Reference.
+ """
+
+ def add_channel(map=None):
+ """After the low-level socket connection negotiation is
+ completed, a channel is created that handles all requests
+ and responses until the end of the connection.
+ """
+
+ def del_channel(map=None):
+ """Delete a channel. This should include also closing the
+ socket to the client.
+ """
+
+ def create_socket(family, type):
+ """This is identical to the creation of a normal socket, and
+ will use the same options for creation. Refer to the socket
+ documentation for information on creating sockets.
+ """
+
+ def readable():
+ """Each time through the select() loop, the set of sockets is
+ scanned, and this method is called to see if there is any
+ interest in reading. The default method simply returns 1,
+ indicating that by default, all channels will be
+ interested.
+ """
+
+ def writable():
+ """Each time through the select() loop, the set of sockets is
+ scanned, and this method is called to see if there is any
+ interest in writing. The default method simply returns 1,
+ indicating that by default, all channels will be
+ interested.
+ """
=== Zope3/lib/python/Zope/Server/IDispatcherEventHandler.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Interface import Interface
+
+
+class IDispatcherEventHandler(Interface):
+ """The Dispatcher can receive several different types of events. This
+ interface describes the necessary methods that handle these common
+ event types.
+ """
+
+ def handle_read_event():
+ """Given a read event, a server has to handle the event and
+ read the input from the client.
+ """
+
+ def handle_write_event():
+ """Given a write event, a server has to handle the event and
+ write the output to the client.
+ """
+
+ def handle_expt_event():
+ """An exception event was handed to the server.
+ """
+
+ def handle_error():
+ """An error occured, but we are still trying to fix it.
+ """
+
+ def handle_expt():
+ """Handle unhandled exceptions. This is usually a time to log.
+ """
+
+ def handle_read():
+ """Read output from client.
+ """
+
+ def handle_write():
+ """Write output via the socket to the client.
+ """
+
+ def handle_connect():
+ """A client requests a connection, now we need to do soemthing.
+ """
+
+ def handle_accept():
+ """A connection is accepted.
+ """
+
+ def handle_close():
+ """A connection is being closed.
+ """
=== Zope3/lib/python/Zope/Server/IDispatcherLogging.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Interface import Interface
+
+
+class IDispatcherLogging(Interface):
+ """This interface provides methods through which the Dispatcher will
+ write its logs. A distinction is made between hit and message logging,
+ since they often go to different output types and can have very
+ different structure.
+ """
+
+ def log (message):
+ """Logs general requests made to the server.
+ """
+
+ def log_info(message, type='info'):
+ """Logs informational messages, warnings and errors.
+ """
=== Zope3/lib/python/Zope/Server/IHeaderOutput.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+from Interface import Interface
+
+
+class IHeaderOutput (Interface):
+ """Interface for setting HTTP response headers.
+
+ This allows the HTTP server and the application to both set response
+ headers.
+
+ Zope.Publisher.HTTP.HTTPResponse is optionally passed an
+ object which implements this interface in order to intermingle
+ its headers with the HTTP server's response headers,
+ and for the purpose of better logging.
+ """
+
+ def setResponseStatus(status, reason):
+ """Sets the status code and the accompanying message.
+ """
+
+ def setResponseHeaders(mapping):
+ """Sets headers. The headers must be Correctly-Cased.
+ """
+
+ def appendResponseHeaders(lst):
+ """Sets headers that can potentially repeat.
+
+ Takes a list of strings.
+ """
+
+ def wroteResponseHeader():
+ """Returns a flag indicating whether the response
+
+ header has already been sent.
+ """
+
+ def setAuthUserName(name):
+ """Sets the name of the authenticated user so the name can be logged.
+ """
=== Zope3/lib/python/Zope/Server/IRequestFactory.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""
+
+Revision information:
+$Id$
+"""
+
+from Interface import Interface
+
+class IRequestFactory:
+
+ def __call__(input_stream, output_steam, environment):
+ """Create a request object *with* a publication
+
+ Factories that support multiple request/response/publication
+ types may look at the environment (headers) or the stream to
+ determine which request/response/publication to create.
+ """
+
+
+
=== Zope3/lib/python/Zope/Server/IServer.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Interface import Interface
+from Interface.Attribute import Attribute
+
+
+class IServer(Interface):
+ """This interface describes the basic base server.
+
+ The most unusual part about the Zope servers (since they all
+ implement this interface or inherit its base class) is that it
+ uses a mix of asynchronous and thread-based mechanism to
+ serve. While the low-level socket listener uses async, the
+ actual request is executed in a thread. This has the huge
+ advantage that if a request takes really long to process, the
+ server does not hang at that point to wait for the request to
+ finish.
+ """
+
+ channel_class = Attribute("""
+ The channel class defines the type of channel
+ to be used by the server. See IServerChannel
+ for more information.
+ """)
+
+ SERVER_IDENT = Attribute("""
+ This string identifies the server. By default
+ this is 'Zope.Server.' and should be
+ overridden.
+ """)
+
=== Zope3/lib/python/Zope/Server/IServerChannel.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Interface import Interface
+from Interface.Attribute import Attribute
+
+class IServerChannel(Interface):
+ """
+ """
+
+ parser_class = Attribute("Subclasses must provide a parser class")
+ task_class = Attribute("Subclasses must provide a task class.")
+
+ active_channels = Attribute("Class-specific channel tracker")
+ next_channel_cleanup = Attribute("Class-specific cleanup time")
+
+ proto_request = Attribute("A request parser instance")
+ ready_requests = Attribute("A list of requests to be processed.")
+ last_activity = Attribute("Time of last activity")
+ running_tasks = Attribute("boolean")
+
+
+ def queue_request(self, req):
+ """Queues a request to be processed in sequence by a task.
+ """
+
+ def end_task(self, close):
+ """Called at the end of a task, may launch another task.
+ """
+
+ def create_task(self, req):
+ """Creates a new task and queues it for execution.
+
+ The task may get executed in another thread.
+ """
+
=== Zope3/lib/python/Zope/Server/ISocket.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Interface import Interface
+
+
+class ISocket(Interface):
+ """Represents a socket.
+
+ Note: Most of this documentation is taken from the Python Library
+ Reference.
+ """
+
+ def listen(num):
+ """Listen for connections made to the socket. The backlog argument
+ specifies the maximum number of queued connections and should
+ be at least 1; the maximum value is system-dependent (usually
+ 5).
+ """
+
+ def bind(addr):
+ """Bind the socket to address. The socket must not already be bound.
+ """
+
+ def connect(address):
+ """Connect to a remote socket at address.
+ """
+
+ def accept():
+ """Accept a connection. The socket must be bound to an address and
+ listening for connections. The return value is a pair (conn,
+ address) where conn is a new socket object usable to send and
+ receive data on the connection, and address is the address
+ bound to the socket on the other end of the connection.
+ """
+
+ def recv(buffer_size):
+ """Receive data from the socket. The return value is a string
+ representing the data received. The maximum amount of data
+ to be received at once is specified by bufsize. See the
+ Unix manual page recv(2) for the meaning of the optional
+ argument flags; it defaults to zero.
+ """
+
+ def send(data):
+ """Send data to the socket. The socket must be connected to a
+ remote socket. The optional flags argument has the same
+ meaning as for recv() above. Returns the number of bytes
+ sent. Applications are responsible for checking that all
+ data has been sent; if only some of the data was
+ transmitted, the application needs to attempt delivery of
+ the remaining data.
+ """
+
+ def close():
+ """Close the socket. All future operations on the socket
+ object will fail. The remote end will receive no more data
+ (after queued data is flushed). Sockets are automatically
+ closed when they are garbage-collected.
+ """
=== Zope3/lib/python/Zope/Server/IStreamConsumer.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+from Interface import Interface
+from Interface.Attribute import Attribute
+
+class IStreamConsumer (Interface):
+ """Consumes a data stream until reaching a completion point.
+
+ The actual amount to be consumed might not be known ahead of time.
+ """
+
+ def received(data):
+ """Accepts data, returning the number of bytes consumed."""
+
+ completed = Attribute(
+ 'completed', 'Set to a true value when finished consuming data.')
=== Zope3/lib/python/Zope/Server/ITask.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+from Interface import Interface
+
+
+class ITask (Interface):
+ """
+ The interface expected of an object placed in the queue of
+ a ThreadedTaskDispatcher. Provides facilities for executing
+ or canceling the task.
+ """
+
+ def service():
+ """
+ Services the task. Either service() or cancel() is called
+ for every task queued.
+ """
+
+ def cancel():
+ """
+ Called instead of service() during shutdown or if an
+ exception occurs that prevents the task from being
+ serviced. Must return quickly and should not throw exceptions.
+ """
+
+ def defer():
+ """
+ Called just before the task is queued to be executed in
+ a different thread.
+ """
=== Zope3/lib/python/Zope/Server/ITaskDispatcher.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+from Interface import Interface
+
+class ITaskDispatcher (Interface):
+ """An object that accepts tasks and dispatches them to threads.
+ """
+
+ def setThreadCount(count):
+ """Sets the number of handler threads.
+ """
+
+ def addTask(task):
+ """Receives a task and dispatches it to a thread.
+
+ Note that, depending on load, a task may have to wait a
+ while for its turn.
+ """
+
+ def shutdown(cancel_pending=1, timeout=5):
+ """Shuts down all handler threads and may cancel pending tasks.
+ """
+
+ def getPendingTasksEstimate():
+ """Returns an estimate of the number of tasks waiting to be serviced.
+
+ This method may be useful for monitoring purposes. If the
+ number of pending tasks is continually climbing, your server
+ is becoming overloaded and the operator should be notified.
+ """
+
=== Zope3/lib/python/Zope/Server/MaxSockets.py 1.1 => 1.2 ===
+
+import socket
+import select
+
+# several factors here we might want to test:
+# 1) max we can create
+# 2) max we can bind
+# 3) max we can listen on
+# 4) max we can connect
+
+def max_server_sockets():
+ sl = []
+ while 1:
+ try:
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.bind (('',0))
+ s.listen(5)
+ sl.append (s)
+ except:
+ break
+ num = len(sl)
+ for s in sl:
+ s.close()
+ del sl
+ return num
+
+def max_client_sockets():
+ # make a server socket
+ server = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ server.bind (('', 9999))
+ server.listen (5)
+ sl = []
+ while 1:
+ try:
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.connect (('', 9999))
+ conn, addr = server.accept()
+ sl.append ((s,conn))
+ except:
+ break
+ num = len(sl)
+ for s,c in sl:
+ s.close()
+ c.close()
+ del sl
+ return num
+
+def max_select_sockets():
+ sl = []
+ while 1:
+ try:
+ num = len(sl)
+ for i in range(1 + len(sl) * 0.05):
+ # Increase exponentially.
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.bind (('',0))
+ s.listen(5)
+ sl.append (s)
+ select.select(sl,[],[],0)
+ except:
+ break
+ for s in sl:
+ s.close()
+ del sl
+ return num
=== Zope3/lib/python/Zope/Server/ServerBase.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import asyncore
+import socket
+
+from Adjustments import default_adj
+
+from IServer import IServer
+
+
+class ServerBase(asyncore.dispatcher, object):
+ """Async. server base for launching derivatives of ServerChannelBase.
+ """
+
+ __implements__ = asyncore.dispatcher.__implements__, IServer
+
+ channel_class = None # Override with a channel class.
+ SERVER_IDENT = 'Zope.Server.ServerBase' # Override.
+
+ def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
+ hit_log=None, verbose=0):
+ if adj is None:
+ adj = default_adj
+ self.adj = adj
+ asyncore.dispatcher.__init__(self)
+ self.port = port
+ self.task_dispatcher = task_dispatcher
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind((ip, port))
+ self.verbose = verbose
+ self.hit_log = hit_log
+ self.server_name = self.computeServerName(ip)
+
+ if start:
+ self.accept_connections()
+
+ def computeServerName(self, ip=''):
+ if ip:
+ server_name = str(ip)
+ else:
+ server_name = str(socket.gethostname())
+ # Convert to a host name if necessary.
+ is_hostname = 0
+ for c in server_name:
+ if c != '.' and not c.isdigit():
+ is_hostname = 1
+ break
+ if not is_hostname:
+ if self.verbose:
+ self.log_info('Computing hostname', 'info')
+ try:
+ server_name = socket.gethostbyaddr(server_name)[0]
+ except socket.error:
+ if self.verbose:
+ self.log_info('Cannot do reverse lookup', 'info')
+ return server_name
+
+ def accept_connections(self):
+ self.accepting = 1
+ self.socket.listen(self.adj.backlog) # Circumvent asyncore's NT limit
+ if self.verbose:
+ self.log_info('%s started.\n'
+ '\tHostname: %s\n\tPort: %d' % (
+ self.SERVER_IDENT,
+ self.server_name,
+ self.port
+ ))
+
+
+ def addTask(self, task):
+ td = self.task_dispatcher
+ if td is not None:
+ td.addTask(task)
+ else:
+ task.service()
+
+ ############################################################
+ # Implementation methods for interface
+ # Zope.Server.IDispatcher.IDispatcher
+
+ def readable(self):
+ 'See Zope.Server.IDispatcher.IDispatcher'
+ return (self.accepting and
+ len(asyncore.socket_map) < self.adj.connection_limit)
+
+ def writable(self):
+ 'See Zope.Server.IDispatcher.IDispatcher'
+ return 0
+
+ ######################################
+ # from: Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler
+
+ def handle_read(self):
+ 'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
+ pass
+
+ def handle_connect(self):
+ 'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
+ pass
+
+ def handle_accept(self):
+ 'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
+ try:
+ v = self.accept()
+ if v is None:
+ return
+ conn, addr = v
+ except socket.error:
+ # Linux: On rare occasions we get a bogus socket back from
+ # accept. socketmodule.c:makesockaddr complains that the
+ # address family is unknown. We don't want the whole server
+ # to shut down because of this.
+ if self.adj.log_socket_errors:
+ self.log_info ('warning: server accept() threw an exception',
+ 'warning')
+ return
+ self.channel_class(self, conn, addr, self.adj)
+
+ #
+ ############################################################
+
=== Zope3/lib/python/Zope/Server/ServerChannelBase.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import os
+import time
+import sys
+import asyncore
+from thread import allocate_lock
+
+# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
+# simultaneous channel mode, which may improve or degrade
+# throughput depending on load characteristics.
+if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
+ from DualModeChannel import SimultaneousModeChannel as \
+ ChannelBaseClass
+else:
+ from DualModeChannel import DualModeChannel as ChannelBaseClass
+
+from IServerChannel import IServerChannel
+
+# Synchronize access to the "running_tasks" attributes.
+running_lock = allocate_lock()
+
+
+class ServerChannelBase(ChannelBaseClass, object):
+ """Base class for a high-performance, mixed-mode server-side channel.
+ """
+
+ __implements__ = ChannelBaseClass.__implements__, IServerChannel
+
+
+ parser_class = None # Subclasses must provide a parser class
+ task_class = None # ... and a task class.
+
+ active_channels = {} # Class-specific channel tracker
+ next_channel_cleanup = [0] # Class-specific cleanup time
+
+ proto_request = None # A request parser instance
+ ready_requests = None # A list
+ # ready_requests must always be empty when not running tasks.
+ last_activity = 0 # Time of last activity
+ running_tasks = 0 # boolean: true when any task is being executed
+
+ #
+ # ASYNCHRONOUS METHODS (incl. __init__)
+ #
+
+ def __init__(self, server, conn, addr, adj=None):
+ ChannelBaseClass.__init__(self, conn, addr, adj)
+ self.server = server
+ self.last_activity = t = self.creation_time
+ self.check_maintenance(t)
+
+
+ def add_channel(self, map=None):
+ """This hook keeps track of opened HTTP channels.
+ """
+ ChannelBaseClass.add_channel(self, map)
+ self.__class__.active_channels[self._fileno] = self
+
+
+ def del_channel(self, map=None):
+ """This hook keeps track of closed HTTP channels.
+ """
+ ChannelBaseClass.del_channel(self, map)
+ ac = self.__class__.active_channels
+ fd = self._fileno
+ if fd in ac:
+ del ac[fd]
+
+
+ def check_maintenance(self, now):
+ """Performs maintenance if necessary.
+ """
+ ncc = self.__class__.next_channel_cleanup
+ if now < ncc[0]:
+ return
+ ncc[0] = now + self.adj.cleanup_interval
+ self.maintenance()
+
+
+ def maintenance(self):
+ """Kills off dead connections.
+ """
+ self.kill_zombies()
+
+
+ def kill_zombies(self):
+ """Closes connections that have not had any activity in a while.
+
+ The timeout is configured through adj.channel_timeout (seconds).
+ """
+ now = time.time()
+ cutoff = now - self.adj.channel_timeout
+ for channel in self.active_channels.values():
+ if (channel is not self and not channel.running_tasks and
+ channel.last_activity < cutoff):
+ channel.close()
+
+
+ def received(self, data):
+ """Receive input asynchronously and send requests to
+ receivedCompleteRequest().
+ """
+ preq = self.proto_request
+ while data:
+ if preq is None:
+ preq = self.parser_class(self.adj)
+ n = preq.received(data)
+ if preq.completed:
+ # The request is ready to use.
+ if not preq.empty:
+ self.receivedCompleteRequest(preq)
+ preq = None
+ self.proto_request = None
+ else:
+ self.proto_request = preq
+ if n >= len(data):
+ break
+ data = data[n:]
+
+
+ def receivedCompleteRequest(self, req):
+ """If there are tasks running or requests on hold, queue
+ the request, otherwise execute it.
+ """
+ do_now = 0
+ running_lock.acquire()
+ try:
+ if self.running_tasks:
+ # A task thread is working. It will read from the queue
+ # when it is finished.
+ rr = self.ready_requests
+ if rr is None:
+ rr = []
+ self.ready_requests = rr
+ rr.append(req)
+ else:
+ # Do it now.
+ do_now = 1
+ finally:
+ running_lock.release()
+ if do_now:
+ task = self.process_request(req)
+ if task is not None:
+ self.start_task(task)
+
+
+ def start_task(self, task):
+ """Starts the given task.
+
+ *** For thread safety, this should only be called from the main
+ (async) thread. ***"""
+ if self.running_tasks:
+ # Can't start while another task is running!
+ # Otherwise two threads would work on the queue at the same time.
+ raise RuntimeError, 'Already executing tasks'
+ self.running_tasks = 1
+ self.set_sync()
+ self.server.addTask(task)
+
+
+ def handle_error(self):
+ """Handles program errors (not communication errors)
+ """
+ t, v = sys.exc_info()[:2]
+ if t is SystemExit or t is KeyboardInterrupt:
+ raise t, v
+ asyncore.dispatcher.handle_error(self)
+
+
+ def handle_comm_error(self):
+ """Handles communication errors (not program errors)
+ """
+ if self.adj.log_socket_errors:
+ self.handle_error()
+ else:
+ # Ignore socket errors.
+ self.close()
+
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def end_task(self, close):
+ """Called at the end of a task and may launch another task.
+ """
+ if close:
+ # Note that self.running_tasks is left on, which has the
+ # side effect of preventing further requests from being
+ # serviced even if more appear. A good thing.
+ self.close_when_done()
+ return
+ # Process requests held in the queue, if any.
+ while 1:
+ req = None
+ running_lock.acquire()
+ try:
+ rr = self.ready_requests
+ if rr:
+ req = rr.pop(0)
+ else:
+ # No requests to process.
+ self.running_tasks = 0
+ finally:
+ running_lock.release()
+
+ if req is not None:
+ task = self.process_request(req)
+ if task is not None:
+ # Add the new task. It will service the queue.
+ self.server.addTask(task)
+ break
+ # else check the queue again.
+ else:
+ # Idle -- Wait for another request on this connection.
+ self.set_async()
+ break
+
+
+ #
+ # BOTH MODES
+ #
+
+ def process_request(self, req):
+ """Returns a task to execute or None if the request is quick and
+ can be processed in the main thread.
+
+ Override to handle some requests in the main thread.
+ """
+ return self.task_class(self, req)
+
+
=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+import sys
+from Queue import Queue, Empty
+from thread import allocate_lock, start_new_thread
+from time import time, sleep
+
+from ITaskDispatcher import ITaskDispatcher
+
+try:
+ from zLOG import LOG, ERROR
+except ImportError:
+ LOG = None
+ ERROR = None
+
+
+
+class ThreadedTaskDispatcher:
+
+ __implements__ = ITaskDispatcher
+
+ stop_count = 0 # Number of threads that will stop soon.
+
+ def __init__(self):
+ self.threads = {} # { thread number -> 1 }
+ self.queue = Queue()
+ self.thread_mgmt_lock = allocate_lock()
+
+ def handlerThread(self, thread_no):
+ threads = self.threads
+ try:
+ while threads.get(thread_no):
+ task = self.queue.get()
+ if task is None:
+ # Special value: kill this thread.
+ break
+ try:
+ task.service()
+ except:
+ self.error('Exception during task', sys.exc_info())
+ finally:
+ mlock = self.thread_mgmt_lock
+ mlock.acquire()
+ try:
+ self.stop_count -= 1
+ try: del threads[thread_no]
+ except KeyError: pass
+ finally:
+ mlock.release()
+
+ def setThreadCount(self, count):
+ mlock = self.thread_mgmt_lock
+ mlock.acquire()
+ try:
+ threads = self.threads
+ thread_no = 0
+ running = len(threads) - self.stop_count
+ while running < count:
+ # Start threads.
+ while thread_no in threads:
+ thread_no = thread_no + 1
+ threads[thread_no] = 1
+ running += 1
+ start_new_thread(self.handlerThread, (thread_no,))
+ thread_no = thread_no + 1
+ if running > count:
+ # Stop threads.
+ to_stop = running - count
+ self.stop_count += to_stop
+ for n in range(to_stop):
+ self.queue.put(None)
+ running -= 1
+ finally:
+ mlock.release()
+
+ def addTask(self, task):
+ if task is None:
+ raise ValueError, "No task passed to addTask()."
+ # assert ITask.isImplementedBy(task)
+ try:
+ task.defer()
+ self.queue.put(task)
+ except:
+ task.cancel()
+ raise
+
+ def error(self, msg, exc=None):
+ if LOG is not None:
+ LOG('ThreadedTaskDispatcher', ERROR, msg, error=exc)
+ else:
+ sys.stderr.write(msg + '\n')
+ if exc is not None:
+ import traceback
+ traceback.print_exception(exc[0], exc[1], exc[2])
+
+ def shutdown(self, cancel_pending=1, timeout=5):
+ self.setThreadCount(0)
+ # Ensure the threads shut down.
+ threads = self.threads
+ expiration = time() + timeout
+ while threads:
+ if time() >= expiration:
+ self.error("%d thread(s) still running" % len(threads))
+ sleep(0.1)
+ if cancel_pending:
+ # Cancel remaining tasks.
+ try:
+ queue = self.queue
+ while not queue.empty():
+ task = queue.get()
+ if task is not None:
+ task.cancel()
+ except Empty:
+ pass
+
+ def getPendingTasksEstimate(self):
+ return self.queue.qsize()
+
=== Zope3/lib/python/Zope/Server/Utilities.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+def find_double_newline(s):
+ """Returns the position just after a double newline in the given string."""
+ pos1 = s.find('\n\r\n') # One kind of double newline
+ if pos1 >= 0:
+ pos1 += 3
+ pos2 = s.find('\n\n') # Another kind of double newline
+ if pos2 >= 0:
+ pos2 += 2
+
+ if pos1 >= 0:
+ if pos2 >= 0:
+ return min(pos1, pos2)
+ else:
+ return pos1
+ else:
+ return pos2
+
+
=== Zope3/lib/python/Zope/Server/ZLogIntegration.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+"""Makes asyncore log to zLOG.
+"""
+
+from zLOG import LOG, register_subsystem, BLATHER, INFO, WARNING, ERROR
+register_subsystem('ZServer')
+severity={'info':INFO, 'warning':WARNING, 'error': ERROR}
+
+def log_info(self, message, type='info'):
+ LOG('Zope.Server', severity[type], message)
+
+import asyncore
+asyncore.dispatcher.log_info=log_info
=== Zope3/lib/python/Zope/Server/__init__.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+Zope.Server package.
+
+$Id$
+"""
+
+from IDispatcher import IDispatcher
+from Interface.Implements import implements
+
+import asyncore
+
+implements(asyncore.dispatcher, IDispatcher, 0)
+