[Zodb-checkins] CVS: StandaloneZODB/ZEO/tests - CommitLockTests.py:1.1.2.1 testZEO.py:1.16.4.4.2.5
Jeremy Hylton
jeremy@zope.com
Mon, 6 May 2002 17:08:22 -0400
Update of /cvs-repository/StandaloneZODB/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv20463/tests
Modified Files:
Tag: ZEO2-branch
testZEO.py
Added Files:
Tag: ZEO2-branch
CommitLockTests.py
Log Message:
Factor out the commit lock tests into a separate module.
=== Added File StandaloneZODB/ZEO/tests/CommitLockTests.py ===
"""Tests of the distributed commit lock."""
from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
ZERO = '\0'*8
class DummyDB:
def invalidate(self, *args):
pass
class CommitLockTests:
def checkCommitLockOnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish")
finally:
self._cleanup()
def checkCommitLockOnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort")
finally:
self._cleanup()
def _cleanup(self):
for store, trans in self._storages:
store.tpc_abort(trans)
store.close()
self._storages = []
def _checkCommitLock(self, method_name):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
# Start on transaction normally.
t = Transaction()
self._storage.tpc_begin(t)
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
for i in range(3):
storage2 = self._duplicate_client()
t2 = Transaction()
# ???
tid = `ZEO.ClientStorage.get_timestamp()`
storage2.tpc_begin(t2, tid)
if i == 0:
storage2.close()
else:
print "storage", i, "opened"
self._storages.append((storage2, t2))
print "finishing original commit"
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
self._storage.tpc_vote(t)
if method_name == "tpc_finish":
self._storage.tpc_finish(t)
self._storage.load(oid, '')
else:
self._storage.tpc_abort(t)
print "done"
self._dowork()
# Make sure the server is still responsive
self._dostore()
def _dowork(self):
for store, trans in self._storages:
oid = store.new_oid()
store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
store.tpc_vote(trans)
store.tpc_abort(trans)
def _duplicate_client(self):
"Open another ClientStorage to the same server."
# XXX argh it's hard to find the actual address
# The rpc mgr addr attribute is a list. Each element in the
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address.
addr = self._storage._rpc_mgr.addr[0][1]
new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB(), None)
return new
def _get_timestamp(self):
t = time.time()
t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
return `t`
=== StandaloneZODB/ZEO/tests/testZEO.py 1.16.4.4.2.4 => 1.16.4.4.2.5 ===
import sys
import tempfile
+import thread
import time
import types
import unittest
@@ -28,10 +29,10 @@
import ThreadedAsync, ZEO.trigger
from ZODB.FileStorage import FileStorage
from ZODB.Transaction import Transaction
-import thread
+from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import zLOG
-from ZEO.tests import forker, Cache
+from ZEO.tests import forker, Cache, CommitLockTests
from ZEO.smac import Disconnected
# Sorry Jim...
@@ -42,8 +43,6 @@
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
-ZERO = '\0'*8
-
class DummyDB:
def invalidate(self, *args):
pass
@@ -72,6 +71,7 @@
Synchronization.SynchronizedStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage,
+ CommitLockTests.CommitLockTests,
):
"""An abstract base class for ZEO tests
@@ -102,75 +102,6 @@
def checkLargeUpdate(self):
obj = MinPO("X" * (10 * 128 * 1024))
self._dostore(data=obj)
-
- def checkCommitLockOnCommit(self):
- try:
- self._checkCommitLock("tpc_finish")
- finally:
- self._cleanup()
-
- def checkCommitLockOnAbort(self):
- try:
- self._checkCommitLock("tpc_abort")
- finally:
- self._cleanup()
-
- def _cleanup(self):
- for store, trans in self._storages:
- store.tpc_abort(trans)
- store.close()
- self._storages = []
-
- def _checkCommitLock(self, method_name):
- # check the commit lock when a client attemps a transaction,
- # but fails/exits before finishing the commit.
-
- # Start on transaction normally.
- t = Transaction()
- self._storage.tpc_begin(t)
-
- # Start a second transaction on a different connection without
- # blocking the test thread.
- self._storages = []
- for i in range(3):
- storage2 = self._duplicate_client()
- t2 = Transaction()
- tid = `ZEO.ClientStorage.get_timestamp()`
- try:
- storage2._server.tpc_begin(tid, t2.user, t2.description,
- t2._extension, None, ' ')
- except Disconnected:
- self.fail("client %d disconnected!" % i)
- if i == 0:
- storage2.close()
- else:
- self._storages.append((storage2, t2))
-
- oid = self._storage.new_oid()
- self._storage.store(oid, None, '', '', t)
- self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
-
- self._cleanup()
-
- # Make sure the server is still responsive
- self._dostore()
-
- def _duplicate_client(self):
- "Open another ClientStorage to the same server."
- # XXX argh it's hard to find the actual address
- # The rpc mgr addr attribute is a list. Each element in the
- # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
- # address.
- addr = self._storage._rpc_mgr.addr[0][1]
- new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
- new.registerDB(DummyDB(), None)
- return new
-
- def _get_timestamp(self):
- t = time.time()
- t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
- return `t`
class ZEOFileStorageTests(GenericTests):
__super_setUp = GenericTests.setUp