[Zodb-checkins] CVS: ZODB3/ZEO/tests - test_cache.py:1.2
testZEO.py:1.77 testAuth.py:1.6 InvalidationTests.py:1.7
ConnectionTests.py:1.48 CommitLockTests.py:1.17 Cache.py:1.10
Jeremy Hylton
jeremy at zope.com
Wed Dec 24 11:02:42 EST 2003
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv27465/ZEO/tests
Modified Files:
testZEO.py testAuth.py InvalidationTests.py ConnectionTests.py
CommitLockTests.py Cache.py
Added Files:
test_cache.py
Log Message:
Merge MVCC branch to the HEAD.
=== ZODB3/ZEO/tests/test_cache.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 24 11:02:42 2003
+++ ZODB3/ZEO/tests/test_cache.py Wed Dec 24 11:02:00 2003
@@ -0,0 +1,155 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Basic unit tests for a multi-version client cache."""
+
+import os
+import tempfile
+import unittest
+
+import ZEO.cache
+from ZODB.utils import p64
+
+n1 = p64(1)
+n2 = p64(2)
+n3 = p64(3)
+n4 = p64(4)
+n5 = p64(5)
+
+class CacheTests(unittest.TestCase):
+
+ def setUp(self):
+ self.cache = ZEO.cache.ClientCache()
+ self.cache.open()
+
+ def tearDown(self):
+ if self.cache.path:
+ os.remove(self.cache.path)
+
+ def testLastTid(self):
+ self.assertEqual(self.cache.getLastTid(), None)
+ self.cache.setLastTid(n2)
+ self.assertEqual(self.cache.getLastTid(), n2)
+ self.cache.invalidate(None, "", n1)
+ self.assertEqual(self.cache.getLastTid(), n2)
+ self.cache.invalidate(None, "", n3)
+ self.assertEqual(self.cache.getLastTid(), n3)
+ self.assertRaises(ValueError, self.cache.setLastTid, n2)
+
+ def testLoad(self):
+ data1 = "data for n1"
+ self.assertEqual(self.cache.load(n1, ""), None)
+ self.assertEqual(self.cache.load(n1, "version"), None)
+ self.cache.store(n1, "", n3, None, data1)
+ self.assertEqual(self.cache.load(n1, ""), (data1, n3, ""))
+ # The cache doesn't know whether version exists, because it
+ # only has non-version data.
+ self.assertEqual(self.cache.load(n1, "version"), None)
+ self.assertEqual(self.cache.modifiedInVersion(n1), None)
+
+ def testInvalidate(self):
+ data1 = "data for n1"
+ self.cache.store(n1, "", n3, None, data1)
+ self.cache.invalidate(n1, "", n4)
+ self.cache.invalidate(n2, "", n2)
+ self.assertEqual(self.cache.load(n1, ""), None)
+ self.assertEqual(self.cache.loadBefore(n1, n4),
+ (data1, n3, n4))
+
+ def testVersion(self):
+ data1 = "data for n1"
+ data1v = "data for n1 in version"
+ self.cache.store(n1, "version", n3, None, data1v)
+ self.assertEqual(self.cache.load(n1, ""), None)
+ self.assertEqual(self.cache.load(n1, "version"),
+ (data1v, n3, "version"))
+ self.assertEqual(self.cache.load(n1, "random"), None)
+ self.assertEqual(self.cache.modifiedInVersion(n1), "version")
+ self.cache.invalidate(n1, "version", n4)
+ self.assertEqual(self.cache.load(n1, "version"), None)
+
+ def testNonCurrent(self):
+ data1 = "data for n1"
+ data2 = "data for n2"
+ self.cache.store(n1, "", n4, None, data1)
+ self.cache.store(n1, "", n2, n3, data2)
+ # can't say anything about state before n2
+ self.assertEqual(self.cache.loadBefore(n1, n2), None)
+ # n3 is the upper bound of non-current record n2
+ self.assertEqual(self.cache.loadBefore(n1, n3), (data2, n2, n3))
+ # no data for between n2 and n3
+ self.assertEqual(self.cache.loadBefore(n1, n4), None)
+ self.cache.invalidate(n1, "", n5)
+ self.assertEqual(self.cache.loadBefore(n1, n5), (data1, n4, n5))
+ self.assertEqual(self.cache.loadBefore(n2, n4), None)
+
+ def testException(self):
+ self.assertRaises(ValueError,
+ self.cache.store,
+ n1, "version", n2, n3, "data")
+ self.cache.store(n1, "", n2, None, "data")
+ self.assertRaises(ValueError,
+ self.cache.store,
+ n1, "", n3, None, "data")
+
+ def testEviction(self):
+ # Manually override the current maxsize
+ maxsize = self.cache.size = self.cache.fc.maxsize = 3395 # 1245
+ self.cache.fc = ZEO.cache.FileCache(3395, None, self.cache)
+
+ # Trivial test of eviction code. Doesn't test non-current
+ # eviction.
+ data = ["z" * i for i in range(100)]
+ for i in range(50):
+ n = p64(i)
+ self.cache.store(n, "", n, None, data[i])
+ self.assertEquals(len(self.cache), i + 1)
+ self.assert_(self.cache.fc.currentsize < maxsize)
+ # The cache now uses 1225 bytes. The next insert
+ # should delete some objects.
+ n = p64(50)
+ self.cache.store(n, "", n, None, data[51])
+ self.assert_(len(self.cache) < 51)
+ self.assert_(self.cache.fc.currentsize <= maxsize)
+
+ # XXX Need to make sure eviction of non-current data
+ # and of version data are handled correctly.
+
+ def testSerialization(self):
+ self.cache.store(n1, "", n2, None, "data for n1")
+ self.cache.store(n2, "version", n2, None, "version data for n2")
+ self.cache.store(n3, "", n3, n4, "non-current data for n3")
+ self.cache.store(n3, "", n4, n5, "more non-current data for n3")
+
+ path = tempfile.mktemp()
+ # Copy data from self.cache into path, reaching into the cache
+ # guts to make the copy.
+ dst = open(path, "wb+")
+ src = self.cache.fc.f
+ src.seek(0)
+ dst.write(src.read(self.cache.fc.maxsize))
+ dst.close()
+ copy = ZEO.cache.ClientCache(path)
+ copy.open()
+
+ # Verify that internals of both objects are the same.
+ # Could also test that external API produces the same results.
+ eq = self.assertEqual
+ eq(copy.tid, self.cache.tid)
+ eq(len(copy), len(self.cache))
+ eq(copy.version, self.cache.version)
+ eq(copy.current, self.cache.current)
+ eq(copy.noncurrent, self.cache.noncurrent)
+
+def test_suite():
+ return unittest.makeSuite(CacheTests)
=== ZODB3/ZEO/tests/testZEO.py 1.76 => 1.77 ===
--- ZODB3/ZEO/tests/testZEO.py:1.76 Fri Nov 28 11:44:48 2003
+++ ZODB3/ZEO/tests/testZEO.py Wed Dec 24 11:02:00 2003
@@ -101,20 +101,12 @@
StorageTestBase.StorageTestBase,
# ZODB test mixin classes (in the same order as imported)
BasicStorage.BasicStorage,
- VersionStorage.VersionStorage,
- TransactionalUndoStorage.TransactionalUndoStorage,
- TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
- ConflictResolution.ConflictResolvingStorage,
- ConflictResolution.ConflictResolvingTransUndoStorage,
- RevisionStorage.RevisionStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage,
# ZEO test mixin classes (in the same order as imported)
- Cache.StorageWithCache,
- Cache.TransUndoStorageWithCache,
- CommitLockTests.CommitLockTests,
+ CommitLockTests.CommitLockVoteTests,
ThreadTests.ThreadTests,
# Locally defined (see above)
MiscZEOTests
@@ -167,8 +159,22 @@
key = '%s:%s' % (self._storage._storage, self._storage._server_addr)
self.assertEqual(self._storage.sortKey(), key)
+class FullGenericTests(
+ GenericTests,
+ Cache.StorageWithCache,
+ Cache.TransUndoStorageWithCache,
+ CommitLockTests.CommitLockUndoTests,
+ ConflictResolution.ConflictResolvingStorage,
+ ConflictResolution.ConflictResolvingTransUndoStorage,
+ PackableStorage.PackableUndoStorage,
+ RevisionStorage.RevisionStorage,
+ TransactionalUndoStorage.TransactionalUndoStorage,
+ TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
+ VersionStorage.VersionStorage,
+ ):
+ """Extend GenericTests with tests that MappingStorage can't pass."""
-class FileStorageTests(GenericTests):
+class FileStorageTests(FullGenericTests):
"""Test ZEO backed by a FileStorage."""
level = 2
@@ -180,7 +186,7 @@
</filestorage>
""" % filename
-class BDBTests(FileStorageTests):
+class BDBTests(FullGenericTests):
"""ZEO backed by a Berkeley full storage."""
level = 2
@@ -192,67 +198,14 @@
</fullstorage>
""" % self._envdir
-class MappingStorageTests(FileStorageTests):
+class MappingStorageTests(GenericTests):
"""ZEO backed by a Mapping storage."""
def getConfig(self):
return """<mappingstorage 1/>"""
- # Tests which MappingStorage can't possibly pass, because it doesn't
- # support versions or undo.
- def checkVersions(self): pass
- def checkVersionedStoreAndLoad(self): pass
- def checkVersionedLoadErrors(self): pass
- def checkVersionLock(self): pass
- def checkVersionEmpty(self): pass
- def checkUndoUnresolvable(self): pass
- def checkUndoInvalidation(self): pass
- def checkUndoInVersion(self): pass
- def checkUndoCreationBranch2(self): pass
- def checkUndoCreationBranch1(self): pass
- def checkUndoConflictResolution(self): pass
- def checkUndoCommitVersion(self): pass
- def checkUndoAbortVersion(self): pass
- def checkPackUndoLog(self): pass
- def checkUndoLogMetadata(self): pass
- def checkTwoObjectUndoAtOnce(self): pass
- def checkTwoObjectUndoAgain(self): pass
- def checkTwoObjectUndo(self): pass
- def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self): pass
- def checkTransactionalUndoAfterPack(self): pass
- def checkSimpleTransactionalUndo(self): pass
- def checkReadMethods(self): pass
- def checkPackAfterUndoDeletion(self): pass
- def checkPackAfterUndoManyTimes(self): pass
- def checkPackVersions(self): pass
- def checkPackUnlinkedFromRoot(self): pass
- def checkPackOnlyOneObject(self): pass
- def checkPackJustOldRevisions(self): pass
- def checkPackEmptyStorage(self): pass
- def checkPackAllRevisions(self): pass
- def checkPackVersionsInPast(self): pass
- def checkPackVersionReachable(self): pass
- def checkNotUndoable(self): pass
- def checkNewSerialOnCommitVersionToVersion(self): pass
- def checkModifyAfterAbortVersion(self): pass
- def checkLoadSerial(self): pass
- def checkCreateObjectInVersionWithAbort(self): pass
- def checkCommitVersionSerialno(self): pass
- def checkCommitVersionInvalidation(self): pass
- def checkCommitToOtherVersion(self): pass
- def checkCommitToNonVersion(self): pass
- def checkCommitLockUndoFinish(self): pass
- def checkCommitLockUndoClose(self): pass
- def checkCommitLockUndoAbort(self): pass
- def checkCommitEmptyVersionInvalidation(self): pass
- def checkCreationUndoneGetSerial(self): pass
- def checkAbortVersionSerialno(self): pass
- def checkAbortVersionInvalidation(self): pass
- def checkAbortVersionErrors(self): pass
- def checkAbortVersion(self): pass
- def checkAbortOneVersionCommitTheOther(self): pass
- def checkResolve(self): pass
- def check4ExtStorageThread(self): pass
+ # XXX There are still a bunch of tests that fail. Are there
+ # still test classes in GenericTests that shouldn't be there?
test_classes = [FileStorageTests, MappingStorageTests]
=== ZODB3/ZEO/tests/testAuth.py 1.5 => 1.6 ===
--- ZODB3/ZEO/tests/testAuth.py:1.5 Tue Dec 23 07:06:29 2003
+++ ZODB3/ZEO/tests/testAuth.py Wed Dec 24 11:02:01 2003
@@ -28,7 +28,6 @@
from ZEO.tests.ConnectionTests import CommonSetupTearDown
from ZODB.FileStorage import FileStorage
-from ZODB.tests.StorageTestBase import removefs
class AuthTest(CommonSetupTearDown):
__super_getServerConfig = CommonSetupTearDown.getServerConfig
=== ZODB3/ZEO/tests/InvalidationTests.py 1.6 => 1.7 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.6 Thu Oct 2 14:17:21 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py Wed Dec 24 11:02:01 2003
@@ -39,7 +39,23 @@
# thought they added (i.e., the keys for which get_transaction().commit()
# did not raise any exception).
-class StressThread(TestThread):
+class FailableThread(TestThread):
+
+ # mixin class
+ # subclass must provide
+ # - self.stop attribute (an event)
+ # - self._testrun() method
+
+ def testrun(self):
+ try:
+ self._testrun()
+ except:
+ # Report the failure here to all the other threads, so
+ # that they stop quickly.
+ self.stop.set()
+ raise
+
+class StressThread(FailableThread):
# Append integers startnum, startnum + step, startnum + 2*step, ...
# to 'tree' until Event stop is set. If sleep is given, sleep
@@ -57,7 +73,7 @@
self.added_keys = []
self.commitdict = commitdict
- def testrun(self):
+ def _testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
@@ -87,7 +103,7 @@
key += self.step
cn.close()
-class LargeUpdatesThread(TestThread):
+class LargeUpdatesThread(FailableThread):
# A thread that performs a lot of updates. It attempts to modify
# more than 25 objects so that it can test code that runs vote
@@ -106,6 +122,15 @@
self.commitdict = commitdict
def testrun(self):
+ try:
+ self._testrun()
+ except:
+ # Report the failure here to all the other threads, so
+ # that they stop quickly.
+ self.stop.set()
+ raise
+
+ def _testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
@@ -162,7 +187,7 @@
self.added_keys = keys_added.keys()
cn.close()
-class VersionStressThread(TestThread):
+class VersionStressThread(FailableThread):
def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None):
@@ -177,6 +202,15 @@
self.commitdict = commitdict
def testrun(self):
+ try:
+ self._testrun()
+ except:
+ # Report the failure here to all the other threads, so
+ # that they stop quickly.
+ self.stop.set()
+ raise
+
+ def _testrun(self):
commit = 0
key = self.startnum
while not self.stop.isSet():
@@ -302,7 +336,10 @@
delay = self.MINTIME
start = time.time()
while time.time() - start <= self.MAXTIME:
- time.sleep(delay)
+ stop.wait(delay)
+ if stop.isSet():
+ # Some thread failed. Stop right now.
+ break
delay = 2.0
if len(commitdict) >= len(threads):
break
@@ -406,6 +443,7 @@
t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.01)
t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.01)
+## t1 = VersionStressThread(self, db2, stop, 3, cd, 1, 3, 0.01)
self.go(stop, cd, t1, t2, t3)
cn.sync()
=== ZODB3/ZEO/tests/ConnectionTests.py 1.47 => 1.48 ===
--- ZODB3/ZEO/tests/ConnectionTests.py:1.47 Thu Oct 9 10:50:54 2003
+++ ZODB3/ZEO/tests/ConnectionTests.py Wed Dec 24 11:02:01 2003
@@ -109,7 +109,7 @@
os.waitpid(pid, 0)
for c in self.caches:
for i in 0, 1:
- path = "c1-%s-%d.zec" % (c, i)
+ path = "%s-%s.zec" % (c, "1")
# On Windows before 2.3, we don't have a way to wait for
# the spawned server(s) to close, and they inherited
# file descriptors for our open files. So long as those
@@ -584,6 +584,9 @@
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
+ # sync() is needed to prevent invalidation for oid from arriving
+ # in the middle of the load() call.
+ perstorage.sync()
perstorage.load(oid, '')
perstorage.close()
@@ -853,7 +856,7 @@
unless = self.failUnless
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
- unless(not storage._cache._index)
+ unless(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
@@ -872,7 +875,7 @@
# We expect finish to fail
raises(ClientDisconnected, storage.tpc_finish, t)
# The cache should still be empty
- unless(not storage._cache._index)
+ unless(not list(storage._cache.contents()))
# Load should fail since the object should not be in either the cache
# or the server.
raises(KeyError, storage.load, oid, '')
@@ -883,7 +886,7 @@
unless = self.failUnless
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
- unless(not storage._cache._index)
+ unless(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
=== ZODB3/ZEO/tests/CommitLockTests.py 1.16 => 1.17 ===
--- ZODB3/ZEO/tests/CommitLockTests.py:1.16 Fri Nov 28 11:44:48 2003
+++ ZODB3/ZEO/tests/CommitLockTests.py Wed Dec 24 11:02:01 2003
@@ -71,7 +71,7 @@
# self.storage.tpc_vote(self.trans)
rpc = self.storage._server.rpc
- msgid = rpc._deferred_call('vote', self.storage._serial)
+ msgid = rpc._deferred_call('vote', id(self.trans))
self.ready.set()
rpc._deferred_wait(msgid)
self.storage._check_serials()
@@ -103,6 +103,51 @@
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
return oid, txn
+ def _begin_threads(self):
+ # Start a second transaction on a different connection without
+ # blocking the test thread. Returns only after each thread has
+ # set it's ready event.
+ self._storages = []
+ self._threads = []
+
+ for i in range(self.NUM_CLIENTS):
+ storage = self._duplicate_client()
+ txn = Transaction()
+ tid = self._get_timestamp()
+
+ t = WorkerThread(self, storage, txn)
+ self._threads.append(t)
+ t.start()
+ t.ready.wait()
+
+ # Close on the connections abnormally to test server response
+ if i == 0:
+ storage.close()
+ else:
+ self._storages.append((storage, txn))
+
+ def _finish_threads(self):
+ for t in self._threads:
+ t.cleanup()
+
+ def _duplicate_client(self):
+ "Open another ClientStorage to the same server."
+ # XXX argh it's hard to find the actual address
+ # The rpc mgr addr attribute is a list. Each element in the
+ # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
+ # address.
+ addr = self._storage._addr
+ new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
+ new.registerDB(DummyDB(), None)
+ return new
+
+ def _get_timestamp(self):
+ t = time.time()
+ t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
+ return `t`
+
+class CommitLockVoteTests(CommitLockTests):
+
def checkCommitLockVoteFinish(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
@@ -141,15 +186,16 @@
self._finish_threads()
self._cleanup()
+class CommitLockUndoTests(CommitLockTests):
+
def _get_trans_id(self):
self._dostore()
L = self._storage.undoInfo()
return L[0]['id']
- def _begin_undo(self, trans_id):
+ def _begin_undo(self, trans_id, txn):
rpc = self._storage._server.rpc
- return rpc._deferred_call('transactionalUndo', trans_id,
- self._storage._serial)
+ return rpc._deferred_call('transactionalUndo', trans_id, id(txn))
def _finish_undo(self, msgid):
return self._storage._server.rpc._deferred_wait(msgid)
@@ -157,7 +203,7 @@
def checkCommitLockUndoFinish(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
- msgid = self._begin_undo(trans_id)
+ msgid = self._begin_undo(trans_id, txn)
self._begin_threads()
@@ -174,7 +220,7 @@
def checkCommitLockUndoAbort(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
- msgid = self._begin_undo(trans_id)
+ msgid = self._begin_undo(trans_id, txn)
self._begin_threads()
@@ -190,7 +236,7 @@
def checkCommitLockUndoClose(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
- msgid = self._begin_undo(trans_id)
+ msgid = self._begin_undo(trans_id, txn)
self._begin_threads()
@@ -201,46 +247,3 @@
self._finish_threads()
self._cleanup()
-
- def _begin_threads(self):
- # Start a second transaction on a different connection without
- # blocking the test thread. Returns only after each thread has
- # set it's ready event.
- self._storages = []
- self._threads = []
-
- for i in range(self.NUM_CLIENTS):
- storage = self._duplicate_client()
- txn = Transaction()
- tid = self._get_timestamp()
-
- t = WorkerThread(self, storage, txn)
- self._threads.append(t)
- t.start()
- t.ready.wait()
-
- # Close on the connections abnormally to test server response
- if i == 0:
- storage.close()
- else:
- self._storages.append((storage, txn))
-
- def _finish_threads(self):
- for t in self._threads:
- t.cleanup()
-
- def _duplicate_client(self):
- "Open another ClientStorage to the same server."
- # XXX argh it's hard to find the actual address
- # The rpc mgr addr attribute is a list. Each element in the
- # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
- # address.
- addr = self._storage._addr
- new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
- new.registerDB(DummyDB(), None)
- return new
-
- def _get_timestamp(self):
- t = time.time()
- t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
- return `t`
=== ZODB3/ZEO/tests/Cache.py 1.9 => 1.10 ===
--- ZODB3/ZEO/tests/Cache.py:1.9 Thu Aug 29 12:32:51 2002
+++ ZODB3/ZEO/tests/Cache.py Wed Dec 24 11:02:01 2003
@@ -42,7 +42,7 @@
t.note('undo1')
self._storage.tpc_begin(t)
- oids = self._storage.transactionalUndo(tid, t)
+ tid, oids = self._storage.transactionalUndo(tid, t)
# Make sure this doesn't load invalid data into the cache
self._storage.load(oid, '')
More information about the Zodb-checkins
mailing list