[Zope3-checkins] CVS: Zope3/src/zope/server - __init__.py:1.1.2.1 adjustments.py:1.1.2.1 buffers.py:1.1.2.1 dualmodechannel.py:1.1.2.1 fixedstreamreceiver.py:1.1.2.1 maxsockets.py:1.1.2.1 serverbase.py:1.1.2.1 serverchannelbase.py:1.1.2.1 taskthreads.py:1.1.2.1 utilities.py:1.1.2.1 zlogintegration.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:33:20 -0500
Update of /cvs-repository/Zope3/src/zope/server
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server
Added Files:
Tag: NameGeddon-branch
__init__.py adjustments.py buffers.py dualmodechannel.py
fixedstreamreceiver.py maxsockets.py serverbase.py
serverchannelbase.py taskthreads.py utilities.py
zlogintegration.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zope/server/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Zope.Server package.
$Id: __init__.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""
from zope.server.interfaces.interfaces import IDispatcher
from zope.interface.implements import implements
import asyncore
implements(asyncore.dispatcher, IDispatcher, 0)
=== Added File Zope3/src/zope/server/adjustments.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
import zope.server.maxsockets
class Adjustments:
"""This class contains tunable communication parameters.
You can either change default_adj to adjust parameters for
all sockets, or you can create a new instance of this class,
change its attributes, and pass it to the channel constructors.
"""
# backlog is the argument to pass to socket.listen().
backlog = 1024
# recv_bytes is the argument to pass to socket.recv().
recv_bytes = 8192
# send_bytes is the number of bytes to send to socket.send().
send_bytes = 8192
# copy_bytes is the number of bytes to copy from one file to another.
copy_bytes = 65536
# Create a tempfile if the pending output data gets larger
# than outbuf_overflow. With RAM so cheap, this probably
# ought to be set to the 16-32 MB range (circa 2001) for
# good performance with big transfers. The default is
# conservative.
outbuf_overflow = 1050000
# Create a tempfile if the data received gets larger
# than inbuf_overflow.
inbuf_overflow = 525000
# Stop accepting new connections if too many are already active.
connection_limit = MaxSockets.max_select_sockets() - 3 # Safe
# Minimum seconds between cleaning up inactive channels.
cleanup_interval = 300
# Maximum seconds to leave an inactive connection open.
channel_timeout = 900
# Boolean: turn off to not log premature client disconnects.
log_socket_errors = 1
default_adj = Adjustments()
=== Added File Zope3/src/zope/server/buffers.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
# copy_bytes controls the size of temp. strings for shuffling data around.
COPY_BYTES = 1 << 18 # 256K
# The maximum number of bytes to buffer in a simple string.
STRBUF_LIMIT = 8192
class FileBasedBuffer:
remain = 0
def __init__(self, file, from_buffer=None):
self.file = file
if from_buffer is not None:
from_file = from_buffer.getfile()
read_pos = from_file.tell()
from_file.seek(0)
while 1:
data = from_file.read(COPY_BYTES)
if not data:
break
file.write(data)
self.remain = int(file.tell() - read_pos)
from_file.seek(read_pos)
file.seek(read_pos)
def __len__(self):
return self.remain
def append(self, s):
file = self.file
read_pos = file.tell()
file.seek(0, 2)
file.write(s)
file.seek(read_pos)
self.remain = self.remain + len(s)
def get(self, bytes=-1, skip=0):
file = self.file
if not skip:
read_pos = file.tell()
if bytes < 0:
# Read all
res = file.read()
else:
res = file.read(bytes)
if skip:
self.remain -= len(res)
else:
file.seek(read_pos)
return res
def skip(self, bytes, allow_prune=0):
if self.remain < bytes:
raise ValueError, (
"Can't skip %d bytes in buffer of %d bytes" %
(bytes, self.remain))
self.file.seek(bytes, 1)
self.remain = self.remain - bytes
def newfile(self):
raise 'NotImplemented'
def prune(self):
file = self.file
if self.remain == 0:
read_pos = file.tell()
file.seek(0, 2)
sz = file.tell()
file.seek(read_pos)
if sz == 0:
# Nothing to prune.
return
nf = self.newfile()
while 1:
data = file.read(COPY_BYTES)
if not data:
break
nf.write(data)
self.file = nf
def getfile(self):
return self.file
class TempfileBasedBuffer(FileBasedBuffer):
def __init__(self, from_buffer=None):
FileBasedBuffer.__init__(self, self.newfile(), from_buffer)
def newfile(self):
from tempfile import TemporaryFile
return TemporaryFile('w+b')
class StringIOBasedBuffer(FileBasedBuffer):
def __init__(self, from_buffer=None):
if from_buffer is not None:
FileBasedBuffer.__init__(self, StringIO(), from_buffer)
else:
# Shortcut. :-)
self.file = StringIO()
def newfile(self):
return StringIO()
class OverflowableBuffer:
"""
This buffer implementation has four stages:
- No data
- String-based buffer
- StringIO-based buffer
- Temporary file storage
The first two stages are fastest for simple transfers.
"""
overflowed = 0
buf = None
strbuf = '' # String-based buffer.
def __init__(self, overflow):
# overflow is the maximum to be stored in a StringIO buffer.
self.overflow = overflow
def __len__(self):
buf = self.buf
if buf is not None:
return len(buf)
else:
return len(self.strbuf)
def _create_buffer(self):
# print 'creating buffer'
strbuf = self.strbuf
if len(strbuf) >= self.overflow:
self._set_large_buffer()
else:
self._set_small_buffer()
buf = self.buf
if strbuf:
buf.append(self.strbuf)
self.strbuf = ''
return buf
def _set_small_buffer(self):
self.buf = StringIOBasedBuffer(self.buf)
self.overflowed = 0
def _set_large_buffer(self):
self.buf = TempfileBasedBuffer(self.buf)
self.overflowed = 1
def append(self, s):
buf = self.buf
if buf is None:
strbuf = self.strbuf
if len(strbuf) + len(s) < STRBUF_LIMIT:
self.strbuf = strbuf + s
return
buf = self._create_buffer()
buf.append(s)
sz = len(buf)
if not self.overflowed:
if sz >= self.overflow:
self._set_large_buffer()
def get(self, bytes=-1, skip=0):
buf = self.buf
if buf is None:
strbuf = self.strbuf
if not skip:
return strbuf
buf = self._create_buffer()
return buf.get(bytes, skip)
def skip(self, bytes, allow_prune=0):
buf = self.buf
if buf is None:
strbuf = self.strbuf
if allow_prune and bytes == len(strbuf):
# We could slice instead of converting to
# a buffer, but that would eat up memory in
# large transfers.
self.strbuf = ''
return
buf = self._create_buffer()
buf.skip(bytes, allow_prune)
def prune(self):
"""
A potentially expensive operation that removes all data
already retrieved from the buffer.
"""
buf = self.buf
if buf is None:
self.strbuf = ''
return
buf.prune()
if self.overflowed:
sz = len(buf)
if sz < self.overflow:
# Revert to a faster buffer.
self._set_small_buffer()
def getfile(self):
buf = self.buf
if buf is None:
buf = self._create_buffer()
return buf.getfile()
=== Added File Zope3/src/zope/server/dualmodechannel.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: dualmodechannel.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""
import asyncore
import socket
from time import time
from UserDict import UserDict
from Thread import SelectTrigger
from zope.server.adjustments import default_adj
from zope.server.buffers import OverflowableBuffer
# Create the main trigger if it doesn't exist yet.
if SelectTrigger.the_trigger is None:
SelectTrigger.the_trigger = SelectTrigger.Trigger()
class DualModeChannel(asyncore.dispatcher):
"""Channel that switches between asynchronous and synchronous mode.
Call set_sync() before using a channel in a thread other than
the thread handling the main loop.
Call set_async() to give the channel back to the thread handling
the main loop.
"""
__implements__ = asyncore.dispatcher.__implements__
# will_close is set to 1 to close the socket.
will_close = 0
# boolean: async or sync mode
async_mode = 1
def __init__(self, conn, addr, adj=None):
self.addr = addr
if adj is None:
adj = default_adj
self.adj = adj
self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
self.creation_time = time()
asyncore.dispatcher.__init__(self, conn)
#
# ASYNCHRONOUS METHODS
#
def handle_close(self):
self.close()
def writable(self):
if not self.async_mode:
return 0
return self.will_close or self.outbuf
def handle_write(self):
if not self.async_mode:
return
self.inner_handle_write()
def inner_handle_write(self):
if self.outbuf:
try:
self._flush_some()
except socket.error:
self.handle_comm_error()
elif self.will_close:
self.close()
def readable(self):
if not self.async_mode:
return 0
return not self.will_close
def handle_read(self):
if not self.async_mode:
return
self.inner_handle_read()
def inner_handle_read(self):
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
self.handle_comm_error()
return
self.received(data)
def received(self, data):
"""
Override to receive data in async mode.
"""
pass
def handle_comm_error(self):
"""
Designed for handling communication errors that occur
during asynchronous operations *only*. Probably should log
this, but in a different place.
"""
self.handle_error()
def set_sync(self):
"""Switches to synchronous mode.
The main thread will stop calling received().
"""
self.async_mode = 0
#
# SYNCHRONOUS METHODS
#
def write(self, data):
if data:
self.outbuf.append(data)
while len(self.outbuf) >= self.adj.send_bytes:
# Send what we can without blocking.
# We propagate errors to the application on purpose
# (to stop the application if the connection closes).
if not self._flush_some():
break
def flush(self, block=1):
"""Sends pending data.
If block is set, this pauses the application. If it is turned
off, only the amount of data that can be sent without blocking
is sent.
"""
if not block:
while self._flush_some():
pass
return
blocked = 0
try:
while self.outbuf:
# We propagate errors to the application on purpose.
if not blocked:
self.socket.setblocking(1)
blocked = 1
self._flush_some()
finally:
if blocked:
self.socket.setblocking(0)
def set_async(self):
"""Switches to asynchronous mode.
The main thread will begin calling received() again.
"""
self.async_mode = 1
self.pull_trigger()
#
# METHODS USED IN BOTH MODES
#
def pull_trigger(self):
"""Wakes up the main loop.
"""
SelectTrigger.the_trigger.pull_trigger()
def _flush_some(self):
"""Flushes data.
Returns 1 if some data was sent."""
outbuf = self.outbuf
if outbuf and self.connected:
chunk = outbuf.get(self.adj.send_bytes)
num_sent = self.send(chunk)
if num_sent:
outbuf.skip(num_sent, 1)
return 1
return 0
def close_when_done(self):
# We might be able close immediately.
while self._flush_some():
pass
if not self.outbuf:
# Quick exit.
self.close()
else:
# Wait until outbuf is flushed.
self.will_close = 1
if not self.async_mode:
self.async_mode = 1
self.pull_trigger()
allocate_lock = None
class SimultaneousModeChannel (DualModeChannel):
"""Layer on top of DualModeChannel that allows communication in
both the main thread and other threads at the same time.
The channel operates in synchronous mode with an asynchronous
helper. The asynchronous callbacks empty the output buffer
and fill the input buffer.
"""
__implements__ = asyncore.dispatcher.__implements__
def __init__(self, conn, addr, adj=None):
global allocate_lock
if allocate_lock is None:
from thread import allocate_lock
# writelock protects all accesses to outbuf, since reads and
# writes of buffers in this class need to be serialized.
writelock = allocate_lock()
self._writelock_acquire = writelock.acquire
self._writelock_release = writelock.release
self._writelock_locked = writelock.locked
DualModeChannel.__init__(self, conn, addr, adj)
#
# ASYNCHRONOUS METHODS
#
def writable(self):
return self.will_close or (
self.outbuf and not self._writelock_locked())
def handle_write(self):
if not self._writelock_acquire(0):
# A synchronous method is writing.
return
try:
self.inner_handle_write()
finally:
self._writelock_release()
def readable(self):
return not self.will_close
def handle_read(self):
self.inner_handle_read()
def set_sync(self):
pass
#
# SYNCHRONOUS METHODS
#
def write(self, data):
self._writelock_acquire()
try:
DualModeChannel.write(self, data)
finally:
self._writelock_release()
def flush(self, block=1):
self._writelock_acquire()
try:
DualModeChannel.flush(self, block)
finally:
self._writelock_release()
def set_async(self):
pass
#
# METHODS USED IN BOTH MODES
#
def close_when_done(self):
self.will_close = 1
self.pull_trigger()
=== Added File Zope3/src/zope/server/fixedstreamreceiver.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: fixedstreamreceiver.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""
from zope.server.interfaces.interfaces import IStreamConsumer
class FixedStreamReceiver:
__implements__ = IStreamConsumer
# See Zope.Server.IStreamConsumer.IStreamConsumer
completed = 0
def __init__(self, cl, buf):
self.remain = cl
self.buf = buf
############################################################
# Implementation methods for interface
# Zope.Server.IStreamConsumer
def received(self, data):
'See Zope.Server.IStreamConsumer.IStreamConsumer'
rm = self.remain
if rm < 1:
self.completed = 1 # Avoid any chance of spinning
return 0
datalen = len(data)
if rm <= datalen:
self.buf.append(data[:rm])
self.remain = 0
self.completed = 1
return rm
else:
self.buf.append(data)
self.remain -= datalen
return datalen
#
############################################################
def getfile(self):
return self.buf.getfile()
=== Added File Zope3/src/zope/server/maxsockets.py ===
# Medusa max_sockets module.
import socket
import select
# several factors here we might want to test:
# 1) max we can create
# 2) max we can bind
# 3) max we can listen on
# 4) max we can connect
def max_server_sockets():
sl = []
while 1:
try:
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
s.bind (('',0))
s.listen(5)
sl.append (s)
except:
break
num = len(sl)
for s in sl:
s.close()
del sl
return num
def max_client_sockets():
# make a server socket
server = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
server.bind (('', 9999))
server.listen (5)
sl = []
while 1:
try:
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
s.connect (('', 9999))
conn, addr = server.accept()
sl.append ((s,conn))
except:
break
num = len(sl)
for s,c in sl:
s.close()
c.close()
del sl
return num
def max_select_sockets():
sl = []
while 1:
try:
num = len(sl)
for i in range(1 + len(sl) * 0.05):
# Increase exponentially.
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
s.bind (('',0))
s.listen(5)
sl.append (s)
select.select(sl,[],[],0)
except:
break
for s in sl:
s.close()
del sl
return num
=== Added File Zope3/src/zope/server/serverbase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: serverbase.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""
import asyncore
import logging
import socket
from zope.server.adjustments import default_adj
from zope.server.interfaces.interfaces import IServer
class ServerBase(asyncore.dispatcher, object):
"""Async. server base for launching derivatives of ServerChannelBase.
"""
__implements__ = asyncore.dispatcher.__implements__, IServer
channel_class = None # Override with a channel class.
SERVER_IDENT = 'Zope.Server.ServerBase' # Override.
def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
hit_log=None, verbose=0):
if adj is None:
adj = default_adj
self.adj = adj
asyncore.dispatcher.__init__(self)
self.port = port
self.task_dispatcher = task_dispatcher
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((ip, port))
self.verbose = verbose
self.hit_log = hit_log
self.server_name = self.computeServerName(ip)
self.logger = logging.getLogger(self.__class__.__name__)
if start:
self.accept_connections()
def log(self, message):
# Override asyncore's default log()
self.logger.info(message)
level_mapping = {
'info': logging.INFO,
'error': logging.ERROR,
'warning': logging.WARN,
}
def log_info(self, message, type='info'):
self.logger.log(self.level_mapping.get(type, logging.INFO), message)
def computeServerName(self, ip=''):
if ip:
server_name = str(ip)
else:
server_name = str(socket.gethostname())
# Convert to a host name if necessary.
is_hostname = 0
for c in server_name:
if c != '.' and not c.isdigit():
is_hostname = 1
break
if not is_hostname:
if self.verbose:
self.log_info('Computing hostname', 'info')
try:
server_name = socket.gethostbyaddr(server_name)[0]
except socket.error:
if self.verbose:
self.log_info('Cannot do reverse lookup', 'info')
return server_name
def accept_connections(self):
self.accepting = 1
self.socket.listen(self.adj.backlog) # Circumvent asyncore's NT limit
if self.verbose:
self.log_info('%s started.\n'
'\tHostname: %s\n\tPort: %d' % (
self.SERVER_IDENT,
self.server_name,
self.port
))
def addTask(self, task):
td = self.task_dispatcher
if td is not None:
td.addTask(task)
else:
task.service()
############################################################
# Implementation methods for interface
# Zope.Server.IDispatcher.IDispatcher
def readable(self):
'See Zope.Server.IDispatcher.IDispatcher'
return (self.accepting and
len(asyncore.socket_map) < self.adj.connection_limit)
def writable(self):
'See Zope.Server.IDispatcher.IDispatcher'
return 0
######################################
# from: Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler
def handle_read(self):
'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
pass
def handle_connect(self):
'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
pass
def handle_accept(self):
'See Zope.Server.IDispatcherEventHandler.IDispatcherEventHandler'
try:
v = self.accept()
if v is None:
return
conn, addr = v
except socket.error:
# Linux: On rare occasions we get a bogus socket back from
# accept. socketmodule.c:makesockaddr complains that the
# address family is unknown. We don't want the whole server
# to shut down because of this.
if self.adj.log_socket_errors:
self.log_info ('warning: server accept() threw an exception',
'warning')
return
self.channel_class(self, conn, addr, self.adj)
#
############################################################
=== Added File Zope3/src/zope/server/serverchannelbase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: serverchannelbase.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""
import os
import time
import sys
import asyncore
from thread import allocate_lock
# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
# simultaneous channel mode, which may improve or degrade
# throughput depending on load characteristics.
if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
from zope.server.dualmodechannel import SimultaneousModeChannel as \
ChannelBaseClass
else:
from zope.server.dualmodechannel import DualModeChannel as ChannelBaseClass
from zope.server.interfaces.interfaces import IServerChannel
# Synchronize access to the "running_tasks" attributes.
running_lock = allocate_lock()
class ServerChannelBase(ChannelBaseClass, object):
"""Base class for a high-performance, mixed-mode server-side channel.
"""
__implements__ = ChannelBaseClass.__implements__, IServerChannel
parser_class = None # Subclasses must provide a parser class
task_class = None # ... and a task class.
active_channels = {} # Class-specific channel tracker
next_channel_cleanup = [0] # Class-specific cleanup time
proto_request = None # A request parser instance
ready_requests = None # A list
# ready_requests must always be empty when not running tasks.
last_activity = 0 # Time of last activity
running_tasks = 0 # boolean: true when any task is being executed
#
# ASYNCHRONOUS METHODS (incl. __init__)
#
def __init__(self, server, conn, addr, adj=None):
ChannelBaseClass.__init__(self, conn, addr, adj)
self.server = server
self.last_activity = t = self.creation_time
self.check_maintenance(t)
def add_channel(self, map=None):
"""This hook keeps track of opened HTTP channels.
"""
ChannelBaseClass.add_channel(self, map)
self.__class__.active_channels[self._fileno] = self
def del_channel(self, map=None):
"""This hook keeps track of closed HTTP channels.
"""
ChannelBaseClass.del_channel(self, map)
ac = self.__class__.active_channels
fd = self._fileno
if fd in ac:
del ac[fd]
def check_maintenance(self, now):
"""Performs maintenance if necessary.
"""
ncc = self.__class__.next_channel_cleanup
if now < ncc[0]:
return
ncc[0] = now + self.adj.cleanup_interval
self.maintenance()
def maintenance(self):
"""Kills off dead connections.
"""
self.kill_zombies()
def kill_zombies(self):
"""Closes connections that have not had any activity in a while.
The timeout is configured through adj.channel_timeout (seconds).
"""
now = time.time()
cutoff = now - self.adj.channel_timeout
for channel in self.active_channels.values():
if (channel is not self and not channel.running_tasks and
channel.last_activity < cutoff):
channel.close()
def received(self, data):
"""Receive input asynchronously and send requests to
receivedCompleteRequest().
"""
preq = self.proto_request
while data:
if preq is None:
preq = self.parser_class(self.adj)
n = preq.received(data)
if preq.completed:
# The request is ready to use.
if not preq.empty:
self.receivedCompleteRequest(preq)
preq = None
self.proto_request = None
else:
self.proto_request = preq
if n >= len(data):
break
data = data[n:]
def receivedCompleteRequest(self, req):
"""If there are tasks running or requests on hold, queue
the request, otherwise execute it.
"""
do_now = 0
running_lock.acquire()
try:
if self.running_tasks:
# A task thread is working. It will read from the queue
# when it is finished.
rr = self.ready_requests
if rr is None:
rr = []
self.ready_requests = rr
rr.append(req)
else:
# Do it now.
do_now = 1
finally:
running_lock.release()
if do_now:
task = self.process_request(req)
if task is not None:
self.start_task(task)
def start_task(self, task):
"""Starts the given task.
*** For thread safety, this should only be called from the main
(async) thread. ***"""
if self.running_tasks:
# Can't start while another task is running!
# Otherwise two threads would work on the queue at the same time.
raise RuntimeError, 'Already executing tasks'
self.running_tasks = 1
self.set_sync()
self.server.addTask(task)
def handle_error(self):
"""Handles program errors (not communication errors)
"""
t, v = sys.exc_info()[:2]
if t is SystemExit or t is KeyboardInterrupt:
raise t, v
asyncore.dispatcher.handle_error(self)
def handle_comm_error(self):
"""Handles communication errors (not program errors)
"""
if self.adj.log_socket_errors:
self.handle_error()
else:
# Ignore socket errors.
self.close()
#
# SYNCHRONOUS METHODS
#
def end_task(self, close):
"""Called at the end of a task and may launch another task.
"""
if close:
# Note that self.running_tasks is left on, which has the
# side effect of preventing further requests from being
# serviced even if more appear. A good thing.
self.close_when_done()
return
# Process requests held in the queue, if any.
while 1:
req = None
running_lock.acquire()
try:
rr = self.ready_requests
if rr:
req = rr.pop(0)
else:
# No requests to process.
self.running_tasks = 0
finally:
running_lock.release()
if req is not None:
task = self.process_request(req)
if task is not None:
# Add the new task. It will service the queue.
self.server.addTask(task)
break
# else check the queue again.
else:
# Idle -- Wait for another request on this connection.
self.set_async()
break
#
# BOTH MODES
#
def process_request(self, req):
"""Returns a task to execute or None if the request is quick and
can be processed in the main thread.
Override to handle some requests in the main thread.
"""
return self.task_class(self, req)
=== Added File Zope3/src/zope/server/taskthreads.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
import sys
from Queue import Queue, Empty
from thread import allocate_lock, start_new_thread
from time import time, sleep
import logging
from zope.server.interfaces.interfaces import ITaskDispatcher
class ThreadedTaskDispatcher:
__implements__ = ITaskDispatcher
stop_count = 0 # Number of threads that will stop soon.
def __init__(self):
self.threads = {} # { thread number -> 1 }
self.queue = Queue()
self.thread_mgmt_lock = allocate_lock()
def handlerThread(self, thread_no):
threads = self.threads
try:
while threads.get(thread_no):
task = self.queue.get()
if task is None:
# Special value: kill this thread.
break
try:
task.service()
except:
logging.exception('Exception during task')
finally:
mlock = self.thread_mgmt_lock
mlock.acquire()
try:
self.stop_count -= 1
try: del threads[thread_no]
except KeyError: pass
finally:
mlock.release()
def setThreadCount(self, count):
mlock = self.thread_mgmt_lock
mlock.acquire()
try:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
while running < count:
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
threads[thread_no] = 1
running += 1
start_new_thread(self.handlerThread, (thread_no,))
thread_no = thread_no + 1
if running > count:
# Stop threads.
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
self.queue.put(None)
running -= 1
finally:
mlock.release()
def addTask(self, task):
if task is None:
raise ValueError, "No task passed to addTask()."
# assert ITask.isImplementedBy(task)
try:
task.defer()
self.queue.put(task)
except:
task.cancel()
raise
def shutdown(self, cancel_pending=1, timeout=5):
self.setThreadCount(0)
# Ensure the threads shut down.
threads = self.threads
expiration = time() + timeout
while threads:
if time() >= expiration:
logging.error("%d thread(s) still running" % len(threads))
sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
try:
queue = self.queue
while not queue.empty():
task = queue.get()
if task is not None:
task.cancel()
except Empty:
pass
def getPendingTasksEstimate(self):
return self.queue.qsize()
=== Added File Zope3/src/zope/server/utilities.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
def find_double_newline(s):
"""Returns the position just after a double newline in the given string."""
pos1 = s.find('\n\r\n') # One kind of double newline
if pos1 >= 0:
pos1 += 3
pos2 = s.find('\n\n') # Another kind of double newline
if pos2 >= 0:
pos2 += 2
if pos1 >= 0:
if pos2 >= 0:
return min(pos1, pos2)
else:
return pos1
else:
return pos2
=== Added File Zope3/src/zope/server/zlogintegration.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Make asyncore log to the logging module.
As a side effect of importing this module, asyncore's logging will be
redirected to the logging module.
$Id: zlogintegration.py,v 1.1.2.1 2002/12/23 19:33:18 jim Exp $
"""
import logging
logger = logging.getLogger("Zope.Server")
severity = {
'info': logging.INFO,
'warning': logging.WARN,
'error': logging.ERROR,
}
def log_info(self, message, type='info'):
logger.log(severity.get(type, logging.INFO), message)
import asyncore
asyncore.dispatcher.log_info = log_info