[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer.py:1.1.2.7 testPublisherServer.py:1.1.2.2
Shane Hathaway
shane@digicool.com
Fri, 30 Nov 2001 10:26:57 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv29399/lib/python/Zope/Server/tests
Modified Files:
Tag: Zope-3x-branch
testHTTPServer.py testPublisherServer.py
Log Message:
- Added tests of HTTP pipelining and fixed bugs.
- Added tests of publisher retry mechanisms and fixed bugs.
- Rearranged http_task for clarity.
- Moved knowledge of PATH_INFO into BaseRequest only.
- Removed commented code.
- Corrected publisher Retry handling.
All this while riding the VRE. Gotta love trains! ;-)
=== Zope3/lib/python/Zope/Server/tests/testHTTPServer.py 1.1.2.6 => 1.1.2.7 ===
import unittest
from asyncore import socket_map, poll
+import socket
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
@@ -18,6 +19,7 @@
from Zope.Server.ITask import ITask
from httplib import HTTPConnection
+from httplib import HTTPResponse as ClientHTTPResponse
from time import sleep, time
@@ -139,6 +141,36 @@
for n in range(3):
self.testEchoResponse(h, body='Hello, world!')
self.testEchoResponse(h, {'Connection': 'close'})
+
+ def testPipelining(self):
+ # Tests the use of several requests issued at once.
+ s = ("GET / HTTP/1.0\r\n"
+ "Connection: %s\r\n"
+ "Content-Length: %d\r\n"
+ "\r\n"
+ "%s")
+ to_send = ''
+ count = 25
+ for n in range(count):
+ body = "Response #%d\r\n" % (n + 1)
+ if n + 1 < count:
+ conn = 'keep-alive'
+ else:
+ conn = 'close'
+ to_send += s % (conn, len(body), body)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((LOCALHOST, self.port))
+ sock.send(to_send)
+ for n in range(count):
+ expect_body = "Response #%d\r\n" % (n + 1)
+ response = ClientHTTPResponse(sock)
+ response.begin()
+ self.failUnlessEqual(int(response.status), 200)
+ length = int(response.getheader('Content-Length', '0'))
+ response_body = response.read(length)
+ self.failUnlessEqual(length, len(response_body))
+ self.failUnlessEqual(response_body, expect_body)
def testLargeBody(self):
# Tests the use of multiple requests in a single connection.
=== Zope3/lib/python/Zope/Server/tests/testPublisherServer.py 1.1.2.1 => 1.1.2.2 ===
import unittest
from asyncore import socket_map, poll
+import sys
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
@@ -16,6 +17,8 @@
from Zope.Publisher.HTTP.BrowserPayload import BrowserRequestPayload
from Zope.Publisher.HTTP.BrowserPayload import BrowserResponsePayload
from Zope.Publisher.DefaultPublication import DefaultPublication
+from Zope.Publisher.Exceptions import Redirect, Retry
+from Zope.Publisher.HTTP import HTTPRequest
from httplib import HTTPConnection
@@ -25,21 +28,64 @@
LOCALHOST = '127.0.0.1'
+HTTPRequest.STAGGER_RETRIES = 0 # Don't pause.
+
+
+class Conflict (Exception):
+ """
+ Pseudo ZODB conflict error.
+ """
+
+
+class PublicationWithConflict(DefaultPublication):
+
+ def handleException(self, request, exc_info, retry_allowed=1):
+ if exc_info[0] is Conflict and retry_allowed:
+ # This simulates a ZODB retry.
+ raise Retry(exc_info)
+ else:
+ DefaultPublication.handleException(self, request, exc_info,
+ retry_allowed)
+
+
+class tested_object:
+ " "
+ tries = 0
+
+ def __call__(self, URL):
+ return 'URL invoked: %s' % URL
+
+ def redirect_method(self, RESPONSE):
+ "Generates a redirect using the redirect() method."
+ RESPONSE.redirect("http://somewhere.com/redirect")
+
+ def redirect_exception(self):
+ "Generates a redirect using an exception."
+ raise Redirect("http://somewhere.com/exception")
+
+ def conflict(self, URL, wait_tries):
+ """
+ Returns 202 status only after (wait_tries) tries.
+ """
+ if self.tries >= int(wait_tries):
+ raise "Accepted"
+ else:
+ self.tries += 1
+ raise Conflict
+
+
+
class Tests(unittest.TestCase):
+
def setUp(self):
- class c:
- " "
- def __call__(self, URL):
- return 'URL invoked: %s' % URL
-
- obj = c()
- obj.folder = c()
- obj.folder.item = c()
+ obj = tested_object()
+ obj.folder = tested_object()
+ obj.folder.item = tested_object()
- obj._protected = c()
+ obj._protected = tested_object()
- pub = DefaultPublication(obj)
+ pub = PublicationWithConflict(obj)
request_payload = BrowserRequestPayload(pub)
response_payload = BrowserResponsePayload()
@@ -76,9 +122,12 @@
if request_body:
h.send(request_body)
response = h.getresponse()
- self.failUnlessEqual(int(response.status), status_expected)
length = int(response.getheader('Content-Length', '0'))
- response_body = response.read()
+ if length:
+ response_body = response.read(length)
+ else:
+ response_body = ''
+ self.failUnlessEqual(int(response.status), status_expected)
self.failUnlessEqual(length, len(response_body))
if (status_expected == 200):
@@ -95,6 +144,25 @@
def testUnauthorized(self):
self.testResponse(path='/_protected', status_expected=401)
+
+ def testRedirectMethod(self):
+ self.testResponse(path='/redirect_method', status_expected=302)
+
+ def testRedirectException(self):
+ self.testResponse(path='/redirect_exception', status_expected=302)
+ self.testResponse(path='/folder/redirect_exception',
+ status_expected=302)
+
+ def testConflictRetry(self):
+ # Expect the "Accepted" response since the retries will succeed.
+ self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
+
+ def testFailedConflictRetry(self):
+ # Expect a "Conflict" response since there will be too many
+ # conflicts.
+ self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
+
+
def test_suite():
loader = unittest.TestLoader()