[Zodb-checkins] CVS: Packages/ZEO - testZEO.py:1.1.2.2
jeremy@digicool.com
jeremy@digicool.com
Wed, 18 Apr 2001 17:02:39 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO/tests
In directory korak:/tmp/cvs-serv9000
Modified Files:
Tag: ZEO-ZRPC-Dev
testZEO.py
Log Message:
Replace multithreaded test approach with forked child process approach
Most of the tests run successfully provided that you include some
local hacks to the ZODB.tests package that work around limitations in
the handling of ConflictErrors by ZEO ClientStorage.
--- Updated File testZEO.py in package Packages/ZEO --
--- testZEO.py 2001/04/18 15:49:56 1.1.2.1
+++ testZEO.py 2001/04/18 21:02:39 1.1.2.2
@@ -2,11 +2,13 @@
import asyncore
import os
+import sys
import tempfile
-import threading
+import time
import unittest
import ZEO.ClientStorage, ZEO.StorageServer
+from ZEO.zrpc2 import ManagedServerConnection
import ThreadedAsync, ZEO.trigger
import zLOG
@@ -17,24 +19,20 @@
from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage
-class MainloopThread(threading.Thread):
- __super_init = threading.Thread.__init__
-
- def __init__(self, *args, **kw):
- self.ready_lock = threading.Lock()
- self.ready_lock.acquire()
- self.__super_init(*args, **kw)
-
- def run(self):
- ThreadedAsync.register_loop_callback(self.callback)
- asyncore.loop(timeout=1.0)
-
- def callback(self, map):
- self.ready_lock.release()
-
+class TestStorageServer(ZEO.StorageServer.StorageServer):
+ def newConnection(self, sock, addr, nil):
+ c = ManagedServerConnection(sock, addr, None, self)
+ c.register_object(TestStorageProxy(self, c))
+ zLOG.LOG("TSS", zLOG.INFO, "connected: %s" % c)
+
+class TestStorageProxy(ZEO.StorageServer.StorageProxy):
+ def shutdown(self):
+ self._StorageProxy__storage.close()
+ os._exit(0)
+
class GenericTests(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
-## VersionStorage.VersionStorage
+ VersionStorage.VersionStorage
):
"""An abstract base class for ZEO tests
@@ -42,11 +40,6 @@
StorageServer provides access to. The GenericTests must be
subclassed to provide an implementation of getStorage() that
returns a specific storage, e.g. FileStorage.
-
- The storage server mainloop is run in a separate thread so that
- it does not interfere with the thread running the tests. The
- tearDown() method clears asyncore's socket_map dict, which causes
- asyncore.loop() to return and the thread to exit.
"""
__super_setUp = StorageTestBase.StorageTestBase.setUp
@@ -59,40 +52,33 @@
getStorage() method.
"""
self.running = 1
- self.__trigger = ZEO.trigger.trigger()
- self.__map = asyncore.socket_map
- ThreadedAsync.register_loop_callback(self.start_loop)
- self.__storage = self.getStorage()
self.__sock = tempfile.mktemp()
- self.__server = ZEO.StorageServer.StorageServer(self.__sock,
- {'1': self.__storage})
- t = MainloopThread()
- t.start()
- # wait for mainloop to start
- t.ready_lock.acquire()
+ self.startServer()
self._storage = ZEO.ClientStorage.ClientStorage(self.__sock)
+ while not self._storage.is_connected():
+ time.sleep(0.1)
self.__super_setUp()
def tearDown(self):
"""Try to cause the tests to halt"""
zLOG.LOG("test", zLOG.INFO, "tearDown()")
self.running = 0
- self.stop_loop()
- self.__storage.close()
+ self._storage._server.rpc.callAsync('shutdown')
+ self._storage._rpc_mgr.close()
+ os.waitpid(self.__pid, 0)
os.unlink(self.__sock)
self.__super_tearDown()
-
- def start_loop(self, map):
-## zLOG.LOG("test", zLOG.INFO, "start_loop(%s)" % repr(map))
- self.__map = map
- if not self.running:
- self.stop_loop()
-
- def stop_loop(self):
-## zLOG.LOG("test", zLOG.INFO, "map = %s" % id(self.__map))
- self.__map.clear()
- self.__trigger.pull_trigger()
+ def startServer(self):
+ self.__pid = os.fork()
+ if self.__pid == 0:
+ self.__storage = self.getStorage()
+ d = {'1': self.__storage}
+ self.__server = TestStorageServer(self.__sock, d)
+ asyncore.loop()
+ else:
+ zLOG.LOG("test", zLOG.INFO, "forked %s" % self.__pid)
+
def checkFirst(self):
self._storage.tpc_begin(self._transaction)
self._storage.tpc_abort(self._transaction)