[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/HTTP/tests - __init__.py:1.2 testHTTPServer.py:1.2 testPublisherServer.py:1.2
Jim Fulton
jim@zope.com
Mon, 10 Jun 2002 19:30:07 -0400
Update of /cvs-repository/Zope3/lib/python/Zope/Server/HTTP/tests
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server/HTTP/tests
Added Files:
__init__.py testHTTPServer.py testPublisherServer.py
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.
=== Zope3/lib/python/Zope/Server/HTTP/tests/__init__.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+""" Unit tests for Zope.Server """
=== Zope3/lib/python/Zope/Server/HTTP/tests/testHTTPServer.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from asyncore import socket_map, poll
+import socket
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.HTTP.HTTPServer import HTTPServer
+from Zope.Server.Adjustments import Adjustments
+from Zope.Server.ITask import ITask
+
+from httplib import HTTPConnection
+from httplib import HTTPResponse as ClientHTTPResponse
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+SERVER_PORT = 0 # Set these port numbers to 0 to auto-bind, or
+CONNECT_TO_PORT = 0 # use specific numbers to inspect using TCPWatch.
+
+
+my_adj = Adjustments()
+# Reduce overflows to make testing easier.
+my_adj.outbuf_overflow = 10000
+my_adj.inbuf_overflow = 10000
+
+
+class EchoHTTPServer(HTTPServer):
+
+ def executeRequest(self, task):
+ headers = task.request_data.headers
+ if 'CONTENT_LENGTH' in headers:
+ cl = headers['CONTENT_LENGTH']
+ task.response_headers['Content-Length'] = cl
+ instream = task.request_data.getBodyStream()
+ while 1:
+ data = instream.read(8192)
+ if not data:
+ break
+ task.write(data)
+
+
+class SleepingTask:
+
+ __implements__ = ITask
+
+ def service(self):
+ sleep(0.2)
+
+ def cancel(self):
+ pass
+
+ def defer(self):
+ pass
+
+
+class Tests(unittest.TestCase):
+
+ def setUp(self):
+ td.setThreadCount(4)
+ self.orig_map_size = len(socket_map)
+ self.server = EchoHTTPServer(LOCALHOST, SERVER_PORT,
+ task_dispatcher=td, adj=my_adj)
+ if CONNECT_TO_PORT == 0:
+ self.port = self.server.socket.getsockname()[1]
+ else:
+ self.port = CONNECT_TO_PORT
+ self.run_loop = 1
+ self.counter = 0
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+ sleep(0.1) # Give the thread some time to start.
+
+ def tearDown(self):
+ self.run_loop = 0
+ self.thread.join()
+ td.shutdown()
+ self.server.close()
+ # Make sure all sockets get closed by asyncore normally.
+ timeout = time() + 5
+ while 1:
+ if len(socket_map) == self.orig_map_size:
+ # Clean!
+ break
+ if time() >= timeout:
+ self.fail('Leaked a socket: %s' % `socket_map`)
+ poll(0.1, socket_map)
+
+ def loop(self):
+ while self.run_loop:
+ self.counter = self.counter + 1
+ #print 'loop', self.counter
+ poll(0.1, socket_map)
+
+ def testEchoResponse(self, h=None, add_headers=None, body=''):
+ if h is None:
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ if add_headers:
+ for k, v in add_headers.items():
+ h.putheader(k, v)
+ if body:
+ h.putheader('Content-Length', str(int(len(body))))
+ h.endheaders()
+ if body:
+ h.send(body)
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read()
+ self.failUnlessEqual(length, len(response_body))
+ self.failUnlessEqual(response_body, body)
+
+ def testMultipleRequestsWithoutBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ for n in range(3):
+ self.testEchoResponse(h)
+ self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testMultipleRequestsWithBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ for n in range(3):
+ self.testEchoResponse(h, body='Hello, world!')
+ self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testPipelining(self):
+ # Tests the use of several requests issued at once.
+ s = ("GET / HTTP/1.0\r\n"
+ "Connection: %s\r\n"
+ "Content-Length: %d\r\n"
+ "\r\n"
+ "%s")
+ to_send = ''
+ count = 25
+ for n in range(count):
+ body = "Response #%d\r\n" % (n + 1)
+ if n + 1 < count:
+ conn = 'keep-alive'
+ else:
+ conn = 'close'
+ to_send += s % (conn, len(body), body)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((LOCALHOST, self.port))
+ sock.send(to_send)
+ for n in range(count):
+ expect_body = "Response #%d\r\n" % (n + 1)
+ response = ClientHTTPResponse(sock)
+ response.begin()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read(length)
+ self.failUnlessEqual(length, len(response_body))
+ self.failUnlessEqual(response_body, expect_body)
+
+ def testWithoutCRLF(self):
+ # Tests the use of just newlines rather than CR/LFs.
+ data = "Echo\nthis\r\nplease"
+ s = ("GET / HTTP/1.0\n"
+ "Connection: close\n"
+ "Content-Length: %d\n"
+ "\n"
+ "%s") % (len(data), data)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((LOCALHOST, self.port))
+ sock.send(s)
+ response = ClientHTTPResponse(sock)
+ response.begin()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read(length)
+ self.failUnlessEqual(length, len(data))
+ self.failUnlessEqual(response_body, data)
+
+ def testLargeBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ s = 'This string has 32 characters.\r\n' * 32 # 1024 characters.
+ self.testEchoResponse(h, body=(s * 1024)) # 1 MB
+ self.testEchoResponse(h, {'Connection': 'close'},
+ body=(s * 100)) # 100 KB
+
+ def testManyClients(self):
+ conns = []
+ for n in range(50): # Linux kernel (2.4.8) doesn't like > 128 ?
+ #print 'open', n, clock()
+ h = HTTPConnection(LOCALHOST, self.port)
+ #h.debuglevel = 1
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.endheaders()
+ conns.append(h)
+ # If you uncomment the next line, you can raise the
+ # number of connections much higher without running
+ # into delays.
+ #sleep(0.01)
+ responses = []
+ for h in conns:
+ response = h.getresponse()
+ self.failUnlessEqual(response.status, 200)
+ responses.append(response)
+ for response in responses:
+ response.read()
+
+ def testThreading(self):
+ # Ensures the correct number of threads keep running.
+ for n in range(4):
+ td.addTask(SleepingTask())
+ # Try to confuse the task manager.
+ td.setThreadCount(2)
+ td.setThreadCount(1)
+ sleep(0.5)
+ # There should be 1 still running.
+ self.failUnlessEqual(len(td.threads), 1)
+
+ def testChunkingRequestWithoutContent(self):
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.putheader('Transfer-Encoding', 'chunked')
+ h.endheaders()
+ h.send("0\r\n\r\n")
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ response_body = response.read()
+ self.failUnlessEqual(response_body, '')
+
+ def testChunkingRequestWithContent(self):
+ control_line="20;\r\n" # 20 hex = 32 dec
+ s = 'This string has 32 characters.\r\n'
+ expect = s * 12
+
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.putheader('Transfer-Encoding', 'chunked')
+ h.endheaders()
+ for n in range(12):
+ h.send(control_line)
+ h.send(s)
+ h.send("0\r\n\r\n")
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ response_body = response.read()
+ self.failUnlessEqual(response_body, expect)
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Zope3/lib/python/Zope/Server/HTTP/tests/testPublisherServer.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from asyncore import socket_map, poll
+import sys
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.HTTP.PublisherHTTPServer import PublisherHTTPServer
+from Zope.Publisher.Browser.BrowserRequest import BrowserRequest
+
+from Zope.Publisher.DefaultPublication import DefaultPublication
+from Zope.Publisher.Exceptions import Redirect, Retry
+from Zope.Publisher.HTTP import HTTPRequest
+
+from httplib import HTTPConnection
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+
+HTTPRequest.STAGGER_RETRIES = 0 # Don't pause.
+
+
+class Conflict (Exception):
+ """
+ Pseudo ZODB conflict error.
+ """
+
+
+class PublicationWithConflict(DefaultPublication):
+
+ def handleException(self, request, exc_info, retry_allowed=1):
+ if exc_info[0] is Conflict and retry_allowed:
+ # This simulates a ZODB retry.
+ raise Retry(exc_info)
+ else:
+ DefaultPublication.handleException(self, request, exc_info,
+ retry_allowed)
+
+
+class tested_object:
+ " "
+ tries = 0
+
+ def __call__(self, REQUEST):
+ return 'URL invoked: %s' % REQUEST.URL
+
+ def redirect_method(self, REQUEST):
+ "Generates a redirect using the redirect() method."
+ REQUEST.getResponse().redirect("http://somewhere.com/redirect")
+
+ def redirect_exception(self):
+ "Generates a redirect using an exception."
+ raise Redirect("http://somewhere.com/exception")
+
+ def conflict(self, REQUEST, wait_tries):
+ """
+ Returns 202 status only after (wait_tries) tries.
+ """
+ if self.tries >= int(wait_tries):
+ raise "Accepted"
+ else:
+ self.tries += 1
+ raise Conflict
+
+
+
+class Tests(unittest.TestCase):
+
+ def setUp(self):
+
+ obj = tested_object()
+ obj.folder = tested_object()
+ obj.folder.item = tested_object()
+
+ obj._protected = tested_object()
+
+ pub = PublicationWithConflict(obj)
+
+ def request_factory(input_stream, output_steam, env):
+ request = BrowserRequest(input_stream, output_steam, env)
+ request.setPublication(pub)
+ return request
+
+ td.setThreadCount(4)
+ # Bind to any port on localhost.
+ self.server = PublisherHTTPServer(request_factory, 'Browser',
+ LOCALHOST, 0, task_dispatcher=td)
+ self.port = self.server.socket.getsockname()[1]
+ self.run_loop = 1
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+ sleep(0.1) # Give the thread some time to start.
+
+ def tearDown(self):
+ self.run_loop = 0
+ self.thread.join()
+ td.shutdown()
+ self.server.close()
+
+ def loop(self):
+ while self.run_loop:
+ poll(0.1, socket_map)
+
+ def testResponse(self, path='/', status_expected=200,
+ add_headers=None, request_body=''):
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', path)
+ h.putheader('Accept', 'text/plain')
+ if add_headers:
+ for k, v in add_headers.items():
+ h.putheader(k, v)
+ if request_body:
+ h.putheader('Content-Length', str(int(len(request_body))))
+ h.endheaders()
+ if request_body:
+ h.send(request_body)
+ response = h.getresponse()
+ length = int(response.getheader('Content-Length', '0'))
+ if length:
+ response_body = response.read(length)
+ else:
+ response_body = ''
+
+ # Please do not disable the status code check. It must work.
+ self.failUnlessEqual(int(response.status), status_expected)
+
+ self.failUnlessEqual(length, len(response_body))
+
+ if (status_expected == 200):
+ if path == '/': path = ''
+ expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
+ self.port, path)
+ self.failUnlessEqual(response_body, expect_response)
+
+ def testDeeperPath(self):
+ self.testResponse(path='/folder/item')
+
+ def testNotFound(self):
+ self.testResponse(path='/foo/bar', status_expected=404)
+
+ def testUnauthorized(self):
+ self.testResponse(path='/_protected', status_expected=401)
+
+ def testRedirectMethod(self):
+ self.testResponse(path='/redirect_method', status_expected=302)
+
+ def testRedirectException(self):
+ self.testResponse(path='/redirect_exception', status_expected=302)
+ self.testResponse(path='/folder/redirect_exception',
+ status_expected=302)
+
+ def testConflictRetry(self):
+ # Expect the "Accepted" response since the retries will succeed.
+ self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
+
+ def testFailedConflictRetry(self):
+ # Expect a "Conflict" response since there will be too many
+ # conflicts.
+ self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
+
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )