[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - IStreamConsumer.py:1.1.2.1 ITaskDispatcher.py:1.1.2.1 ServerBase.py:1.1.2.1 Adjustments.py:1.1.2.3 Buffers.py:1.1.2.2 Chunking.py:1.1.2.2 DualModeChannel.py:1.1.2.2 HTTPServer.py:1.1.2.16 IHeaderOutput.py:1.1.2.2 ITask.py:1.1.2.2 PublisherServers.py:1.1.2.8 TaskThreads.py:1.1.2.7 Utilities.py:1.1.2.2 ZLogIntegration.py:1.1.2.2 __init__.py:1.1.2.4
Shane Hathaway
shane@cvs.zope.org
Fri, 8 Feb 2002 10:06:04 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv11304
Modified Files:
Tag: Zope-3x-branch
Adjustments.py Buffers.py Chunking.py DualModeChannel.py
HTTPServer.py IHeaderOutput.py ITask.py PublisherServers.py
TaskThreads.py Utilities.py ZLogIntegration.py __init__.py
Added Files:
Tag: Zope-3x-branch
IStreamConsumer.py ITaskDispatcher.py ServerBase.py
Log Message:
- Factored the non-HTTP-specific code into ServerBase, in preparation for
an FTP server.
- Added explicit interfaces and docstrings.
- Updated licenses.
=== Added File Zope3/lib/python/Zope/Server/IStreamConsumer.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
from Interface import Interface, Attribute
class IStreamConsumer (Interface):
"""Consumes a data stream until reaching a completion point.
The actual amount to be consumed might not be known ahead of time.
"""
def received(data):
"""Accepts data, returning the number of bytes consumed."""
completed = Attribute(
'completed', 'Set to a true value when finished consuming data.')
=== Added File Zope3/lib/python/Zope/Server/ITaskDispatcher.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
from Interface import Interface
class ITaskDispatcher (Interface):
"""An object that accepts tasks and dispatches them to threads.
"""
def setThreadCount(count):
"""Sets the number of handler threads.
"""
def addTask(task):
"""Receives a task and dispatches it to a thread.
Note that, depending on load, a task may have to wait a
while for its turn.
"""
def shutdown(cancel_pending=1, timeout=5):
"""Shuts down all handler threads and may cancel pending tasks.
"""
def getPendingTasksEstimate():
"""Returns an estimate of the number of tasks waiting to be serviced.
This method may be useful for monitoring purposes. If the
number of pending tasks is continually climbing, your server
is becoming overloaded and the operator should be notified.
"""
=== Added File Zope3/lib/python/Zope/Server/ServerBase.py ===
# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
import os
import asyncore
import socket
import time
from thread import allocate_lock
from DualModeChannel import AlternateSocketMapMixin
from IStreamConsumer import IStreamConsumer
from Adjustments import default_adj
# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
# simultaneous channel mode, which may improve or degrade
# throughput depending on load characteristics.
if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
from DualModeChannel import SimultaneousModeChannel as \
channel_base_class
else:
from DualModeChannel import DualModeChannel as channel_base_class
class FixedStreamReceiver:
__implements__ = IStreamConsumer
completed = 0
def __init__(self, cl, buf):
self.remain = cl
self.buf = buf
def received(self, data):
rm = self.remain
if rm < 1:
self.completed = 1 # Avoid any chance of spinning
return 0
datalen = len(data)
if rm <= datalen:
self.buf.append(data[:rm])
self.remain = 0
self.completed = 1
return rm
else:
self.buf.append(data)
self.remain -= datalen
return datalen
def getfile(self):
return self.buf.getfile()
# Synchronize access to the "running_tasks" attributes.
running_lock = allocate_lock()
class ServerChannelBase (channel_base_class):
"""Base class for a high-performance, mixed-mode server-side channel.
"""
parser_class = None # Subclasses must provide a parser class
task_class = None # ... and a task class.
active_channels = {} # Class-specific channel tracker
next_channel_cleanup = [0] # Class-specific cleanup time
proto_request = None # A request parser instance
ready_requests = None # A list
last_activity = 0 # Time of last activity
running_tasks = 0 # boolean
#
# ASYNCHRONOUS METHODS (incl. __init__)
#
def __init__(self, server, conn, addr, adj=None, socket_map=None):
channel_base_class.__init__(self, server, conn, addr, adj, socket_map)
self.last_activity = t = self.creation_time
self.check_maintenance(t)
def add_channel(self, map=None):
"""This hook keeps track of opened HTTP channels.
"""
channel_base_class.add_channel(self, map)
self.active_channels[self._fileno] = self
def del_channel(self, map=None):
"""This hook keeps track of closed HTTP channels.
"""
channel_base_class.del_channel(self, map)
ac = self.active_channels
fd = self._fileno
if ac.has_key(fd):
del ac[fd]
def check_maintenance(self, now):
"""Performs maintenance if necessary.
"""
if now < self.next_channel_cleanup[0]:
return
self.next_channel_cleanup[0] = now + self.adj.cleanup_interval
self.maintenance()
def maintenance(self):
"""Kills off dead connections.
"""
self.kill_zombies()
def kill_zombies(self):
"""Closes connections that have not had any activity in a while.
The timeout is configured through adj.channel_timeout (seconds).
"""
now = time.time()
cutoff = now - self.adj.channel_timeout
for channel in self.active_channels.values():
if (channel is not self and not channel.running_tasks and
channel.last_activity < cutoff):
channel.close()
def received(self, data):
"""Receives input asynchronously and launches or queues requests.
"""
preq = self.proto_request
while data:
if preq is None:
preq = self.parser_class(self.adj)
n = preq.received(data)
if preq.completed:
# The request is ready to use.
if not preq.empty:
self.queue_request(preq)
preq = None
self.proto_request = None
else:
self.proto_request = preq
if n >= len(data):
break
data = data[n:]
def queue_request(self, req):
"""Queues a request to be processed in sequence by a task.
"""
do_now = 0
running_lock.acquire()
try:
if self.running_tasks:
# Wait for the current tasks to finish.
rr = self.ready_requests
if rr is None:
rr = []
self.ready_requests = rr
rr.append(req)
else:
# Do it now.
self.running_tasks = 1
do_now = 1
finally:
running_lock.release()
if do_now:
self.set_sync()
self.create_task(req)
def handle_error(self):
"""Handles program errors (not communication errors)
"""
t, v = sys.exc_info()[:2]
if t is SystemExit or t is KeyboardInterrupt:
raise t, v
asyncore.dispatcher.handle_error(self)
def handle_comm_error(self):
"""Handles communication errors (not program errors)
"""
if self.adj.log_socket_errors:
self.handle_error()
else:
# Ignore socket errors.
self.close()
#
# SYNCHRONOUS METHODS
#
def end_task(self, close):
"""Called at the end of a task, may launch another task.
"""
if close:
self.close_when_done()
return
new_req = None
running_lock.acquire()
try:
rr = self.ready_requests
if rr:
new_req = rr.pop(0)
else:
self.running_tasks = 0
finally:
running_lock.release()
if new_req:
# Respond to the next request.
self.create_task(new_req)
else:
# Wait for another request on this connection.
self.set_async()
#
# BOTH MODES
#
def create_task(self, req):
"""Creates a new task and queues it for execution.
The task may get executed in another thread.
"""
task = self.task_class(self, req)
self.server.addTask(task)
class ServerBase (AlternateSocketMapMixin, asyncore.dispatcher):
"""Async. server base for launching derivatives of ServerChannelBase.
"""
channel_class = None # Override with a channel class.
SERVER_IDENT = 'Zope.Server.ServerBase' # Override.
def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
hit_log=None, verbose=0, socket_map=None):
if adj is None:
adj = default_adj
self.adj = adj
self.socket_map = socket_map
asyncore.dispatcher.__init__(self)
self.port = port
self.task_dispatcher = task_dispatcher
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((ip, port))
self.verbose = verbose
self.hit_log = hit_log
self.server_name = self.computeServerName(ip)
if start:
self.accept_connections()
def computeServerName(self, ip=''):
if ip:
server_name = str(ip)
else:
server_name = str(socket.gethostname())
# Convert to a host name if necessary.
is_hostname = 0
for c in server_name:
if c != '.' and not c.isdigit():
is_hostname = 1
break
if not is_hostname:
if self.verbose:
self.log_info('Computing hostname', 'info')
try:
server_name = socket.gethostbyaddr(server_name)[0]
except socket.error:
if self.verbose:
self.log_info('Cannot do reverse lookup', 'info')
return server_name
def accept_connections(self):
self.accepting = 1
self.socket.listen(self.adj.backlog) # Circumvent asyncore's NT limit
if self.verbose:
self.log_info('%s started.\n'
'\tHostname: %s\n\tPort: %d' % (
self.SERVER_IDENT,
self.server_name,
self.port
))
def readable(self):
return (self.accepting and
len(asyncore.socket_map) < self.adj.connection_limit)
def writable (self):
return 0
def handle_read (self):
pass
def readable (self):
return self.accepting
def handle_connect (self):
pass
def handle_accept (self):
try:
v = self.accept()
if v is None:
return
conn, addr = v
except socket.error:
# Linux: On rare occasions we get a bogus socket back from
# accept. socketmodule.c:makesockaddr complains that the
# address family is unknown. We don't want the whole server
# to shut down because of this.
if self.adj.log_socket_errors:
self.log_info ('warning: server accept() threw an exception',
'warning')
return
self.channel_class(self, conn, addr, self.adj, self.socket_map)
def addTask(self, task):
td = self.task_dispatcher
if td is not None:
td.addTask(task)
else:
task.service()
=== Zope3/lib/python/Zope/Server/Adjustments.py 1.1.2.2 => 1.1.2.3 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
=== Zope3/lib/python/Zope/Server/Buffers.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
=== Zope3/lib/python/Zope/Server/Chunking.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -9,9 +9,12 @@
from Utilities import find_double_newline
+from IStreamConsumer import IStreamConsumer
class ChunkedReceiver:
+
+ __implements__ = IStreamConsumer
chunk_remainder = 0
control_line = ''
=== Zope3/lib/python/Zope/Server/DualModeChannel.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -65,8 +65,7 @@
class DualModeChannel (AlternateSocketMapMixin, asyncore.dispatcher):
- """
- The channel switches between asynchronous and synchronous mode.
+ """Channel that switches between asynchronous and synchronous mode.
"""
# will_close is set to 1 to close the socket.
@@ -148,6 +147,10 @@
self.handle_error()
def set_sync(self):
+ """Switches to synchronous mode.
+
+ The main thread will stop calling received().
+ """
self.async_mode = 0
#
@@ -182,6 +185,10 @@
self.socket.setblocking(0)
def set_async(self):
+ """Switches to asynchronous mode.
+
+ The main thread will begin calling received() again.
+ """
self.async_mode = 1
self.pull_trigger()
@@ -221,7 +228,9 @@
class SimultaneousModeChannel (DualModeChannel):
- """
+ """Layer on top of DualModeChannel that allows communication in
+ both the main thread and other threads at the same time.
+
The channel operates in synchronous mode with an asynchronous
helper. The asynchronous callbacks empty the output buffer
and fill the input buffer.
=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.15 => 1.1.2.16 ===
"""
-SIMULT_MODE = 0 # Turn on to enable experimental simultaneous channel mode.
-
import asyncore
import re
import socket
@@ -24,16 +22,10 @@
from medusa.http_date import build_http_date, monthname
from medusa import logger
-if SIMULT_MODE:
- from DualModeChannel import SimultaneousModeChannel as \
- channel_base_class
-else:
- from DualModeChannel import DualModeChannel as channel_base_class
-
-from DualModeChannel import AlternateSocketMapMixin
+from ServerBase import ServerBase, ServerChannelBase, FixedStreamReceiver
from Buffers import OverflowableBuffer
from Utilities import find_double_newline
-from Adjustments import default_adj
+from IStreamConsumer import IStreamConsumer
from IHeaderOutput import IHeaderOutput
from ITask import ITask
@@ -43,19 +35,11 @@
except ImportError:
from StringIO import StringIO
-from thread import allocate_lock
-
-if 1:
- # Patch asyncore for speed.
- if hasattr(asyncore.dispatcher, '__getattr__'):
- del asyncore.dispatcher.__getattr__
+class HTTPTask:
+ """An HTTP task accepts a request and writes to a channel.
-class http_task:
- """
- An HTTP task receives a parsed request and an HTTP channel
- and is expected to write its response to that channel.
Subclass this and override the execute() method.
"""
@@ -74,7 +58,7 @@
self.channel = channel
self.request_data = request_data
self.response_headers = {
- 'Server': 'Zope.Server.HTTPServer',
+ 'Server': channel.server.SERVER_IDENT,
}
version = request_data.version
if version not in ('1.0', '1.1'):
@@ -83,14 +67,12 @@
self.version = version
def defer(self):
- """
- Called when the task will be serviced in a different thread.
+ """Called when the task will be serviced in a different thread.
"""
pass
def service(self):
- """
- Called to execute the task.
+ """Called to execute the task.
"""
try:
try:
@@ -105,31 +87,32 @@
self.channel.end_task(self.close_on_finish)
def cancel(self):
- """
- Called when shutting down the server.
+ """Called when shutting down the server.
"""
self.channel.close_when_done()
def setResponseStatus(self, status, reason):
+ """See the IHeaderOutput interface."""
self.status = status
self.reason = reason
def setResponseHeaders(self, mapping):
+ """See the IHeaderOutput interface."""
self.response_headers.update(mapping)
def appendResponseHeaders(self, lst):
- """
- Takes a list of strings.
- """
+ """See the IHeaderOutput interface."""
accum = self.accumulated_headers
if accum is None:
self.accumulated_headers = accum = []
accum.extend(lst)
def wroteResponseHeader(self):
+ """See the IHeaderOutput interface."""
return self.wrote_header
def setAuthUserName(self, name):
+ """See the IHeaderOutput interface."""
self.auth_user_name = name
def prepareResponseHeaders(self):
@@ -153,6 +136,9 @@
elif response_headers.has_key ('Transfer-Encoding'):
if not response_headers['Transfer-Encoding'] == 'chunked':
close_it = 1
+ elif self.status == '304':
+ # Replying with headers only.
+ pass
elif not response_headers.has_key ('Content-Length'):
close_it = 1
else:
@@ -211,39 +197,14 @@
-class StreamedReceiver:
-
- completed = 0
-
- def __init__(self, cl, buf):
- self.remain = cl
- self.buf = buf
-
- def received(self, data):
- rm = self.remain
- if rm < 1:
- self.completed = 1 # Avoid any chance of spinning
- return 0
- datalen = len(data)
- if rm <= datalen:
- self.buf.append(data[:rm])
- self.remain = 0
- self.completed = 1
- return rm
- else:
- self.buf.append(data)
- self.remain -= datalen
- return datalen
-
- def getfile(self):
- return self.buf.getfile()
-
-
-
class HTTPRequestParser:
+ """A structure that collects the HTTP request.
+
+ Once the stream is completed, the instance is passed to
+ a server task constructor.
"""
- A structure that collects the HTTP request.
- """
+
+ __implements__ = IStreamConsumer
completed = 0 # Set once request is completed.
empty = 0 # Set if no request was made.
@@ -352,7 +313,7 @@
self.content_length = cl
if cl > 0:
buf = OverflowableBuffer(self.adj.inbuf_overflow)
- self.body_rcv = StreamedReceiver(cl, buf)
+ self.body_rcv = FixedStreamReceiver(cl, buf)
def get_header_lines(self):
@@ -407,170 +368,8 @@
-# Synchronize access to the "running_tasks" attribute.
-running_lock = allocate_lock()
-
-
-
-class http_channel (channel_base_class):
- # Note: this class is very reusable for other protocols like FTP
- # and should probably be turned into an abstract base class
- # when we approach FTP.
-
- task_class = http_task
- active_channels = {} # Class-specific channel tracker
- next_channel_cleanup = [0] # Class-specific cleanup time
-
- proto_request = None # An HTTPRequestParser instance
- ready_requests = None # A list
- last_activity = 0 # Time of last activity
- running_tasks = 0 # boolean
-
- #
- # ASYNCHRONOUS METHODS (incl. __init__)
- #
-
- def __init__(self, server, conn, addr, adj=None, socket_map=None):
- channel_base_class.__init__(self, server, conn, addr, adj, socket_map)
- self.last_activity = t = self.creation_time
- self.check_maintenance(t)
-
- def add_channel(self, map=None):
- """
- This hook keeps track of opened HTTP channels.
- """
- channel_base_class.add_channel(self, map)
- self.active_channels[self._fileno] = self
-
- def del_channel(self, map=None):
- """
- This hook keeps track of closed HTTP channels.
- """
- channel_base_class.del_channel(self, map)
- ac = self.active_channels
- fd = self._fileno
- if ac.has_key(fd):
- del ac[fd]
-
- def check_maintenance(self, now):
- if now < self.next_channel_cleanup[0]:
- return
- self.next_channel_cleanup[0] = now + self.adj.cleanup_interval
- self.maintenance()
-
- def maintenance(self):
- # Kill off dead connections.
- self.kill_zombies()
-
- def kill_zombies(self):
- """
- Closes connections that have not had any activity in
- (channel_timeout) seconds.
- """
- now = time.time()
- cutoff = now - self.adj.channel_timeout
- for channel in self.active_channels.values():
- if (channel is not self and not channel.running_tasks and
- channel.last_activity < cutoff):
- channel.close()
-
- def received(self, data):
- """
- Receives input asynchronously and launches or queues requests.
- """
- preq = self.proto_request
- while data:
- if preq is None:
- preq = HTTPRequestParser(self.adj)
- n = preq.received(data)
- if preq.completed:
- # The request is ready to use.
- if not preq.empty:
- self.queue_request(preq)
- preq = None
- self.proto_request = None
- else:
- self.proto_request = preq
- if n >= len(data):
- break
- data = data[n:]
-
- def queue_request(self, req):
- """
- Queues requests to be processed in sequence by tasks.
- """
- do_now = 0
- running_lock.acquire()
- try:
- if self.running_tasks:
- # Wait for the current tasks to finish.
- rr = self.ready_requests
- if rr is None:
- rr = []
- self.ready_requests = rr
- rr.append(req)
- else:
- # Do it now.
- self.running_tasks = 1
- do_now = 1
- finally:
- running_lock.release()
- if do_now:
- self.set_sync()
- self.create_task(req)
-
- def handle_error(self):
- # Program error
- t, v = sys.exc_info()[:2]
- if t is SystemExit or t is KeyboardInterrupt:
- raise t, v
- asyncore.dispatcher.handle_error(self)
-
- def handle_comm_error(self):
- if self.adj.log_socket_errors:
- self.handle_error()
- else:
- # Ignore socket errors.
- self.close()
-
- #
- # SYNCHRONOUS METHODS
- #
-
- def end_task(self, close):
- if close:
- self.close_when_done()
- return
- new_req = None
- running_lock.acquire()
- try:
- rr = self.ready_requests
- if rr:
- new_req = rr.pop(0)
- else:
- self.running_tasks = 0
- finally:
- running_lock.release()
- if new_req:
- # Respond to the next request.
- self.create_task(new_req)
- else:
- # Wait for another request on this connection.
- self.set_async()
-
- #
- # BOTH MODES
- #
-
- def create_task(self, req):
- task = self.task_class(self, req)
- self.server.addTask(task)
-
-
-
class CommonHitLogger:
- """
- Outputs hits in common HTTP log format.
+ """Outputs hits in common HTTP log format.
"""
def __init__(self, logger_object=None, resolver=None):
@@ -644,104 +443,19 @@
)
-class http_server (AlternateSocketMapMixin, asyncore.dispatcher):
-
- channel_class = http_channel
- SERVER_IDENT = 'Zope.Server.HTTPServer.http_server'
+class HTTPServerChannel (ServerChannelBase):
+ task_class = HTTPTask
+ parser_class = HTTPRequestParser
- def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
- hit_log=None, verbose=0, socket_map=None):
- if adj is None:
- adj = default_adj
- self.adj = adj
- self.socket_map = socket_map
- asyncore.dispatcher.__init__(self)
- self.port = port
- self.task_dispatcher = task_dispatcher
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.set_reuse_addr()
- self.bind((ip, port))
- self.verbose = verbose
- self.hit_log = hit_log
- self.server_name = self.computeServerName(ip)
-
- if start:
- self.accept_connections()
-
- def computeServerName(self, ip=''):
- if ip:
- server_name = str(ip)
- else:
- server_name = str(socket.gethostname())
- # Convert to a host name if necessary.
- is_hostname = 0
- for c in server_name:
- if c != '.' and not c.isdigit():
- is_hostname = 1
- break
- if not is_hostname:
- if self.verbose:
- self.log_info('Computing hostname', 'info')
- try:
- server_name = socket.gethostbyaddr(server_name)[0]
- except socket.error:
- if self.verbose:
- self.log_info('Cannot do reverse lookup', 'info')
- return server_name
-
- def accept_connections(self):
- self.accepting = 1
- self.socket.listen(self.adj.backlog) # Circumvent asyncore's NT limit
- if self.verbose:
- self.log_info('HTTP server started.\n'
- '\tHostname: %s\n\tPort: %d' % (
- self.server_name,
- self.port
- ))
-
- def readable(self):
- return (self.accepting and
- len(asyncore.socket_map) < self.adj.connection_limit)
-
- def writable (self):
- return 0
-
- def handle_read (self):
- pass
-
- def readable (self):
- return self.accepting
-
- def handle_connect (self):
- pass
-
- def handle_accept (self):
- try:
- v = self.accept()
- if v is None:
- return
- conn, addr = v
- except socket.error:
- # Linux: On rare occasions we get a bogus socket back from
- # accept. socketmodule.c:makesockaddr complains that the
- # address family is unknown. We don't want the whole server
- # to shut down because of this.
- if self.adj.log_socket_errors:
- self.log_info ('warning: server accept() threw an exception',
- 'warning')
- return
- self.channel_class(self, conn, addr, self.adj, self.socket_map)
-
- def addTask(self, task):
- td = self.task_dispatcher
- if td is not None:
- td.addTask(task)
- else:
- task.service()
+ active_channels = {} # Class-specific channel tracker
+ next_channel_cleanup = [0] # Class-specific cleanup time
+class HTTPServer (ServerBase):
+ channel_class = HTTPServerChannel
+ SERVER_IDENT = 'Zope.Server.HTTPServer'
@@ -749,7 +463,7 @@
from TaskThreads import ThreadedTaskDispatcher
td = ThreadedTaskDispatcher()
td.setThreadCount(4)
- http_server('', 8080, task_dispatcher=td)
+ HTTPServer('', 8080, task_dispatcher=td)
try:
while 1:
asyncore.poll(5)
=== Zope3/lib/python/Zope/Server/IHeaderOutput.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -12,7 +12,11 @@
class IHeaderOutput (Interface):
- """
+ """Interface for setting HTTP response headers.
+
+ This allows the HTTP server and the application to both set response
+ headers.
+
Zope.Publisher.HTTP.HTTPResponse is optionally passed an
object which implements this interface in order to intermingle
its headers with the HTTP server's response headers,
@@ -20,29 +24,25 @@
"""
def setResponseStatus(status, reason):
- """
- Sets the status code and the accompanying message.
+ """Sets the status code and the accompanying message.
"""
def setResponseHeaders(mapping):
- """
- Sets headers. The headers must be Correctly-Cased.
+ """Sets headers. The headers must be Correctly-Cased.
"""
def appendResponseHeaders(lst):
- """
- Sets headers that can potentially repeat.
+ """Sets headers that can potentially repeat.
+
Takes a list of strings.
"""
def wroteResponseHeader():
- """
- Returns a flag indicating whether the response
+ """Returns a flag indicating whether the response
+
header has already been sent.
"""
def setAuthUserName(name):
- """
- Sets the name of the authenticated user so it can
- be logged.
+ """Sets the name of the authenticated user so the name can be logged.
"""
=== Zope3/lib/python/Zope/Server/ITask.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
=== Zope3/lib/python/Zope/Server/PublisherServers.py 1.1.2.7 => 1.1.2.8 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -10,7 +10,7 @@
from os import path as ospath
-from HTTPServer import http_task, http_channel, http_server
+from HTTPServer import HTTPTask, HTTPServerChannel, HTTPServer
from Zope.Publisher.Publish import publish
from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
@@ -23,7 +23,7 @@
}
-class PublisherHTTPTask (http_task):
+class PublisherHTTPTask (HTTPTask):
def execute(self):
server = self.channel.server
@@ -95,18 +95,18 @@
-class PublisherHTTPChannel (http_channel):
+class PublisherHTTPChannel (HTTPServerChannel):
task_class = PublisherHTTPTask
-class PublisherHTTPServer (http_server):
+class PublisherHTTPServer (HTTPServer):
channel_class = PublisherHTTPChannel
def __init__(self, request_payload, response_payload, *args, **kw):
self.request_payload = request_payload
self.response_payload = response_payload
- http_server.__init__(self, *args, **kw)
+ HTTPServer.__init__(self, *args, **kw)
=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1.2.6 => 1.1.2.7 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -13,6 +13,8 @@
from thread import allocate_lock, start_new_thread
from time import time, sleep
+from ITaskDispatcher import ITaskDispatcher
+
try:
from zLOG import LOG, ERROR
except ImportError:
@@ -20,8 +22,11 @@
ERROR = None
+
class ThreadedTaskDispatcher:
+ __implements__ = ITaskDispatcher
+
stop_count = 0 # Number of threads that will stop soon.
def __init__(self):
@@ -116,8 +121,6 @@
except Empty:
pass
- def hasTasks(self):
- # Inherently non-thread-safe.
- return not self.queue.empty()
-
+ def getPendingTasksEstimate(self):
+ return self.queue.qsize()
=== Zope3/lib/python/Zope/Server/Utilities.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
=== Zope3/lib/python/Zope/Server/ZLogIntegration.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-"""
-Pokes zLOG default logging into asyncore.
+"""Makes asyncore log to zLOG.
"""
from zLOG import LOG, register_subsystem, BLATHER, INFO, WARNING, ERROR
=== Zope3/lib/python/Zope/Server/__init__.py 1.1.2.3 => 1.1.2.4 ===
+# Copyright 2001-2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS