[Zope-Checkins] CVS: Zope2 - pi_module.py:1.3 select_trigger.py:1.3 test_module.py:1.3 thread_channel.py:1.3 thread_handler.py:1.3
andreas@serenade.digicool.com
andreas@serenade.digicool.com
Tue, 1 May 2001 07:45:28 -0400
Update of /cvs-repository/Zope2/ZServer/medusa/thread
In directory serenade.digicool.com:/tmp/cvs-serv12359/thread
Modified Files:
pi_module.py select_trigger.py test_module.py
thread_channel.py thread_handler.py
Log Message:
we *hate* tabs - lets get rid of them
--- Updated File pi_module.py in package Zope2 --
--- pi_module.py 2001/04/25 19:09:56 1.2
+++ pi_module.py 2001/05/01 11:45:27 1.3
@@ -15,48 +15,48 @@
StopException = "Stop!"
def go (file):
- try:
- k, a, b, a1, b1 = 2L, 4L, 1L, 12L, 4L
- while 1:
- # Next approximation
- p, q, k = k*k, 2L*k+1L, k+1L
- a, b, a1, b1 = a1, b1, p*a+q*a1, p*b+q*b1
- # Print common digits
- d, d1 = a/b, a1/b1
- while d == d1:
- if file.write (str(int(d))):
- raise StopException
- a, a1 = 10L*(a%b), 10L*(a1%b1)
- d, d1 = a/b, a1/b1
- except StopException:
- return
-
+ try:
+ k, a, b, a1, b1 = 2L, 4L, 1L, 12L, 4L
+ while 1:
+ # Next approximation
+ p, q, k = k*k, 2L*k+1L, k+1L
+ a, b, a1, b1 = a1, b1, p*a+q*a1, p*b+q*b1
+ # Print common digits
+ d, d1 = a/b, a1/b1
+ while d == d1:
+ if file.write (str(int(d))):
+ raise StopException
+ a, a1 = 10L*(a%b), 10L*(a1%b1)
+ d, d1 = a/b, a1/b1
+ except StopException:
+ return
+
class line_writer:
-
- "partition the endless line into 80-character ones"
-
- def __init__ (self, file, digit_limit=10000):
- self.file = file
- self.buffer = ''
- self.count = 0
- self.digit_limit = digit_limit
-
- def write (self, data):
- self.buffer = self.buffer + data
- if len(self.buffer) > 80:
- line, self.buffer = self.buffer[:80], self.buffer[80:]
- self.file.write (line+'\r\n')
- self.count = self.count + 80
- if self.count > self.digit_limit:
- return 1
- else:
- return 0
+ "partition the endless line into 80-character ones"
+
+ def __init__ (self, file, digit_limit=10000):
+ self.file = file
+ self.buffer = ''
+ self.count = 0
+ self.digit_limit = digit_limit
+
+ def write (self, data):
+ self.buffer = self.buffer + data
+ if len(self.buffer) > 80:
+ line, self.buffer = self.buffer[:80], self.buffer[80:]
+ self.file.write (line+'\r\n')
+ self.count = self.count + 80
+ if self.count > self.digit_limit:
+ return 1
+ else:
+ return 0
+
def main (env, stdin, stdout):
- parts = string.split (env['REQUEST_URI'], '/')
- if len(parts) >= 3:
- ndigits = string.atoi (parts[2])
- else:
- ndigits = 5000
- stdout.write ('Content-Type: text/plain\r\n\r\n')
- go (line_writer (stdout, ndigits))
+ parts = string.split (env['REQUEST_URI'], '/')
+ if len(parts) >= 3:
+ ndigits = string.atoi (parts[2])
+ else:
+ ndigits = 5000
+ stdout.write ('Content-Type: text/plain\r\n\r\n')
+ go (line_writer (stdout, ndigits))
--- Updated File select_trigger.py in package Zope2 --
--- select_trigger.py 2001/04/25 19:09:56 1.2
+++ select_trigger.py 2001/05/01 11:45:27 1.3
@@ -9,259 +9,259 @@
import socket
import string
import thread
-
-if os.name == 'posix':
-
- class trigger (asyncore.file_dispatcher):
-
- "Wake up a call to select() running in the main thread"
-
- # This is useful in a context where you are using Medusa's I/O
- # subsystem to deliver data, but the data is generated by another
- # thread. Normally, if Medusa is in the middle of a call to
- # select(), new output data generated by another thread will have
- # to sit until the call to select() either times out or returns.
- # If the trigger is 'pulled' by another thread, it should immediately
- # generate a READ event on the trigger object, which will force the
- # select() invocation to return.
-
- # A common use for this facility: letting Medusa manage I/O for a
- # large number of connections; but routing each request through a
- # thread chosen from a fixed-size thread pool. When a thread is
- # acquired, a transaction is performed, but output data is
- # accumulated into buffers that will be emptied more efficiently
- # by Medusa. [picture a server that can process database queries
- # rapidly, but doesn't want to tie up threads waiting to send data
- # to low-bandwidth connections]
-
- # The other major feature provided by this class is the ability to
- # move work back into the main thread: if you call pull_trigger()
- # with a thunk argument, when select() wakes up and receives the
- # event it will call your thunk from within that thread. The main
- # purpose of this is to remove the need to wrap thread locks around
- # Medusa's data structures, which normally do not need them. [To see
- # why this is true, imagine this scenario: A thread tries to push some
- # new data onto a channel's outgoing data queue at the same time that
- # the main thread is trying to remove some]
-
- def __init__ (self):
- r, w = os.pipe()
- self.trigger = w
- asyncore.file_dispatcher.__init__ (self, r)
- self.lock = thread.allocate_lock()
- self.thunks = []
-
- def __repr__ (self):
- return '<select-trigger (pipe) at %x>' % id(self)
-
- def readable (self):
- return 1
-
- def writable (self):
- return 0
- def handle_connect (self):
- pass
-
- def pull_trigger (self, thunk=None):
- # print 'PULL_TRIGGER: ', len(self.thunks)
- if thunk:
- try:
- self.lock.acquire()
- self.thunks.append (thunk)
- finally:
- self.lock.release()
- os.write (self.trigger, 'x')
-
- def handle_read (self):
- self.recv (8192)
- try:
- self.lock.acquire()
- for thunk in self.thunks:
- try:
- thunk()
- except:
- (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
- print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
- self.thunks = []
- finally:
- self.lock.release()
+if os.name == 'posix':
+ class trigger (asyncore.file_dispatcher):
+
+ "Wake up a call to select() running in the main thread"
+
+ # This is useful in a context where you are using Medusa's I/O
+ # subsystem to deliver data, but the data is generated by another
+ # thread. Normally, if Medusa is in the middle of a call to
+ # select(), new output data generated by another thread will have
+ # to sit until the call to select() either times out or returns.
+ # If the trigger is 'pulled' by another thread, it should immediately
+ # generate a READ event on the trigger object, which will force the
+ # select() invocation to return.
+
+ # A common use for this facility: letting Medusa manage I/O for a
+ # large number of connections; but routing each request through a
+ # thread chosen from a fixed-size thread pool. When a thread is
+ # acquired, a transaction is performed, but output data is
+ # accumulated into buffers that will be emptied more efficiently
+ # by Medusa. [picture a server that can process database queries
+ # rapidly, but doesn't want to tie up threads waiting to send data
+ # to low-bandwidth connections]
+
+ # The other major feature provided by this class is the ability to
+ # move work back into the main thread: if you call pull_trigger()
+ # with a thunk argument, when select() wakes up and receives the
+ # event it will call your thunk from within that thread. The main
+ # purpose of this is to remove the need to wrap thread locks around
+ # Medusa's data structures, which normally do not need them. [To see
+ # why this is true, imagine this scenario: A thread tries to push some
+ # new data onto a channel's outgoing data queue at the same time that
+ # the main thread is trying to remove some]
+
+ def __init__ (self):
+ r, w = os.pipe()
+ self.trigger = w
+ asyncore.file_dispatcher.__init__ (self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+
+ def __repr__ (self):
+ return '<select-trigger (pipe) at %x>' % id(self)
+
+ def readable (self):
+ return 1
+
+ def writable (self):
+ return 0
+
+ def handle_connect (self):
+ pass
+
+ def pull_trigger (self, thunk=None):
+ # print 'PULL_TRIGGER: ', len(self.thunks)
+ if thunk:
+ try:
+ self.lock.acquire()
+ self.thunks.append (thunk)
+ finally:
+ self.lock.release()
+ os.write (self.trigger, 'x')
+
+ def handle_read (self):
+ self.recv (8192)
+ try:
+ self.lock.acquire()
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+ print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
else:
-
- # win32-safe version
-
- class trigger (asyncore.dispatcher):
-
- address = ('127.9.9.9', 19999)
-
- def __init__ (self):
- a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
- w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
-
- # tricky: get a pair of connected sockets
- a.bind (self.address)
- a.listen (1)
- w.setblocking (0)
- try:
- w.connect (self.address)
- except:
- pass
- r, addr = a.accept()
- a.close()
- w.setblocking (1)
- self.trigger = w
-
- asyncore.dispatcher.__init__ (self, r)
- self.lock = thread.allocate_lock()
- self.thunks = []
- self._trigger_connected = 0
-
- def __repr__ (self):
- return '<select-trigger (loopback) at %x>' % id(self)
- def readable (self):
- return 1
+ # win32-safe version
- def writable (self):
- return 0
-
- def handle_connect (self):
- pass
-
- def pull_trigger (self, thunk=None):
- if thunk:
- try:
- self.lock.acquire()
- self.thunks.append (thunk)
- finally:
- self.lock.release()
- self.trigger.send ('x')
-
- def handle_read (self):
- self.recv (8192)
- try:
- self.lock.acquire()
- for thunk in self.thunks:
- try:
- thunk()
- except:
- (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
- print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
- self.thunks = []
- finally:
- self.lock.release()
-
-
+ class trigger (asyncore.dispatcher):
+
+ address = ('127.9.9.9', 19999)
+
+ def __init__ (self):
+ a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+
+ # tricky: get a pair of connected sockets
+ a.bind (self.address)
+ a.listen (1)
+ w.setblocking (0)
+ try:
+ w.connect (self.address)
+ except:
+ pass
+ r, addr = a.accept()
+ a.close()
+ w.setblocking (1)
+ self.trigger = w
+
+ asyncore.dispatcher.__init__ (self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._trigger_connected = 0
+
+ def __repr__ (self):
+ return '<select-trigger (loopback) at %x>' % id(self)
+
+ def readable (self):
+ return 1
+
+ def writable (self):
+ return 0
+
+ def handle_connect (self):
+ pass
+
+ def pull_trigger (self, thunk=None):
+ if thunk:
+ try:
+ self.lock.acquire()
+ self.thunks.append (thunk)
+ finally:
+ self.lock.release()
+ self.trigger.send ('x')
+
+ def handle_read (self):
+ self.recv (8192)
+ try:
+ self.lock.acquire()
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+ print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+
the_trigger = None
class trigger_file:
-
- "A 'triggered' file object"
-
- buffer_size = 4096
-
- def __init__ (self, parent):
- global the_trigger
- if the_trigger is None:
- the_trigger = trigger()
- self.parent = parent
- self.buffer = ''
-
- def write (self, data):
- self.buffer = self.buffer + data
- if len(self.buffer) > self.buffer_size:
- d, self.buffer = self.buffer, ''
- the_trigger.pull_trigger (
- lambda d=d,p=self.parent: p.push (d)
- )
-
- def writeline (self, line):
- self.write (line+'\r\n')
-
- def writelines (self, lines):
- self.write (
- string.joinfields (
- lines,
- '\r\n'
- ) + '\r\n'
- )
-
- def flush (self):
- if self.buffer:
- d, self.buffer = self.buffer, ''
- the_trigger.pull_trigger (
- lambda p=self.parent,d=d: p.push (d)
- )
- def softspace (self, *args):
- pass
-
- def close (self):
- # in a derived class, you may want to call trigger_close() instead.
- self.flush()
- self.parent = None
-
- def trigger_close (self):
- d, self.buffer = self.buffer, ''
- p, self.parent = self.parent, None
- the_trigger.pull_trigger (
- lambda p=p,d=d: (p.push(d), p.close_when_done())
- )
-
+ "A 'triggered' file object"
+
+ buffer_size = 4096
+
+ def __init__ (self, parent):
+ global the_trigger
+ if the_trigger is None:
+ the_trigger = trigger()
+ self.parent = parent
+ self.buffer = ''
+
+ def write (self, data):
+ self.buffer = self.buffer + data
+ if len(self.buffer) > self.buffer_size:
+ d, self.buffer = self.buffer, ''
+ the_trigger.pull_trigger (
+ lambda d=d,p=self.parent: p.push (d)
+ )
+
+ def writeline (self, line):
+ self.write (line+'\r\n')
+
+ def writelines (self, lines):
+ self.write (
+ string.joinfields (
+ lines,
+ '\r\n'
+ ) + '\r\n'
+ )
+
+ def flush (self):
+ if self.buffer:
+ d, self.buffer = self.buffer, ''
+ the_trigger.pull_trigger (
+ lambda p=self.parent,d=d: p.push (d)
+ )
+
+ def softspace (self, *args):
+ pass
+
+ def close (self):
+ # in a derived class, you may want to call trigger_close() instead.
+ self.flush()
+ self.parent = None
+
+ def trigger_close (self):
+ d, self.buffer = self.buffer, ''
+ p, self.parent = self.parent, None
+ the_trigger.pull_trigger (
+ lambda p=p,d=d: (p.push(d), p.close_when_done())
+ )
+
if __name__ == '__main__':
-
- import time
-
- def thread_function (output_file, i, n):
- print 'entering thread_function'
- while n:
- time.sleep (5)
- output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
- output_file.flush()
- n = n - 1
- output_file.close()
- print 'exiting thread_function'
-
- class thread_parent (asynchat.async_chat):
-
- def __init__ (self, conn, addr):
- self.addr = addr
- asynchat.async_chat.__init__ (self, conn)
- self.set_terminator ('\r\n')
- self.buffer = ''
- self.count = 0
-
- def collect_incoming_data (self, data):
- self.buffer = self.buffer + data
-
- def found_terminator (self):
- data, self.buffer = self.buffer, ''
- if not data:
- asyncore.close_all()
- print "done"
- return
- n = string.atoi (string.split (data)[0])
- tf = trigger_file (self)
- self.count = self.count + 1
- thread.start_new_thread (thread_function, (tf, self.count, n))
-
- class thread_server (asyncore.dispatcher):
-
- def __init__ (self, family=socket.AF_INET, address=('', 9003)):
- asyncore.dispatcher.__init__ (self)
- self.create_socket (family, socket.SOCK_STREAM)
- self.set_reuse_addr()
- self.bind (address)
- self.listen (5)
-
- def handle_accept (self):
- conn, addr = self.accept()
- tp = thread_parent (conn, addr)
- thread_server()
- #asyncore.loop(1.0, use_poll=1)
- try:
- asyncore.loop ()
- except:
- asyncore.close_all()
+ import time
+
+ def thread_function (output_file, i, n):
+ print 'entering thread_function'
+ while n:
+ time.sleep (5)
+ output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
+ output_file.flush()
+ n = n - 1
+ output_file.close()
+ print 'exiting thread_function'
+
+ class thread_parent (asynchat.async_chat):
+
+ def __init__ (self, conn, addr):
+ self.addr = addr
+ asynchat.async_chat.__init__ (self, conn)
+ self.set_terminator ('\r\n')
+ self.buffer = ''
+ self.count = 0
+
+ def collect_incoming_data (self, data):
+ self.buffer = self.buffer + data
+
+ def found_terminator (self):
+ data, self.buffer = self.buffer, ''
+ if not data:
+ asyncore.close_all()
+ print "done"
+ return
+ n = string.atoi (string.split (data)[0])
+ tf = trigger_file (self)
+ self.count = self.count + 1
+ thread.start_new_thread (thread_function, (tf, self.count, n))
+
+ class thread_server (asyncore.dispatcher):
+
+ def __init__ (self, family=socket.AF_INET, address=('', 9003)):
+ asyncore.dispatcher.__init__ (self)
+ self.create_socket (family, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind (address)
+ self.listen (5)
+
+ def handle_accept (self):
+ conn, addr = self.accept()
+ tp = thread_parent (conn, addr)
+
+ thread_server()
+ #asyncore.loop(1.0, use_poll=1)
+ try:
+ asyncore.loop ()
+ except:
+ asyncore.close_all()
--- Updated File test_module.py in package Zope2 --
--- test_module.py 2001/04/25 19:09:56 1.2
+++ test_module.py 2001/05/01 11:45:27 1.3
@@ -4,10 +4,10 @@
def main (env, stdin, stdout):
- stdout.write (
- '<html><body><h1>Test CGI Module</h1>\r\n'
- '<br>The Environment:<pre>\r\n'
- )
- pprint.pprint (env, stdout)
- stdout.write ('</pre></body></html>\r\n')
-
+ stdout.write (
+ '<html><body><h1>Test CGI Module</h1>\r\n'
+ '<br>The Environment:<pre>\r\n'
+ )
+ pprint.pprint (env, stdout)
+ stdout.write ('</pre></body></html>\r\n')
+
--- Updated File thread_channel.py in package Zope2 --
--- thread_channel.py 2001/04/25 19:09:56 1.2
+++ thread_channel.py 2001/05/01 11:45:27 1.3
@@ -27,99 +27,99 @@
class thread_channel (asyncore.file_dispatcher):
- buffer_size = 8192
-
- def __init__ (self, channel, function, *args):
- self.parent = channel
- self.function = function
- self.args = args
- self.pipe = rfd, wfd = os.pipe()
- asyncore.file_dispatcher.__init__ (self, rfd)
-
- def start (self):
- rfd, wfd = self.pipe
-
- # The read side of the pipe is set to non-blocking I/O; it is
- # 'owned' by medusa.
-
- flags = fcntl.fcntl (rfd, FCNTL.F_GETFL, 0)
- fcntl.fcntl (rfd, FCNTL.F_SETFL, flags | FCNTL.O_NDELAY)
-
- # The write side of the pipe is left in blocking mode; it is
- # 'owned' by the thread. However, we wrap it up as a file object.
- # [who wants to 'write()' to a number?]
-
- of = os.fdopen (wfd, 'w')
-
- thread.start_new_thread (
- self.function,
- # put the output file in front of the other arguments
- (of,) + self.args
- )
-
- def writable (self):
- return 0
-
- def readable (self):
- return 1
-
- def handle_read (self):
- data = self.recv (self.buffer_size)
- self.parent.push (data)
-
- def handle_close (self):
- # Depending on your intentions, you may want to close
- # the parent channel here.
- self.close()
-
-# Yeah, it's bad when the test code is bigger than the library code.
-
+ buffer_size = 8192
+
+ def __init__ (self, channel, function, *args):
+ self.parent = channel
+ self.function = function
+ self.args = args
+ self.pipe = rfd, wfd = os.pipe()
+ asyncore.file_dispatcher.__init__ (self, rfd)
+
+ def start (self):
+ rfd, wfd = self.pipe
+
+ # The read side of the pipe is set to non-blocking I/O; it is
+ # 'owned' by medusa.
+
+ flags = fcntl.fcntl (rfd, FCNTL.F_GETFL, 0)
+ fcntl.fcntl (rfd, FCNTL.F_SETFL, flags | FCNTL.O_NDELAY)
+
+ # The write side of the pipe is left in blocking mode; it is
+ # 'owned' by the thread. However, we wrap it up as a file object.
+ # [who wants to 'write()' to a number?]
+
+ of = os.fdopen (wfd, 'w')
+
+ thread.start_new_thread (
+ self.function,
+ # put the output file in front of the other arguments
+ (of,) + self.args
+ )
+
+ def writable (self):
+ return 0
+
+ def readable (self):
+ return 1
+
+ def handle_read (self):
+ data = self.recv (self.buffer_size)
+ self.parent.push (data)
+
+ def handle_close (self):
+ # Depending on your intentions, you may want to close
+ # the parent channel here.
+ self.close()
+
+ # Yeah, it's bad when the test code is bigger than the library code.
+
if __name__ == '__main__':
-
- import time
-
- def thread_function (output_file, i, n):
- print 'entering thread_function'
- while n:
- time.sleep (5)
- output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
- output_file.flush()
- n = n - 1
- output_file.close()
- print 'exiting thread_function'
-
- class thread_parent (asynchat.async_chat):
-
- def __init__ (self, conn, addr):
- self.addr = addr
- asynchat.async_chat.__init__ (self, conn)
- self.set_terminator ('\r\n')
- self.buffer = ''
- self.count = 0
-
- def collect_incoming_data (self, data):
- self.buffer = self.buffer + data
-
- def found_terminator (self):
- data, self.buffer = self.buffer, ''
- n = string.atoi (string.split (data)[0])
- tc = thread_channel (self, thread_function, self.count, n)
- self.count = self.count + 1
- tc.start()
-
- class thread_server (asyncore.dispatcher):
-
- def __init__ (self, family=socket.AF_INET, address=('127.0.0.1', 9003)):
- asyncore.dispatcher.__init__ (self)
- self.create_socket (family, socket.SOCK_STREAM)
- self.set_reuse_addr()
- self.bind (address)
- self.listen (5)
-
- def handle_accept (self):
- conn, addr = self.accept()
- tp = thread_parent (conn, addr)
- thread_server()
- #asyncore.loop(1.0, use_poll=1)
- asyncore.loop ()
+ import time
+
+ def thread_function (output_file, i, n):
+ print 'entering thread_function'
+ while n:
+ time.sleep (5)
+ output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
+ output_file.flush()
+ n = n - 1
+ output_file.close()
+ print 'exiting thread_function'
+
+ class thread_parent (asynchat.async_chat):
+
+ def __init__ (self, conn, addr):
+ self.addr = addr
+ asynchat.async_chat.__init__ (self, conn)
+ self.set_terminator ('\r\n')
+ self.buffer = ''
+ self.count = 0
+
+ def collect_incoming_data (self, data):
+ self.buffer = self.buffer + data
+
+ def found_terminator (self):
+ data, self.buffer = self.buffer, ''
+ n = string.atoi (string.split (data)[0])
+ tc = thread_channel (self, thread_function, self.count, n)
+ self.count = self.count + 1
+ tc.start()
+
+ class thread_server (asyncore.dispatcher):
+
+ def __init__ (self, family=socket.AF_INET, address=('127.0.0.1', 9003)):
+ asyncore.dispatcher.__init__ (self)
+ self.create_socket (family, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind (address)
+ self.listen (5)
+
+ def handle_accept (self):
+ conn, addr = self.accept()
+ tp = thread_parent (conn, addr)
+
+ thread_server()
+ #asyncore.loop(1.0, use_poll=1)
+ asyncore.loop ()
--- Updated File thread_handler.py in package Zope2 --
--- thread_handler.py 2001/04/25 19:09:56 1.2
+++ thread_handler.py 2001/05/01 11:45:27 1.3
@@ -21,344 +21,344 @@
class request_queue:
- def __init__ (self):
- self.mon = threading.RLock()
- self.cv = threading.Condition (self.mon)
- self.queue = fifo.fifo()
-
- def put (self, item):
- self.cv.acquire()
- self.queue.push (item)
- self.cv.notify()
- self.cv.release()
-
- def get(self):
- self.cv.acquire()
- while not self.queue:
- self.cv.wait()
- result = self.queue.pop()
- self.cv.release()
- return result
-
+ def __init__ (self):
+ self.mon = threading.RLock()
+ self.cv = threading.Condition (self.mon)
+ self.queue = fifo.fifo()
+
+ def put (self, item):
+ self.cv.acquire()
+ self.queue.push (item)
+ self.cv.notify()
+ self.cv.release()
+
+ def get(self):
+ self.cv.acquire()
+ while not self.queue:
+ self.cv.wait()
+ result = self.queue.pop()
+ self.cv.release()
+ return result
+
header2env= {
- 'Content-Length' : 'CONTENT_LENGTH',
- 'Content-Type' : 'CONTENT_TYPE',
- 'Referer' : 'HTTP_REFERER',
- 'User-Agent' : 'HTTP_USER_AGENT',
- 'Accept' : 'HTTP_ACCEPT',
- 'Accept-Charset' : 'HTTP_ACCEPT_CHARSET',
- 'Accept-Language' : 'HTTP_ACCEPT_LANGUAGE',
- 'Host' : 'HTTP_HOST',
- 'Connection' : 'CONNECTION_TYPE',
- 'Authorization' : 'HTTP_AUTHORIZATION',
- 'Cookie' : 'HTTP_COOKIE',
- }
+ 'Content-Length' : 'CONTENT_LENGTH',
+ 'Content-Type' : 'CONTENT_TYPE',
+ 'Referer' : 'HTTP_REFERER',
+ 'User-Agent' : 'HTTP_USER_AGENT',
+ 'Accept' : 'HTTP_ACCEPT',
+ 'Accept-Charset' : 'HTTP_ACCEPT_CHARSET',
+ 'Accept-Language' : 'HTTP_ACCEPT_LANGUAGE',
+ 'Host' : 'HTTP_HOST',
+ 'Connection' : 'CONNECTION_TYPE',
+ 'Authorization' : 'HTTP_AUTHORIZATION',
+ 'Cookie' : 'HTTP_COOKIE',
+ }
# convert keys to lower case for case-insensitive matching
for (key,value) in header2env.items():
- del header2env[key]
- key=string.lower(key)
- header2env[key]=value
-
+ del header2env[key]
+ key=string.lower(key)
+ header2env[key]=value
+
class thread_output_file (select_trigger.trigger_file):
- def close (self):
- self.trigger_close()
-
+ def close (self):
+ self.trigger_close()
+
class script_handler:
-
- def __init__ (self, queue, document_root=""):
- self.modules = {}
- self.document_root = document_root
- self.queue = queue
-
- def add_module (self, module, *names):
- if not names:
- names = ["/%s" % module.__name__]
- for name in names:
- self.modules['/'+name] = module
-
- def match (self, request):
- uri = request.uri
-
- i = string.find(uri, "/", 1)
- if i != -1:
- uri = uri[:i]
-
- i = string.find(uri, "?", 1)
- if i != -1:
- uri = uri[:i]
-
- if self.modules.has_key (uri):
- request.module = self.modules[uri]
- return 1
- else:
- return 0
-
- def handle_request (self, request):
-
- [path, params, query, fragment] = split_path (request.uri)
-
- while path and path[0] == '/':
- path = path[1:]
-
- if '%' in path:
- path = unquote (path)
-
- env = {}
-
- env['REQUEST_URI'] = "/" + path
- env['REQUEST_METHOD'] = string.upper(request.command)
- env['SERVER_PORT'] = str(request.channel.server.port)
- env['SERVER_NAME'] = request.channel.server.server_name
- env['SERVER_SOFTWARE'] = request['Server']
- env['DOCUMENT_ROOT'] = self.document_root
-
- parts = string.split(path, "/")
-
- # are script_name and path_info ok?
-
- env['SCRIPT_NAME'] = "/" + parts[0]
-
- if query and query[0] == "?":
- query = query[1:]
-
- env['QUERY_STRING'] = query
-
- try:
- path_info = "/" + string.join(parts[1:], "/")
- except:
- path_info = ''
-
- env['PATH_INFO'] = path_info
- env['GATEWAY_INTERFACE']='CGI/1.1' # what should this really be?
- env['REMOTE_ADDR'] =request.channel.addr[0]
- env['REMOTE_HOST'] =request.channel.addr[0] # TODO: connect to resolver
-
- for header in request.header:
- [key,value]=string.split(header,": ",1)
- key=string.lower(key)
-
- if header2env.has_key(key):
- if header2env[key]:
- env[header2env[key]]=value
- else:
- key = 'HTTP_' + string.upper(
- string.join(
- string.split (key,"-"),
- "_"
- )
- )
- env[key]=value
-
- ## remove empty environment variables
- for key in env.keys():
- if env[key]=="" or env[key]==None:
- del env[key]
-
- try:
- httphost = env['HTTP_HOST']
- parts = string.split(httphost,":")
- env['HTTP_HOST'] = parts[0]
- except KeyError:
- pass
-
- if request.command in ('put', 'post'):
- # PUT data requires a correct Content-Length: header
- # (though I bet with http/1.1 we can expect chunked encoding)
- request.collector = collector (self, request, env)
- request.channel.set_terminator (None)
- else:
- sin = StringIO.StringIO ('')
- self.continue_request (sin, request, env)
- def continue_request (self, stdin, request, env):
- stdout = header_scanning_file (
- request,
- thread_output_file (request.channel)
- )
- self.queue.put (
- (request.module.main, (env, stdin, stdout))
- )
-
+ def __init__ (self, queue, document_root=""):
+ self.modules = {}
+ self.document_root = document_root
+ self.queue = queue
+
+ def add_module (self, module, *names):
+ if not names:
+ names = ["/%s" % module.__name__]
+ for name in names:
+ self.modules['/'+name] = module
+
+ def match (self, request):
+ uri = request.uri
+
+ i = string.find(uri, "/", 1)
+ if i != -1:
+ uri = uri[:i]
+
+ i = string.find(uri, "?", 1)
+ if i != -1:
+ uri = uri[:i]
+
+ if self.modules.has_key (uri):
+ request.module = self.modules[uri]
+ return 1
+ else:
+ return 0
+
+ def handle_request (self, request):
+
+ [path, params, query, fragment] = split_path (request.uri)
+
+ while path and path[0] == '/':
+ path = path[1:]
+
+ if '%' in path:
+ path = unquote (path)
+
+ env = {}
+
+ env['REQUEST_URI'] = "/" + path
+ env['REQUEST_METHOD'] = string.upper(request.command)
+ env['SERVER_PORT'] = str(request.channel.server.port)
+ env['SERVER_NAME'] = request.channel.server.server_name
+ env['SERVER_SOFTWARE'] = request['Server']
+ env['DOCUMENT_ROOT'] = self.document_root
+
+ parts = string.split(path, "/")
+
+ # are script_name and path_info ok?
+
+ env['SCRIPT_NAME'] = "/" + parts[0]
+
+ if query and query[0] == "?":
+ query = query[1:]
+
+ env['QUERY_STRING'] = query
+
+ try:
+ path_info = "/" + string.join(parts[1:], "/")
+ except:
+ path_info = ''
+
+ env['PATH_INFO'] = path_info
+ env['GATEWAY_INTERFACE']='CGI/1.1' # what should this really be?
+ env['REMOTE_ADDR'] =request.channel.addr[0]
+ env['REMOTE_HOST'] =request.channel.addr[0] # TODO: connect to resolver
+
+ for header in request.header:
+ [key,value]=string.split(header,": ",1)
+ key=string.lower(key)
+
+ if header2env.has_key(key):
+ if header2env[key]:
+ env[header2env[key]]=value
+ else:
+ key = 'HTTP_' + string.upper(
+ string.join(
+ string.split (key,"-"),
+ "_"
+ )
+ )
+ env[key]=value
+
+ ## remove empty environment variables
+ for key in env.keys():
+ if env[key]=="" or env[key]==None:
+ del env[key]
+
+ try:
+ httphost = env['HTTP_HOST']
+ parts = string.split(httphost,":")
+ env['HTTP_HOST'] = parts[0]
+ except KeyError:
+ pass
+
+ if request.command in ('put', 'post'):
+ # PUT data requires a correct Content-Length: header
+ # (though I bet with http/1.1 we can expect chunked encoding)
+ request.collector = collector (self, request, env)
+ request.channel.set_terminator (None)
+ else:
+ sin = StringIO.StringIO ('')
+ self.continue_request (sin, request, env)
+
+ def continue_request (self, stdin, request, env):
+ stdout = header_scanning_file (
+ request,
+ thread_output_file (request.channel)
+ )
+ self.queue.put (
+ (request.module.main, (env, stdin, stdout))
+ )
+
HEADER_LINE = re.compile ('([A-Za-z0-9-]+): ([^\r\n]+)')
# A file wrapper that handles the CGI 'Status:' header hack
# by scanning the output.
class header_scanning_file:
-
- def __init__ (self, request, file):
- self.buffer = ''
- self.request = request
- self.file = file
- self.got_header = 0
- self.bytes_out = counter.counter()
-
- def write (self, data):
- if self.got_header:
- self._write (data)
- else:
- # CGI scripts may optionally provide extra headers.
- #
- # If they do not, then the output is assumed to be
- # text/html, with an HTTP reply code of '200 OK'.
- #
- # If they do, we need to scan those headers for one in
- # particular: the 'Status:' header, which will tell us
- # to use a different HTTP reply code [like '302 Moved']
- #
- self.buffer = self.buffer + data
- lines = string.split (self.buffer, '\n')
- # ignore the last piece, it is either empty, or a partial line
- lines = lines[:-1]
- # look for something un-header-like
- for i in range(len(lines)):
- li = lines[i]
- if (not li) or (HEADER_LINE.match (li) is None):
- # this is either the header separator, or it
- # is not a header line.
- self.got_header = 1
- h = self.build_header (lines[:i])
- self._write (h)
- # rejoin the rest of the data
- d = string.join (lines[i:], '\n')
- self._write (d)
- self.buffer = ''
- break
-
- def build_header (self, lines):
- status = '200 OK'
- saw_content_type = 0
- hl = HEADER_LINE
- for line in lines:
- mo = hl.match (line)
- if mo is not None:
- h = string.lower (mo.group(1))
- if h == 'status':
- status = mo.group(2)
- elif h == 'content-type':
- saw_content_type = 1
- lines.insert (0, 'HTTP/1.0 %s' % status)
- lines.append ('Server: ' + self.request['Server'])
- lines.append ('Date: ' + self.request['Date'])
- if not saw_content_type:
- lines.append ('Content-Type: text/html')
- lines.append ('Connection: close')
- return string.join (lines, '\r\n')+'\r\n\r\n'
-
- def _write (self, data):
- self.bytes_out.increment (len(data))
- self.file.write (data)
-
- def writelines(self, list):
- self.write (string.join (list, ''))
-
- def flush(self):
- pass
- def close (self):
- if not self.got_header:
- # managed to slip through our header detectors
- self._write (self.build_header (['Status: 502', 'Content-Type: text/html']))
- self._write (
- '<html><h1>Server Error</h1>\r\n'
- '<b>Bad Gateway:</b> No Header from CGI Script\r\n'
- '<pre>Data: %s</pre>'
- '</html>\r\n' % (repr(self.buffer))
- )
- self.request.log (int(self.bytes_out.as_long()))
- self.file.close()
- self.request.channel.current_request = None
-
-
+ def __init__ (self, request, file):
+ self.buffer = ''
+ self.request = request
+ self.file = file
+ self.got_header = 0
+ self.bytes_out = counter.counter()
+
+ def write (self, data):
+ if self.got_header:
+ self._write (data)
+ else:
+ # CGI scripts may optionally provide extra headers.
+ #
+ # If they do not, then the output is assumed to be
+ # text/html, with an HTTP reply code of '200 OK'.
+ #
+ # If they do, we need to scan those headers for one in
+ # particular: the 'Status:' header, which will tell us
+ # to use a different HTTP reply code [like '302 Moved']
+ #
+ self.buffer = self.buffer + data
+ lines = string.split (self.buffer, '\n')
+ # ignore the last piece, it is either empty, or a partial line
+ lines = lines[:-1]
+ # look for something un-header-like
+ for i in range(len(lines)):
+ li = lines[i]
+ if (not li) or (HEADER_LINE.match (li) is None):
+ # this is either the header separator, or it
+ # is not a header line.
+ self.got_header = 1
+ h = self.build_header (lines[:i])
+ self._write (h)
+ # rejoin the rest of the data
+ d = string.join (lines[i:], '\n')
+ self._write (d)
+ self.buffer = ''
+ break
+
+ def build_header (self, lines):
+ status = '200 OK'
+ saw_content_type = 0
+ hl = HEADER_LINE
+ for line in lines:
+ mo = hl.match (line)
+ if mo is not None:
+ h = string.lower (mo.group(1))
+ if h == 'status':
+ status = mo.group(2)
+ elif h == 'content-type':
+ saw_content_type = 1
+ lines.insert (0, 'HTTP/1.0 %s' % status)
+ lines.append ('Server: ' + self.request['Server'])
+ lines.append ('Date: ' + self.request['Date'])
+ if not saw_content_type:
+ lines.append ('Content-Type: text/html')
+ lines.append ('Connection: close')
+ return string.join (lines, '\r\n')+'\r\n\r\n'
+
+ def _write (self, data):
+ self.bytes_out.increment (len(data))
+ self.file.write (data)
+
+ def writelines(self, list):
+ self.write (string.join (list, ''))
+
+ def flush(self):
+ pass
+
+ def close (self):
+ if not self.got_header:
+ # managed to slip through our header detectors
+ self._write (self.build_header (['Status: 502', 'Content-Type: text/html']))
+ self._write (
+ '<html><h1>Server Error</h1>\r\n'
+ '<b>Bad Gateway:</b> No Header from CGI Script\r\n'
+ '<pre>Data: %s</pre>'
+ '</html>\r\n' % (repr(self.buffer))
+ )
+ self.request.log (int(self.bytes_out.as_long()))
+ self.file.close()
+ self.request.channel.current_request = None
+
+
class collector:
-
- "gathers input for PUT requests"
-
- def __init__ (self, handler, request, env):
- self.handler = handler
- self.env = env
- self.request = request
- self.data = StringIO.StringIO()
-
- # make sure there's a content-length header
- self.cl = request.get_header ('content-length')
-
- if not self.cl:
- request.error (411)
- return
- else:
- self.cl = string.atoi(self.cl)
-
- def collect_incoming_data (self, data):
- self.data.write (data)
- if self.data.tell() >= self.cl:
- self.data.seek(0)
-
- h=self.handler
- r=self.request
- # set the terminator back to the default
- self.request.channel.set_terminator ('\r\n\r\n')
- del self.handler
- del self.request
-
- h.continue_request (self.data, r, self.env)
-
-
+ "gathers input for PUT requests"
+
+ def __init__ (self, handler, request, env):
+ self.handler = handler
+ self.env = env
+ self.request = request
+ self.data = StringIO.StringIO()
+
+ # make sure there's a content-length header
+ self.cl = request.get_header ('content-length')
+
+ if not self.cl:
+ request.error (411)
+ return
+ else:
+ self.cl = string.atoi(self.cl)
+
+ def collect_incoming_data (self, data):
+ self.data.write (data)
+ if self.data.tell() >= self.cl:
+ self.data.seek(0)
+
+ h=self.handler
+ r=self.request
+
+ # set the terminator back to the default
+ self.request.channel.set_terminator ('\r\n\r\n')
+ del self.handler
+ del self.request
+
+ h.continue_request (self.data, r, self.env)
+
+
class request_loop_thread (threading.Thread):
-
- def __init__ (self, queue):
- threading.Thread.__init__ (self)
- self.setDaemon(1)
- self.queue = queue
-
- def run (self):
- while 1:
- function, (env, stdin, stdout) = self.queue.get()
- function (env, stdin, stdout)
- stdout.close()
-# ===========================================================================
-# Testing
-# ===========================================================================
-
+ def __init__ (self, queue):
+ threading.Thread.__init__ (self)
+ self.setDaemon(1)
+ self.queue = queue
+
+ def run (self):
+ while 1:
+ function, (env, stdin, stdout) = self.queue.get()
+ function (env, stdin, stdout)
+ stdout.close()
+
+ # ===========================================================================
+ # Testing
+ # ===========================================================================
+
if __name__ == '__main__':
-
- import sys
-
- if len(sys.argv) < 2:
- print 'Usage: %s <worker_threads>' % sys.argv[0]
- else:
- nthreads = string.atoi (sys.argv[1])
-
- import asyncore
- import http_server
- # create a generic web server
- hs = http_server.http_server ('', 7080)
-
- # create a request queue
- q = request_queue()
-
- # create a script handler
- sh = script_handler (q)
-
- # install the script handler on the web server
- hs.install_handler (sh)
-
- # get a couple of CGI modules
- import test_module
- import pi_module
-
- # install the module on the script handler
- sh.add_module (test_module, 'test')
- sh.add_module (pi_module, 'pi')
-
- # fire up the worker threads
- for i in range (nthreads):
- rt = request_loop_thread (q)
- rt.start()
- # start the main event loop
- asyncore.loop()
+ import sys
+
+ if len(sys.argv) < 2:
+ print 'Usage: %s <worker_threads>' % sys.argv[0]
+ else:
+ nthreads = string.atoi (sys.argv[1])
+
+ import asyncore
+ import http_server
+ # create a generic web server
+ hs = http_server.http_server ('', 7080)
+
+ # create a request queue
+ q = request_queue()
+
+ # create a script handler
+ sh = script_handler (q)
+
+ # install the script handler on the web server
+ hs.install_handler (sh)
+
+ # get a couple of CGI modules
+ import test_module
+ import pi_module
+
+ # install the module on the script handler
+ sh.add_module (test_module, 'test')
+ sh.add_module (pi_module, 'pi')
+
+ # fire up the worker threads
+ for i in range (nthreads):
+ rt = request_loop_thread (q)
+ rt.start()
+
+ # start the main event loop
+ asyncore.loop()