[Zope3-checkins] CVS: Zope3/src/zope/server/http/tests - __init__.py:1.2 test_httprequestparser.py:1.2 test_httpserver.py:1.2 test_publisherserver.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:15:56 -0500
Update of /cvs-repository/Zope3/src/zope/server/http/tests
In directory cvs.zope.org:/tmp/cvs-serv20790/src/zope/server/http/tests
Added Files:
__init__.py test_httprequestparser.py test_httpserver.py
test_publisherserver.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zope/server/http/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:56 2002
+++ Zope3/src/zope/server/http/tests/__init__.py Wed Dec 25 09:15:25 2002
@@ -0,0 +1,2 @@
+#
+# This file is necessary to make this directory a package.
=== Zope3/src/zope/server/http/tests/test_httprequestparser.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:56 2002
+++ Zope3/src/zope/server/http/tests/test_httprequestparser.py Wed Dec 25 09:15:25 2002
@@ -0,0 +1,93 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from zope.server.http.httprequestparser import HTTPRequestParser
+from zope.server.adjustments import Adjustments
+
+
+my_adj = Adjustments()
+
+class Tests(unittest.TestCase):
+
+ def setUp(self):
+ self.parser = HTTPRequestParser(my_adj)
+
+ def feed(self, data):
+ parser = self.parser
+ for n in xrange(100): # make sure we never loop forever
+ consumed = parser.received(data)
+ data = data[consumed:]
+ if parser.completed:
+ return
+ raise ValueError, 'Looping'
+
+ def testSimpleGET(self):
+ data = """\
+GET /foobar HTTP/8.4
+FirstName: mickey
+lastname: Mouse
+content-length: 7
+
+Hello.
+"""
+ parser = self.parser
+ self.feed(data)
+ self.failUnless(parser.completed)
+ self.assertEqual(parser.version, '8.4')
+ self.failIf(parser.empty)
+ self.assertEqual(parser.headers,
+ {'FIRSTNAME': 'mickey',
+ 'LASTNAME': 'Mouse',
+ 'CONTENT_LENGTH': '7',
+ })
+ self.assertEqual(parser.path, '/foobar')
+ self.assertEqual(parser.command, 'GET')
+ self.assertEqual(parser.query, None)
+ self.assertEqual(parser.getBodyStream().getvalue(), 'Hello.\n')
+
+ def testComplexGET(self):
+ data = """\
+GET /foo/a+%2B%2F%3D%26a%3Aint?d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6 HTTP/8.4
+FirstName: mickey
+lastname: Mouse
+content-length: 10
+
+Hello mickey.
+"""
+ parser = self.parser
+ self.feed(data)
+ self.assertEqual(parser.command, 'GET')
+ self.assertEqual(parser.version, '8.4')
+ self.failIf(parser.empty)
+ self.assertEqual(parser.headers,
+ {'FIRSTNAME': 'mickey',
+ 'LASTNAME': 'Mouse',
+ 'CONTENT_LENGTH': '10',
+ })
+ self.assertEqual(parser.path, '/foo/a++/=&a:int')
+ self.assertEqual(parser.query, 'd=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6')
+ self.assertEqual(parser.getBodyStream().getvalue(), 'Hello mick')
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Zope3/src/zope/server/http/tests/test_httpserver.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:56 2002
+++ Zope3/src/zope/server/http/tests/test_httpserver.py Wed Dec 25 09:15:25 2002
@@ -0,0 +1,279 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from asyncore import socket_map, poll
+import socket
+
+from threading import Thread
+from zope.server.taskthreads import ThreadedTaskDispatcher
+from zope.server.http.httpserver import HTTPServer
+from zope.server.adjustments import Adjustments
+from zope.server.interfaces import ITask
+from zope.server.tests.asyncerror import AsyncoreErrorHook
+
+from httplib import HTTPConnection
+from httplib import HTTPResponse as ClientHTTPResponse
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+SERVER_PORT = 0 # Set these port numbers to 0 to auto-bind, or
+CONNECT_TO_PORT = 0 # use specific numbers to inspect using TCPWatch.
+
+
+my_adj = Adjustments()
+# Reduce overflows to make testing easier.
+my_adj.outbuf_overflow = 10000
+my_adj.inbuf_overflow = 10000
+
+
+class EchoHTTPServer(HTTPServer):
+
+ def executeRequest(self, task):
+ headers = task.request_data.headers
+ if 'CONTENT_LENGTH' in headers:
+ cl = headers['CONTENT_LENGTH']
+ task.response_headers['Content-Length'] = cl
+ instream = task.request_data.getBodyStream()
+ while 1:
+ data = instream.read(8192)
+ if not data:
+ break
+ task.write(data)
+
+
+class SleepingTask:
+
+ __implements__ = ITask
+
+ def service(self):
+ sleep(0.2)
+
+ def cancel(self):
+ pass
+
+ def defer(self):
+ pass
+
+
+class Tests(unittest.TestCase, AsyncoreErrorHook):
+
+ def setUp(self):
+ td.setThreadCount(4)
+ self.orig_map_size = len(socket_map)
+ self.hook_asyncore_error()
+ self.server = EchoHTTPServer(LOCALHOST, SERVER_PORT,
+ task_dispatcher=td, adj=my_adj)
+ if CONNECT_TO_PORT == 0:
+ self.port = self.server.socket.getsockname()[1]
+ else:
+ self.port = CONNECT_TO_PORT
+ self.run_loop = 1
+ self.counter = 0
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+ sleep(0.1) # Give the thread some time to start.
+
+ def tearDown(self):
+ self.run_loop = 0
+ self.thread.join()
+ td.shutdown()
+ self.server.close()
+ # Make sure all sockets get closed by asyncore normally.
+ timeout = time() + 5
+ while 1:
+ if len(socket_map) == self.orig_map_size:
+ # Clean!
+ break
+ if time() >= timeout:
+ self.fail('Leaked a socket: %s' % `socket_map`)
+ poll(0.1)
+ self.unhook_asyncore_error()
+
+ def loop(self):
+ while self.run_loop:
+ self.counter = self.counter + 1
+ #print 'loop', self.counter
+ poll(0.1)
+
+ def testEchoResponse(self, h=None, add_headers=None, body=''):
+ if h is None:
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ if add_headers:
+ for k, v in add_headers.items():
+ h.putheader(k, v)
+ if body:
+ h.putheader('Content-Length', str(int(len(body))))
+ h.endheaders()
+ if body:
+ h.send(body)
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read()
+ self.failUnlessEqual(length, len(response_body))
+ self.failUnlessEqual(response_body, body)
+
+ def testMultipleRequestsWithoutBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ for n in range(3):
+ self.testEchoResponse(h)
+ self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testMultipleRequestsWithBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ for n in range(3):
+ self.testEchoResponse(h, body='Hello, world!')
+ self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testPipelining(self):
+ # Tests the use of several requests issued at once.
+ s = ("GET / HTTP/1.0\r\n"
+ "Connection: %s\r\n"
+ "Content-Length: %d\r\n"
+ "\r\n"
+ "%s")
+ to_send = ''
+ count = 25
+ for n in range(count):
+ body = "Response #%d\r\n" % (n + 1)
+ if n + 1 < count:
+ conn = 'keep-alive'
+ else:
+ conn = 'close'
+ to_send += s % (conn, len(body), body)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((LOCALHOST, self.port))
+ sock.send(to_send)
+ for n in range(count):
+ expect_body = "Response #%d\r\n" % (n + 1)
+ response = ClientHTTPResponse(sock)
+ response.begin()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read(length)
+ self.failUnlessEqual(length, len(response_body))
+ self.failUnlessEqual(response_body, expect_body)
+
+ def testWithoutCRLF(self):
+ # Tests the use of just newlines rather than CR/LFs.
+ data = "Echo\nthis\r\nplease"
+ s = ("GET / HTTP/1.0\n"
+ "Connection: close\n"
+ "Content-Length: %d\n"
+ "\n"
+ "%s") % (len(data), data)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((LOCALHOST, self.port))
+ sock.send(s)
+ response = ClientHTTPResponse(sock)
+ response.begin()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read(length)
+ self.failUnlessEqual(length, len(data))
+ self.failUnlessEqual(response_body, data)
+
+ def testLargeBody(self):
+ # Tests the use of multiple requests in a single connection.
+ h = HTTPConnection(LOCALHOST, self.port)
+ s = 'This string has 32 characters.\r\n' * 32 # 1024 characters.
+ self.testEchoResponse(h, body=(s * 1024)) # 1 MB
+ self.testEchoResponse(h, {'Connection': 'close'},
+ body=(s * 100)) # 100 KB
+
+ def testManyClients(self):
+ conns = []
+ for n in range(50): # Linux kernel (2.4.8) doesn't like > 128 ?
+ #print 'open', n, clock()
+ h = HTTPConnection(LOCALHOST, self.port)
+ #h.debuglevel = 1
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.endheaders()
+ conns.append(h)
+ # If you uncomment the next line, you can raise the
+ # number of connections much higher without running
+ # into delays.
+ #sleep(0.01)
+ responses = []
+ for h in conns:
+ response = h.getresponse()
+ self.failUnlessEqual(response.status, 200)
+ responses.append(response)
+ for response in responses:
+ response.read()
+
+ def testThreading(self):
+ # Ensures the correct number of threads keep running.
+ for n in range(4):
+ td.addTask(SleepingTask())
+ # Try to confuse the task manager.
+ td.setThreadCount(2)
+ td.setThreadCount(1)
+ sleep(0.5)
+ # There should be 1 still running.
+ self.failUnlessEqual(len(td.threads), 1)
+
+ def testChunkingRequestWithoutContent(self):
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.putheader('Transfer-Encoding', 'chunked')
+ h.endheaders()
+ h.send("0\r\n\r\n")
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ response_body = response.read()
+ self.failUnlessEqual(response_body, '')
+
+ def testChunkingRequestWithContent(self):
+ control_line="20;\r\n" # 20 hex = 32 dec
+ s = 'This string has 32 characters.\r\n'
+ expect = s * 12
+
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', '/')
+ h.putheader('Accept', 'text/plain')
+ h.putheader('Transfer-Encoding', 'chunked')
+ h.endheaders()
+ for n in range(12):
+ h.send(control_line)
+ h.send(s)
+ h.send("0\r\n\r\n")
+ response = h.getresponse()
+ self.failUnlessEqual(int(response.status), 200)
+ response_body = response.read()
+ self.failUnlessEqual(response_body, expect)
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Zope3/src/zope/server/http/tests/test_publisherserver.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:56 2002
+++ Zope3/src/zope/server/http/tests/test_publisherserver.py Wed Dec 25 09:15:25 2002
@@ -0,0 +1,194 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+from asyncore import socket_map, poll
+import sys
+from threading import Thread
+
+from zope.server.taskthreads import ThreadedTaskDispatcher
+from zope.server.http.publisherhttpserver import PublisherHTTPServer
+
+from zope.component.tests.placelesssetup import PlacelessSetup
+from zope.component.adapter import provideAdapter
+
+from zope.interfaces.i18n import IUserPreferredCharsets
+
+from zope.publisher.http import IHTTPRequest
+from zope.publisher.http import HTTPCharsets
+from zope.publisher.browser import BrowserRequest
+from zope.publisher.base import DefaultPublication
+from zope.publisher.interfaces import Redirect, Retry
+from zope.publisher.http import HTTPRequest
+
+from httplib import HTTPConnection
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+
+HTTPRequest.STAGGER_RETRIES = 0 # Don't pause.
+
+
+class Conflict (Exception):
+ """
+ Pseudo ZODB conflict error.
+ """
+
+
+class PublicationWithConflict(DefaultPublication):
+
+ def handleException(self, object, request, exc_info, retry_allowed=1):
+ if exc_info[0] is Conflict and retry_allowed:
+ # This simulates a ZODB retry.
+ raise Retry(exc_info)
+ else:
+ DefaultPublication.handleException(self, object, request, exc_info,
+ retry_allowed)
+
+
+class tested_object:
+ " "
+ tries = 0
+
+ def __call__(self, REQUEST):
+ return 'URL invoked: %s' % REQUEST.URL
+
+ def redirect_method(self, REQUEST):
+ "Generates a redirect using the redirect() method."
+ REQUEST.response.redirect("http://somewhere.com/redirect")
+
+ def redirect_exception(self):
+ "Generates a redirect using an exception."
+ raise Redirect("http://somewhere.com/exception")
+
+ def conflict(self, REQUEST, wait_tries):
+ """
+ Returns 202 status only after (wait_tries) tries.
+ """
+ if self.tries >= int(wait_tries):
+ raise "Accepted"
+ else:
+ self.tries += 1
+ raise Conflict
+
+
+
+class Tests(PlacelessSetup, unittest.TestCase):
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+ provideAdapter(IHTTPRequest, IUserPreferredCharsets, HTTPCharsets)
+ obj = tested_object()
+ obj.folder = tested_object()
+ obj.folder.item = tested_object()
+
+ obj._protected = tested_object()
+
+ pub = PublicationWithConflict(obj)
+
+ def request_factory(input_stream, output_steam, env):
+ request = BrowserRequest(input_stream, output_steam, env)
+ request.setPublication(pub)
+ return request
+
+ td.setThreadCount(4)
+ # Bind to any port on localhost.
+ self.server = PublisherHTTPServer(request_factory, 'Browser',
+ LOCALHOST, 0, task_dispatcher=td)
+ self.port = self.server.socket.getsockname()[1]
+ self.run_loop = 1
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+ sleep(0.1) # Give the thread some time to start.
+
+ def tearDown(self):
+ self.run_loop = 0
+ self.thread.join()
+ td.shutdown()
+ self.server.close()
+
+ def loop(self):
+ while self.run_loop:
+ poll(0.1, socket_map)
+
+ def testResponse(self, path='/', status_expected=200,
+ add_headers=None, request_body=''):
+ h = HTTPConnection(LOCALHOST, self.port)
+ h.putrequest('GET', path)
+ h.putheader('Accept', 'text/plain')
+ if add_headers:
+ for k, v in add_headers.items():
+ h.putheader(k, v)
+ if request_body:
+ h.putheader('Content-Length', str(int(len(request_body))))
+ h.endheaders()
+ if request_body:
+ h.send(request_body)
+ response = h.getresponse()
+ length = int(response.getheader('Content-Length', '0'))
+ if length:
+ response_body = response.read(length)
+ else:
+ response_body = ''
+
+ # Please do not disable the status code check. It must work.
+ self.failUnlessEqual(int(response.status), status_expected)
+
+ self.failUnlessEqual(length, len(response_body))
+
+ if (status_expected == 200):
+ if path == '/': path = ''
+ expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
+ self.port, path)
+ self.failUnlessEqual(response_body, expect_response)
+
+ def testDeeperPath(self):
+ self.testResponse(path='/folder/item')
+
+ def testNotFound(self):
+ self.testResponse(path='/foo/bar', status_expected=404)
+
+ def testUnauthorized(self):
+ self.testResponse(path='/_protected', status_expected=401)
+
+ def testRedirectMethod(self):
+ self.testResponse(path='/redirect_method', status_expected=302)
+
+ def testRedirectException(self):
+ self.testResponse(path='/redirect_exception', status_expected=302)
+ self.testResponse(path='/folder/redirect_exception',
+ status_expected=302)
+
+ def testConflictRetry(self):
+ # Expect the "Accepted" response since the retries will succeed.
+ self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
+
+ def testFailedConflictRetry(self):
+ # Expect a "Conflict" response since there will be too many
+ # conflicts.
+ self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
+
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )