[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - HTTPServer2.py:1.1.2.1 PublisherServers.py:1.1.2.1 TaskThreads.py:1.1.2.1 dual_mode_channel.py:1.1.2.1 HTTPResponse.py:1.1.2.7 HTTPServer.py:1.1.2.7
Shane Hathaway
shane@digicool.com
Wed, 21 Nov 2001 19:22:41 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv31156/Server
Modified Files:
Tag: Zope-3x-branch
HTTPResponse.py HTTPServer.py
Added Files:
Tag: Zope-3x-branch
HTTPServer2.py PublisherServers.py TaskThreads.py
dual_mode_channel.py
Log Message:
- Created new HTTP server based on Medusa and ZServer.
- Made some corresponding changes to Zope.Publication.
- Got minitest working again.
=== Added File Zope3/lib/python/Zope/Server/HTTPServer2.py ===
# This server uses asyncore to accept connections and do initial
# processing but threads to do work.
SIMULT_MODE = 0
import asyncore
import re
import socket
import sys
import time
from urllib import unquote
from medusa.http_date import build_http_date
if SIMULT_MODE:
from dual_mode_channel import simultaneous_mode_channel as channel_type
else:
from dual_mode_channel import dual_mode_channel as channel_type
from dual_mode_channel import synchronous_instream
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
if 1:
# Patch asyncore for speed.
if hasattr(asyncore.dispatcher, '__getattr__'):
del asyncore.dispatcher.__getattr__
class http_task:
# __implements__ = ITask
instream = None
close_on_finish = 1
status_str = '200 Ok'
wrote_header = 0
accumulated_headers = None
def __init__(self, channel, request_header_plus, body_start):
self.channel = channel
self.request_header_plus = request_header_plus
self.body_start = body_start
self.response_headers = {
'Server' : 'Zope.Server.HTTPServer',
'Date' : build_http_date (time.time())
}
def defer(self):
"""
Called when the task will be serviced in a different thread.
"""
pass
def service(self):
"""
"""
try:
try:
self.init()
self.execute()
self.finish()
finally:
self.channel.end_task(self.close_on_finish)
except socket.error:
self.channel.handle_comm_error()
def cancel(self):
"""
Called when shutting down the server.
"""
self.channel.kill_task()
def init(self):
"""
"""
rhp = self.request_header_plus
index = rhp.find('\r\n')
if index >= 0:
first_line = rhp[:index]
header = rhp[index + 2:]
else:
first_line = rhp
header = ''
self.first_line = first_line
self.header = header
lines = get_header_lines(header)
self.request_headers = request_headers = {}
for line in lines:
index = line.find(':')
if index > 0:
key = line[:index]
value = line[index + 1:].strip()
key1 = key.upper().replace('-', '_')
request_headers[key1] = value
# else there's garbage in the headers?
channel = self.channel
request_body_len = int(request_headers.get('CONTENT_LENGTH', 0))
self.request_body_len = request_body_len
if request_body_len > 0:
body_start = self.body_start
body_end = body_start + request_body_len
self.instream = synchronous_instream(
channel, body_start, body_end)
else:
self.instream = StringIO('')
command, uri, version = crack_first_line(self.first_line)
self.command = str(command)
if uri and '%' in uri:
uri = unquote(uri)
self.uri = str(uri)
if version not in ('1.0', '1.1'):
# fall back to a version we support.
version = '1.0'
self.version = version
# setResponseStatus(), setResponseHeaders(), appendResponseHeaders(),
# and wroteResponseHeader() are part of the IHeaderOutput interface
# used by Zope.Publisher.HTTP.HTTPResponse.
def setResponseStatus(self, s):
self.status_str = s
def setResponseHeaders(self, mapping):
self.response_headers.update(mapping)
def appendResponseHeaders(self, lst):
"""
Takes a list of strings.
"""
accum = self.accumulated_headers
if accum is None:
self.accumulated_headers = accum = []
accum.extend(lst)
def wroteResponseHeader(self):
return self.wrote_header
def prepareResponseHeaders(self):
version = self.version
# Figure out whether the connection should be closed.
connection = self.request_headers.get('CONNECTION', None)
close_on_finish = 1
response_headers = self.response_headers
if version == '1.0':
if connection == 'keep-alive':
if response_headers.has_key('Content-Length'):
close_on_finish = 0
self.response_headers['Connection'] = 'Keep-Alive'
elif self.version == '1.1':
if connection != 'close':
if response_headers.has_key('Content-Length'):
close_on_finish = 0
else:
te = response_headers.get('Transfer-Encoding', None)
if te is not None:
if te == 'chunked':
close_on_finish = 0
if close_on_finish:
self.response_headers['Connection'] = 'close'
else:
self.close_on_finish = 0
def buildResponseHeader(self):
self.prepareResponseHeaders()
first_line = 'HTTP/%s %s' % (self.version, self.status_str)
lines = [first_line] + map(
lambda hv: '%s: %s' % hv, self.response_headers.items())
accum = self.accumulated_headers
if accum is not None:
lines.extend(accum)
res = '%s\r\n\r\n' % '\r\n'.join(lines)
return res
def execute(self):
"""
Override this.
"""
body = ("The uri was %s\r\n" % self.uri) * 10
self.response_headers['Content-Type'] = 'text/plain'
self.response_headers['Content-Length'] = str(len(body))
self.write(body)
def finish(self):
if not self.wrote_header:
if not self.response_headers.has_key('Content-Length'):
self.response_headers['Content-Length'] = '0'
self.write('')
def write(self, data):
channel = self.channel
if not self.wrote_header:
rh = self.buildResponseHeader()
channel.sync_write(rh)
self.wrote_header = 1
return channel.sync_write(data)
def flush(self):
self.channel.sync_flush()
class http_channel (channel_type):
max_header_size = 65536 # Ought to be enough
task_class = http_task
def after_read_no_task(self):
self.inbuf.seek(0)
data = self.inbuf.read()
if len(data) >= self.max_header_size:
# Header too big. DoS?
self.close_when_done()
return
# reading headers for the next task.
index = data.find('\r\n\r\n')
if index >= 0:
header_plus = data[:index]
body_start = index + 4
self.inbuf.seek(body_start)
# Remove preceeding blank lines.
header_plus = header_plus.lstrip()
if not header_plus:
# No request was made.
self.close_when_done()
return
task = self.task_class(self, header_plus, body_start)
self.start_task(task)
self.server.addTask(task)
class http_server (asyncore.dispatcher):
channel_class = http_channel
def __init__(self, ip, port, backlog=5, tasks=None):
# Assumes sock is already bound.
asyncore.dispatcher.__init__(self)
self.tasks = tasks
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((ip, port))
self.listen(backlog)
def writable (self):
return 0
def handle_read (self):
pass
def readable (self):
return self.accepting
def handle_connect (self):
pass
def handle_error(self):
t = sys.exc_info()[0]
if t is KeyboardInterrupt:
raise t
asyncore.dispatcher.handle_error(self)
def handle_accept (self):
try:
conn, addr = self.accept()
except socket.error:
# linux: on rare occasions we get a bogus socket back from
# accept. socketmodule.c:makesockaddr complains that the
# address family is unknown. We don't want the whole server
# to shut down because of this.
self.log_info ('warning: server accept() threw an exception',
'warning')
return
except TypeError:
# unpack non-sequence. this can happen when a read event
# fires on a listening socket, but when we call accept()
# we get EWOULDBLOCK, so dispatcher.accept() returns None.
# Seen on FreeBSD3.
self.log_info ('warning: server accept() threw EWOULDBLOCK',
'warning')
return
self.channel_class(self, conn, addr)
def addTask(self, task):
tasks = self.tasks
if tasks is not None:
self.tasks.addTask(task)
else:
task.service()
first_line_re = re.compile (
'([^ ]+) (?:[^ :?#]+://[^ ?#/]*)?([^ ]+)(( HTTP/([0-9.]+))$|$)')
def crack_first_line (r):
m = first_line_re.match (r)
if m is not None and m.end() == len(r):
if m.group(3):
version = m.group(5)
else:
version = None
return m.group(1).upper(), m.group(2), version
else:
return None, None, None
def get_header_lines(header):
"""
Splits the header into lines, putting multi-line headers together.
"""
r = []
lines = header.split('\r\n')
for line in lines:
if line and line[0] in ' \t':
r[-1] = r[-1] + line[1:]
else:
r.append(line)
return r
if __name__ == '__main__':
from TaskThreads import ThreadedTaskDispatcher
tasks = ThreadedTaskDispatcher()
tasks.setThreadCount(4)
http_server('', 8080, tasks=tasks)
try:
asyncore.loop()
except KeyboardInterrupt:
print 'shutting down...'
tasks.shutdown()
=== Added File Zope3/lib/python/Zope/Server/PublisherServers.py ===
from HTTPServer2 import http_task, http_channel, http_server
from Zope.Publisher.Publish import publish
from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
from Zope.Publisher.HTTP.HTTPResponse import HTTPResponse
class PublisherHTTPTask (http_task):
def execute(self):
server = self.channel.server
resp = HTTPResponse(server.response_payload, self, self)
req = HTTPRequest(server.request_payload, self.instream,
self.request_headers, resp)
publish(req)
class PublisherHTTPChannel (http_channel):
task_class = PublisherHTTPTask
class PublisherHTTPServer (http_server):
channel_class = PublisherHTTPChannel
def __init__(self, request_payload, response_payload,
ip, port, backlog=5, tasks=None):
self.request_payload = request_payload
self.response_payload = response_payload
http_server.__init__(self, ip, port, backlog, tasks)
if __name__ == '__main__':
from Zope.Publisher.HTTP.BrowserPayload import BrowserRequestPayload, \
BrowserResponsePayload
from Zope.Publisher.DefaultPublication import DefaultPublication
class c:
" "
def __call__(self, URL):
return 'You invoked URL %s just now.\n' % URL
ob = c()
ob.x = c()
ob.x.y = c()
pub = DefaultPublication(ob)
request_payload = BrowserRequestPayload(pub)
response_payload = BrowserResponsePayload()
from TaskThreads import ThreadedTaskDispatcher
tasks = ThreadedTaskDispatcher()
tasks.setThreadCount(4)
PublisherHTTPServer(request_payload, response_payload,
'', 8080, tasks=tasks)
try:
import asyncore
asyncore.loop()
except KeyboardInterrupt:
print 'shutting down...'
tasks.shutdown()
=== Added File Zope3/lib/python/Zope/Server/TaskThreads.py ===
from Queue import Queue, Empty
from thread import allocate_lock, start_new_thread
class ITask: # Interface
def service():
"""
Services the task. Either service() or cancel() is called
for every task queued.
"""
def cancel():
"""
Called instead of service() during shutdown or if an
exception occurs that prevents the task from being
serviced. Must return quickly and should not throw exceptions.
"""
def defer():
"""
Called just before the task is queued to be executed in
a different thread.
"""
class ThreadedTaskDispatcher:
def __init__(self):
self.threads = {} # { thread number -> 1 }
self.queue = Queue()
self.thread_mgmt_lock = allocate_lock()
def handlerThread(self, thread_no):
threads = self.threads
while threads.has_key(thread_no):
task = self.queue.get()
try:
task.service()
except:
# Log somewhere?
import traceback
traceback.print_exc()
def setThreadCount(self, count):
mlock = self.thread_mgmt_lock
mlock.acquire()
try:
threads = self.threads
thread_no = 0
while (len(threads) < count):
while threads.has_key(thread_no):
thread_no = thread_no + 1
threads[thread_no] = 1
start_new_thread(self.handlerThread, (thread_no,))
thread_no = thread_no + 1
while (len(threads) > count):
if count == 0:
threads.clear()
else:
thread_no = threads.keys()[0]
del threads[thread_no]
finally:
mlock.release()
def addTask(self, task):
try:
task.defer()
self.queue.put_nowait(task)
except:
task.cancel()
raise
def shutdown(self, cancel_pending=1):
self.setThreadCount(0)
if cancel_pending:
try:
while 1:
task = self.queue.get_nowait()
task.cancel()
except Empty:
pass
def hasTasks(self):
# Inherently non-thread-safe.
return not self.queue.empty()
=== Added File Zope3/lib/python/Zope/Server/dual_mode_channel.py === (424/524 lines abridged)
import asyncore
import socket
import sys
import time
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from medusa.thread.select_trigger import trigger
pull_trigger = trigger().pull_trigger
class dual_mode_channel (asyncore.dispatcher):
"""
The channel switches between asynchronous and synchronous mode.
"""
# recv_bytes is the argument to pass to socket.recv().
recv_bytes = 8192
# outbuf_maxsize specifies maximum outbuf is allowed to hold
# before the application starts blocking on output.
# Raising outbuf_maxsize will improve throughput if you have
# files larger than outbuf_maxsize being transferred over
# more concurrent, slow connections than your worker thread count.
# Expect maximum RAM consumption by outbufs to be
# at most (number_of_concurrent_connections * outbuf_maxsize),
# but if you're using ZODB and everyone is downloading the
# same file then the normal RAM consumption is only a little more
# than (number of ZODB threads * outbuf_maxsize) because of
# ConservingStringBuffer. Also, if ZODB is changed to
# share strings among threads, normal RAM consumption by outbufs
# will decrease significantly.
outbuf_maxsize = 4200000 # About 4 MB
# Create a tempfile if the input data gets larger than inbuf_overflow.
inbuf_overflow = 525000 # About 0.5 MB
overflowed = 0
# will_close is set to 1 to close the socket.
will_close = 0
async_mode = 1
[-=- -=- -=- 424 lines omitted -=- -=- -=-]
Adds bytes to the end of the buffer.
"""
self.data.append(s)
self.len = self.len + len(s)
def get_chunk(self, minbytes=4096, delete=0):
"""
Returns a string from the start of the buffer, preferring
at least (minsize) bytes, optionally deleting that part.
"""
data = self.data
if not data:
return ''
gotbytes = 0
for index in range(len(data)):
gotbytes = gotbytes + len(data[index])
if gotbytes >= minbytes:
break
res = ''.join(data[:index + 1])
if delete:
del data[:index + 1]
self.len = self.len - gotbytes
return res
def del_bytes(self, bytes):
"""
Deletes the given number of bytes from the start of the buffer.
"""
gotbytes = 0
data = self.data
for index in range(len(data)):
s = data[index]
slen = len(s)
gotbytes = gotbytes + slen
if gotbytes > bytes:
position = slen - (gotbytes - bytes)
del data[:index]
data[0] = s[position:]
self.len = self.len - bytes
return
elif gotbytes == bytes:
del data[:index + 1]
self.len = self.len - bytes
return
# Hmm, too many!
raise ValueError, (
"Can't delete %d bytes from buffer of %d bytes" %
(bytes, gotbytes))
=== Zope3/lib/python/Zope/Server/HTTPResponse.py 1.1.2.6 => 1.1.2.7 ===
outstream=self.outstream
- if not self._wrote:
+ if not self._wrote_headers:
l=self.headers.get('content-length', None)
if l is not None:
try:
@@ -158,7 +158,7 @@
self._streaming=1
outstream.write(str(self))
- self._wrote=1
+ self._wrote_headers=1
if not data: return
=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.6 => 1.1.2.7 ===
env['SERVER_NAME']=server.server_name
env['SERVER_SOFTWARE']=server.SERVER_IDENT
- env['SERVER_PROTOCOL']="HTTP/"+request.version
+ env['SERVER_PROTOCOL']="HTTP/%s" % request.version
env['channel.creation_time']=request.channel.creation_time
if self.uri_base=='/':
env['SCRIPT_NAME']=''