[Zope3-checkins] CVS: Zope3/src/zope/server - __init__.py:1.2 adjustments.py:1.2 buffers.py:1.2 dualmodechannel.py:1.2 fixedstreamreceiver.py:1.2 maxsockets.py:1.2 selecttrigger.py:1.2 serverbase.py:1.2 serverchannelbase.py:1.2 taskthreads.py:1.2 utilities.py:1.2 zlogintegration.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:15:54 -0500
Update of /cvs-repository/Zope3/src/zope/server
In directory cvs.zope.org:/tmp/cvs-serv20790/src/zope/server
Added Files:
__init__.py adjustments.py buffers.py dualmodechannel.py
fixedstreamreceiver.py maxsockets.py selecttrigger.py
serverbase.py serverchannelbase.py taskthreads.py utilities.py
zlogintegration.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zope/server/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/__init__.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,24 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import asyncore
+
+from zope.server.interfaces import IDispatcher
+from zope.interface.implements import implements
+
+implements(asyncore.dispatcher, IDispatcher, 0)
=== Zope3/src/zope/server/adjustments.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/adjustments.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,66 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Adjustments are tunable parameters.
+
+$Id$
+"""
+
+from zope.server import maxsockets
+
+
+class Adjustments:
+ """This class contains tunable communication parameters.
+
+ You can either change default_adj to adjust parameters for
+ all sockets, or you can create a new instance of this class,
+ change its attributes, and pass it to the channel constructors.
+ """
+
+ # backlog is the argument to pass to socket.listen().
+ backlog = 1024
+
+ # recv_bytes is the argument to pass to socket.recv().
+ recv_bytes = 8192
+
+ # send_bytes is the number of bytes to send to socket.send().
+ send_bytes = 8192
+
+ # copy_bytes is the number of bytes to copy from one file to another.
+ copy_bytes = 65536
+
+ # Create a tempfile if the pending output data gets larger
+ # than outbuf_overflow. With RAM so cheap, this probably
+ # ought to be set to the 16-32 MB range (circa 2001) for
+ # good performance with big transfers. The default is
+ # conservative.
+ outbuf_overflow = 1050000
+
+ # Create a tempfile if the data received gets larger
+ # than inbuf_overflow.
+ inbuf_overflow = 525000
+
+ # Stop accepting new connections if too many are already active.
+ connection_limit = maxsockets.max_select_sockets() - 3 # Safe
+
+ # Minimum seconds between cleaning up inactive channels.
+ cleanup_interval = 300
+
+ # Maximum seconds to leave an inactive connection open.
+ channel_timeout = 900
+
+ # Boolean: turn off to not log premature client disconnects.
+ log_socket_errors = 1
+
+
+default_adj = Adjustments()
=== Zope3/src/zope/server/buffers.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/buffers.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,230 @@
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+
+# copy_bytes controls the size of temp. strings for shuffling data around.
+COPY_BYTES = 1 << 18 # 256K
+
+# The maximum number of bytes to buffer in a simple string.
+STRBUF_LIMIT = 8192
+
+
+class FileBasedBuffer:
+
+ remain = 0
+
+ def __init__(self, file, from_buffer=None):
+ self.file = file
+ if from_buffer is not None:
+ from_file = from_buffer.getfile()
+ read_pos = from_file.tell()
+ from_file.seek(0)
+ while 1:
+ data = from_file.read(COPY_BYTES)
+ if not data:
+ break
+ file.write(data)
+ self.remain = int(file.tell() - read_pos)
+ from_file.seek(read_pos)
+ file.seek(read_pos)
+
+ def __len__(self):
+ return self.remain
+
+ def append(self, s):
+ file = self.file
+ read_pos = file.tell()
+ file.seek(0, 2)
+ file.write(s)
+ file.seek(read_pos)
+ self.remain = self.remain + len(s)
+
+ def get(self, bytes=-1, skip=0):
+ file = self.file
+ if not skip:
+ read_pos = file.tell()
+ if bytes < 0:
+ # Read all
+ res = file.read()
+ else:
+ res = file.read(bytes)
+ if skip:
+ self.remain -= len(res)
+ else:
+ file.seek(read_pos)
+ return res
+
+ def skip(self, bytes, allow_prune=0):
+ if self.remain < bytes:
+ raise ValueError, (
+ "Can't skip %d bytes in buffer of %d bytes" %
+ (bytes, self.remain))
+ self.file.seek(bytes, 1)
+ self.remain = self.remain - bytes
+
+ def newfile(self):
+ raise 'NotImplemented'
+
+ def prune(self):
+ file = self.file
+ if self.remain == 0:
+ read_pos = file.tell()
+ file.seek(0, 2)
+ sz = file.tell()
+ file.seek(read_pos)
+ if sz == 0:
+ # Nothing to prune.
+ return
+ nf = self.newfile()
+ while 1:
+ data = file.read(COPY_BYTES)
+ if not data:
+ break
+ nf.write(data)
+ self.file = nf
+
+ def getfile(self):
+ return self.file
+
+
+
+class TempfileBasedBuffer(FileBasedBuffer):
+
+ def __init__(self, from_buffer=None):
+ FileBasedBuffer.__init__(self, self.newfile(), from_buffer)
+
+ def newfile(self):
+ from tempfile import TemporaryFile
+ return TemporaryFile('w+b')
+
+
+
+class StringIOBasedBuffer(FileBasedBuffer):
+
+ def __init__(self, from_buffer=None):
+ if from_buffer is not None:
+ FileBasedBuffer.__init__(self, StringIO(), from_buffer)
+ else:
+ # Shortcut. :-)
+ self.file = StringIO()
+
+ def newfile(self):
+ return StringIO()
+
+
+
+class OverflowableBuffer:
+ """
+ This buffer implementation has four stages:
+ - No data
+ - String-based buffer
+ - StringIO-based buffer
+ - Temporary file storage
+ The first two stages are fastest for simple transfers.
+ """
+
+ overflowed = 0
+ buf = None
+ strbuf = '' # String-based buffer.
+
+ def __init__(self, overflow):
+ # overflow is the maximum to be stored in a StringIO buffer.
+ self.overflow = overflow
+
+ def __len__(self):
+ buf = self.buf
+ if buf is not None:
+ return len(buf)
+ else:
+ return len(self.strbuf)
+
+ def _create_buffer(self):
+ # print 'creating buffer'
+ strbuf = self.strbuf
+ if len(strbuf) >= self.overflow:
+ self._set_large_buffer()
+ else:
+ self._set_small_buffer()
+ buf = self.buf
+ if strbuf:
+ buf.append(self.strbuf)
+ self.strbuf = ''
+ return buf
+
+ def _set_small_buffer(self):
+ self.buf = StringIOBasedBuffer(self.buf)
+ self.overflowed = 0
+
+ def _set_large_buffer(self):
+ self.buf = TempfileBasedBuffer(self.buf)
+ self.overflowed = 1
+
+ def append(self, s):
+ buf = self.buf
+ if buf is None:
+ strbuf = self.strbuf
+ if len(strbuf) + len(s) < STRBUF_LIMIT:
+ self.strbuf = strbuf + s
+ return
+ buf = self._create_buffer()
+ buf.append(s)
+ sz = len(buf)
+ if not self.overflowed:
+ if sz >= self.overflow:
+ self._set_large_buffer()
+
+ def get(self, bytes=-1, skip=0):
+ buf = self.buf
+ if buf is None:
+ strbuf = self.strbuf
+ if not skip:
+ return strbuf
+ buf = self._create_buffer()
+ return buf.get(bytes, skip)
+
+ def skip(self, bytes, allow_prune=0):
+ buf = self.buf
+ if buf is None:
+ strbuf = self.strbuf
+ if allow_prune and bytes == len(strbuf):
+ # We could slice instead of converting to
+ # a buffer, but that would eat up memory in
+ # large transfers.
+ self.strbuf = ''
+ return
+ buf = self._create_buffer()
+ buf.skip(bytes, allow_prune)
+
+ def prune(self):
+ """
+ A potentially expensive operation that removes all data
+ already retrieved from the buffer.
+ """
+ buf = self.buf
+ if buf is None:
+ self.strbuf = ''
+ return
+ buf.prune()
+ if self.overflowed:
+ sz = len(buf)
+ if sz < self.overflow:
+ # Revert to a faster buffer.
+ self._set_small_buffer()
+
+ def getfile(self):
+ buf = self.buf
+ if buf is None:
+ buf = self._create_buffer()
+ return buf.getfile()
=== Zope3/src/zope/server/dualmodechannel.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/dualmodechannel.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,290 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import asyncore
+import socket
+from time import time
+from UserDict import UserDict
+
+import selecttrigger
+from zope.server.adjustments import default_adj
+from zope.server.buffers import OverflowableBuffer
+
+
+# Create the main trigger if it doesn't exist yet.
+if selecttrigger.the_trigger is None:
+ selecttrigger.the_trigger = selecttrigger.Trigger()
+
+
+
+class DualModeChannel(asyncore.dispatcher):
+ """Channel that switches between asynchronous and synchronous mode.
+
+ Call set_sync() before using a channel in a thread other than
+ the thread handling the main loop.
+
+ Call set_async() to give the channel back to the thread handling
+ the main loop.
+ """
+
+ __implements__ = asyncore.dispatcher.__implements__
+
+ # will_close is set to 1 to close the socket.
+ will_close = 0
+
+ # boolean: async or sync mode
+ async_mode = 1
+
+ def __init__(self, conn, addr, adj=None):
+ self.addr = addr
+ if adj is None:
+ adj = default_adj
+ self.adj = adj
+ self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
+ self.creation_time = time()
+ asyncore.dispatcher.__init__(self, conn)
+
+ #
+ # ASYNCHRONOUS METHODS
+ #
+
+ def handle_close(self):
+ self.close()
+
+ def writable(self):
+ if not self.async_mode:
+ return 0
+ return self.will_close or self.outbuf
+
+ def handle_write(self):
+ if not self.async_mode:
+ return
+ self.inner_handle_write()
+
+ def inner_handle_write(self):
+ if self.outbuf:
+ try:
+ self._flush_some()
+ except socket.error:
+ self.handle_comm_error()
+ elif self.will_close:
+ self.close()
+
+ def readable(self):
+ if not self.async_mode:
+ return 0
+ return not self.will_close
+
+ def handle_read(self):
+ if not self.async_mode:
+ return
+ self.inner_handle_read()
+
+ def inner_handle_read(self):
+ try:
+ data = self.recv(self.adj.recv_bytes)
+ except socket.error:
+ self.handle_comm_error()
+ return
+ self.received(data)
+
+ def received(self, data):
+ """
+ Override to receive data in async mode.
+ """
+ pass
+
+ def handle_comm_error(self):
+ """
+ Designed for handling communication errors that occur
+ during asynchronous operations *only*. Probably should log
+ this, but in a different place.
+ """
+ self.handle_error()
+
+ def set_sync(self):
+ """Switches to synchronous mode.
+
+ The main thread will stop calling received().
+ """
+ self.async_mode = 0
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def write(self, data):
+ if data:
+ self.outbuf.append(data)
+ while len(self.outbuf) >= self.adj.send_bytes:
+ # Send what we can without blocking.
+ # We propagate errors to the application on purpose
+ # (to stop the application if the connection closes).
+ if not self._flush_some():
+ break
+
+ def flush(self, block=1):
+ """Sends pending data.
+
+ If block is set, this pauses the application. If it is turned
+ off, only the amount of data that can be sent without blocking
+ is sent.
+ """
+ if not block:
+ while self._flush_some():
+ pass
+ return
+ blocked = 0
+ try:
+ while self.outbuf:
+ # We propagate errors to the application on purpose.
+ if not blocked:
+ self.socket.setblocking(1)
+ blocked = 1
+ self._flush_some()
+ finally:
+ if blocked:
+ self.socket.setblocking(0)
+
+ def set_async(self):
+ """Switches to asynchronous mode.
+
+ The main thread will begin calling received() again.
+ """
+ self.async_mode = 1
+ self.pull_trigger()
+
+ #
+ # METHODS USED IN BOTH MODES
+ #
+
+ def pull_trigger(self):
+ """Wakes up the main loop.
+ """
+ selecttrigger.the_trigger.pull_trigger()
+
+ def _flush_some(self):
+ """Flushes data.
+
+ Returns 1 if some data was sent."""
+ outbuf = self.outbuf
+ if outbuf and self.connected:
+ chunk = outbuf.get(self.adj.send_bytes)
+ num_sent = self.send(chunk)
+ if num_sent:
+ outbuf.skip(num_sent, 1)
+ return 1
+ return 0
+
+ def close_when_done(self):
+ # We might be able close immediately.
+ while self._flush_some():
+ pass
+ if not self.outbuf:
+ # Quick exit.
+ self.close()
+ else:
+ # Wait until outbuf is flushed.
+ self.will_close = 1
+ if not self.async_mode:
+ self.async_mode = 1
+ self.pull_trigger()
+
+
+allocate_lock = None
+
+
+class SimultaneousModeChannel (DualModeChannel):
+ """Layer on top of DualModeChannel that allows communication in
+ both the main thread and other threads at the same time.
+
+ The channel operates in synchronous mode with an asynchronous
+ helper. The asynchronous callbacks empty the output buffer
+ and fill the input buffer.
+ """
+
+ __implements__ = asyncore.dispatcher.__implements__
+
+
+ def __init__(self, conn, addr, adj=None):
+ global allocate_lock
+ if allocate_lock is None:
+ from thread import allocate_lock
+
+ # writelock protects all accesses to outbuf, since reads and
+ # writes of buffers in this class need to be serialized.
+ writelock = allocate_lock()
+ self._writelock_acquire = writelock.acquire
+ self._writelock_release = writelock.release
+ self._writelock_locked = writelock.locked
+ DualModeChannel.__init__(self, conn, addr, adj)
+
+ #
+ # ASYNCHRONOUS METHODS
+ #
+
+ def writable(self):
+ return self.will_close or (
+ self.outbuf and not self._writelock_locked())
+
+ def handle_write(self):
+ if not self._writelock_acquire(0):
+ # A synchronous method is writing.
+ return
+ try:
+ self.inner_handle_write()
+ finally:
+ self._writelock_release()
+
+ def readable(self):
+ return not self.will_close
+
+ def handle_read(self):
+ self.inner_handle_read()
+
+ def set_sync(self):
+ pass
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def write(self, data):
+ self._writelock_acquire()
+ try:
+ DualModeChannel.write(self, data)
+ finally:
+ self._writelock_release()
+
+ def flush(self, block=1):
+ self._writelock_acquire()
+ try:
+ DualModeChannel.flush(self, block)
+ finally:
+ self._writelock_release()
+
+ def set_async(self):
+ pass
+
+ #
+ # METHODS USED IN BOTH MODES
+ #
+
+ def close_when_done(self):
+ self.will_close = 1
+ self.pull_trigger()
=== Zope3/src/zope/server/fixedstreamreceiver.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/fixedstreamreceiver.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,51 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from zope.server.interfaces import IStreamConsumer
+
+
+class FixedStreamReceiver:
+
+ __implements__ = IStreamConsumer
+
+ # See IStreamConsumer
+ completed = 0
+
+ def __init__(self, cl, buf):
+ self.remain = cl
+ self.buf = buf
+
+ def received(self, data):
+ 'See IStreamConsumer'
+ rm = self.remain
+ if rm < 1:
+ self.completed = 1 # Avoid any chance of spinning
+ return 0
+ datalen = len(data)
+ if rm <= datalen:
+ self.buf.append(data[:rm])
+ self.remain = 0
+ self.completed = 1
+ return rm
+ else:
+ self.buf.append(data)
+ self.remain -= datalen
+ return datalen
+
+ def getfile(self):
+ return self.buf.getfile()
=== Zope3/src/zope/server/maxsockets.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/maxsockets.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,66 @@
+# Medusa max_sockets module.
+
+import socket
+import select
+
+# several factors here we might want to test:
+# 1) max we can create
+# 2) max we can bind
+# 3) max we can listen on
+# 4) max we can connect
+
+def max_server_sockets():
+ sl = []
+ while 1:
+ try:
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.bind (('',0))
+ s.listen(5)
+ sl.append (s)
+ except:
+ break
+ num = len(sl)
+ for s in sl:
+ s.close()
+ del sl
+ return num
+
+def max_client_sockets():
+ # make a server socket
+ server = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ server.bind (('', 9999))
+ server.listen (5)
+ sl = []
+ while 1:
+ try:
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.connect (('', 9999))
+ conn, addr = server.accept()
+ sl.append ((s,conn))
+ except:
+ break
+ num = len(sl)
+ for s,c in sl:
+ s.close()
+ c.close()
+ del sl
+ return num
+
+def max_select_sockets():
+ sl = []
+ while 1:
+ try:
+ num = len(sl)
+ for i in range(1 + len(sl) * 0.05):
+ # Increase exponentially.
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.bind (('',0))
+ s.listen(5)
+ sl.append (s)
+ select.select(sl,[],[],0)
+ except:
+ break
+ for s in sl:
+ s.close()
+ del sl
+ return num
=== Zope3/src/zope/server/selecttrigger.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/selecttrigger.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,310 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+# -*- Mode: Python; tab-width: 4 -*-
+
+VERSION_STRING = "$Id$"
+
+import asyncore
+import asynchat
+
+import os
+import socket
+import string
+import thread
+import traceback
+
+if os.name == 'posix':
+
+ class Trigger(asyncore.file_dispatcher):
+
+ "Wake up a call to select() running in the main thread"
+
+ # This is useful in a context where you are using Medusa's I/O
+ # subsystem to deliver data, but the data is generated by another
+ # thread. Normally, if Medusa is in the middle of a call to
+ # select(), new output data generated by another thread will have
+ # to sit until the call to select() either times out or returns.
+ # If the trigger is 'pulled' by another thread, it should immediately
+ # generate a READ event on the trigger object, which will force the
+ # select() invocation to return.
+
+ # A common use for this facility: letting Medusa manage I/O for a
+ # large number of connections; but routing each request through a
+ # thread chosen from a fixed-size thread pool. When a thread is
+ # acquired, a transaction is performed, but output data is
+ # accumulated into buffers that will be emptied more efficiently
+ # by Medusa. [picture a server that can process database queries
+ # rapidly, but doesn't want to tie up threads waiting to send data
+ # to low-bandwidth connections]
+
+ # The other major feature provided by this class is the ability to
+ # move work back into the main thread: if you call pull_trigger()
+ # with a thunk argument, when select() wakes up and receives the
+ # event it will call your thunk from within that thread. The main
+ # purpose of this is to remove the need to wrap thread locks around
+ # Medusa's data structures, which normally do not need them. [To see
+ # why this is true, imagine this scenario: A thread tries to push some
+ # new data onto a channel's outgoing data queue at the same time that
+ # the main thread is trying to remove some]
+
+ def __init__(self):
+ r, w = self._fds = os.pipe()
+ self.trigger = w
+ asyncore.file_dispatcher.__init__(self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._closed = 0
+
+ # Override the asyncore close() method, because it seems that
+ # it would only close the r file descriptor and not w. The
+ # constructor calls file_dispatcher.__init__ and passes r,
+ # which would get stored in a file_wrapper and get closed by
+ # the default close. But that would leave w open...
+
+ def close(self):
+ if not self._closed:
+ self._closed = 1
+ self.del_channel()
+ for fd in self._fds:
+ os.close(fd)
+ self._fds = []
+
+ def __repr__(self):
+ return '<select-trigger (pipe) at %x>' % id(self)
+
+ def readable(self):
+ return 1
+
+ def writable(self):
+ return 0
+
+ def handle_connect(self):
+ pass
+
+ def pull_trigger(self, thunk=None):
+ if thunk:
+ self.lock.acquire()
+ try:
+ self.thunks.append(thunk)
+ finally:
+ self.lock.release()
+ os.write(self.trigger, 'x')
+
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
+ try:
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ L = traceback.format_exception(*sys.exc_info())
+ print 'exception in trigger thunk:\n%s' % "".join(L)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+else:
+ # XXX Should define a base class that has the common methods and
+ # then put the platform-specific in a subclass named trigger.
+
+ # win32-safe version
+
+ HOST = '127.0.0.1'
+ MINPORT = 19950
+ NPORTS = 50
+
+ class Trigger(asyncore.dispatcher):
+
+ portoffset = 0
+
+ def __init__(self):
+ a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ # set TCP_NODELAY to true to avoid buffering
+ w.setsockopt(socket.IPPROTO_TCP, 1, 1)
+
+ # tricky: get a pair of connected sockets
+ for i in range(NPORTS):
+ Trigger.portoffset = (Trigger.portoffset + 1) % NPORTS
+ port = MINPORT + Trigger.portoffset
+ address = (HOST, port)
+ try:
+ a.bind(address)
+ except socket.error:
+ continue
+ else:
+ break
+ else:
+ raise RuntimeError, 'Cannot bind trigger!'
+
+ a.listen(1)
+ w.setblocking(0)
+ try:
+ w.connect(address)
+ except:
+ pass
+ r, addr = a.accept()
+ a.close()
+ w.setblocking(1)
+ self.trigger = w
+
+ asyncore.dispatcher.__init__(self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._trigger_connected = 0
+
+ def __repr__(self):
+ return '<select-trigger (loopback) at %x>' % id(self)
+
+ def readable(self):
+ return 1
+
+ def writable(self):
+ return 0
+
+ def handle_connect(self):
+ pass
+
+ def pull_trigger(self, thunk=None):
+ if thunk:
+ self.lock.acquire()
+ try:
+ self.thunks.append(thunk)
+ finally:
+ self.lock.release()
+ self.trigger.send('x')
+
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except socket.error:
+ return
+ self.lock.acquire()
+ try:
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ L = traceback.format_exception(*sys.exc_info())
+ print 'exception in trigger thunk:\n%s' % "".join(L)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+
+the_trigger = None
+
+class TriggerFile:
+ "A 'triggered' file object"
+
+ buffer_size = 4096
+
+ def __init__ (self, parent):
+ global the_trigger
+ if the_trigger is None:
+ the_trigger = trigger()
+ self.parent = parent
+ self.buffer = ''
+
+ def write (self, data):
+ self.buffer = self.buffer + data
+ if len(self.buffer) > self.buffer_size:
+ d, self.buffer = self.buffer, ''
+ the_trigger.pull_trigger(lambda: self.parent.push(d))
+
+ def writeline (self, line):
+ self.write(line + '\r\n')
+
+ def writelines (self, lines):
+ self.write("\r\n".join(lines) + "\r\n")
+
+ def flush (self):
+ if self.buffer:
+ d, self.buffer = self.buffer, ''
+ the_trigger.pull_trigger(lambda: self.parent.push(d))
+
+ def softspace (self, *args):
+ pass
+
+ def close (self):
+ # in a derived class, you may want to call trigger_close() instead.
+ self.flush()
+ self.parent = None
+
+ def trigger_close (self):
+ d, self.buffer = self.buffer, ''
+ p, self.parent = self.parent, None
+ the_trigger.pull_trigger(lambda: (p.push(d), p.close_when_done()))
+
+if __name__ == '__main__':
+
+ import time
+
+ def thread_function (output_file, i, n):
+ print 'entering thread_function'
+ while n:
+ time.sleep (5)
+ output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
+ output_file.flush()
+ n = n - 1
+ output_file.close()
+ print 'exiting thread_function'
+
+ class thread_parent (asynchat.async_chat):
+
+ def __init__ (self, conn, addr):
+ self.addr = addr
+ asynchat.async_chat.__init__ (self, conn)
+ self.set_terminator ('\r\n')
+ self.buffer = ''
+ self.count = 0
+
+ def collect_incoming_data (self, data):
+ self.buffer = self.buffer + data
+
+ def found_terminator (self):
+ data, self.buffer = self.buffer, ''
+ if not data:
+ asyncore.close_all()
+ print "done"
+ return
+ n = string.atoi (string.split (data)[0])
+ tf = TriggerFile (self)
+ self.count = self.count + 1
+ thread.start_new_thread (thread_function, (tf, self.count, n))
+
+ class ThreadServer(asyncore.dispatcher):
+
+ def __init__ (self, family=socket.AF_INET, address=('', 9003)):
+ asyncore.dispatcher.__init__ (self)
+ self.create_socket (family, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind (address)
+ self.listen (5)
+
+ def handle_accept (self):
+ conn, addr = self.accept()
+ tp = thread_parent (conn, addr)
+
+ ThreadServer()
+ #asyncore.loop(1.0, use_poll=1)
+ try:
+ asyncore.loop ()
+ except:
+ asyncore.close_all()
=== Zope3/src/zope/server/serverbase.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/serverbase.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,142 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import asyncore
+import logging
+import socket
+
+from zope.server.adjustments import default_adj
+
+from zope.server.interfaces import IServer
+
+
+class ServerBase(asyncore.dispatcher, object):
+ """Async. server base for launching derivatives of ServerChannelBase.
+ """
+
+ __implements__ = asyncore.dispatcher.__implements__, IServer
+
+ channel_class = None # Override with a channel class.
+ SERVER_IDENT = 'zope.server.serverbase' # Override.
+
+ def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
+ hit_log=None, verbose=0):
+ if adj is None:
+ adj = default_adj
+ self.adj = adj
+ asyncore.dispatcher.__init__(self)
+ self.port = port
+ self.task_dispatcher = task_dispatcher
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind((ip, port))
+ self.verbose = verbose
+ self.hit_log = hit_log
+ self.server_name = self.computeServerName(ip)
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ if start:
+ self.accept_connections()
+
+ def log(self, message):
+ # Override asyncore's default log()
+ self.logger.info(message)
+
+ level_mapping = {
+ 'info': logging.INFO,
+ 'error': logging.ERROR,
+ 'warning': logging.WARN,
+ }
+
+ def log_info(self, message, type='info'):
+ self.logger.log(self.level_mapping.get(type, logging.INFO), message)
+
+ def computeServerName(self, ip=''):
+ if ip:
+ server_name = str(ip)
+ else:
+ server_name = str(socket.gethostname())
+ # Convert to a host name if necessary.
+ is_hostname = 0
+ for c in server_name:
+ if c != '.' and not c.isdigit():
+ is_hostname = 1
+ break
+ if not is_hostname:
+ if self.verbose:
+ self.log_info('Computing hostname', 'info')
+ try:
+ server_name = socket.gethostbyaddr(server_name)[0]
+ except socket.error:
+ if self.verbose:
+ self.log_info('Cannot do reverse lookup', 'info')
+ return server_name
+
+ def accept_connections(self):
+ self.accepting = 1
+ self.socket.listen(self.adj.backlog) # Circumvent asyncore's NT limit
+ if self.verbose:
+ self.log_info('%s started.\n'
+ '\tHostname: %s\n\tPort: %d' % (
+ self.SERVER_IDENT,
+ self.server_name,
+ self.port
+ ))
+
+
+ def addTask(self, task):
+ td = self.task_dispatcher
+ if td is not None:
+ td.addTask(task)
+ else:
+ task.service()
+
+ def readable(self):
+ 'See IDispatcher'
+ return (self.accepting and
+ len(asyncore.socket_map) < self.adj.connection_limit)
+
+ def writable(self):
+ 'See IDispatcher'
+ return 0
+
+ def handle_read(self):
+ 'See IDispatcherEventHandler'
+ pass
+
+ def handle_connect(self):
+ 'See IDispatcherEventHandler'
+ pass
+
+ def handle_accept(self):
+ 'See IDispatcherEventHandler'
+ try:
+ v = self.accept()
+ if v is None:
+ return
+ conn, addr = v
+ except socket.error:
+ # Linux: On rare occasions we get a bogus socket back from
+ # accept. socketmodule.c:makesockaddr complains that the
+ # address family is unknown. We don't want the whole server
+ # to shut down because of this.
+ if self.adj.log_socket_errors:
+ self.log_info ('warning: server accept() threw an exception',
+ 'warning')
+ return
+ self.channel_class(self, conn, addr, self.adj)
=== Zope3/src/zope/server/serverchannelbase.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/serverchannelbase.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,247 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import os
+import time
+import sys
+import asyncore
+from thread import allocate_lock
+
+# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
+# simultaneous channel mode, which may improve or degrade
+# throughput depending on load characteristics.
+if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
+ from zope.server.dualmodechannel import SimultaneousModeChannel as \
+ ChannelBaseClass
+else:
+ from zope.server.dualmodechannel import DualModeChannel as ChannelBaseClass
+
+from zope.server.interfaces import IServerChannel
+
+# Synchronize access to the "running_tasks" attributes.
+running_lock = allocate_lock()
+
+
+class ServerChannelBase(ChannelBaseClass, object):
+ """Base class for a high-performance, mixed-mode server-side channel.
+ """
+
+ __implements__ = ChannelBaseClass.__implements__, IServerChannel
+
+
+ parser_class = None # Subclasses must provide a parser class
+ task_class = None # ... and a task class.
+
+ active_channels = {} # Class-specific channel tracker
+ next_channel_cleanup = [0] # Class-specific cleanup time
+
+ proto_request = None # A request parser instance
+ ready_requests = None # A list
+ # ready_requests must always be empty when not running tasks.
+ last_activity = 0 # Time of last activity
+ running_tasks = 0 # boolean: true when any task is being executed
+
+ #
+ # ASYNCHRONOUS METHODS (incl. __init__)
+ #
+
+ def __init__(self, server, conn, addr, adj=None):
+ ChannelBaseClass.__init__(self, conn, addr, adj)
+ self.server = server
+ self.last_activity = t = self.creation_time
+ self.check_maintenance(t)
+
+
+ def add_channel(self, map=None):
+ """This hook keeps track of opened HTTP channels.
+ """
+ ChannelBaseClass.add_channel(self, map)
+ self.__class__.active_channels[self._fileno] = self
+
+
+ def del_channel(self, map=None):
+ """This hook keeps track of closed HTTP channels.
+ """
+ ChannelBaseClass.del_channel(self, map)
+ ac = self.__class__.active_channels
+ fd = self._fileno
+ if fd in ac:
+ del ac[fd]
+
+
+ def check_maintenance(self, now):
+ """Performs maintenance if necessary.
+ """
+ ncc = self.__class__.next_channel_cleanup
+ if now < ncc[0]:
+ return
+ ncc[0] = now + self.adj.cleanup_interval
+ self.maintenance()
+
+
+ def maintenance(self):
+ """Kills off dead connections.
+ """
+ self.kill_zombies()
+
+
+ def kill_zombies(self):
+ """Closes connections that have not had any activity in a while.
+
+ The timeout is configured through adj.channel_timeout (seconds).
+ """
+ now = time.time()
+ cutoff = now - self.adj.channel_timeout
+ for channel in self.active_channels.values():
+ if (channel is not self and not channel.running_tasks and
+ channel.last_activity < cutoff):
+ channel.close()
+
+
+ def received(self, data):
+ """Receive input asynchronously and send requests to
+ receivedCompleteRequest().
+ """
+ preq = self.proto_request
+ while data:
+ if preq is None:
+ preq = self.parser_class(self.adj)
+ n = preq.received(data)
+ if preq.completed:
+ # The request is ready to use.
+ if not preq.empty:
+ self.receivedCompleteRequest(preq)
+ preq = None
+ self.proto_request = None
+ else:
+ self.proto_request = preq
+ if n >= len(data):
+ break
+ data = data[n:]
+
+
+ def receivedCompleteRequest(self, req):
+ """If there are tasks running or requests on hold, queue
+ the request, otherwise execute it.
+ """
+ do_now = 0
+ running_lock.acquire()
+ try:
+ if self.running_tasks:
+ # A task thread is working. It will read from the queue
+ # when it is finished.
+ rr = self.ready_requests
+ if rr is None:
+ rr = []
+ self.ready_requests = rr
+ rr.append(req)
+ else:
+ # Do it now.
+ do_now = 1
+ finally:
+ running_lock.release()
+ if do_now:
+ task = self.process_request(req)
+ if task is not None:
+ self.start_task(task)
+
+
+ def start_task(self, task):
+ """Starts the given task.
+
+ *** For thread safety, this should only be called from the main
+ (async) thread. ***"""
+ if self.running_tasks:
+ # Can't start while another task is running!
+ # Otherwise two threads would work on the queue at the same time.
+ raise RuntimeError, 'Already executing tasks'
+ self.running_tasks = 1
+ self.set_sync()
+ self.server.addTask(task)
+
+
+ def handle_error(self):
+ """Handles program errors (not communication errors)
+ """
+ t, v = sys.exc_info()[:2]
+ if t is SystemExit or t is KeyboardInterrupt:
+ raise t, v
+ asyncore.dispatcher.handle_error(self)
+
+
+ def handle_comm_error(self):
+ """Handles communication errors (not program errors)
+ """
+ if self.adj.log_socket_errors:
+ self.handle_error()
+ else:
+ # Ignore socket errors.
+ self.close()
+
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def end_task(self, close):
+ """Called at the end of a task and may launch another task.
+ """
+ if close:
+ # Note that self.running_tasks is left on, which has the
+ # side effect of preventing further requests from being
+ # serviced even if more appear. A good thing.
+ self.close_when_done()
+ return
+ # Process requests held in the queue, if any.
+ while 1:
+ req = None
+ running_lock.acquire()
+ try:
+ rr = self.ready_requests
+ if rr:
+ req = rr.pop(0)
+ else:
+ # No requests to process.
+ self.running_tasks = 0
+ finally:
+ running_lock.release()
+
+ if req is not None:
+ task = self.process_request(req)
+ if task is not None:
+ # Add the new task. It will service the queue.
+ self.server.addTask(task)
+ break
+ # else check the queue again.
+ else:
+ # Idle -- Wait for another request on this connection.
+ self.set_async()
+ break
+
+
+ #
+ # BOTH MODES
+ #
+
+ def process_request(self, req):
+ """Returns a task to execute or None if the request is quick and
+ can be processed in the main thread.
+
+ Override to handle some requests in the main thread.
+ """
+ return self.task_class(self, req)
=== Zope3/src/zope/server/taskthreads.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/taskthreads.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,110 @@
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+import sys
+from Queue import Queue, Empty
+from thread import allocate_lock, start_new_thread
+from time import time, sleep
+import logging
+
+from zope.server.interfaces import ITaskDispatcher
+
+
+class ThreadedTaskDispatcher:
+
+ __implements__ = ITaskDispatcher
+
+ stop_count = 0 # Number of threads that will stop soon.
+
+ def __init__(self):
+ self.threads = {} # { thread number -> 1 }
+ self.queue = Queue()
+ self.thread_mgmt_lock = allocate_lock()
+
+ def handlerThread(self, thread_no):
+ threads = self.threads
+ try:
+ while threads.get(thread_no):
+ task = self.queue.get()
+ if task is None:
+ # Special value: kill this thread.
+ break
+ try:
+ task.service()
+ except:
+ logging.exception('Exception during task')
+ finally:
+ mlock = self.thread_mgmt_lock
+ mlock.acquire()
+ try:
+ self.stop_count -= 1
+ try: del threads[thread_no]
+ except KeyError: pass
+ finally:
+ mlock.release()
+
+ def setThreadCount(self, count):
+ mlock = self.thread_mgmt_lock
+ mlock.acquire()
+ try:
+ threads = self.threads
+ thread_no = 0
+ running = len(threads) - self.stop_count
+ while running < count:
+ # Start threads.
+ while thread_no in threads:
+ thread_no = thread_no + 1
+ threads[thread_no] = 1
+ running += 1
+ start_new_thread(self.handlerThread, (thread_no,))
+ thread_no = thread_no + 1
+ if running > count:
+ # Stop threads.
+ to_stop = running - count
+ self.stop_count += to_stop
+ for n in range(to_stop):
+ self.queue.put(None)
+ running -= 1
+ finally:
+ mlock.release()
+
+ def addTask(self, task):
+ if task is None:
+ raise ValueError, "No task passed to addTask()."
+ # assert ITask.isImplementedBy(task)
+ try:
+ task.defer()
+ self.queue.put(task)
+ except:
+ task.cancel()
+ raise
+
+ def shutdown(self, cancel_pending=1, timeout=5):
+ self.setThreadCount(0)
+ # Ensure the threads shut down.
+ threads = self.threads
+ expiration = time() + timeout
+ while threads:
+ if time() >= expiration:
+ logging.error("%d thread(s) still running" % len(threads))
+ sleep(0.1)
+ if cancel_pending:
+ # Cancel remaining tasks.
+ try:
+ queue = self.queue
+ while not queue.empty():
+ task = queue.get()
+ if task is not None:
+ task.cancel()
+ except Empty:
+ pass
+
+ def getPendingTasksEstimate(self):
+ return self.queue.qsize()
=== Zope3/src/zope/server/utilities.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/utilities.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,26 @@
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+
+def find_double_newline(s):
+ """Returns the position just after a double newline in the given string."""
+ pos1 = s.find('\n\r\n') # One kind of double newline
+ if pos1 >= 0:
+ pos1 += 3
+ pos2 = s.find('\n\n') # Another kind of double newline
+ if pos2 >= 0:
+ pos2 += 2
+
+ if pos1 >= 0:
+ if pos2 >= 0:
+ return min(pos1, pos2)
+ else:
+ return pos1
+ else:
+ return pos2
=== Zope3/src/zope/server/zlogintegration.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:54 2002
+++ Zope3/src/zope/server/zlogintegration.py Wed Dec 25 09:15:23 2002
@@ -0,0 +1,36 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Make asyncore log to the logging module.
+
+As a side effect of importing this module, asyncore's logging will be
+redirected to the logging module.
+
+$Id$
+"""
+
+import logging
+
+logger = logging.getLogger("zope.server")
+
+severity = {
+ 'info': logging.INFO,
+ 'warning': logging.WARN,
+ 'error': logging.ERROR,
+ }
+
+def log_info(self, message, type='info'):
+ logger.log(severity.get(type, logging.INFO), message)
+
+import asyncore
+asyncore.dispatcher.log_info = log_info