[Zope-Checkins] CVS: ZODB3/ZEO/tests - zeoserver.py:1.19.6.1 testZEO.py:1.72.6.1 testConnection.py:1.16.4.1 testClientCache.py:1.9.4.1 InvalidationTests.py:1.4.4.1 ConnectionTests.py:1.40.4.1 CommitLockTests.py:1.12.18.2

Jeremy Hylton jeremy at zope.com
Mon Sep 15 14:03:30 EDT 2003


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv13599/ZEO/tests

Modified Files:
      Tag: Zope-2_7-branch
	zeoserver.py testZEO.py testConnection.py testClientCache.py 
	InvalidationTests.py ConnectionTests.py CommitLockTests.py 
Log Message:
Take two: Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.

The previous attempt used "cvs up -j ZODB3-3_2-branch", but appeared
to get only a small fraction of the changes.  This attempt is based on
copying a checkout of ZODB3-3_2-branch over top of a checkout of
Zope-2_7-branch.


=== ZODB3/ZEO/tests/zeoserver.py 1.19 => 1.19.6.1 ===
--- ZODB3/ZEO/tests/zeoserver.py:1.19	Thu Jun  5 18:39:01 2003
+++ ZODB3/ZEO/tests/zeoserver.py	Mon Sep 15 14:02:59 2003
@@ -122,9 +122,9 @@
         self._adminaddr = addr
 
     def run(self):
-        # If this process doesn't exit in 100 seconds, commit suicide
-        for i in range(20):
-            time.sleep(5)
+        # If this process doesn't exit in 300 seconds, commit suicide
+        time.sleep(300)
+        log("zeoserver", "suicide thread invoking shutdown")
         from ZEO.tests.forker import shutdown_zeo_server
         # XXX If the -k option was given to zeoserver, then the process will
         # go away but the temp files won't get cleaned up.
@@ -174,7 +174,7 @@
         transaction_timeout=zo.transaction_timeout,
         monitor_address=mon_addr,
         auth_protocol=zo.auth_protocol,
-        auth_filename=zo.auth_database,
+        auth_database=zo.auth_database,
         auth_realm=zo.auth_realm)
 
     try:


=== ZODB3/ZEO/tests/testZEO.py 1.72 => 1.72.6.1 ===
--- ZODB3/ZEO/tests/testZEO.py:1.72	Fri May 30 15:20:56 2003
+++ ZODB3/ZEO/tests/testZEO.py	Mon Sep 15 14:02:59 2003
@@ -162,6 +162,10 @@
         if hasattr(ZODB, "__version__"):
             ReadOnlyStorage.ReadOnlyStorage.checkWriteMethods(self)
 
+    def checkSortKey(self):
+        key = '%s:%s' % (self._storage._storage, self._storage._server_addr)
+        self.assertEqual(self._storage.sortKey(), key)
+
 
 class FileStorageTests(GenericTests):
     """Test ZEO backed by a FileStorage."""
@@ -183,7 +187,7 @@
         self._envdir = tempfile.mktemp()
         return """\
         <fullstorage 1>
-        name %s
+        envdir %s
         </fullstorage>
         """ % self._envdir
 


=== ZODB3/ZEO/tests/testConnection.py 1.16 => 1.16.4.1 ===
--- ZODB3/ZEO/tests/testConnection.py:1.16	Fri Jun 13 18:27:33 2003
+++ ZODB3/ZEO/tests/testConnection.py	Mon Sep 15 14:02:59 2003
@@ -38,7 +38,7 @@
     def getConfig(self, path, create, read_only):
         return """\
         <fullstorage 1>
-        name %s
+        envdir %s
         read-only %s
         </fullstorage>""" % (path, read_only and "yes" or "no")
 
@@ -57,19 +57,25 @@
 
 class FileStorageReconnectionTests(
     FileStorageConfig,
-    ConnectionTests.ReconnectionTests
+    ConnectionTests.ReconnectionTests,
     ):
     """FileStorage-specific re-connection tests."""
     # Run this at level 1 because MappingStorage can't do reconnection tests
     level = 1
 
+class FileStorageInvqTests(
+    FileStorageConfig,
+    ConnectionTests.InvqTests
+    ):
+    """FileStorage-specific invalidation queue tests."""
+    level = 1
+
 class FileStorageTimeoutTests(
     FileStorageConfig,
     ConnectionTests.TimeoutTests
     ):
     level = 2
 
-
 class BDBConnectionTests(
     BerkeleyStorageConfig,
     ConnectionTests.ConnectionTests,
@@ -85,6 +91,13 @@
     """Berkeley storage re-connection tests."""
     level = 2
 
+class BDBInvqTests(
+    BerkeleyStorageConfig,
+    ConnectionTests.InvqTests
+    ):
+    """Berkeley storage invalidation queue tests."""
+    level = 2
+
 class BDBTimeoutTests(
     BerkeleyStorageConfig,
     ConnectionTests.TimeoutTests
@@ -112,22 +125,19 @@
 
 test_classes = [FileStorageConnectionTests,
                 FileStorageReconnectionTests,
+                FileStorageInvqTests,
                 FileStorageTimeoutTests,
                 MappingStorageConnectionTests,
                 MappingStorageTimeoutTests]
 
 import BDBStorage
 if BDBStorage.is_available:
-    test_classes.append(BDBConnectionTests)
-    test_classes.append(BDBReconnectionTests)
-    test_classes.append(BDBTimeoutTests)
-
+    test_classes += [BDBConnectionTests,
+                     BDBReconnectionTests,
+                     BDBInvqTests,
+                     BDBTimeoutTests]
 
 def test_suite():
-    # shutup warnings about mktemp
-    import warnings
-    warnings.filterwarnings("ignore", "mktemp")
-
     suite = unittest.TestSuite()
     for klass in test_classes:
         sub = unittest.makeSuite(klass, 'check')


=== ZODB3/ZEO/tests/testClientCache.py 1.9 => 1.9.4.1 ===
--- ZODB3/ZEO/tests/testClientCache.py:1.9	Mon Jun 16 14:50:06 2003
+++ ZODB3/ZEO/tests/testClientCache.py	Mon Sep 15 14:02:59 2003
@@ -32,14 +32,12 @@
     _oid3 = 'cdefghij'
 
     def setUp(self):
-        unittest.TestCase.setUp(self)
         self.cachesize = 10*1000*1000
         self.cache = ClientCache(size=self.cachesize)
         self.cache.open()
 
     def tearDown(self):
         self.cache.close()
-        unittest.TestCase.tearDown(self)
 
     def testOpenClose(self):
         pass # All the work is done by setUp() / tearDown()
@@ -281,9 +279,10 @@
 class PersistentClientCacheTests(unittest.TestCase):
 
     _oid = 'abcdefgh'
+    _oid2 = 'bcdefghi'
+    _oid3 = 'cdefghij'
 
     def setUp(self):
-        unittest.TestCase.setUp(self)
         self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
         self.cachesize = 10*1000*1000
         self.storagename = 'foo'
@@ -319,7 +318,6 @@
                     os.unlink(filename)
                 except os.error:
                     pass
-        unittest.TestCase.tearDown(self)
 
     def testCacheFileSelection(self):
         # A bug in __init__ read the wrong slice of the file to determine
@@ -388,7 +386,42 @@
         cache.checkSize(10*self.cachesize) # Force a file flip
         self.failUnless(cache.getLastTid() is None)
 
-
+    def testLoadNonversionWithVersionInFlippedCache(self):
+        # This test provokes an error seen once in an unrelated test.
+        # The object is stored in the old cache file with version data,
+        # a load for non-version data occurs.  The attempt to copy the
+        # non-version data to the new file fails.
+        nvdata = "Mend your speech a little, lest it may mar your fortunes."
+        nvserial = "12345678"
+        version = "folio"
+        vdata = "Mend your speech a little, lest you may mar your fortunes."
+        vserial = "12346789"
+        
+        self.cache.store(self._oid, nvdata, nvserial, version, vdata, vserial)
+        self.cache.checkSize(10 * self.cachesize) # force a cache flip
+
+        for i in 1, 2: # check the we can load before and after copying
+            for xversion, xdata, xserial in [("", nvdata, nvserial),
+                                          (version, vdata, vserial)]:
+                data, serial = self.cache.load(self._oid, xversion)
+                self.assertEqual(data, xdata)
+                self.assertEqual(serial, xserial)
+
+        # now cause two more cache flips and make sure the data is still there
+        self.cache.store(self._oid2, "", "", "foo", "bar", "23456789")
+        self.cache.checkSize(10 * self.cachesize) # force a cache flip
+        self.cache.load(self._oid, "")
+        self.cache.store(self._oid3, "bar", "34567890", "", "", "")
+        self.cache.checkSize(10 * self.cachesize) # force a cache flip
+        self.cache.load(self._oid, "")
+
+        for i in 1, 2: # check the we can load before and after copying
+            for xversion, xdata, xserial in [("", nvdata, nvserial),
+                                          (version, vdata, vserial)]:
+                data, serial = self.cache.load(self._oid, xversion)
+                self.assertEqual(data, xdata)
+                self.assertEqual(serial, xserial)
+                
 class ClientCacheLongOIDTests(ClientCacheTests):
     _oid  = 'abcdefghijklmnop' * 2
     _oid2 = 'bcdefghijklmnopq' * 2
@@ -397,7 +430,8 @@
 
 class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
     _oid = 'abcdefghijklmnop' * 2
-
+    _oid2 = 'bcdefghijklmnopq' * 2
+    _oid3 = 'cdefghijklmnopqr' * 2
 
 def test_suite():
     suite = unittest.TestSuite()


=== ZODB3/ZEO/tests/InvalidationTests.py 1.4 => 1.4.4.1 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.4	Fri Jun 13 19:09:30 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py	Mon Sep 15 14:02:59 2003
@@ -12,7 +12,6 @@
 #
 ##############################################################################
 
-from thread import get_ident
 import threading
 import time
 
@@ -20,12 +19,10 @@
 from BTrees.OOBTree import OOBTree
 
 from ZEO.tests.TestThread import TestThread
-from ZEO.tests.ConnectionTests import CommonSetupTearDown
 
 from ZODB.DB import DB
 from ZODB.POSException \
      import ReadConflictError, ConflictError, VersionLockError
-import zLOG
 
 # The tests here let several threads have a go at one or more database
 # instances simultaneously.  Each thread appends a disjoint (from the
@@ -48,8 +45,8 @@
     # to 'tree' until Event stop is set.  If sleep is given, sleep
     # that long after each append.  At the end, instance var .added_keys
     # is a list of the ints the thread believes it added successfully.
-    def __init__(self, testcase, db, stop, threadnum, startnum,
-                 step=2, sleep=None):
+    def __init__(self, testcase, db, stop, threadnum, commitdict,
+                 startnum, step=2, sleep=None):
         TestThread.__init__(self, testcase)
         self.db = db
         self.stop = stop
@@ -58,6 +55,7 @@
         self.step = step
         self.sleep = sleep
         self.added_keys = []
+        self.commitdict = commitdict
 
     def testrun(self):
         cn = self.db.open()
@@ -74,6 +72,7 @@
                 tree[key] = self.threadnum
                 get_transaction().note("add key %s" % key)
                 get_transaction().commit()
+                self.commitdict[self] = 1
                 if self.sleep:
                     time.sleep(self.sleep)
             except (ReadConflictError, ConflictError), msg:
@@ -88,9 +87,13 @@
             key += self.step
         cn.close()
 
-class VersionStressThread(TestThread):
+class LargeUpdatesThread(TestThread):
+
+    # A thread that performs a lot of updates.  It attempts to modify
+    # more than 25 objects so that it can test code that runs vote
+    # in a separate thread when it modifies more than 25 objects.
 
-    def __init__(self, testcase, db, stop, threadnum, startnum,
+    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
                  step=2, sleep=None):
         TestThread.__init__(self, testcase)
         self.db = db
@@ -100,21 +103,88 @@
         self.step = step
         self.sleep = sleep
         self.added_keys = []
+        self.commitdict = commitdict
+
+    def testrun(self):
+        cn = self.db.open()
+        while not self.stop.isSet():
+            try:
+                tree = cn.root()["tree"]
+                break
+            except (ConflictError, KeyError):
+                # print "%d getting tree abort" % self.threadnum
+                get_transaction().abort()
+                cn.sync()
+
+        keys_added = {} # set of keys we commit
+        tkeys = []
+        while not self.stop.isSet():
+
+            # The test picks 50 keys spread across many buckets.
+            # self.startnum and self.step ensure that all threads use
+            # disjoint key sets, to minimize conflict errors.
+
+            nkeys = len(tkeys)
+            if nkeys < 50:
+                tkeys = range(self.startnum, 3000, self.step)
+                nkeys = len(tkeys)
+            step = max(int(nkeys / 50), 1)
+            keys = [tkeys[i] for i in range(0, nkeys, step)]
+            for key in keys:
+                try:
+                    tree[key] = self.threadnum
+                except (ReadConflictError, ConflictError), msg:
+                    # print "%d setting key %s" % (self.threadnum, msg)
+                    get_transaction().abort()
+                    cn.sync()
+                    break
+            else:
+                # print "%d set #%d" % (self.threadnum, len(keys))
+                get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+                try:
+                    get_transaction().commit()
+                    self.commitdict[self] = 1
+                    if self.sleep:
+                        time.sleep(self.sleep)
+                except ConflictError, msg:
+                    # print "%d commit %s" % (self.threadnum, msg)
+                    get_transaction().abort()
+                    cn.sync()
+                    continue
+                for k in keys:
+                    tkeys.remove(k)
+                    keys_added[k] = 1
+                # sync() is necessary here to process invalidations
+                # if we get a read conflict.  In the read conflict case,
+                # no objects were modified so cn never got registered
+                # with the transaction.
+                cn.sync()
+        self.added_keys = keys_added.keys()
+        cn.close()
 
-    def log(self, msg):
-        zLOG.LOG("thread %d" % get_ident(), 0, msg)
+class VersionStressThread(TestThread):
+
+    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+                 step=2, sleep=None):
+        TestThread.__init__(self, testcase)
+        self.db = db
+        self.stop = stop
+        self.threadnum = threadnum
+        self.startnum = startnum
+        self.step = step
+        self.sleep = sleep
+        self.added_keys = []
+        self.commitdict = commitdict
 
     def testrun(self):
-        self.log("thread begin")
         commit = 0
         key = self.startnum
         while not self.stop.isSet():
             version = "%s:%s" % (self.threadnum, key)
             commit = not commit
-            self.log("attempt to add key=%s version=%s commit=%d" %
-                     (key, version, commit))
             if self.oneupdate(version, key, commit):
                 self.added_keys.append(key)
+                self.commitdict[self] = 1
             key += self.step
 
     def oneupdate(self, version, key, commit=1):
@@ -134,13 +204,11 @@
         while not self.stop.isSet():
             try:
                 tree[key] = self.threadnum
-                get_transaction().note("add key %d" % key)
                 get_transaction().commit()
                 if self.sleep:
                     time.sleep(self.sleep)
                 break
             except (VersionLockError, ReadConflictError, ConflictError), msg:
-                self.log(msg)
                 get_transaction().abort()
                 # sync() is necessary here to process invalidations
                 # if we get a read conflict.  In the read conflict case,
@@ -163,20 +231,30 @@
                         time.sleep(self.sleep)
                     return commit
                 except ConflictError, msg:
-                    self.log(msg)
                     get_transaction().abort()
                     cn.sync()
         finally:
             cn.close()
         return 0
 
-class InvalidationTests(CommonSetupTearDown):
+class InvalidationTests:
 
     level = 2
-    DELAY = 15  # number of seconds the main thread lets the workers run
+
+    # Minimum # of seconds the main thread lets the workers run.  The
+    # test stops as soon as this much time has elapsed, and all threads
+    # have managed to commit a change.
+    MINTIME = 10
+
+    # Maximum # of seconds the main thread lets the workers run.  We
+    # stop after this long has elapsed regardless of whether all threads
+    # have managed to commit a change.
+    MAXTIME = 300
+
+    StressThread = StressThread
 
     def _check_tree(self, cn, tree):
-        # Make sure the BTree is sane and that all the updates persisted.
+        # Make sure the BTree is sane at the C level.
         retries = 3
         while retries:
             retries -= 1
@@ -196,28 +274,46 @@
     def _check_threads(self, tree, *threads):
         # Make sure the thread's view of the world is consistent with
         # the actual database state.
-        all_keys = []
+        expected_keys = []
+        errormsgs = []
+        err = errormsgs.append
         for t in threads:
-            # If the test didn't add any keys, it didn't do what we expected.
-            self.assert_(t.added_keys)
-            for key in t.added_keys:
-                self.assert_(tree.has_key(key), key)
-            all_keys.extend(t.added_keys)
-        all_keys.sort()
-        self.assertEqual(all_keys, list(tree.keys()))
+            if not t.added_keys:
+                err("thread %d didn't add any keys" % t.threadnum)
+            expected_keys.extend(t.added_keys)
+        expected_keys.sort()
+        actual_keys = list(tree.keys())
+        if expected_keys != actual_keys:
+            err("expected keys != actual keys")
+            for k in expected_keys:
+                if k not in actual_keys:
+                    err("key %s expected but not in tree" % k)
+            for k in actual_keys:
+                if k not in expected_keys:
+                    err("key %s in tree but not expected" % k)
+        if errormsgs:
+            display(tree)
+            self.fail('\n'.join(errormsgs))
 
-    def go(self, stop, *threads):
+    def go(self, stop, commitdict, *threads):
         # Run the threads
         for t in threads:
             t.start()
-        time.sleep(self.DELAY)
+        delay = self.MINTIME
+        start = time.time()
+        while time.time() - start <= self.MAXTIME:
+            time.sleep(delay)
+            delay = 2.0
+            if len(commitdict) >= len(threads):
+                break
+            # Some thread still hasn't managed to commit anything.
         stop.set()
         for t in threads:
             t.cleanup()
 
     def checkConcurrentUpdates2Storages(self):
         self._storage = storage1 = self.openClientStorage()
-        storage2 = self.openClientStorage(cache="2")
+        storage2 = self.openClientStorage()
         db1 = DB(storage1)
         db2 = DB(storage2)
         stop = threading.Event()
@@ -227,9 +323,10 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1)
-        t2 = StressThread(self, db2, stop, 2, 2)
-        self.go(stop, t1, t2)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1)
+        t2 = self.StressThread(self, db2, stop, 2, cd, 2)
+        self.go(stop, cd, t1, t2)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -249,9 +346,10 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
-        t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
-        self.go(stop, t1, t2)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
+        t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
+        self.go(stop, cd, t1, t2)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -263,22 +361,23 @@
     def checkConcurrentUpdates2StoragesMT(self):
         self._storage = storage1 = self.openClientStorage()
         db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
         stop = threading.Event()
 
         cn = db1.open()
         tree = cn.root()["tree"] = OOBTree()
         get_transaction().commit()
 
-        db2 = DB(self.openClientStorage(cache="2"))
         # Run three threads that update the BTree.
         # Two of the threads share a single storage so that it
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = StressThread(self, db1, stop, 1, 1, 3)
-        t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
-        self.go(stop, t1, t2, t3)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
+        t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -291,7 +390,7 @@
     def checkConcurrentUpdatesInVersions(self):
         self._storage = storage1 = self.openClientStorage()
         db1 = DB(storage1)
-        db2 = DB(self.openClientStorage(cache="2"))
+        db2 = DB(self.openClientStorage())
         stop = threading.Event()
 
         cn = db1.open()
@@ -303,10 +402,11 @@
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
-        t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
-        self.go(stop, t1, t2, t3)
+        cd = {}
+        t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
+        t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -316,3 +416,41 @@
         db1.close()
         db2.close()
 
+    def checkConcurrentLargeUpdates(self):
+        # Use 3 threads like the 2StorageMT test above.
+        self._storage = storage1 = self.openClientStorage()
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
+        stop = threading.Event()
+
+        cn = db1.open()
+        tree = cn.root()["tree"] = OOBTree()
+        for i in range(0, 3000, 2):
+            tree[i] = 0
+        get_transaction().commit()
+
+        # Run three threads that update the BTree.
+        # Two of the threads share a single storage so that it
+        # is possible for both threads to read the same object
+        # at the same time.
+
+        cd = {}
+        t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
+        t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
+
+        cn.sync()
+        self._check_tree(cn, tree)
+
+        # Purge the tree of the dummy entries mapping to 0.
+        losers = [k for k, v in tree.items() if v == 0]
+        for k in losers:
+            del tree[k]
+        get_transaction().commit()
+
+        self._check_threads(tree, t1, t2, t3)
+
+        cn.close()
+        db1.close()
+        db2.close()


=== ZODB3/ZEO/tests/ConnectionTests.py 1.40 => 1.40.4.1 ===
--- ZODB3/ZEO/tests/ConnectionTests.py:1.40	Mon Jun 16 17:04:38 2003
+++ ZODB3/ZEO/tests/ConnectionTests.py	Mon Sep 15 14:02:59 2003
@@ -29,15 +29,17 @@
 from ZEO.ClientStorage import ClientStorage
 from ZEO.Exceptions import ClientDisconnected
 from ZEO.zrpc.marshal import Marshaller
+from ZEO.zrpc.error import DisconnectedError
 from ZEO.tests import forker
 
 from ZODB.DB import DB
 from ZODB.Transaction import get_transaction, Transaction
-from ZODB.POSException import ReadOnlyError
+from ZODB.POSException import ReadOnlyError, ConflictError
 from ZODB.tests.StorageTestBase import StorageTestBase
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
 from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
+from ZODB.tests.StorageTestBase import handle_serials
 
 class TestClientStorage(ClientStorage):
 
@@ -98,6 +100,8 @@
         if getattr(self, '_storage', None) is not None:
             self._storage.close()
             if hasattr(self._storage, 'cleanup'):
+                zLOG.LOG("testZEO", zLOG.DEBUG, "cleanup storage %s" %
+                         self._storage.__name__)
                 self._storage.cleanup()
         for adminaddr in self._servers:
             if adminaddr is not None:
@@ -139,9 +143,14 @@
     def getConfig(self, path, create, read_only):
         raise NotImplementedError
 
-    def openClientStorage(self, cache='', cache_size=200000, wait=1,
+    cache_id = 1
+
+    def openClientStorage(self, cache=None, cache_size=200000, wait=1,
                           read_only=0, read_only_fallback=0,
                           username=None, password=None, realm=None):
+        if cache is None:
+            cache = str(self.__class__.cache_id)
+            self.__class__.cache_id += 1
         self.caches.append(cache)
         storage = TestClientStorage(self.addr,
                                     client=cache,
@@ -564,6 +573,70 @@
         db2.close()
         db1.close()
 
+class InvqTests(CommonSetupTearDown):
+    invq = 2
+
+    def checkQuickVerificationWith2Clients(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+
+        self._storage = self.openClientStorage()
+        oid = self._storage.new_oid()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        revid = self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+        perstorage.close()
+
+        revid = self._dostore(oid, revid)
+
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "quick verification")
+
+        self.assertEqual(perstorage.load(oid, ''),
+                         self._storage.load(oid, ''))
+        perstorage.close()
+
+    def checkVerificationWith2ClientsInvqOverflow(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+
+        self._storage = self.openClientStorage()
+        oid = self._storage.new_oid()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        revid = self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+        perstorage.close()
+
+        # the test code sets invq bound to 2
+        for i in range(5):
+            revid = self._dostore(oid, revid)
+
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        t = time.time() + 30
+        while not perstorage.end_verify.isSet():
+            perstorage.sync()
+            if time.time() > t:
+                self.fail("timed out waiting for endVerify")
+
+        self.assertEqual(self._storage.load(oid, '')[1], revid)
+        self.assertEqual(perstorage.load(oid, ''),
+                         self._storage.load(oid, ''))
+
+        perstorage.close()
+
 class ReconnectionTests(CommonSetupTearDown):
     # The setUp() starts a server automatically.  In order for its
     # state to persist, we set the class variable keep to 1.  In
@@ -686,7 +759,7 @@
         self._newAddr()
 
         # Start a read-only server
-        self.startServer(create=0, index=0, read_only=1)
+        self.startServer(create=0, index=0, read_only=1, keep=0)
         # Start a client in fallback mode
         self._storage = self.openClientStorage(read_only_fallback=1)
         # Stores should fail here
@@ -754,69 +827,6 @@
         perstorage.close()
         self._storage.close()
 
-    def checkQuickVerificationWith2Clients(self):
-        perstorage = self.openClientStorage(cache="test")
-        self.assertEqual(perstorage.verify_result, "full verification")
-
-        self._storage = self.openClientStorage()
-        oid = self._storage.new_oid()
-        # When we create a new storage, it should always do a full
-        # verification
-        self.assertEqual(self._storage.verify_result, "full verification")
-        # do two storages of the object to make sure an invalidation
-        # message is generated
-        revid = self._dostore(oid)
-        revid = self._dostore(oid, revid)
-
-        perstorage.load(oid, '')
-        perstorage.close()
-
-        revid = self._dostore(oid, revid)
-
-        perstorage = self.openClientStorage(cache="test")
-        self.assertEqual(perstorage.verify_result, "quick verification")
-
-        self.assertEqual(perstorage.load(oid, ''),
-                         self._storage.load(oid, ''))
-        perstorage.close()
-
-
-
-    def checkVerificationWith2ClientsInvqOverflow(self):
-        perstorage = self.openClientStorage(cache="test")
-        self.assertEqual(perstorage.verify_result, "full verification")
-
-        self._storage = self.openClientStorage()
-        oid = self._storage.new_oid()
-        # When we create a new storage, it should always do a full
-        # verification
-        self.assertEqual(self._storage.verify_result, "full verification")
-        # do two storages of the object to make sure an invalidation
-        # message is generated
-        revid = self._dostore(oid)
-        revid = self._dostore(oid, revid)
-
-        perstorage.load(oid, '')
-        perstorage.close()
-
-        # the test code sets invq bound to 2
-        for i in range(5):
-            revid = self._dostore(oid, revid)
-
-        perstorage = self.openClientStorage(cache="test")
-        self.assertEqual(perstorage.verify_result, "full verification")
-        t = time.time() + 30
-        while not perstorage.end_verify.isSet():
-            perstorage.sync()
-            if time.time() > t:
-                self.fail("timed out waiting for endVerify")
-
-        self.assertEqual(self._storage.load(oid, '')[1], revid)
-        self.assertEqual(perstorage.load(oid, ''),
-                         self._storage.load(oid, ''))
-
-        perstorage.close()
-
 class TimeoutTests(CommonSetupTearDown):
     timeout = 1
 
@@ -843,6 +853,103 @@
         storage.tpc_begin(txn)
         storage.tpc_abort(txn)
         storage.close()
+
+    def checkTimeoutAfterVote(self):
+        raises = self.assertRaises
+        unless = self.failUnless
+        self._storage = storage = self.openClientStorage()
+        # Assert that the zeo cache is empty
+        unless(not storage._cache._index)
+        # Create the object
+        oid = storage.new_oid()
+        obj = MinPO(7)
+        ZERO = '\0'*8
+        # Now do a store, sleeping before the finish so as to cause a timeout
+        t = Transaction()
+        storage.tpc_begin(t)
+        revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
+        storage.tpc_vote(t)
+        # Now sleep long enough for the storage to time out
+        time.sleep(3)
+        storage.sync()
+        unless(not storage.is_connected())
+        storage._wait()
+        unless(storage.is_connected())
+        # We expect finish to fail
+        raises(ClientDisconnected, storage.tpc_finish, t)
+        # The cache should still be empty
+        unless(not storage._cache._index)
+        # Load should fail since the object should not be in either the cache
+        # or the server.
+        raises(KeyError, storage.load, oid, '')
+
+    def checkTimeoutProvokingConflicts(self):
+        eq = self.assertEqual
+        raises = self.assertRaises
+        unless = self.failUnless
+        self._storage = storage = self.openClientStorage()
+        # Assert that the zeo cache is empty
+        unless(not storage._cache._index)
+        # Create the object
+        oid = storage.new_oid()
+        obj = MinPO(7)
+        ZERO = '\0'*8
+        # We need to successfully commit an object now so we have something to
+        # conflict about.
+        t = Transaction()
+        storage.tpc_begin(t)
+        revid1a = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
+        revid1b = storage.tpc_vote(t)
+        revid1 = handle_serials(oid, revid1a, revid1b)
+        storage.tpc_finish(t)
+        # Now do a store, sleeping before the finish so as to cause a timeout
+        obj.value = 8
+        t = Transaction()
+        storage.tpc_begin(t)
+        revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
+        revid2b = storage.tpc_vote(t)
+        revid2 = handle_serials(oid, revid2a, revid2b)
+        # Now sleep long enough for the storage to time out
+        time.sleep(3)
+        storage.sync()
+        unless(not storage.is_connected())
+        storage._wait()
+        unless(storage.is_connected())
+        # We expect finish to fail
+        raises(ClientDisconnected, storage.tpc_finish, t)
+        # Now we think we've committed the second transaction, but we really
+        # haven't.  A third one should produce a POSKeyError on the server,
+        # which manifests as a ConflictError on the client.
+        obj.value = 9
+        t = Transaction()
+        storage.tpc_begin(t)
+        storage.store(oid, revid2, zodb_pickle(obj), '', t)
+        raises(ConflictError, storage.tpc_vote, t)
+        # Even aborting won't help
+        storage.tpc_abort(t)
+        storage.tpc_finish(t)
+        # Try again
+        obj.value = 10
+        t = Transaction()
+        storage.tpc_begin(t)
+        storage.store(oid, revid2, zodb_pickle(obj), '', t)
+        # Even aborting won't help
+        raises(ConflictError, storage.tpc_vote, t)
+        # Abort this one and try a transaction that should succeed
+        storage.tpc_abort(t)
+        storage.tpc_finish(t)
+        # Now do a store, sleeping before the finish so as to cause a timeout
+        obj.value = 11
+        t = Transaction()
+        storage.tpc_begin(t)
+        revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
+        revid2b = storage.tpc_vote(t)
+        revid2 = handle_serials(oid, revid2a, revid2b)
+        storage.tpc_finish(t)
+        # Now load the object and verify that it has a value of 11
+        data, revid = storage.load(oid, '')
+        eq(zodb_unpickle(data), MinPO(11))
+        eq(revid, revid2)
 
 class MSTThread(threading.Thread):
 


=== ZODB3/ZEO/tests/CommitLockTests.py 1.12.18.1 => 1.12.18.2 ===
--- ZODB3/ZEO/tests/CommitLockTests.py:1.12.18.1	Mon Jul 21 12:37:15 2003
+++ ZODB3/ZEO/tests/CommitLockTests.py	Mon Sep 15 14:02:59 2003
@@ -27,7 +27,7 @@
 ZERO = '\0'*8
 
 class DummyDB:
-    def invalidate(self, *args):
+    def invalidate(self, *args, **kwargs):
         pass
 
 class WorkerThread(TestThread):
@@ -78,12 +78,11 @@
 
 class CommitLockTests:
 
-    NUM_CLIENTS = 5
+    NUM_CLIENTS = 20
 
     # The commit lock tests verify that the storage successfully
     # blocks and restarts transactions when there is contention for a
-    # single storage.  There are a lot of cases to cover.  transaction
-    # has finished.
+    # single storage.  There are a lot of cases to cover.
 
     # The general flow of these tests is to start a transaction by
     # getting far enough into 2PC to acquire the commit lock.  Then
@@ -205,7 +204,8 @@
         
     def _begin_threads(self):
         # Start a second transaction on a different connection without
-        # blocking the test thread.
+        # blocking the test thread.  Returns only after each thread has
+        # set it's ready event.
         self._storages = []
         self._threads = []
         
@@ -242,5 +242,5 @@
 
     def _get_timestamp(self):
         t = time.time()
-        t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
+        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
         return `t`




More information about the Zope-Checkins mailing list