[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer2.py:1.1.2.1
Shane Hathaway
shane@digicool.com
Mon, 26 Nov 2001 09:57:24 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv4654/tests
Added Files:
Tag: Zope-3x-branch
testHTTPServer2.py
Log Message:
- Better in sync with HTTP spec.
- Pipelineable without making unnecessary buffers
- Added ZPL
- Moved task management out of dual_mode_channel
=== Added File Zope3/lib/python/Zope/Server/tests/testHTTPServer2.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
import unittest
from asyncore import socket_map, poll
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.HTTPServer2 import http_server
from Zope.Server.dual_mode_channel import pull_trigger
from httplib import HTTPConnection
from time import sleep
import sys
sys.setcheckinterval(120)
tasks = ThreadedTaskDispatcher()
class Tests(unittest.TestCase):
def setUp(self):
tasks.setThreadCount(4)
# Select any port on localhost
self.server = http_server('127.0.0.1', 0, tasks=tasks)
self.port = self.server.socket.getsockname()[1]
self.run_loop = 1
self.counter = 0
self.thread = Thread(target=self.loop)
self.thread.start()
sleep(0.1) # Let the thread start.
def tearDown(self):
self.run_loop = 0
tasks.shutdown()
self.server.close()
self.thread.join()
def loop(self):
while self.run_loop:
self.counter = self.counter + 1
#print 'loop', self.counter
poll(2.0, socket_map)
def testOkResponse(self, h=None, add_headers=None):
if h is None:
h = HTTPConnection('127.0.0.1', self.port)
h.putrequest('GET', '/')
h.putheader('Accept', 'text/plain')
if add_headers:
for k, v in add_headers.items():
h.putheader(k, v)
h.endheaders()
response = h.getresponse()
self.failUnlessEqual(int(response.status), 200)
length = int(response.getheader('Content-Length'))
data = response.read()
self.failUnlessEqual(length, len(data))
def testMultipleRequests(self):
h = HTTPConnection('127.0.0.1', self.port)
for n in range(3):
self.testOkResponse(h)
self.testOkResponse(h, {'Connection': 'close'})
def testManyClients(self):
conns = []
for n in range(120): # Linux kernel (2.4.8) doesn't like > 128 ?
#print 'open', n, clock()
h = HTTPConnection('127.0.0.1', self.port)
#h.debuglevel = 1
h.putrequest('GET', '/')
h.putheader('Accept', 'text/plain')
h.endheaders()
conns.append(h)
# If you uncomment the next line, you can raise the
# number of connections much higher without running
# into delays.
#sleep(0.01)
responses = []
for h in conns:
response = h.getresponse()
self.failUnlessEqual(response.status, 200)
responses.append(response)
for response in responses:
response.read()
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(Tests)
if __name__=='__main__':
unittest.TextTestRunner().run( test_suite() )