[Zope-Checkins] CVS: Zope/lib/python/ZServer/medusa/thread - __init__.py:1.1.2.1 pi_module.py:1.1.2.1 select_trigger.py:1.1.2.1 test_module.py:1.1.2.1 thread_channel.py:1.1.2.1 thread_handler.py:1.1.2.1
Chris McDonough
chrism@zope.com
Tue, 17 Sep 2002 01:16:11 -0400
Update of /cvs-repository/Zope/lib/python/ZServer/medusa/thread
In directory cvs.zope.org:/tmp/cvs-serv12650/lib/python/ZServer/medusa/thread
Added Files:
Tag: chrism-install-branch
__init__.py pi_module.py select_trigger.py test_module.py
thread_channel.py thread_handler.py
Log Message:
Moved ZServer into lib/python.
=== Added File Zope/lib/python/ZServer/medusa/thread/__init__.py ===
# make thread to appear as a package
=== Added File Zope/lib/python/ZServer/medusa/thread/pi_module.py ===
# -*- Mode: Python; tab-width: 4 -*-
# [reworking of the version in Python-1.5.1/Demo/scripts/pi.py]
# Print digits of pi forever.
#
# The algorithm, using Python's 'long' integers ("bignums"), works
# with continued fractions, and was conceived by Lambert Meertens.
#
# See also the ABC Programmer's Handbook, by Geurts, Meertens & Pemberton,
# published by Prentice-Hall (UK) Ltd., 1990.
import string
StopException = "Stop!"
def go (file):
try:
k, a, b, a1, b1 = 2L, 4L, 1L, 12L, 4L
while 1:
# Next approximation
p, q, k = k*k, 2L*k+1L, k+1L
a, b, a1, b1 = a1, b1, p*a+q*a1, p*b+q*b1
# Print common digits
d, d1 = a/b, a1/b1
while d == d1:
if file.write (str(int(d))):
raise StopException
a, a1 = 10L*(a%b), 10L*(a1%b1)
d, d1 = a/b, a1/b1
except StopException:
return
class line_writer:
"partition the endless line into 80-character ones"
def __init__ (self, file, digit_limit=10000):
self.file = file
self.buffer = ''
self.count = 0
self.digit_limit = digit_limit
def write (self, data):
self.buffer = self.buffer + data
if len(self.buffer) > 80:
line, self.buffer = self.buffer[:80], self.buffer[80:]
self.file.write (line+'\r\n')
self.count = self.count + 80
if self.count > self.digit_limit:
return 1
else:
return 0
def main (env, stdin, stdout):
parts = string.split (env['REQUEST_URI'], '/')
if len(parts) >= 3:
ndigits = string.atoi (parts[2])
else:
ndigits = 5000
stdout.write ('Content-Type: text/plain\r\n\r\n')
go (line_writer (stdout, ndigits))
=== Added File Zope/lib/python/ZServer/medusa/thread/select_trigger.py ===
# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = "$Id: select_trigger.py,v 1.1.2.1 2002/09/17 05:16:10 chrism Exp $"
import asyncore
import asynchat
import os
import socket
import string
import thread
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
r, w = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
os.write (self.trigger, 'x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
else:
# win32-safe version
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
host='127.0.0.1'
port=19999
while 1:
try:
self.address=(host, port)
a.bind(self.address)
break
except:
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
a.listen (1)
w.setblocking (0)
try:
w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking (1)
self.trigger = w
asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
self.trigger.send ('x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
the_trigger = None
class trigger_file:
"A 'triggered' file object"
buffer_size = 4096
def __init__ (self, parent):
global the_trigger
if the_trigger is None:
the_trigger = trigger()
self.parent = parent
self.buffer = ''
def write (self, data):
self.buffer = self.buffer + data
if len(self.buffer) > self.buffer_size:
d, self.buffer = self.buffer, ''
the_trigger.pull_trigger (
lambda d=d,p=self.parent: p.push (d)
)
def writeline (self, line):
self.write (line+'\r\n')
def writelines (self, lines):
self.write (
string.joinfields (
lines,
'\r\n'
) + '\r\n'
)
def flush (self):
if self.buffer:
d, self.buffer = self.buffer, ''
the_trigger.pull_trigger (
lambda p=self.parent,d=d: p.push (d)
)
def softspace (self, *args):
pass
def close (self):
# in a derived class, you may want to call trigger_close() instead.
self.flush()
self.parent = None
def trigger_close (self):
d, self.buffer = self.buffer, ''
p, self.parent = self.parent, None
the_trigger.pull_trigger (
lambda p=p,d=d: (p.push(d), p.close_when_done())
)
if __name__ == '__main__':
import time
def thread_function (output_file, i, n):
print 'entering thread_function'
while n:
time.sleep (5)
output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
output_file.flush()
n = n - 1
output_file.close()
print 'exiting thread_function'
class thread_parent (asynchat.async_chat):
def __init__ (self, conn, addr):
self.addr = addr
asynchat.async_chat.__init__ (self, conn)
self.set_terminator ('\r\n')
self.buffer = ''
self.count = 0
def collect_incoming_data (self, data):
self.buffer = self.buffer + data
def found_terminator (self):
data, self.buffer = self.buffer, ''
if not data:
asyncore.close_all()
print "done"
return
n = string.atoi (string.split (data)[0])
tf = trigger_file (self)
self.count = self.count + 1
thread.start_new_thread (thread_function, (tf, self.count, n))
class thread_server (asyncore.dispatcher):
def __init__ (self, family=socket.AF_INET, address=('', 9003)):
asyncore.dispatcher.__init__ (self)
self.create_socket (family, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind (address)
self.listen (5)
def handle_accept (self):
conn, addr = self.accept()
tp = thread_parent (conn, addr)
thread_server()
#asyncore.loop(1.0, use_poll=1)
try:
asyncore.loop ()
except:
asyncore.close_all()
=== Added File Zope/lib/python/ZServer/medusa/thread/test_module.py ===
# -*- Mode: Python; tab-width: 4 -*-
import pprint
def main (env, stdin, stdout):
stdout.write (
'<html><body><h1>Test CGI Module</h1>\r\n'
'<br>The Environment:<pre>\r\n'
)
pprint.pprint (env, stdout)
stdout.write ('</pre></body></html>\r\n')
=== Added File Zope/lib/python/ZServer/medusa/thread/thread_channel.py ===
# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = "$Id: thread_channel.py,v 1.1.2.1 2002/09/17 05:16:10 chrism Exp $"
# This will probably only work on Unix.
# The disadvantage to this technique is that it wastes file
# descriptors (especially when compared to select_trigger.py)
# May be possible to do it on Win32, using TCP localhost sockets.
# [does winsock support 'socketpair'?]
import asyncore
import asynchat
import fcntl
import os
import socket
import string
import thread
try:
from fcntl import F_GETFL, F_SETFL, O_NDELAY
except ImportError:
from FCNTL import F_GETFL, F_SETFL, O_NDELAY
# this channel slaves off of another one. it starts a thread which
# pumps its output through the 'write' side of the pipe. The 'read'
# side of the pipe will then notify us when data is ready. We push
# this data on the owning data channel's output queue.
class thread_channel (asyncore.file_dispatcher):
buffer_size = 8192
def __init__ (self, channel, function, *args):
self.parent = channel
self.function = function
self.args = args
self.pipe = rfd, wfd = os.pipe()
asyncore.file_dispatcher.__init__ (self, rfd)
def start (self):
rfd, wfd = self.pipe
# The read side of the pipe is set to non-blocking I/O; it is
# 'owned' by medusa.
flags = fcntl.fcntl (rfd, F_GETFL, 0)
fcntl.fcntl (rfd, F_SETFL, flags | O_NDELAY)
# The write side of the pipe is left in blocking mode; it is
# 'owned' by the thread. However, we wrap it up as a file object.
# [who wants to 'write()' to a number?]
of = os.fdopen (wfd, 'w')
thread.start_new_thread (
self.function,
# put the output file in front of the other arguments
(of,) + self.args
)
def writable (self):
return 0
def readable (self):
return 1
def handle_read (self):
data = self.recv (self.buffer_size)
self.parent.push (data)
def handle_close (self):
# Depending on your intentions, you may want to close
# the parent channel here.
self.close()
# Yeah, it's bad when the test code is bigger than the library code.
if __name__ == '__main__':
import time
def thread_function (output_file, i, n):
print 'entering thread_function'
while n:
time.sleep (5)
output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
output_file.flush()
n = n - 1
output_file.close()
print 'exiting thread_function'
class thread_parent (asynchat.async_chat):
def __init__ (self, conn, addr):
self.addr = addr
asynchat.async_chat.__init__ (self, conn)
self.set_terminator ('\r\n')
self.buffer = ''
self.count = 0
def collect_incoming_data (self, data):
self.buffer = self.buffer + data
def found_terminator (self):
data, self.buffer = self.buffer, ''
n = string.atoi (string.split (data)[0])
tc = thread_channel (self, thread_function, self.count, n)
self.count = self.count + 1
tc.start()
class thread_server (asyncore.dispatcher):
def __init__ (self, family=socket.AF_INET, address=('127.0.0.1', 9003)):
asyncore.dispatcher.__init__ (self)
self.create_socket (family, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind (address)
self.listen (5)
def handle_accept (self):
conn, addr = self.accept()
tp = thread_parent (conn, addr)
thread_server()
#asyncore.loop(1.0, use_poll=1)
asyncore.loop ()
=== Added File Zope/lib/python/ZServer/medusa/thread/thread_handler.py ===
# -*- Mode: Python; tab-width: 4 -*-
import re
import string
import StringIO
import sys
import os
import sys
import time
import counter
import select_trigger
import producers
from default_handler import split_path, unquote, get_header
import fifo
import threading
class request_queue:
def __init__ (self):
self.mon = threading.RLock()
self.cv = threading.Condition (self.mon)
self.queue = fifo.fifo()
def put (self, item):
self.cv.acquire()
self.queue.push (item)
self.cv.notify()
self.cv.release()
def get(self):
self.cv.acquire()
while not self.queue:
self.cv.wait()
result = self.queue.pop()
self.cv.release()
return result
header2env= {
'Content-Length' : 'CONTENT_LENGTH',
'Content-Type' : 'CONTENT_TYPE',
'Referer' : 'HTTP_REFERER',
'User-Agent' : 'HTTP_USER_AGENT',
'Accept' : 'HTTP_ACCEPT',
'Accept-Charset' : 'HTTP_ACCEPT_CHARSET',
'Accept-Language' : 'HTTP_ACCEPT_LANGUAGE',
'Host' : 'HTTP_HOST',
'Connection' : 'CONNECTION_TYPE',
'Authorization' : 'HTTP_AUTHORIZATION',
'Cookie' : 'HTTP_COOKIE',
}
# convert keys to lower case for case-insensitive matching
for (key,value) in header2env.items():
del header2env[key]
key=string.lower(key)
header2env[key]=value
class thread_output_file (select_trigger.trigger_file):
def close (self):
self.trigger_close()
class script_handler:
def __init__ (self, queue, document_root=""):
self.modules = {}
self.document_root = document_root
self.queue = queue
def add_module (self, module, *names):
if not names:
names = ["/%s" % module.__name__]
for name in names:
self.modules['/'+name] = module
def match (self, request):
uri = request.uri
i = string.find(uri, "/", 1)
if i != -1:
uri = uri[:i]
i = string.find(uri, "?", 1)
if i != -1:
uri = uri[:i]
if self.modules.has_key (uri):
request.module = self.modules[uri]
return 1
else:
return 0
def handle_request (self, request):
[path, params, query, fragment] = split_path (request.uri)
while path and path[0] == '/':
path = path[1:]
if '%' in path:
path = unquote (path)
env = {}
env['REQUEST_URI'] = "/" + path
env['REQUEST_METHOD'] = string.upper(request.command)
env['SERVER_PORT'] = str(request.channel.server.port)
env['SERVER_NAME'] = request.channel.server.server_name
env['SERVER_SOFTWARE'] = request['Server']
env['DOCUMENT_ROOT'] = self.document_root
parts = string.split(path, "/")
# are script_name and path_info ok?
env['SCRIPT_NAME'] = "/" + parts[0]
if query and query[0] == "?":
query = query[1:]
env['QUERY_STRING'] = query
try:
path_info = "/" + string.join(parts[1:], "/")
except:
path_info = ''
env['PATH_INFO'] = path_info
env['GATEWAY_INTERFACE']='CGI/1.1' # what should this really be?
env['REMOTE_ADDR'] =request.channel.addr[0]
env['REMOTE_HOST'] =request.channel.addr[0] # TODO: connect to resolver
for header in request.header:
[key,value]=string.split(header,": ",1)
key=string.lower(key)
if header2env.has_key(key):
if header2env[key]:
env[header2env[key]]=value
else:
key = 'HTTP_' + string.upper(
string.join(
string.split (key,"-"),
"_"
)
)
env[key]=value
## remove empty environment variables
for key in env.keys():
if env[key]=="" or env[key]==None:
del env[key]
try:
httphost = env['HTTP_HOST']
parts = string.split(httphost,":")
env['HTTP_HOST'] = parts[0]
except KeyError:
pass
if request.command in ('put', 'post'):
# PUT data requires a correct Content-Length: header
# (though I bet with http/1.1 we can expect chunked encoding)
request.collector = collector (self, request, env)
request.channel.set_terminator (None)
else:
sin = StringIO.StringIO ('')
self.continue_request (sin, request, env)
def continue_request (self, stdin, request, env):
stdout = header_scanning_file (
request,
thread_output_file (request.channel)
)
self.queue.put (
(request.module.main, (env, stdin, stdout))
)
HEADER_LINE = re.compile ('([A-Za-z0-9-]+): ([^\r\n]+)')
# A file wrapper that handles the CGI 'Status:' header hack
# by scanning the output.
class header_scanning_file:
def __init__ (self, request, file):
self.buffer = ''
self.request = request
self.file = file
self.got_header = 0
self.bytes_out = counter.counter()
def write (self, data):
if self.got_header:
self._write (data)
else:
# CGI scripts may optionally provide extra headers.
#
# If they do not, then the output is assumed to be
# text/html, with an HTTP reply code of '200 OK'.
#
# If they do, we need to scan those headers for one in
# particular: the 'Status:' header, which will tell us
# to use a different HTTP reply code [like '302 Moved']
#
self.buffer = self.buffer + data
lines = string.split (self.buffer, '\n')
# ignore the last piece, it is either empty, or a partial line
lines = lines[:-1]
# look for something un-header-like
for i in range(len(lines)):
li = lines[i]
if (not li) or (HEADER_LINE.match (li) is None):
# this is either the header separator, or it
# is not a header line.
self.got_header = 1
h = self.build_header (lines[:i])
self._write (h)
# rejoin the rest of the data
d = string.join (lines[i:], '\n')
self._write (d)
self.buffer = ''
break
def build_header (self, lines):
status = '200 OK'
saw_content_type = 0
hl = HEADER_LINE
for line in lines:
mo = hl.match (line)
if mo is not None:
h = string.lower (mo.group(1))
if h == 'status':
status = mo.group(2)
elif h == 'content-type':
saw_content_type = 1
lines.insert (0, 'HTTP/1.0 %s' % status)
lines.append ('Server: ' + self.request['Server'])
lines.append ('Date: ' + self.request['Date'])
if not saw_content_type:
lines.append ('Content-Type: text/html')
lines.append ('Connection: close')
return string.join (lines, '\r\n')+'\r\n\r\n'
def _write (self, data):
self.bytes_out.increment (len(data))
self.file.write (data)
def writelines(self, list):
self.write (string.join (list, ''))
def flush(self):
pass
def close (self):
if not self.got_header:
# managed to slip through our header detectors
self._write (self.build_header (['Status: 502', 'Content-Type: text/html']))
self._write (
'<html><h1>Server Error</h1>\r\n'
'<b>Bad Gateway:</b> No Header from CGI Script\r\n'
'<pre>Data: %s</pre>'
'</html>\r\n' % (repr(self.buffer))
)
self.request.log (int(self.bytes_out.as_long()))
self.file.close()
self.request.channel.current_request = None
class collector:
"gathers input for PUT requests"
def __init__ (self, handler, request, env):
self.handler = handler
self.env = env
self.request = request
self.data = StringIO.StringIO()
# make sure there's a content-length header
self.cl = request.get_header ('content-length')
if not self.cl:
request.error (411)
return
else:
self.cl = string.atoi(self.cl)
def collect_incoming_data (self, data):
self.data.write (data)
if self.data.tell() >= self.cl:
self.data.seek(0)
h=self.handler
r=self.request
# set the terminator back to the default
self.request.channel.set_terminator ('\r\n\r\n')
del self.handler
del self.request
h.continue_request (self.data, r, self.env)
class request_loop_thread (threading.Thread):
def __init__ (self, queue):
threading.Thread.__init__ (self)
self.setDaemon(1)
self.queue = queue
def run (self):
while 1:
function, (env, stdin, stdout) = self.queue.get()
function (env, stdin, stdout)
stdout.close()
# ===========================================================================
# Testing
# ===========================================================================
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print 'Usage: %s <worker_threads>' % sys.argv[0]
else:
nthreads = string.atoi (sys.argv[1])
import asyncore
import http_server
# create a generic web server
hs = http_server.http_server ('', 7080)
# create a request queue
q = request_queue()
# create a script handler
sh = script_handler (q)
# install the script handler on the web server
hs.install_handler (sh)
# get a couple of CGI modules
import test_module
import pi_module
# install the module on the script handler
sh.add_module (test_module, 'test')
sh.add_module (pi_module, 'pi')
# fire up the worker threads
for i in range (nthreads):
rt = request_loop_thread (q)
rt.start()
# start the main event loop
asyncore.loop()