[Zope-Checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:

Jeremy Hylton jeremy at zope.com
Wed Aug 20 17:38:20 EDT 2003

Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv23383/ZEO/tests

Modified Files:
      Tag: ZODB3-vote-backoff-branch
Log Message:
Add a new concurrent updates test the modifies a large BTree.

The test is specifically designed to have competing updates large
enough to trigger the "run-vote-in-separate-thread" code in
StorageServer.py.  I confirmed that it exercises that code by adding
some debugging prints.

=== ZODB3/ZEO/tests/InvalidationTests.py => ===
--- ZODB3/ZEO/tests/InvalidationTests.py:	Thu Jun 12 21:33:52 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py	Wed Aug 20 16:38:19 2003
@@ -85,6 +85,73 @@
             key += self.step
+class LargeUpdatesThread(TestThread):
+    # A thread that performs a lot of updates.  It attempts to modify
+    # more than 25 objects so that it can test code that runs vote
+    # in a separate thread when it modifies more than 25 objects.
+    keys = range(3000)
+    def __init__(self, testcase, db, stop, threadnum, startnum,
+                 step=2, sleep=None):
+        TestThread.__init__(self, testcase)
+        self.db = db
+        self.stop = stop
+        self.threadnum = threadnum
+        self.startnum = startnum
+        self.step = step
+        self.sleep = sleep
+        self.added_keys = []
+    def testrun(self):
+        cn = self.db.open()
+        while not self.stop.isSet():
+            try:
+                tree = cn.root()["tree"]
+                break
+            except (ConflictError, KeyError):
+                get_transaction().abort()
+                cn.sync()
+        tkeys = range(self.threadnum, 3000)
+        while not self.stop.isSet():
+            # The test picks 50 keys spread across many buckets.  The
+            # three threads start with different offsets to minimize
+            # conflict errors.
+            nkeys = len(tkeys)
+            if nkeys < 50:
+                tkeys = range(self.threadnum, 3000)
+                nkeys = len(tkeys)
+            step = max(nkeys / 50, 1)
+            keys = [tkeys[i] for i in range(0, nkeys - 1, step)]
+            for key in keys:
+                try:
+                    tree[key] = self.threadnum
+                except (ReadConflictError, ConflictError), msg:
+                    cn.sync()
+                    break
+            else:
+                get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+                try:
+                    get_transaction().commit()
+                    if self.sleep:
+                        time.sleep(self.sleep)
+                except ConflictError, msg:
+                    get_transaction().abort()
+                    continue
+                for k in keys:
+                    tkeys.remove(k)
+                self.added_keys += keys
+                # sync() is necessary here to process invalidations
+                # if we get a read conflict.  In the read conflict case,
+                # no objects were modified so cn never got registered
+                # with the transaction.
+                cn.sync()
+        cn.close()
 class VersionStressThread(TestThread):
     def __init__(self, testcase, db, stop, threadnum, startnum,
@@ -162,6 +229,7 @@
     level = 2
     DELAY = 15  # number of seconds the main thread lets the workers run
+    StressThread = StressThread
     def _check_tree(self, cn, tree):
         # Make sure the BTree is sane and that all the updates persisted.
@@ -215,8 +283,8 @@
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1)
-        t2 = StressThread(self, db2, stop, 2, 2)
+        t1 = self.StressThread(self, db1, stop, 1, 1)
+        t2 = self.StressThread(self, db2, stop, 2, 2)
         self.go(stop, t1, t2)
@@ -237,8 +305,8 @@
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
-        t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
+        t1 = self.StressThread(self, db1, stop, 1, 1, sleep=0.001)
+        t2 = self.StressThread(self, db1, stop, 2, 2, sleep=0.001)
         self.go(stop, t1, t2)
@@ -263,9 +331,9 @@
         # is possible for both threads to read the same object
         # at the same time.
-        t1 = StressThread(self, db1, stop, 1, 1, 3)
-        t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
+        t1 = self.StressThread(self, db1, stop, 1, 1, 3)
+        t2 = self.StressThread(self, db2, stop, 2, 2, 3, 0.001)
+        t3 = self.StressThread(self, db2, stop, 3, 3, 3, 0.001)
         self.go(stop, t1, t2, t3)
@@ -299,6 +367,46 @@
         self._check_tree(cn, tree)
         self._check_threads(tree, t1, t2, t3)
+        cn.close()
+        db1.close()
+        db2.close()
+    def checkConcurrentLargeUpdates(self):
+        # Use 3 threads like the 2StorageMT test above.
+        self._storage = storage1 = self.openClientStorage()
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
+        stop = threading.Event()
+        cn = db1.open()
+        tree = cn.root()["tree"] = OOBTree()
+        for i in range(0, 3000, 2):
+            tree[i] = 0
+        get_transaction().commit()
+        # Run three threads that update the BTree.
+        # Two of the threads share a single storage so that it
+        # is possible for both threads to read the same object
+        # at the same time.
+        t1 = LargeUpdatesThread(self, db1, stop, 1, 1, 3, 0.001)
+        t2 = LargeUpdatesThread(self, db2, stop, 2, 2, 3, 0.001)
+        t3 = LargeUpdatesThread(self, db2, stop, 3, 3, 3, 0.001)
+        self.go(stop, t1, t2, t3)
+        cn.sync()
+        self._check_tree(cn, tree)
+        # check the threads differently here than in the other tests
+        L = [None, t1, t2, t3]
+        for t in t1, t2, t3:
+            self.assert_(t.added_keys)
+            for k in t.added_keys:
+                self.assert_(tree[k] != 0)
+                if tree[k] != t.threadnum:
+                    self.assert_(k in L[tree[k]].added_keys)

More information about the Zope-Checkins mailing list