[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/HTTP/tests - __init__.py:1.2 testHTTPServer.py:1.2 testPublisherServer.py:1.2

Jim Fulton jim@zope.com
Mon, 10 Jun 2002 19:30:07 -0400


Update of /cvs-repository/Zope3/lib/python/Zope/Server/HTTP/tests
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server/HTTP/tests

Added Files:
	__init__.py testHTTPServer.py testPublisherServer.py 
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.

=== Zope3/lib/python/Zope/Server/HTTP/tests/__init__.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+""" Unit tests for Zope.Server """


=== Zope3/lib/python/Zope/Server/HTTP/tests/testHTTPServer.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from asyncore import socket_map, poll
+import socket
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.HTTP.HTTPServer import HTTPServer
+from Zope.Server.Adjustments import Adjustments
+from Zope.Server.ITask import ITask
+
+from httplib import HTTPConnection
+from httplib import HTTPResponse as ClientHTTPResponse
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
+CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.
+
+
+my_adj = Adjustments()
+# Reduce overflows to make testing easier.
+my_adj.outbuf_overflow = 10000
+my_adj.inbuf_overflow = 10000
+
+
+class EchoHTTPServer(HTTPServer):
+
+    def executeRequest(self, task):
+        headers = task.request_data.headers
+        if 'CONTENT_LENGTH' in headers:
+            cl = headers['CONTENT_LENGTH']
+            task.response_headers['Content-Length'] = cl
+        instream = task.request_data.getBodyStream()
+        while 1:
+            data = instream.read(8192)
+            if not data:
+                break
+            task.write(data)
+
+
+class SleepingTask:
+
+    __implements__ = ITask
+
+    def service(self):
+        sleep(0.2)
+
+    def cancel(self):
+        pass
+
+    def defer(self):
+        pass
+
+
+class Tests(unittest.TestCase):
+
+    def setUp(self):
+        td.setThreadCount(4)
+        self.orig_map_size = len(socket_map)
+        self.server = EchoHTTPServer(LOCALHOST, SERVER_PORT,
+                                     task_dispatcher=td, adj=my_adj)
+        if CONNECT_TO_PORT == 0:
+            self.port = self.server.socket.getsockname()[1]
+        else:
+            self.port = CONNECT_TO_PORT
+        self.run_loop = 1
+        self.counter = 0
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+        sleep(0.1)  # Give the thread some time to start.
+
+    def tearDown(self):
+        self.run_loop = 0
+        self.thread.join()
+        td.shutdown()
+        self.server.close()
+        # Make sure all sockets get closed by asyncore normally.
+        timeout = time() + 5
+        while 1:
+            if len(socket_map) == self.orig_map_size:
+                # Clean!
+                break
+            if time() >= timeout:
+                self.fail('Leaked a socket: %s' % `socket_map`)
+            poll(0.1, socket_map)
+
+    def loop(self):
+        while self.run_loop:
+            self.counter = self.counter + 1
+            #print 'loop', self.counter
+            poll(0.1, socket_map)
+
+    def testEchoResponse(self, h=None, add_headers=None, body=''):
+        if h is None:
+            h = HTTPConnection(LOCALHOST, self.port)
+        h.putrequest('GET', '/')
+        h.putheader('Accept', 'text/plain')
+        if add_headers:
+            for k, v in add_headers.items():
+                h.putheader(k, v)
+        if body:
+            h.putheader('Content-Length', str(int(len(body))))
+        h.endheaders()
+        if body:
+            h.send(body)
+        response = h.getresponse()
+        self.failUnlessEqual(int(response.status), 200)
+        length = int(response.getheader('Content-Length', '0'))
+        response_body = response.read()
+        self.failUnlessEqual(length, len(response_body))
+        self.failUnlessEqual(response_body, body)
+
+    def testMultipleRequestsWithoutBody(self):
+        # Tests the use of multiple requests in a single connection.
+        h = HTTPConnection(LOCALHOST, self.port)
+        for n in range(3):
+            self.testEchoResponse(h)
+        self.testEchoResponse(h, {'Connection': 'close'})
+
+    def testMultipleRequestsWithBody(self):
+        # Tests the use of multiple requests in a single connection.
+        h = HTTPConnection(LOCALHOST, self.port)
+        for n in range(3):
+            self.testEchoResponse(h, body='Hello, world!')
+        self.testEchoResponse(h, {'Connection': 'close'})
+
+    def testPipelining(self):
+        # Tests the use of several requests issued at once.
+        s = ("GET / HTTP/1.0\r\n"
+             "Connection: %s\r\n"
+             "Content-Length: %d\r\n"
+             "\r\n"
+             "%s")
+        to_send = ''
+        count = 25
+        for n in range(count):
+            body = "Response #%d\r\n" % (n + 1)
+            if n + 1 < count:
+                conn = 'keep-alive'
+            else:
+                conn = 'close'
+            to_send += s % (conn, len(body), body)
+
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.connect((LOCALHOST, self.port))
+        sock.send(to_send)
+        for n in range(count):
+            expect_body = "Response #%d\r\n" % (n + 1)
+            response = ClientHTTPResponse(sock)
+            response.begin()
+            self.failUnlessEqual(int(response.status), 200)
+            length = int(response.getheader('Content-Length', '0'))
+            response_body = response.read(length)
+            self.failUnlessEqual(length, len(response_body))
+            self.failUnlessEqual(response_body, expect_body)
+
+    def testWithoutCRLF(self):
+        # Tests the use of just newlines rather than CR/LFs.
+        data = "Echo\nthis\r\nplease"
+        s = ("GET / HTTP/1.0\n"
+             "Connection: close\n"
+             "Content-Length: %d\n"
+             "\n"
+             "%s") % (len(data), data)
+
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.connect((LOCALHOST, self.port))
+        sock.send(s)
+        response = ClientHTTPResponse(sock)
+        response.begin()
+        self.failUnlessEqual(int(response.status), 200)
+        length = int(response.getheader('Content-Length', '0'))
+        response_body = response.read(length)
+        self.failUnlessEqual(length, len(data))
+        self.failUnlessEqual(response_body, data)
+
+    def testLargeBody(self):
+        # Tests the use of multiple requests in a single connection.
+        h = HTTPConnection(LOCALHOST, self.port)
+        s = 'This string has 32 characters.\r\n' * 32  # 1024 characters.
+        self.testEchoResponse(h, body=(s * 1024))  # 1 MB
+        self.testEchoResponse(h, {'Connection': 'close'},
+                              body=(s * 100))  # 100 KB
+
+    def testManyClients(self):
+        conns = []
+        for n in range(50):  # Linux kernel (2.4.8) doesn't like > 128 ?
+            #print 'open', n, clock()
+            h = HTTPConnection(LOCALHOST, self.port)
+            #h.debuglevel = 1
+            h.putrequest('GET', '/')
+            h.putheader('Accept', 'text/plain')
+            h.endheaders()
+            conns.append(h)
+            # If you uncomment the next line, you can raise the
+            # number of connections much higher without running
+            # into delays.
+            #sleep(0.01)
+        responses = []
+        for h in conns:
+            response = h.getresponse()
+            self.failUnlessEqual(response.status, 200)
+            responses.append(response)
+        for response in responses:
+            response.read()
+
+    def testThreading(self):
+        # Ensures the correct number of threads keep running.
+        for n in range(4):
+            td.addTask(SleepingTask())
+        # Try to confuse the task manager.
+        td.setThreadCount(2)
+        td.setThreadCount(1)
+        sleep(0.5)
+        # There should be 1 still running.
+        self.failUnlessEqual(len(td.threads), 1)
+
+    def testChunkingRequestWithoutContent(self):
+        h = HTTPConnection(LOCALHOST, self.port)
+        h.putrequest('GET', '/')
+        h.putheader('Accept', 'text/plain')
+        h.putheader('Transfer-Encoding', 'chunked')
+        h.endheaders()
+        h.send("0\r\n\r\n")
+        response = h.getresponse()
+        self.failUnlessEqual(int(response.status), 200)
+        response_body = response.read()
+        self.failUnlessEqual(response_body, '')
+
+    def testChunkingRequestWithContent(self):
+        control_line="20;\r\n"  # 20 hex = 32 dec
+        s = 'This string has 32 characters.\r\n'
+        expect = s * 12
+
+        h = HTTPConnection(LOCALHOST, self.port)
+        h.putrequest('GET', '/')
+        h.putheader('Accept', 'text/plain')
+        h.putheader('Transfer-Encoding', 'chunked')
+        h.endheaders()
+        for n in range(12):
+            h.send(control_line)
+            h.send(s)
+        h.send("0\r\n\r\n")
+        response = h.getresponse()
+        self.failUnlessEqual(int(response.status), 200)
+        response_body = response.read()
+        self.failUnlessEqual(response_body, expect)
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )


=== Zope3/lib/python/Zope/Server/HTTP/tests/testPublisherServer.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from asyncore import socket_map, poll
+import sys
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.HTTP.PublisherHTTPServer import PublisherHTTPServer
+from Zope.Publisher.Browser.BrowserRequest import BrowserRequest
+
+from Zope.Publisher.DefaultPublication import DefaultPublication
+from Zope.Publisher.Exceptions import Redirect, Retry
+from Zope.Publisher.HTTP import HTTPRequest
+
+from httplib import HTTPConnection
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+
+HTTPRequest.STAGGER_RETRIES = 0  # Don't pause.
+
+
+class Conflict (Exception):
+    """
+    Pseudo ZODB conflict error.
+    """
+
+
+class PublicationWithConflict(DefaultPublication):
+
+    def handleException(self, request, exc_info, retry_allowed=1):
+        if exc_info[0] is Conflict and retry_allowed:
+            # This simulates a ZODB retry.
+            raise Retry(exc_info)
+        else:
+            DefaultPublication.handleException(self, request, exc_info,
+                                               retry_allowed)
+
+
+class tested_object:
+    " "
+    tries = 0
+
+    def __call__(self, REQUEST):
+        return 'URL invoked: %s' % REQUEST.URL
+
+    def redirect_method(self, REQUEST):
+        "Generates a redirect using the redirect() method."
+        REQUEST.getResponse().redirect("http://somewhere.com/redirect")
+
+    def redirect_exception(self):
+        "Generates a redirect using an exception."
+        raise Redirect("http://somewhere.com/exception")
+
+    def conflict(self, REQUEST, wait_tries):
+        """
+        Returns 202 status only after (wait_tries) tries.
+        """
+        if self.tries >= int(wait_tries):
+            raise "Accepted"
+        else:
+            self.tries += 1
+            raise Conflict
+
+
+
+class Tests(unittest.TestCase):
+
+    def setUp(self):
+
+        obj = tested_object()
+        obj.folder = tested_object()
+        obj.folder.item = tested_object()
+
+        obj._protected = tested_object()
+
+        pub = PublicationWithConflict(obj)
+
+        def request_factory(input_stream, output_steam, env):
+            request = BrowserRequest(input_stream, output_steam, env)
+            request.setPublication(pub)
+            return request
+
+        td.setThreadCount(4)
+        # Bind to any port on localhost.
+        self.server = PublisherHTTPServer(request_factory, 'Browser',
+                                          LOCALHOST, 0, task_dispatcher=td)
+        self.port = self.server.socket.getsockname()[1]
+        self.run_loop = 1
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+        sleep(0.1)  # Give the thread some time to start.
+
+    def tearDown(self):
+        self.run_loop = 0
+        self.thread.join()
+        td.shutdown()
+        self.server.close()
+
+    def loop(self):
+        while self.run_loop:
+            poll(0.1, socket_map)
+
+    def testResponse(self, path='/', status_expected=200,
+                     add_headers=None, request_body=''):
+        h = HTTPConnection(LOCALHOST, self.port)
+        h.putrequest('GET', path)
+        h.putheader('Accept', 'text/plain')
+        if add_headers:
+            for k, v in add_headers.items():
+                h.putheader(k, v)
+        if request_body:
+            h.putheader('Content-Length', str(int(len(request_body))))
+        h.endheaders()
+        if request_body:
+            h.send(request_body)
+        response = h.getresponse()
+        length = int(response.getheader('Content-Length', '0'))
+        if length:
+            response_body = response.read(length)
+        else:
+            response_body = ''
+
+        # Please do not disable the status code check.  It must work.
+        self.failUnlessEqual(int(response.status), status_expected)
+
+        self.failUnlessEqual(length, len(response_body))
+
+        if (status_expected == 200):
+            if path == '/': path = ''
+            expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
+                self.port, path)
+            self.failUnlessEqual(response_body, expect_response)
+
+    def testDeeperPath(self):
+        self.testResponse(path='/folder/item')
+
+    def testNotFound(self):
+        self.testResponse(path='/foo/bar', status_expected=404)
+
+    def testUnauthorized(self):
+        self.testResponse(path='/_protected', status_expected=401)
+
+    def testRedirectMethod(self):
+        self.testResponse(path='/redirect_method', status_expected=302)
+
+    def testRedirectException(self):
+        self.testResponse(path='/redirect_exception', status_expected=302)
+        self.testResponse(path='/folder/redirect_exception',
+                          status_expected=302)
+
+    def testConflictRetry(self):
+        # Expect the "Accepted" response since the retries will succeed.
+        self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
+
+    def testFailedConflictRetry(self):
+        # Expect a "Conflict" response since there will be too many
+        # conflicts.
+        self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
+
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )