[Zope3-checkins] CVS: Zope3/lib/python/Zope/Server/FTP/tests - testFTPServer.py:1.7
Jeremy Hylton
jeremy@zope.com
Fri, 20 Dec 2002 18:50:09 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP/tests
In directory cvs.zope.org:/tmp/cvs-serv13078/FTP/tests
Modified Files:
testFTPServer.py
Log Message:
Add a framework for converting asyncore errors to unittest errors.
By default, asyncore handles uncaught exceptions in dispatchers by
printing a message to the console. If a test causes such uncaught
exceptions, the test is marked as a failure, because asyncore handles
the exception. This framework causes the unit test to fail. If code
being tested expects the errors to occur, it can add code to prevent
the error from propagating all the way back to asyncore.
=== Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py 1.6 => 1.7 ===
--- Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py:1.6 Fri Dec 20 15:42:15 2002
+++ Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py Fri Dec 20 18:50:08 2002
@@ -16,16 +16,18 @@
$Id$
"""
+import asyncore
import unittest
import tempfile
import os
-from asyncore import socket_map, poll
import socket
import shutil
+import sys
+import traceback
from types import StringType
from StringIO import StringIO
-from threading import Thread
+from threading import Thread, Event
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.FTP.FTPServer import FTPServer
from Zope.Server.FTP.FTPStatusMessages import status_msgs
@@ -35,6 +37,8 @@
from Zope.Server.VFS.OSFileSystem import OSFileSystem
from Zope.Server.FTP.TestFilesystemAccess import TestFilesystemAccess
+from Zope.Server.tests.asyncerror import AsyncoreErrorHook
+
import ftplib
from time import sleep, time
@@ -55,11 +59,13 @@
return ''.join(res)
-class Tests(unittest.TestCase):
+class Tests(unittest.TestCase, AsyncoreErrorHook):
def setUp(self):
td.setThreadCount(1)
- self.orig_map_size = len(socket_map)
+ print "setUp", asyncore.socket_map
+ self.orig_map_size = len(asyncore.socket_map)
+ self.hook_asyncore_error()
self.root_dir = tempfile.mktemp()
os.mkdir(self.root_dir)
@@ -76,10 +82,11 @@
self.port = CONNECT_TO_PORT
self.run_loop = 1
self.counter = 0
+ self.thread_started = Event()
self.thread = Thread(target=self.loop)
self.thread.start()
- sleep(0.1) # Give the thread some time to start.
-
+ self.thread_started.wait(10.0)
+ self.assert_(self.thread_started.isSet())
def tearDown(self):
self.run_loop = 0
@@ -89,25 +96,28 @@
# Make sure all sockets get closed by asyncore normally.
timeout = time() + 2
while 1:
- if len(socket_map) == self.orig_map_size:
+ if len(asyncore.socket_map) == self.orig_map_size:
# Clean!
break
if time() >= timeout:
- self.fail('Leaked a socket: %s' % `socket_map`)
+ self.fail('Leaked a socket: %s' % `asyncore.socket_map`)
break
- poll(0.1, socket_map)
+ asyncore.poll(0.1)
+
+ self.unhook_asyncore_error()
if os.path.exists(self.root_dir):
shutil.rmtree(self.root_dir)
def loop(self):
+ self.thread_started.set()
import select
from errno import EBADF
while self.run_loop:
self.counter = self.counter + 1
# print 'loop', self.counter
try:
- poll(0.1, socket_map)
+ asyncore.poll(0.1)
except select.error, data:
if data[0] == EBADF:
print "exception polling in loop(): ", data