[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer.py:1.1.2.8 testPublisherServer.py:1.1.2.4
Shane Hathaway
shane@digicool.com
Mon, 7 Jan 2002 10:39:04 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv31823/tests
Modified Files:
Tag: Zope-3x-branch
testHTTPServer.py testPublisherServer.py
Log Message:
- Made next_channel_cleanup a list so that it can be more easily overridden
by subclasses.
- Changed the name "channel_type" to "channel_base_class" for clarity.
- Renamed the "tasks" parameter to "task_dispatcher".
- Separated out the default server name computation into a method.
- Removed comments that are better described in interfaces.
- Provided a way to use an alternate socket map.
=== Zope3/lib/python/Zope/Server/tests/testHTTPServer.py 1.1.2.7 => 1.1.2.8 ===
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.HTTPServer import http_task, http_channel, http_server
-from Zope.Server.dual_mode_channel import pull_trigger
from Zope.Server.Adjustments import Adjustments
from Zope.Server.ITask import ITask
@@ -23,7 +22,7 @@
from time import sleep, time
-tasks = ThreadedTaskDispatcher()
+td = ThreadedTaskDispatcher()
LOCALHOST = '127.0.0.1'
@@ -76,10 +75,11 @@
class Tests(unittest.TestCase):
def setUp(self):
- tasks.setThreadCount(4)
+ td.setThreadCount(4)
# Bind to any port on localhost.
self.orig_map_size = len(socket_map)
- self.server = EchoHTTPServer(LOCALHOST, 0, tasks=tasks, adj=my_adj)
+ self.server = EchoHTTPServer(LOCALHOST, 0, task_dispatcher=td,
+ adj=my_adj)
self.port = self.server.socket.getsockname()[1]
self.run_loop = 1
self.counter = 0
@@ -90,7 +90,7 @@
def tearDown(self):
self.run_loop = 0
self.thread.join()
- tasks.shutdown()
+ td.shutdown()
self.server.close()
# Make sure all sockets get closed by asyncore normally.
timeout = time() + 5
@@ -205,13 +205,13 @@
def testThreading(self):
# Ensures the correct number of threads keep running.
for n in range(4):
- tasks.addTask(SleepingTask())
+ td.addTask(SleepingTask())
# Try to confuse the task manager.
- tasks.setThreadCount(2)
- tasks.setThreadCount(1)
+ td.setThreadCount(2)
+ td.setThreadCount(1)
sleep(0.5)
# There should be 1 still running.
- self.failUnlessEqual(len(tasks.threads), 1)
+ self.failUnlessEqual(len(td.threads), 1)
def testChunkingRequestWithoutContent(self):
h = HTTPConnection(LOCALHOST, self.port)
=== Zope3/lib/python/Zope/Server/tests/testPublisherServer.py 1.1.2.3 => 1.1.2.4 ===
from time import sleep, time
-tasks = ThreadedTaskDispatcher()
+td = ThreadedTaskDispatcher()
LOCALHOST = '127.0.0.1'
@@ -89,10 +89,10 @@
request_payload = BrowserRequestPayload(pub)
response_payload = BrowserResponsePayload()
- tasks.setThreadCount(4)
+ td.setThreadCount(4)
# Bind to any port on localhost.
self.server = PublisherHTTPServer(request_payload, response_payload,
- LOCALHOST, 0, tasks=tasks)
+ LOCALHOST, 0, task_dispatcher=td)
self.port = self.server.socket.getsockname()[1]
self.run_loop = 1
self.thread = Thread(target=self.loop)
@@ -102,7 +102,7 @@
def tearDown(self):
self.run_loop = 0
self.thread.join()
- tasks.shutdown()
+ td.shutdown()
self.server.close()
def loop(self):