[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer.py:1.1.2.2 testHTTPServer2.py:NONE
Shane Hathaway
shane@digicool.com
Tue, 27 Nov 2001 14:29:21 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv32307
Modified Files:
Tag: Zope-3x-branch
testHTTPServer.py
Removed Files:
Tag: Zope-3x-branch
testHTTPServer2.py
Log Message:
Changed testHTTPServer2.py to testHTTPServer.py. The new tests have the
same coverage but they depend on the external interface (the HTTP standard)
rather than internals.
=== Zope3/lib/python/Zope/Server/tests/testHTTPServer.py 1.1.2.1 => 1.1.2.2 ===
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-""" Unit tests for Zope.Server.HTTPResponse """
-
import unittest
-import Zope.Publisher.HTTP.IPayload
-from urlparse import urlparse
+from asyncore import socket_map, poll
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.HTTPServer import http_task, http_channel, http_server
+from Zope.Server.dual_mode_channel import pull_trigger
+from Zope.Server.Adjustments import Adjustments
+
+
+from httplib import HTTPConnection
+
+from time import sleep
+
+import sys
+sys.setcheckinterval(120)
+
+tasks = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+
+my_adj = Adjustments()
+# Reduce overflows to make testing easier.
+my_adj.outbuf_overflow = 10000
+my_adj.inbuf_overflow = 10000
+
+
+class EchoHTTPTask (http_task):
+
+ def execute(self):
+ headers = self.request_data.headers
+ if headers.has_key('CONTENT_LENGTH'):
+ cl = headers['CONTENT_LENGTH']
+ self.response_headers['Content-Length'] = cl
+ instream = self.request_data.getBodyStream()
+ while 1:
+ data = instream.read(8192)
+ if not data:
+ break
+ self.write(data)
+
+
+class EchoHTTPChannel (http_channel):
+
+ task_class = EchoHTTPTask
+
+
+class EchoHTTPServer (http_server):
+
+ channel_class = EchoHTTPChannel
+
+
+
+class Tests(unittest.TestCase):
+
+ def setUp(self):
+ tasks.setThreadCount(4)
+ # Bind to any port on localhost.
+ self.server = EchoHTTPServer(LOCALHOST, 0, tasks=tasks, adj=my_adj)
+ self.port = self.server.socket.getsockname()[1]
+ self.run_loop = 1
+ self.counter = 0
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+ sleep(0.1) # Give the thread some time to start.
+
+ def tearDown(self):
+ self.run_loop = 0
+ tasks.shutdown()
+ self.server.close()
+ self.thread.join()
+
+ def loop(self):
+ while self.run_loop:
+ self.counter = self.counter + 1
+ #print 'loop', self.counter
+ poll(0.1, socket_map)
+
+ def testEchoResponse(self, h=None, add_headers=None, body=''):
+ if h is None:
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ if add_headers:
+ for k, v in add_headers.items():
+ h.putheader(k, v)
+ if body:
+ h.putheader('Content-Length', str(int(len(body))))
+ h.endheaders()
+ if body:
+ h.send(body)
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read()
+ self.failUnlessEqual(length, len(response_body))
+ self.failUnlessEqual(response_body, body)
+
+ def testMultipleRequestsWithoutBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ for n in range(3):
+ self.testEchoResponse(h)
+ self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testMultipleRequestsWithBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ for n in range(3):
+ self.testEchoResponse(h, body='Hello, world!')
+ self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testLargeBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ s = 'This string has 32 characters.\r\n' * 32 # 1024 characters.
+ self.testEchoResponse(h, body=(s * 1024)) # 1 MB
+ self.testEchoResponse(h, {'Connection': 'close'},
+ body=(s * 100)) # 100 KB
+
+ def testManyClients(self):
+ conns = []
+ for n in range(50): # Linux kernel (2.4.8) doesn't like > 128 ?
+ #print 'open', n, clock()
+ h = HTTPConnection(LOCALHOST, self.port)
+ #h.debuglevel = 1
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.endheaders()
+ conns.append(h)
+ # If you uncomment the next line, you can raise the
+ # number of connections much higher without running
+ # into delays.
+ #sleep(0.01)
+ responses = []
+ for h in conns:
+ response = h.getresponse()
+ self.failUnlessEqual(response.status, 200)
+ responses.append(response)
+ for response in responses:
+ response.read()
+
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(Tests)
-#
-# Test shims.
-#
-class InitAny:
-
- def __init__( self, **kw ):
- self.__dict__.update( kw )
-
-class FauxHandler( InitAny ):
-
- data = request = None
-
- def continue_request( self, data, request ):
- self.data = data
- self.request = request
-
-class FauxRequest( InitAny ):
-
- collector = DEFAULT_COLLECTOR = []
- uri = 'http://www.zope.org/Resources;special?id=abc#aargh'
-
- def split_uri( self ):
- scheme, nethost, path, parms, query, fragment = urlparse( self.uri )
- if parms:
- parms = ';%s' % parms # Compensate for some stupidity
- if query:
- query = '?%s' % query # Compensate for some stupidity
- return path, parms, query, fragment
-
-class FauxChannel( InitAny ):
-
- terminator = None
- worked = 0
-
- def set_terminator( self, value ):
- self.terminator = value
-
- def work( self ):
- self.worked = 1
-
-class FauxServer( InitAny ):
- pass
-
-class FauxRequestPayload:
-
- __implements__ = Zope.Publisher.HTTP.IPayload.IRequestPayload
-
- def processInputs(self, request):
- """
- Processes request inputs.
- """
- self.body = request.full_instream.read()
-
- def getPublication(self, request):
- """
- Returns the publication object.
- """
-
- def debugInfo(self, request):
- """
- Returns text containing debugging information.
- """
-
-
-class FauxResponsePayload:
-
- __implements__ = Zope.Publisher.HTTP.IPayload.IResponsePayload
-
- def setBody(self, response, body):
- """
- Sets the body of the response.
- """
-
- def handleException(self, response, exc_info):
- """
- Calls setBody() with an error response.
- """
-
-#
-# Test the collector.
-#
-class zhttp_collectorTest( unittest.TestCase ):
-
- def setUp( self ):
- self.handler = FauxHandler()
- self.request = FauxRequest()
- self.request.channel = self.channel = FauxChannel()
-
- def tearDown( self ):
- del self.channel
- del self.request
-
- def testZero( self ):
- """
- Test building a collector with size = 0
- """
- from Zope.Server.HTTPServer import zhttp_collector
- collector = zhttp_collector( self.handler, self.request, 0 )
- self.assertEqual( self.channel.terminator, 0 )
- self.assertEqual( self.request.collector, collector )
- self.failUnless( self.handler.data is None )
- self.failUnless( self.handler.request is None )
-
- collector.found_terminator()
-
- self.assertEqual( self.handler.data.read(), '' )
- self.failUnless( self.handler.request is self.request )
-
- def testSmaller( self ):
- from Zope.Server.HTTPServer import zhttp_collector
- collector = zhttp_collector( self.handler, self.request, 15 )
- self.assertEqual( self.channel.terminator, 15 )
- self.assertEqual( self.request.collector, collector )
- self.failUnless( self.handler.data is None )
- self.failUnless( self.handler.request is None )
-
- junk = 'x' * 15
- collector.collect_incoming_data( junk )
- collector.found_terminator()
-
- self.assertEqual( self.handler.data.read(), junk )
- self.failUnless( self.handler.request is self.request )
-
- def testLarger( self ):
- from Zope.Server.HTTPServer import zhttp_collector
- collector = zhttp_collector( self.handler, self.request, 600000 )
- self.assertEqual( self.channel.terminator, 600000 )
- self.assertEqual( self.request.collector, collector )
- self.failUnless( self.handler.data is None )
- self.failUnless( self.handler.request is None )
-
- junk = 'x' * 1000
- for i in range( 600 ):
- collector.collect_incoming_data( junk )
- collector.found_terminator()
-
- stuff = self.handler.data.read()
- self.assertEqual( stuff, junk*600 )
- self.failUnless( self.handler.request is self.request )
-
-#
-# Test the handler.
-#
-class zhttp_handlerTest( unittest.TestCase ):
-
- def setUp( self ):
- self.request_payload = FauxRequestPayload()
- self.response_payload = FauxResponsePayload()
-
- self.resolver = InitAny( cache={ '123.45.67.89'
- : ( 0, 1, 'my.host.tla' )
- } )
-
- self.logger = InitAny( resolver=self.resolver )
-
- self.server = FauxServer( port=9673
- , server_name='localhost'
- , SERVER_IDENT='XYZZY'
- , logger=self.logger
- )
-
- self.channel = FauxChannel( server=self.server
- , creation_time='2001/01/01 00:00:00 GMT'
- , addr=('123.45.67.89', )
- , queue=[]
- )
-
- self.request = FauxRequest( channel=self.channel
- , command='GET'
- , version='3.14'
- , header= ( 'Foo: foo', 'Bar-baz: bar-baz' )
- )
-
- def testEmpty( self ):
-
- from Zope.Server.HTTPServer import zhttp_handler
-
- handler = zhttp_handler( self.request_payload, self.response_payload )
- self.failUnless( handler.request_payload is self.request_payload )
- self.failUnless( handler.response_payload is self.response_payload )
- self.failIf( handler.env_override )
- self.failIf( handler.hits )
-
- self.failUnless( handler.match( FauxRequest( uri = '/' ) ) )
- self.failUnless( handler.match( FauxRequest( uri = '/abc' ) ) )
- self.failUnless( handler.match( FauxRequest( uri = '/abc/def' ) ) )
- self.failIf( handler.match( FauxRequest( uri = '#' ) ) )
-
- def testInit( self ):
-
- from Zope.Server.HTTPServer import zhttp_handler
- override = { 'foo' : 1, 'bar' : 2 }
-
- handler = zhttp_handler( self.request_payload
- , self.response_payload
- , 'abc'
- , override
- )
- self.assertEqual( handler.env_override, override )
- self.failIf( handler.hits )
-
- self.failIf( handler.match( FauxRequest( uri = '/' ) ) )
- self.failUnless( handler.match( FauxRequest( uri = '/abc' ) ) )
- self.failUnless( handler.match( FauxRequest( uri = '/abc/def' ) ) )
- self.failIf( handler.match( FauxRequest( uri = '#' ) ) )
-
- def testGetEnvironment( self ):
-
- from Zope.Server.HTTPServer import zhttp_handler
-
- #
- # Test default URL, w/ no uri-base
- #
- req = self.request
- req.command = 'sneer'
-
- handler = zhttp_handler( self.request_payload, self.response_payload )
- env = handler.get_environment( req )
-
- self.assertEqual( env['REQUEST_METHOD'], 'SNEER' )
- self.assertEqual( int( env['SERVER_PORT'] ), 9673 )
- self.assertEqual( env['SERVER_NAME'], 'localhost' )
- self.assertEqual( env['SERVER_SOFTWARE'], 'XYZZY' )
- self.assertEqual( env['SERVER_PROTOCOL'], "HTTP/3.14" )
- self.assertEqual( env['channel.creation_time']
- , '2001/01/01 00:00:00 GMT' )
-
- self.assertEqual( env['SCRIPT_NAME'], '' )
- self.assertEqual( env['PATH_INFO'], '/Resources;special' )
- self.assertEqual( env['QUERY_STRING'], 'id=abc' )
- self.assertEqual( env['GATEWAY_INTERFACE'], 'CGI/1.1' )
- self.assertEqual( env['REMOTE_ADDR'], '123.45.67.89' )
- self.assertEqual( env['REMOTE_HOST'], 'my.host.tla' )
- self.assertEqual( env['HTTP_FOO'], 'foo' )
- self.assertEqual( env['HTTP_BAR_BAZ'], 'bar-baz' )
-
- #
- # Test with a different URL, and a uri-base.
- #
- req = FauxRequest( channel=self.channel
- , command='glower'
- , version='3.14'
- , header= ()
- , uri='http://www.zope.org/hmm/WikiCentral/DevWiki'
- )
-
- handler = zhttp_handler( self.request_payload
- , self.response_payload
- , uri_base='hmm'
- )
- env = handler.get_environment( req )
-
- self.assertEqual( env['SCRIPT_NAME'], '/hmm' )
- self.assertEqual( env['PATH_INFO'], '/WikiCentral/DevWiki' )
- self.failIf( env.get( 'QUERY_STRING', None ) is not None )
- self.failIf( env.get( 'HTTP_FOO', None ) is not None )
- self.failIf( env.get( 'HTTP_BAR_BAZ', None ) is not None )
-
- def testHandleNoLength( self ):
-
- from Zope.Server.HTTPServer import zhttp_handler
-
- self.channel.current_request = req = self.request
-
- handler = zhttp_handler( self.request_payload, self.response_payload )
-
- self.failIf( handler.hits.as_long() )
- self.failIf( self.channel.worked )
-
- handler.handle_request( req )
-
- self.assertEqual( handler.hits.as_long(), 1 )
- self.assertEqual( self.channel.current_request, None )
- self.assertEqual( len( self.channel.queue ), 1 )
- self.failUnless( self.channel.worked )
-
- httpreq = self.channel.queue[0]
- self.assertEqual( httpreq[ 'REQUEST_METHOD' ], 'GET' )
- self.assertEqual( int( httpreq['SERVER_PORT'] ), 9673 )
- self.assertEqual( httpreq['SERVER_NAME'], 'localhost' )
- self.assertEqual( httpreq['SERVER_SOFTWARE'], 'XYZZY' )
- self.assertEqual( httpreq['SERVER_PROTOCOL'], "HTTP/3.14" )
- self.assertEqual( httpreq['SCRIPT_NAME'], '' )
- self.assertEqual( httpreq['PATH_INFO'], '/Resources;special' )
- self.assertEqual( httpreq['QUERY_STRING'], 'id=abc' )
- self.assertEqual( httpreq['GATEWAY_INTERFACE'], 'CGI/1.1' )
- self.assertEqual( httpreq['REMOTE_ADDR'], '123.45.67.89' )
- self.assertEqual( httpreq['REMOTE_HOST'], 'my.host.tla' )
- self.assertEqual( httpreq['HTTP_FOO'], 'foo' )
- self.assertEqual( httpreq['HTTP_BAR_BAZ'], 'bar-baz' )
-
- self.assertEqual( httpreq.response._http_version, req.version )
- self.assertEqual( httpreq.response._server_version, 'XYZZY' )
-
- def testHandleWithLength( self ):
-
- from Zope.Server.HTTPServer import zhttp_handler
-
- self.channel.current_request = req = self.request
- req.header = ( 'Content-length: 100', )
-
- handler = zhttp_handler( self.request_payload, self.response_payload )
-
- self.failIf( handler.hits.as_long() )
- self.failIf( self.channel.worked )
-
- handler.handle_request( req )
-
- self.assertEqual( handler.hits.as_long(), 1 )
- self.assertEqual( self.channel.current_request, req )
- self.assertEqual( len( self.channel.queue ), 0 )
- self.failIf( self.channel.worked )
-
- collector = getattr( req, 'collector', None )
- self.failUnless( collector is not None )
-
- junk = 'x' * 100
- collector.collect_incoming_data( junk )
- collector.found_terminator()
-
- self.assertEqual( self.channel.current_request, None )
- self.assertEqual( len( self.channel.queue ), 1 )
- self.failUnless( self.channel.worked )
-
- httpreq = self.channel.queue[0]
- httpreq.processInputs()
- self.assertEqual( self.request_payload.body, junk )
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Removed File Zope3/lib/python/Zope/Server/tests/testHTTPServer2.py ===