[Zope-Checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:1.1.4.7

Jeremy Hylton jeremy at zope.com
Fri Aug 22 17:45:29 EDT 2003


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv32661/ZEO/tests

Modified Files:
      Tag: ZODB3-3_1-branch
	InvalidationTests.py 
Log Message:
Apply several [hacks] from the ZODB3-vote-backoff-branch.

At least one of these is really an important fix.  The ZEO server was
holding onto ConflictErrors until after tpc_vote() was executed on the
storage.  This costs way too much for a large txn.  This change fixes
it to fail immediately.

Also, make two changes to tpc_vote().  

1. Add a mechanism to return a "VOTE_BACKOFF" message that instructs
   the client to try again later. This mechanism prevents the vote()
   call from blocking reads while the client waits.

2. If the actual txn being voted on is large, run it in a separate
   thread.


=== ZODB3/ZEO/tests/InvalidationTests.py 1.1.4.6 => 1.1.4.7 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.1.4.6	Thu Jun 12 21:33:52 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py	Fri Aug 22 16:45:28 2003
@@ -85,6 +85,73 @@
             key += self.step
         cn.close()
 
+class LargeUpdatesThread(TestThread):
+
+    # A thread that performs a lot of updates.  It attempts to modify
+    # more than 25 objects so that it can test code that runs vote
+    # in a separate thread when it modifies more than 25 objects.
+
+    keys = range(3000)
+
+    def __init__(self, testcase, db, stop, threadnum, startnum,
+                 step=2, sleep=None):
+        TestThread.__init__(self, testcase)
+        self.db = db
+        self.stop = stop
+        self.threadnum = threadnum
+        self.startnum = startnum
+        self.step = step
+        self.sleep = sleep
+        self.added_keys = []
+
+    def testrun(self):
+        cn = self.db.open()
+        while not self.stop.isSet():
+            try:
+                tree = cn.root()["tree"]
+                break
+            except (ConflictError, KeyError):
+                get_transaction().abort()
+                cn.sync()
+
+        tkeys = range(self.threadnum, 3000)
+        while not self.stop.isSet():
+
+            # The test picks 50 keys spread across many buckets.  The
+            # three threads start with different offsets to minimize
+            # conflict errors.
+            
+            nkeys = len(tkeys)
+            if nkeys < 50:
+                tkeys = range(self.threadnum, 3000)
+                nkeys = len(tkeys)
+            step = max(nkeys / 50, 1)
+            keys = [tkeys[i] for i in range(0, nkeys - 1, step)]
+            for key in keys:
+                try:
+                    tree[key] = self.threadnum
+                except (ReadConflictError, ConflictError), msg:
+                    cn.sync()
+                    break
+            else:
+                get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+                try:
+                    get_transaction().commit()
+                    if self.sleep:
+                        time.sleep(self.sleep)
+                except ConflictError, msg:
+                    get_transaction().abort()
+                    continue
+                for k in keys:
+                    tkeys.remove(k)
+                self.added_keys += keys
+                # sync() is necessary here to process invalidations
+                # if we get a read conflict.  In the read conflict case,
+                # no objects were modified so cn never got registered
+                # with the transaction.
+                cn.sync()
+        cn.close()
+
 class VersionStressThread(TestThread):
 
     def __init__(self, testcase, db, stop, threadnum, startnum,
@@ -162,6 +229,7 @@
 
     level = 2
     DELAY = 15  # number of seconds the main thread lets the workers run
+    StressThread = StressThread
 
     def _check_tree(self, cn, tree):
         # Make sure the BTree is sane and that all the updates persisted.
@@ -215,8 +283,8 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1)
-        t2 = StressThread(self, db2, stop, 2, 2)
+        t1 = self.StressThread(self, db1, stop, 1, 1)
+        t2 = self.StressThread(self, db2, stop, 2, 2)
         self.go(stop, t1, t2)
 
         cn.sync()
@@ -237,8 +305,8 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
-        t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
+        t1 = self.StressThread(self, db1, stop, 1, 1, sleep=0.001)
+        t2 = self.StressThread(self, db1, stop, 2, 2, sleep=0.001)
         self.go(stop, t1, t2)
 
         cn.sync()
@@ -263,9 +331,9 @@
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = StressThread(self, db1, stop, 1, 1, 3)
-        t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
+        t1 = self.StressThread(self, db1, stop, 1, 1, 3)
+        t2 = self.StressThread(self, db2, stop, 2, 2, 3, 0.001)
+        t3 = self.StressThread(self, db2, stop, 3, 3, 3, 0.001)
         self.go(stop, t1, t2, t3)
 
         cn.sync()
@@ -299,6 +367,46 @@
         cn.sync()
         self._check_tree(cn, tree)
         self._check_threads(tree, t1, t2, t3)
+
+        cn.close()
+        db1.close()
+        db2.close()
+
+    def checkConcurrentLargeUpdates(self):
+        # Use 3 threads like the 2StorageMT test above.
+        self._storage = storage1 = self.openClientStorage()
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
+        stop = threading.Event()
+
+        cn = db1.open()
+        tree = cn.root()["tree"] = OOBTree()
+        for i in range(0, 3000, 2):
+            tree[i] = 0
+        get_transaction().commit()
+        
+        # Run three threads that update the BTree.
+        # Two of the threads share a single storage so that it
+        # is possible for both threads to read the same object
+        # at the same time.
+
+        t1 = LargeUpdatesThread(self, db1, stop, 1, 1, 3, 0.001)
+        t2 = LargeUpdatesThread(self, db2, stop, 2, 2, 3, 0.001)
+        t3 = LargeUpdatesThread(self, db2, stop, 3, 3, 3, 0.001)
+        self.go(stop, t1, t2, t3)
+
+        cn.sync()
+        self._check_tree(cn, tree)
+
+        # check the threads differently here than in the other tests
+        L = [None, t1, t2, t3]
+        for t in t1, t2, t3:
+            self.assert_(t.added_keys)
+            for k in t.added_keys:
+                self.assert_(tree[k] != 0)
+                if tree[k] != t.threadnum:
+                    self.assert_(k in L[tree[k]].added_keys)
+                    
 
         cn.close()
         db1.close()




More information about the Zope-Checkins mailing list