[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - chunking.py:1.1.2.1 HTTPServer2.py:1.1.2.3 PublisherServers.py:1.1.2.2 dual_mode_channel.py:1.1.2.3
Shane Hathaway
shane@digicool.com
Mon, 26 Nov 2001 11:16:08 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv23433
Modified Files:
Tag: Zope-3x-branch
HTTPServer2.py PublisherServers.py dual_mode_channel.py
Added Files:
Tag: Zope-3x-branch
chunking.py
Log Message:
Closer to full integration with Publisher.
=== Added File Zope3/lib/python/Zope/Server/chunking.py ===
class ChunkedReceiver:
chunk_remainder = 0
control_line = ''
all_chunks_received = 0
trailer = ''
finished = 0
# max_control_line = 1024
# max_trailer = 65536
def __init__(self, buf):
self.buf = buf
def received(self, s):
# Returns the number of bytes consumed.
if self.finished:
return 0
orig_size = len(s)
while s:
rm = self.chunk_remainder
if rm > 0:
# Receive the remainder of a chunk.
to_write = s[:rm]
self.buf_append(to_write)
written = len(to_write)
s = s[written:]
self.chunk_remainder -= written
elif not self.all_chunks_received:
# Receive a control line.
s = self.control_line + s
pos = s.find('\r\n')
if pos < 0:
# Control line not finished.
self.control_line = s
s = ''
else:
# Control line finished.
line = s[:pos]
s = s[pos + 2:]
self.control_line = ''
if line:
# Begin a new chunk.
semi = line.find(';')
if semi >= 0:
# discard extension info.
line = line[:semi]
sz = int(line)
if sz > 0:
# Start a new chunk.
self.chunk_remainder = sz
else:
# Finished chunks.
self.all_chunks_received = 1
# else expect a control line.
else:
# Receive the trailer.
s = self.trailer + s
if s[:2] == '\r\n':
# No trailer.
self.finished = 1
return orig_size - (len(s) - 2)
pos = s.find('\r\n\r\n')
if pos >= 0:
# Finished the trailer.
self.finished = 1
self.trailer = s[:pos + 2]
return orig_size - (len(s) - (pos + 4))
return orig_size
def getfile(self):
return self.buf.getfile()
=== Zope3/lib/python/Zope/Server/HTTPServer2.py 1.1.2.2 => 1.1.2.3 ===
# FOR A PARTICULAR PURPOSE.
-# This server uses asyncore to accept connections and do initial
-# processing but threads to do work.
+"""
+This server uses asyncore to accept connections and do initial
+processing but threads to do work.
+"""
SIMULT_MODE = 0
@@ -78,7 +80,6 @@
"""
try:
try:
- self.init()
self.execute()
self.finish()
finally:
@@ -156,10 +157,6 @@
res = '%s\r\n\r\n' % '\r\n'.join(lines)
return res
- def init(self):
- """
- """
-
def execute(self):
"""
Override this.
@@ -191,9 +188,9 @@
completed = 0
- def __init__(self, cl):
+ def __init__(self, cl, buf):
self.remain = cl
- self.buf = OverflowableBuffer(525000) # TODO: make configurable
+ self.buf = buf
def received(self, data):
rm = self.remain
@@ -217,17 +214,21 @@
class http_request_data:
- completed = 0
- bad = 0
+ completed = 0 # Set once request is completed.
+ empty = 0 # Set if no request was made.
header_plus = ''
chunked = 0
content_length = 0
body_rcv = None
- # Other attributes: first_line, header, headers, command, uri, version
+ # Other attributes: first_line, header, headers, command, uri, version,
+ # path, query, fragment
+
+ # headers is a mapping containing keys translated to uppercase
+ # with dashes turned into underscores.
def received(self, data):
"""
- Receives the HTTP stream.
+ Receives the HTTP stream for one request.
Returns the number of bytes consumed.
"""
if self.completed:
@@ -246,17 +247,12 @@
# Remove preceeding blank lines.
header_plus = header_plus.lstrip()
if not header_plus:
- # No request was made.
- self.handle_error('No request')
- return datalen
+ self.empty = 1
+ self.completed = 1
else:
- try:
- self.parse_header(header_plus)
- if self.body_rcv is None:
- self.completed = 1
- except:
- self.handle_error()
- return datalen
+ self.parse_header(header_plus)
+ if self.body_rcv is None:
+ self.completed = 1
return consumed
else:
# Header not finished yet.
@@ -264,21 +260,11 @@
return datalen
else:
# In body.
- try:
- consumed = br.received(data)
- except:
- self.handle_error()
- return datalen
+ consumed = br.received(data)
if br.completed:
self.completed = 1
return consumed
- def handle_error(self, msg=None):
- # TODO: generate a response saying bad header?
- import traceback
- traceback.print_exc()
- self.bad = 1
- self.completed = 1
def parse_header(self, header_plus):
index = header_plus.find('\r\n')
@@ -308,17 +294,22 @@
uri = unquote(uri)
self.uri = str(uri)
self.version = version
+ self.split_uri()
if version == '1.1':
te = headers.get('TRANSFER_ENCODING', '')
if te == 'chunked':
+ from chunking import ChunkedReceiver
self.chunked = 1
- self.body_rcv = ChunkedReceiver()
+ buf = OverflowableBuffer(525000) # TODO: make configurable
+ self.body_rcv = ChunkedReceiver(buf)
if not self.chunked:
cl = int(headers.get('CONTENT_LENGTH', 0))
self.content_length = cl
if cl > 0:
- self.body_rcv = StreamedReceiver(cl)
+ buf = OverflowableBuffer(525000) # TODO: make configurable
+ self.body_rcv = StreamedReceiver(cl, buf)
+
def get_header_lines(self):
"""
@@ -348,8 +339,32 @@
else:
return None, None, None
+ path_regex = re.compile (
+ # path query fragment
+ r'([^?#]*)(\?[^#]*)?(#.*)?'
+ )
+
+ def split_uri(self):
+ m = self.path_regex.match (self.uri)
+ if m.end() != len(self.uri):
+ raise ValueError, "Broken URI"
+ else:
+ self.path, query, self.fragment = m.groups()
+ if query:
+ query = query[1:]
+ self.query = query
+
+ def getBodyStream(self):
+ body_rcv = self.body_rcv
+ if body_rcv is not None:
+ return body_rcv.getfile()
+ else:
+ return StringIO('')
+
+
-tasklock = allocate_lock()
+# Synchronize access to request queues.
+request_queue_lock = allocate_lock()
class http_channel (channel_type):
@@ -359,13 +374,21 @@
active_channels = {} # Class-specific channel counter
proto_request = None
ready_requests = None # A list
- task_running = 0
+ wedged = 0
+
def add_channel(self, map=None):
+ """
+ Keeps track of opened HTTP channels.
+ """
channel_type.add_channel(self, map)
self.active_channels[self._fileno] = self
+
def del_channel(self, map=None):
+ """
+ Keeps track of opened HTTP channels.
+ """
channel_type.del_channel(self, map)
ac = self.active_channels
fd = self._fileno
@@ -373,15 +396,31 @@
del ac[fd]
# print 'active HTTP channels:', len(ac)
+
def received(self, data):
+ """
+ Receives input asynchronously and launches or queues requests.
+ """
+ if self.wedged:
+ # Ignore input after a bad request.
+ return
preq = self.proto_request
while data:
if preq is None:
preq = http_request_data()
- n = preq.received(data)
+ try:
+ n = preq.received(data)
+ except:
+ # Bad header format or request. Can't accept more requests.
+ # TODO: use logging.
+ import traceback
+ traceback.print_exc()
+ self.wedged = 1
+ return
if preq.completed:
- # Ready.
- self.queue_request(preq)
+ # The request is ready to use.
+ if not preq.empty:
+ self.queue_request(preq)
preq = None
self.proto_request = None
else:
@@ -390,43 +429,54 @@
break
data = data[n:]
+
def queue_request(self, req):
rr = self.ready_requests
+ do_now = 1
if rr is None:
# First request--no need to lock.
self.ready_requests = []
else:
- tasklock.acquire()
+ request_queue_lock.acquire()
try:
if rr:
# The request will be executed when the current
# task is finished.
rr.append(req)
- return
+ do_now = 0
# else no task is running.
finally:
- tasklock.release()
- self.set_sync()
- self.start_task(req)
+ request_queue_lock.release()
+ if do_now:
+ self.set_sync()
+ self.create_task(req)
+
- def start_task(self, req):
+ def create_task(self, req):
task = self.task_class(self, req)
self.server.addTask(task)
+
def end_task(self, close):
if close:
self.close_when_done()
else:
- tasklock.acquire()
+ new_task = 0
+ req = None
+ request_queue_lock.acquire()
try:
rr = self.ready_requests
if rr:
req = rr.pop(0)
- self.start_task(req)
- else:
- self.set_async()
+ new_task = 1
finally:
- tasklock.release()
+ request_queue_lock.release()
+ if new_task:
+ # Respond to the next request.
+ self.create_task(req)
+ else:
+ # Wait for another request on this connection.
+ self.set_async()
@@ -434,14 +484,29 @@
channel_class = http_channel
+ SERVER_IDENT = 'Zope.Server.HTTPServer.http_server'
+
def __init__(self, ip, port, backlog=1024, tasks=None):
# Assumes sock is already bound.
asyncore.dispatcher.__init__(self)
+ self.port = port
self.tasks = tasks
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((ip, port))
+
+ host, port = self.socket.getsockname()
+ if not ip:
+ self.log_info('Computing default hostname', 'info')
+ ip = socket.gethostbyname (socket.gethostname())
+ try:
+ self.server_name = socket.gethostbyaddr (ip)[0]
+ except socket.error:
+ self.log_info('Cannot do reverse lookup', 'info')
+ self.server_name = ip # use the IP address as the "hostname"
+
self.listen(backlog)
+
def writable (self):
return 0
=== Zope3/lib/python/Zope/Server/PublisherServers.py 1.1.2.1 => 1.1.2.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+from os import path as ospath
+
from HTTPServer2 import http_task, http_channel, http_server
from Zope.Publisher.Publish import publish
from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
from Zope.Publisher.HTTP.HTTPResponse import HTTPResponse
+rename_headers = {
+ 'CONTENT_LENGTH' : 'CONTENT_LENGTH',
+ 'CONTENT_TYPE' : 'CONTENT_TYPE',
+ 'CONNECTION' : 'CONNECTION_TYPE',
+ }
+
class PublisherHTTPTask (http_task):
def execute(self):
server = self.channel.server
+ env = self.create_environment()
+ instream = self.request_data.getBodyStream()
resp = HTTPResponse(server.response_payload, self, self)
- req = HTTPRequest(server.request_payload, self.instream,
- self.request_headers, resp)
+ req = HTTPRequest(server.request_payload, instream,
+ env, resp)
publish(req)
+
+ def create_environment(self):
+ request_data = self.request_data
+ path = request_data.path
+ channel = self.channel
+ server = channel.server
+
+ while path and path[0] == '/':
+ path = path[1:]
+ # already unquoted!
+ # if '%' in path:
+ # path = unquote(path)
+
+ env = {}
+ env['REQUEST_METHOD'] = request_data.command.upper()
+ env['SERVER_PORT'] = str(server.port)
+ env['SERVER_NAME'] = server.server_name
+ env['SERVER_SOFTWARE'] = server.SERVER_IDENT
+ env['SERVER_PROTOCOL'] = "HTTP/%s" % self.version
+ env['channel.creation_time'] = channel.creation_time
+## if self.uri_base=='/':
+ env['SCRIPT_NAME']=''
+ env['PATH_INFO']='/' + path
+## else:
+## env['SCRIPT_NAME'] = self.uri_base
+## try:
+## path_info = '/'.split(self.uri_base[1:],1)[1]
+## except:
+## path_info=''
+## env['PATH_INFO']=path_info
+ #env['PATH_TRANSLATED'] = ospath.normpath(ospath.join(
+ # workdir, env['PATH_INFO']))
+ query = request_data.query
+ if query:
+ env['QUERY_STRING'] = query
+ env['GATEWAY_INTERFACE'] = 'CGI/1.1'
+ env['REMOTE_ADDR'] = channel.addr[0]
+
+ # If we're using a resolving logger, try to get the
+ # remote host from the resolver's cache.
+## if hasattr(server.logger, 'resolver'):
+## dns_cache=server.logger.resolver.cache
+## if dns_cache.has_key(env['REMOTE_ADDR']):
+## remote_host=dns_cache[env['REMOTE_ADDR']][2]
+## if remote_host is not None:
+## env['REMOTE_HOST']=remote_host
+
+ env_has = env.has_key
+
+ for key, value in request_data.headers.items():
+ value = value.strip()
+ mykey = rename_headers.get(key, None)
+ if mykey is None:
+ mykey = 'HTTP_%s' % key
+ if not env_has(mykey):
+ env[mykey] = value
+ return env
+
=== Zope3/lib/python/Zope/Server/dual_mode_channel.py 1.1.2.2 => 1.1.2.3 ===
# Create a tempfile if the pending output data gets larger
# than outbuf_overflow.
- outbuf_overflow = 4100000 # About 4 MB
+ outbuf_overflow = 1050000 # A little over 1 MB
# will_close is set to 1 to close the socket.
will_close = 0
@@ -114,16 +114,13 @@
raise t
asyncore.dispatcher.handle_error(self)
- def handle_comm_error(self, msg=None):
+ def handle_comm_error(self):
"""
Designed for handling communication errors that occur
during asynchronous transfers *only*. Probably should log
this, but in a different place.
"""
- print '--- communication error ---'
- import traceback
- traceback.print_exc()
- self.close()
+ self.handle_error()
def set_sync(self):
self.async_mode = 0
@@ -135,11 +132,12 @@
def sync_write(self, data):
if data:
self.outbuf.append(data)
- if len(self.outbuf) >= self.send_bytes:
- while self._flush_some():
- # Send what we can without blocking.
- # We propogate errors to the application on purpose.
- pass
+ while len(self.outbuf) >= self.send_bytes:
+ # Send what we can without blocking.
+ # We propogate errors to the application on purpose
+ # (to prevent unnecessary work).
+ if not self._flush_some():
+ break
return len(data)
def sync_flush(self):