[Zodb-checkins] CVS: Zope/lib/python/ZEO/tests - deadlock.py:1.1.6.1 CommitLockTests.py:1.9.8.2 ConnectionTests.py:1.5.2.2
Chris McDonough
chrism@zope.com
Sun, 24 Nov 2002 18:55:57 -0500
Update of /cvs-repository/Zope/lib/python/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv13982/tests
Modified Files:
Tag: chrism-install-branch
CommitLockTests.py ConnectionTests.py
Added Files:
Tag: chrism-install-branch
deadlock.py
Log Message:
Merge with HEAD.
=== Added File Zope/lib/python/ZEO/tests/deadlock.py ===
import ZODB
from ZODB.POSException import ConflictError
from ZEO.ClientStorage import ClientStorage, ClientDisconnected
from ZEO.zrpc.error import DisconnectedError
import os
import random
import time
L = range(1, 100)
def main():
z1 = ClientStorage(('localhost', 2001), wait=1)
z2 = ClientStorage(('localhost', 2002), wait=2)
db1 = ZODB.DB(z1)
db2 = ZODB.DB(z2)
c1 = db1.open()
c2 = db2.open()
r1 = c1.root()
r2 = c2.root()
while 1:
try:
try:
update(r1, r2)
except ConflictError, msg:
print msg
get_transaction().abort()
c1.sync()
c2.sync()
except (ClientDisconnected, DisconnectedError), err:
print "disconnected", err
time.sleep(2)
def update(r1, r2):
k1 = random.choice(L)
k2 = random.choice(L)
updates = [(k1, r1),
(k2, r2)]
random.shuffle(updates)
for key, root in updates:
root[key] = time.time()
get_transaction().commit()
print os.getpid(), k1, k2
if __name__ == "__main__":
main()
=== Zope/lib/python/ZEO/tests/CommitLockTests.py 1.9.8.1 => 1.9.8.2 ===
--- Zope/lib/python/ZEO/tests/CommitLockTests.py:1.9.8.1 Tue Oct 8 20:41:42 2002
+++ Zope/lib/python/ZEO/tests/CommitLockTests.py Sun Nov 24 18:55:26 2002
@@ -124,9 +124,12 @@
# started, but before it finishes. The dowork() function
# executes after the first transaction has completed.
- # Start on transaction normally.
+ # Start on transaction normally and get the lock.
t = Transaction()
self._storage.tpc_begin(t)
+ oid = self._storage.new_oid()
+ self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
+ self._storage.tpc_vote(t)
# Start a second transaction on a different connection without
# blocking the test thread.
@@ -141,9 +144,6 @@
else:
self._storages.append((storage2, t2))
- oid = self._storage.new_oid()
- self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
- self._storage.tpc_vote(t)
if method_name == "tpc_finish":
self._storage.tpc_finish(t)
self._storage.load(oid, '')
=== Zope/lib/python/ZEO/tests/ConnectionTests.py 1.5.2.1 => 1.5.2.2 ===
--- Zope/lib/python/ZEO/tests/ConnectionTests.py:1.5.2.1 Tue Oct 8 20:41:42 2002
+++ Zope/lib/python/ZEO/tests/ConnectionTests.py Sun Nov 24 18:55:26 2002
@@ -18,7 +18,7 @@
import socket
import sys
import tempfile
-import thread
+import threading
import time
import zLOG
@@ -27,30 +27,38 @@
from ZEO.Exceptions import Disconnected
from ZEO.zrpc.marshal import Marshaller
-from ZODB.Transaction import get_transaction
+from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError
-from ZODB.tests import StorageTestBase
-from ZODB.tests.StorageTestBase import zodb_unpickle, MinPO
+from ZODB.tests.StorageTestBase import StorageTestBase
+from ZODB.tests.MinPO import MinPO
+from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
+from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
class DummyDB:
def invalidate(self, *args):
pass
-class ConnectionTests(StorageTestBase.StorageTestBase):
+class ConnectionTests(StorageTestBase):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
start and stop a ZEO storage server.
+
+ This must be subclassed; the subclass must provide implementations
+ of startServer() and shutdownServer().
"""
- __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+ __super_setUp = StorageTestBase.setUp
+ __super_tearDown = StorageTestBase.tearDown
def setUp(self):
- """Start a ZEO server using a Unix domain socket
+ """Test setup for connection tests.
- The ZEO server uses the storage object returned by the
- getStorage() method.
+ This starts only one server; a test may start more servers by
+ calling self._newAddr() and then self.startServer(index=i)
+ for i in 1, 2, ...
"""
+ self.__super_setUp()
zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
self.file = tempfile.mktemp()
self.addr = []
@@ -70,15 +78,17 @@
return 'localhost', random.randrange(25000, 30000, 2)
def openClientStorage(self, cache='', cache_size=200000, wait=1,
- read_only=0, read_only_fallback=0):
- base = ClientStorage(self.addr,
- client=cache,
- cache_size=cache_size,
- wait=wait,
- min_disconnect_poll=0.1,
- read_only=read_only,
- read_only_fallback=read_only_fallback)
- storage = base
+ read_only=0, read_only_fallback=0,
+ addr=None):
+ if addr is None:
+ addr = self.addr
+ storage = ClientStorage(addr,
+ client=cache,
+ cache_size=cache_size,
+ wait=wait,
+ min_disconnect_poll=0.1,
+ read_only=read_only,
+ read_only_fallback=read_only_fallback)
storage.registerDB(DummyDB(), None)
return storage
@@ -376,7 +386,7 @@
self._dostore()
break
except (Disconnected, ReadOnlyError,
- select.error, thread.error, socket.error):
+ select.error, threading.ThreadError, socket.error):
time.sleep(0.1)
else:
self.fail("Couldn't store after starting a read-write server")
@@ -453,7 +463,8 @@
try:
self._dostore(oid, data=obj)
break
- except (Disconnected, select.error, thread.error, socket.error):
+ except (Disconnected, select.error,
+ threading.ThreadError, socket.error):
zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.",
error=sys.exc_info())
@@ -503,3 +514,113 @@
self._storage = self.openClientStorage()
self._dostore()
+
+ # Test case for multiple storages participating in a single
+ # transaction. This is not really a connection test, but it needs
+ # about the same infrastructure (several storage servers).
+
+ # XXX WARNING: with the current ZEO code, this occasionally fails.
+ # That's the point of this test. :-)
+
+ def NOcheckMultiStorageTransaction(self):
+ # Configuration parameters (larger values mean more likely deadlocks)
+ N = 2
+ # These don't *have* to be all the same, but it's convenient this way
+ self.nservers = N
+ self.nthreads = N
+ self.ntrans = N
+ self.nobj = N
+
+ # Start extra servers
+ for i in range(1, self.nservers):
+ self._newAddr()
+ self.startServer(index=i)
+
+ # Spawn threads that each do some transactions on all storages
+ threads = []
+ try:
+ for i in range(self.nthreads):
+ t = MSTThread(self, "T%d" % i)
+ threads.append(t)
+ t.start()
+ # Wait for all threads to finish
+ for t in threads:
+ t.join(60)
+ self.failIf(t.isAlive(), "%s didn't die" % t.getName())
+ finally:
+ for t in threads:
+ t.closeclients()
+
+class MSTThread(threading.Thread):
+
+ __super_init = threading.Thread.__init__
+
+ def __init__(self, testcase, name):
+ self.__super_init(name=name)
+ self.testcase = testcase
+ self.clients = []
+
+ def run(self):
+ tname = self.getName()
+ testcase = self.testcase
+
+ # Create client connections to each server
+ clients = self.clients
+ for i in range(len(testcase.addr)):
+ c = testcase.openClientStorage(addr=testcase.addr[i])
+ c.__name = "C%d" % i
+ clients.append(c)
+
+ for i in range(testcase.ntrans):
+ # Because we want a transaction spanning all storages,
+ # we can't use _dostore(). This is several _dostore() calls
+ # expanded in-line (mostly).
+
+ # Create oid->serial mappings
+ for c in clients:
+ c.__oids = []
+ c.__serials = {}
+
+ # Begin a transaction
+ t = Transaction()
+ for c in clients:
+ #print "%s.%s.%s begin\n" % (tname, c.__name, i),
+ c.tpc_begin(t)
+
+ for j in range(testcase.nobj):
+ for c in clients:
+ # Create and store a new object on each server
+ oid = c.new_oid()
+ c.__oids.append(oid)
+ data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
+ #print data.value
+ data = zodb_pickle(data)
+ s = c.store(oid, ZERO, data, '', t)
+ c.__serials.update(handle_all_serials(oid, s))
+
+ # Vote on all servers and handle serials
+ for c in clients:
+ #print "%s.%s.%s vote\n" % (tname, c.__name, i),
+ s = c.tpc_vote(t)
+ c.__serials.update(handle_all_serials(None, s))
+
+ # Finish on all servers
+ for c in clients:
+ #print "%s.%s.%s finish\n" % (tname, c.__name, i),
+ c.tpc_finish(t)
+
+ for c in clients:
+ # Check that we got serials for all oids
+ for oid in c.__oids:
+ testcase.failUnless(c.__serials.has_key(oid))
+ # Check that we got serials for no other oids
+ for oid in c.__serials.keys():
+ testcase.failUnless(oid in c.__oids)
+
+ def closeclients(self):
+ # Close clients opened by run()
+ for c in self.clients:
+ try:
+ c.close()
+ except:
+ pass