[Zope-Checkins] CVS: ZODB3/ZODB/tests - PackableStorage.py:1.13.8.3

Tim Peters tim.one at comcast.net
Thu Aug 14 16:11:49 EDT 2003


Update of /cvs-repository/ZODB3/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv23925/ZODB/tests

Modified Files:
      Tag: ZODB3-3_1-branch
	PackableStorage.py 
Log Message:
Refactored checkPackWhileWriting() into checkPackWhileWriting() and
checkPackNowWhileWriting().  This is a backport from the ZODB4 tests;
IIRC, the distinction was important when running a Berkeley storage.


=== ZODB3/ZODB/tests/PackableStorage.py 1.13.8.2 => 1.13.8.3 ===
--- ZODB3/ZODB/tests/PackableStorage.py:1.13.8.2	Fri May 16 16:38:58 2003
+++ ZODB3/ZODB/tests/PackableStorage.py	Thu Aug 14 15:11:43 2003
@@ -2,14 +2,14 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
-# 
+#
 ##############################################################################
 """Run some tests relevant for storages that support pack()."""
 
@@ -381,7 +381,7 @@
 
         eq(root['obj'].value, 7)
 
-    def checkPackWhileWriting(self):
+    def _PackWhileWriting(self, pack_now=0):
         # A storage should allow some reading and writing during
         # a pack.  This test attempts to exercise locking code
         # in the storage to test that it is safe.  It generates
@@ -406,28 +406,42 @@
         threads = [ClientThread(db) for i in range(4)]
         for t in threads:
             t.start()
-        db.pack(packt)
+
+        if pack_now:
+            db.pack(time.time())
+        else:
+            db.pack(packt)
+
         for t in threads:
             t.join(30)
         for t in threads:
             t.join(1)
             self.assert_(not t.isAlive())
 
-        # iterator over the storage to make sure it's sane
+        # Iterate over the storage to make sure it's sane, but not every
+        # storage supports iterators.
         if not hasattr(self._storage, "iterator"):
             return
+
         iter = self._storage.iterator()
         for txn in iter:
             for data in txn:
                 pass
         iter.close()
 
+    def checkPackWhileWriting(self):
+        self._PackWhileWriting(pack_now=0)
+
+    def checkPackNowWhileWriting(self):
+        self._PackWhileWriting(pack_now=1)
+
+
 class ClientThread(threading.Thread):
 
     def __init__(self, db):
         threading.Thread.__init__(self)
         self.root = db.open().root()
-        
+
     def run(self):
         for j in range(50):
             try:




More information about the Zope-Checkins mailing list