[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testPublisherServer.py:1.1.2.1 testHTTPServer.py:1.1.2.6
Martijn Pieters
mj@zope.com
Wed, 28 Nov 2001 18:18:43 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv7747/lib/python/Zope/Server/tests
Modified Files:
Tag: Zope-3x-branch
testHTTPServer.py
Added Files:
Tag: Zope-3x-branch
testPublisherServer.py
Log Message:
- Clear out sys.setcheckinterval
- Clear out testing code from PublisherServers
- Provide unit tests for PublisherServers
=== Added File Zope3/lib/python/Zope/Server/tests/testPublisherServer.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
import unittest
from asyncore import socket_map, poll
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.PublisherServers import PublisherHTTPServer
from Zope.Publisher.HTTP.BrowserPayload import BrowserRequestPayload
from Zope.Publisher.HTTP.BrowserPayload import BrowserResponsePayload
from Zope.Publisher.DefaultPublication import DefaultPublication
from httplib import HTTPConnection
from time import sleep, time
tasks = ThreadedTaskDispatcher()
LOCALHOST = '127.0.0.1'
class Tests(unittest.TestCase):
def setUp(self):
class c:
" "
def __call__(self, URL):
return 'URL invoked: %s' % URL
obj = c()
obj.folder = c()
obj.folder.item = c()
obj._protected = c()
pub = DefaultPublication(obj)
request_payload = BrowserRequestPayload(pub)
response_payload = BrowserResponsePayload()
tasks.setThreadCount(4)
# Bind to any port on localhost.
self.server = PublisherHTTPServer(request_payload, response_payload,
LOCALHOST, 0, tasks=tasks)
self.port = self.server.socket.getsockname()[1]
self.run_loop = 1
self.thread = Thread(target=self.loop)
self.thread.start()
sleep(0.1) # Give the thread some time to start.
def tearDown(self):
self.run_loop = 0
self.thread.join()
tasks.shutdown()
self.server.close()
def loop(self):
while self.run_loop:
poll(0.1, socket_map)
def testResponse(self, path='/', status_expected=200, add_headers=None, request_body=''):
h = HTTPConnection(LOCALHOST, self.port)
h.putrequest('GET', path)
h.putheader('Accept', 'text/plain')
if add_headers:
for k, v in add_headers.items():
h.putheader(k, v)
if request_body:
h.putheader('Content-Length', str(int(len(request_body))))
h.endheaders()
if request_body:
h.send(request_body)
response = h.getresponse()
self.failUnlessEqual(int(response.status), status_expected)
length = int(response.getheader('Content-Length', '0'))
response_body = response.read()
self.failUnlessEqual(length, len(response_body))
if (status_expected == 200):
if path == '/': path = ''
expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
self.port, path)
self.failUnlessEqual(response_body, expect_response)
def testDeeperPath(self):
self.testResponse(path='/folder/item')
def testNotFound(self):
self.testResponse(path='/foo/bar', status_expected=404)
def testUnauthorized(self):
self.testResponse(path='/_protected', status_expected=401)
def test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(Tests)
if __name__=='__main__':
unittest.TextTestRunner().run( test_suite() )
=== Zope3/lib/python/Zope/Server/tests/testHTTPServer.py 1.1.2.5 => 1.1.2.6 ===
from time import sleep, time
-import sys
-sys.setcheckinterval(120)
-
tasks = ThreadedTaskDispatcher()
LOCALHOST = '127.0.0.1'