[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - Buffers.py:1.1.2.1 Chunking.py:1.1.2.1 DualModeChannel.py:1.1.2.1 Utilities.py:1.1.2.1 HTTPServer.py:1.1.2.15 chunking.py:NONE dual_mode_channel.py:NONE
Shane Hathaway
shane@cvs.zope.org
Thu, 31 Jan 2002 11:33:47 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv14226
Modified Files:
Tag: Zope-3x-branch
HTTPServer.py
Added Files:
Tag: Zope-3x-branch
Buffers.py Chunking.py DualModeChannel.py Utilities.py
Removed Files:
Tag: Zope-3x-branch
chunking.py dual_mode_channel.py
Log Message:
- Parse newlines, rather than requiring CRLFs, in the HTTP server.
- Fixed chunking to use hexadecimal.
- Renamed modules and classes to fit with Zope 3 naming conventions.
- Put buffer classes in their own module.
- Provided a way to insert TCPWatch in the HTTP server tests.
=== Added File Zope3/lib/python/Zope/Server/Buffers.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
# copy_bytes controls the size of temp. strings for shuffling data around.
COPY_BYTES = 1 << 18 # 256K
# The maximum number of bytes to buffer in a simple string.
STRBUF_LIMIT = 8192
class FileBasedBuffer:
remain = 0
def __init__(self, file, from_buffer=None):
self.file = file
if from_buffer is not None:
from_file = from_buffer.getfile()
read_pos = from_file.tell()
from_file.seek(0)
while 1:
data = from_file.read(COPY_BYTES)
if not data:
break
file.write(data)
self.remain = int(file.tell() - read_pos)
from_file.seek(read_pos)
file.seek(read_pos)
def __len__(self):
return self.remain
def append(self, s):
file = self.file
read_pos = file.tell()
file.seek(0, 2)
file.write(s)
file.seek(read_pos)
self.remain = self.remain + len(s)
def get(self, bytes=-1, skip=0):
file = self.file
if not skip:
read_pos = file.tell()
if bytes < 0:
# Read all
res = file.read()
else:
res = file.read(bytes)
if skip:
self.remain -= len(res)
else:
file.seek(read_pos)
return res
def skip(self, bytes, allow_prune=0):
if self.remain < bytes:
raise ValueError, (
"Can't skip %d bytes in buffer of %d bytes" %
(bytes, self.remain))
self.file.seek(bytes, 1)
self.remain = self.remain - bytes
def newfile(self):
raise 'NotImplemented'
def prune(self):
file = self.file
if self.remain == 0:
read_pos = file.tell()
file.seek(0, 2)
sz = file.tell()
file.seek(read_pos)
if sz == 0:
# Nothing to prune.
return
nf = self.newfile()
while 1:
data = file.read(COPY_BYTES)
if not data:
break
nf.write(data)
self.file = nf
def getfile(self):
return self.file
class TempfileBasedBuffer(FileBasedBuffer):
def __init__(self, from_buffer=None):
FileBasedBuffer.__init__(self, self.newfile(), from_buffer)
def newfile(self):
from tempfile import TemporaryFile
return TemporaryFile('w+b')
class StringIOBasedBuffer(FileBasedBuffer):
def __init__(self, from_buffer=None):
if from_buffer is not None:
FileBasedBuffer.__init__(self, StringIO(), from_buffer)
else:
# Shortcut. :-)
self.file = StringIO()
def newfile(self):
return StringIO()
class OverflowableBuffer:
"""
This buffer implementation has four stages:
- No data
- String-based buffer
- StringIO-based buffer
- Temporary file storage
The first two stages are fastest for simple transfers.
"""
overflowed = 0
buf = None
strbuf = '' # String-based buffer.
def __init__(self, overflow):
# overflow is the maximum to be stored in a StringIO buffer.
self.overflow = overflow
def __len__(self):
buf = self.buf
if buf is not None:
return len(buf)
else:
return len(self.strbuf)
def _create_buffer(self):
# print 'creating buffer'
strbuf = self.strbuf
if len(strbuf) >= self.overflow:
self._set_large_buffer()
else:
self._set_small_buffer()
buf = self.buf
if strbuf:
buf.append(self.strbuf)
self.strbuf = ''
return buf
def _set_small_buffer(self):
self.buf = StringIOBasedBuffer(self.buf)
self.overflowed = 0
def _set_large_buffer(self):
self.buf = TempfileBasedBuffer(self.buf)
self.overflowed = 1
def append(self, s):
buf = self.buf
if buf is None:
strbuf = self.strbuf
if len(strbuf) + len(s) < STRBUF_LIMIT:
self.strbuf = strbuf + s
return
buf = self._create_buffer()
buf.append(s)
sz = len(buf)
if not self.overflowed:
if sz >= self.overflow:
self._set_large_buffer()
def get(self, bytes=-1, skip=0):
buf = self.buf
if buf is None:
strbuf = self.strbuf
if not skip:
return strbuf
buf = self._create_buffer()
return buf.get(bytes, skip)
def skip(self, bytes, allow_prune=0):
buf = self.buf
if buf is None:
strbuf = self.strbuf
if allow_prune and bytes == len(strbuf):
# We could slice instead of converting to
# a buffer, but that would eat up memory in
# large transfers.
self.strbuf = ''
return
buf = self._create_buffer()
buf.skip(bytes, allow_prune)
def prune(self):
"""
A potentially expensive operation that removes all data
already retrieved from the buffer.
"""
buf = self.buf
if buf is None:
self.strbuf = ''
return
buf.prune()
if self.overflowed:
sz = len(buf)
if sz < self.overflow:
# Revert to a faster buffer.
self._set_small_buffer()
def getfile(self):
buf = self.buf
if buf is None:
buf = self._create_buffer()
return buf.getfile()
=== Added File Zope3/lib/python/Zope/Server/Chunking.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
from Utilities import find_double_newline
class ChunkedReceiver:
chunk_remainder = 0
control_line = ''
all_chunks_received = 0
trailer = ''
completed = 0
# max_control_line = 1024
# max_trailer = 65536
def __init__(self, buf):
self.buf = buf
def received(self, s):
# Returns the number of bytes consumed.
if self.completed:
return 0
orig_size = len(s)
while s:
rm = self.chunk_remainder
if rm > 0:
# Receive the remainder of a chunk.
to_write = s[:rm]
self.buf.append(to_write)
written = len(to_write)
s = s[written:]
self.chunk_remainder -= written
elif not self.all_chunks_received:
# Receive a control line.
s = self.control_line + s
pos = s.find('\n')
if pos < 0:
# Control line not finished.
self.control_line = s
s = ''
else:
# Control line finished.
line = s[:pos]
s = s[pos + 1:]
self.control_line = ''
line = line.strip()
if line:
# Begin a new chunk.
semi = line.find(';')
if semi >= 0:
# discard extension info.
line = line[:semi]
sz = int(line.strip(), 16) # hexadecimal
if sz > 0:
# Start a new chunk.
self.chunk_remainder = sz
else:
# Finished chunks.
self.all_chunks_received = 1
# else expect a control line.
else:
# Receive the trailer.
trailer = self.trailer + s
if trailer[:2] == '\r\n':
# No trailer.
self.completed = 1
return orig_size - (len(trailer) - 2)
elif trailer[:1] == '\n':
# No trailer.
self.completed = 1
return orig_size - (len(trailer) - 1)
pos = find_double_newline(trailer)
if pos < 0:
# Trailer not finished.
self.trailer = trailer
s = ''
else:
# Finished the trailer.
self.completed = 1
self.trailer = trailer[:pos]
return orig_size - (len(trailer) - pos)
return orig_size
def getfile(self):
return self.buf.getfile()
=== Added File Zope3/lib/python/Zope/Server/DualModeChannel.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
import asyncore
import socket
from time import time
from UserDict import UserDict
from medusa.thread import select_trigger
from Adjustments import default_adj
from Buffers import OverflowableBuffer
# Create the main trigger if it doesn't exist yet.
if select_trigger.the_trigger is None:
select_trigger.the_trigger = select_trigger.trigger()
class AlternateSocketMapMixin:
"""Mixin for asyncore.dispatcher to more easily support
alternate socket maps"""
socket_map = None
def add_channel(self, map=None):
if map is None:
map = self.socket_map
asyncore.dispatcher.add_channel(self, map)
def del_channel(self, map=None):
if map is None:
map = self.socket_map
asyncore.dispatcher.del_channel(self, map)
def pull_trigger(self):
pull_trigger = getattr(self.socket_map, 'pull_trigger', None)
if pull_trigger is not None:
# Use the trigger from the socket map.
pull_trigger()
else:
select_trigger.the_trigger.pull_trigger()
class ASMTrigger (AlternateSocketMapMixin, select_trigger.trigger):
"""Trigger for an alternate socket map"""
def __init__(self, socket_map):
self.socket_map = socket_map
select_trigger.trigger.__init__(self)
pull_trigger = select_trigger.trigger.pull_trigger
class SocketMapWithTrigger (UserDict):
def __init__(self):
UserDict.__init__(self)
self.pull_trigger = ASMTrigger(self).pull_trigger
class DualModeChannel (AlternateSocketMapMixin, asyncore.dispatcher):
"""
The channel switches between asynchronous and synchronous mode.
"""
# will_close is set to 1 to close the socket.
will_close = 0
# boolean: async or sync mode
async_mode = 1
def __init__(self, server, conn, addr, adj=None, socket_map=None):
self.server = server
self.addr = addr
if adj is None:
adj = default_adj
self.adj = adj
self.socket_map = socket_map
self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
self.creation_time = time()
asyncore.dispatcher.__init__(self, conn)
def get_sync_streams(self):
return synchronous_streams(self)
#
# ASYNCHRONOUS METHODS
#
def handle_close(self):
self.close()
def writable(self):
if not self.async_mode:
return 0
return self.will_close or self.outbuf
def handle_write(self):
if not self.async_mode:
return
self.inner_handle_write()
def inner_handle_write(self):
if self.outbuf:
try:
self._flush_some()
except socket.error:
self.handle_comm_error()
elif self.will_close:
self.close()
def readable(self):
if not self.async_mode:
return 0
return not self.will_close
def handle_read(self):
if not self.async_mode:
return
self.inner_handle_read()
def inner_handle_read(self):
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
self.handle_comm_error()
return
self.received(data)
def received(self, data):
"""
Override to receive data in async mode.
"""
pass
def handle_comm_error(self):
"""
Designed for handling communication errors that occur
during asynchronous operations *only*. Probably should log
this, but in a different place.
"""
self.handle_error()
def set_sync(self):
self.async_mode = 0
#
# SYNCHRONOUS METHODS
#
def sync_write(self, data):
if data:
self.outbuf.append(data)
while len(self.outbuf) >= self.adj.send_bytes:
# Send what we can without blocking.
# We propogate errors to the application on purpose
# (to prevent unnecessary work).
if not self._flush_some():
break
def sync_flush(self):
"""
Pauses the application while outbuf is flushed.
Normally not a good thing to do.
"""
blocked = 0
try:
while self.outbuf:
# We propogate errors to the application on purpose.
if not blocked:
self.socket.setblocking(1)
blocked = 1
self._flush_some()
finally:
if blocked:
self.socket.setblocking(0)
def set_async(self):
self.async_mode = 1
self.pull_trigger()
#
# METHODS USED IN BOTH MODES
#
def _flush_some(self):
outbuf = self.outbuf
if outbuf:
chunk = outbuf.get(self.adj.send_bytes)
num_sent = self.send(chunk)
if num_sent:
outbuf.skip(num_sent, 1)
return 1
return 0
def close_when_done(self):
if self.async_mode:
self.will_close = 1
self.pull_trigger()
else:
# We might be able close immediately.
while self._flush_some():
pass
if not self.outbuf:
# Quick exit.
self.close()
else:
# Wait until outbuf is flushed.
self.will_close = 1
self.async_mode = 1
self.pull_trigger()
allocate_lock = None
class SimultaneousModeChannel (DualModeChannel):
"""
The channel operates in synchronous mode with an asynchronous
helper. The asynchronous callbacks empty the output buffer
and fill the input buffer.
"""
def __init__(self, server, conn, addr, adj=None, socket_map=None):
global allocate_lock
if allocate_lock is None:
from thread import allocate_lock
writelock = allocate_lock()
self._writelock_acquire = writelock.acquire
self._writelock_release = writelock.release
self._writelock_locked = writelock.locked
DualModeChannel.__init__(self, server, conn, addr, adj, socket_map)
#
# ASYNCHRONOUS METHODS
#
def writable(self):
return self.will_close or (
self.outbuf and not self._writelock_locked())
def handle_write(self):
if not self._writelock_acquire(0):
# A synchronous method is writing.
return
try:
self.inner_handle_write()
finally:
self._writelock_release()
def readable(self):
return not self.will_close
def handle_read(self):
self.inner_handle_read()
def set_sync(self):
pass
#
# SYNCHRONOUS METHODS
#
def sync_write(self, data):
self._writelock_acquire()
try:
DualModeChannel.sync_write(self, data)
finally:
self._writelock_release()
def sync_flush(self):
self._writelock_acquire()
try:
DualModeChannel.sync_flush(self)
finally:
self._writelock_release()
def set_async(self):
pass
#
# METHODS USED IN BOTH MODES
#
def close_when_done(self):
self.will_close = 1
self.pull_trigger()
=== Added File Zope3/lib/python/Zope/Server/Utilities.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
def find_double_newline(s):
"""Returns the position just after a double newline in the given string."""
pos1 = s.find('\n\r\n') # One kind of double newline
if pos1 >= 0:
pos1 += 3
pos2 = s.find('\n\n') # Another kind of double newline
if pos2 >= 0:
pos2 += 2
if pos1 >= 0:
if pos2 >= 0:
return min(pos1, pos2)
else:
return pos1
else:
return pos2
=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.14 => 1.1.2.15 ===
"""
-SIMULT_MODE = 0
+SIMULT_MODE = 0 # Turn on to enable experimental simultaneous channel mode.
import asyncore
import re
@@ -25,12 +25,14 @@
from medusa import logger
if SIMULT_MODE:
- from dual_mode_channel import simultaneous_mode_channel as \
+ from DualModeChannel import SimultaneousModeChannel as \
channel_base_class
else:
- from dual_mode_channel import dual_mode_channel as channel_base_class
+ from DualModeChannel import DualModeChannel as channel_base_class
-from dual_mode_channel import AlternateSocketMapMixin, OverflowableBuffer
+from DualModeChannel import AlternateSocketMapMixin
+from Buffers import OverflowableBuffer
+from Utilities import find_double_newline
from Adjustments import default_adj
from IHeaderOutput import IHeaderOutput
from ITask import ITask
@@ -238,7 +240,7 @@
-class http_request_data:
+class HTTPRequestParser:
"""
A structure that collects the HTTP request.
"""
@@ -259,6 +261,7 @@
"""
adj is an Adjustments object.
"""
+ self.headers = {}
self.adj = adj
def received(self, data):
@@ -275,11 +278,11 @@
if br is None:
# In header.
s = self.header_plus + data
- index = s.find('\r\n\r\n')
+ index = find_double_newline(s)
if index >= 0:
# Header finished.
header_plus = s[:index]
- consumed = len(data) - (len(s) - (index + 4))
+ consumed = len(data) - (len(s) - index)
self.in_header = 0
# Remove preceeding blank lines.
header_plus = header_plus.lstrip()
@@ -308,18 +311,18 @@
Parses the header_plus block of text (the headers plus the
first line of the request).
"""
- index = header_plus.find('\r\n')
+ index = header_plus.find('\n')
if index >= 0:
- first_line = header_plus[:index]
- header = header_plus[index + 2:]
+ first_line = header_plus[:index].rstrip()
+ header = header_plus[index + 1:]
else:
- first_line = header_plus
+ first_line = header_plus.rstrip()
header = ''
self.first_line = first_line
self.header = header
lines = self.get_header_lines()
- self.headers = headers = {}
+ headers = self.headers
for line in lines:
index = line.find(':')
if index > 0:
@@ -340,7 +343,7 @@
if version == '1.1':
te = headers.get('TRANSFER_ENCODING', '')
if te == 'chunked':
- from chunking import ChunkedReceiver
+ from Chunking import ChunkedReceiver
self.chunked = 1
buf = OverflowableBuffer(self.adj.inbuf_overflow)
self.body_rcv = ChunkedReceiver(buf)
@@ -357,7 +360,7 @@
Splits the header into lines, putting multi-line headers together.
"""
r = []
- lines = self.header.split('\r\n')
+ lines = self.header.split('\n')
for line in lines:
if line and line[0] in ' \t':
r[-1] = r[-1] + line[1:]
@@ -418,7 +421,7 @@
active_channels = {} # Class-specific channel tracker
next_channel_cleanup = [0] # Class-specific cleanup time
- proto_request = None # An http_request_data instance
+ proto_request = None # An HTTPRequestParser instance
ready_requests = None # A list
last_activity = 0 # Time of last activity
running_tasks = 0 # boolean
@@ -478,7 +481,7 @@
preq = self.proto_request
while data:
if preq is None:
- preq = http_request_data(self.adj)
+ preq = HTTPRequestParser(self.adj)
n = preq.received(data)
if preq.completed:
# The request is ready to use.
=== Removed File Zope3/lib/python/Zope/Server/chunking.py ===
=== Removed File Zope3/lib/python/Zope/Server/dual_mode_channel.py ===