[Zodb-checkins] CVS: ZODB3/ZODB/tests - PackableStorage.py:1.20.4.1
RecoveryStorage.py:1.8.4.1
Jeremy Hylton
jeremy at zope.com
Tue Sep 9 12:06:24 EDT 2003
Update of /cvs-repository/ZODB3/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv3493/ZODB/tests
Modified Files:
Tag: ZODB3-3_2-branch
PackableStorage.py RecoveryStorage.py
Log Message:
Port new tests and fix to old test.
=== ZODB3/ZODB/tests/PackableStorage.py 1.20 => 1.20.4.1 ===
--- ZODB3/ZODB/tests/PackableStorage.py:1.20 Fri May 30 15:04:54 2003
+++ ZODB3/ZODB/tests/PackableStorage.py Tue Sep 9 11:06:12 2003
@@ -381,6 +381,60 @@
eq(root['obj'].value, 7)
+ def _PackWhileWriting(self, pack_now=0):
+ # A storage should allow some reading and writing during
+ # a pack. This test attempts to exercise locking code
+ # in the storage to test that it is safe. It generates
+ # a lot of revisions, so that pack takes a long time.
+
+ db = DB(self._storage)
+ conn = db.open()
+ root = conn.root()
+
+ for i in range(10):
+ root[i] = MinPO(i)
+ get_transaction().commit()
+
+ snooze()
+ packt = time.time()
+
+ for j in range(10):
+ for i in range(10):
+ root[i].value = MinPO(i)
+ get_transaction().commit()
+
+ threads = [ClientThread(db) for i in range(4)]
+ for t in threads:
+ t.start()
+
+ if pack_now:
+ db.pack(time.time())
+ else:
+ db.pack(packt)
+
+ for t in threads:
+ t.join(30)
+ for t in threads:
+ t.join(1)
+ self.assert_(not t.isAlive())
+
+ # Iterate over the storage to make sure it's sane, but not every
+ # storage supports iterators.
+ if not hasattr(self._storage, "iterator"):
+ return
+
+ iter = self._storage.iterator()
+ for txn in iter:
+ for data in txn:
+ pass
+ iter.close()
+
+ def checkPackWhileWriting(self):
+ self._PackWhileWriting(pack_now=0)
+
+ def checkPackNowWhileWriting(self):
+ self._PackWhileWriting(pack_now=1)
+
def checkPackUndoLog(self):
self._initroot()
eq = self.assertEqual
@@ -449,47 +503,6 @@
print '\nafter packing undoLog was'
for r in self._storage.undoLog(): print r
# what can we assert about that?
-
- def checkPackWhileWriting(self):
- # A storage should allow some reading and writing during
- # a pack. This test attempts to exercise locking code
- # in the storage to test that it is safe. It generates
- # a lot of revisions, so that pack takes a long time.
-
- db = DB(self._storage)
- conn = db.open()
- root = conn.root()
-
- for i in range(10):
- root[i] = MinPO(i)
- get_transaction().commit()
-
- snooze()
- packt = time.time()
-
- for j in range(10):
- for i in range(10):
- root[i].value = MinPO(i)
- get_transaction().commit()
-
- threads = [ClientThread(db) for i in range(4)]
- for t in threads:
- t.start()
- db.pack(packt)
- for t in threads:
- t.join(30)
- for t in threads:
- t.join(1)
- self.assert_(not t.isAlive())
-
- # iterator over the storage to make sure it's sane
- if not hasattr(self._storage, "iterator"):
- return
- iter = self._storage.iterator()
- for txn in iter:
- for data in txn:
- pass
- iter.close()
class ClientThread(threading.Thread):
=== ZODB3/ZODB/tests/RecoveryStorage.py 1.8 => 1.8.4.1 ===
--- ZODB3/ZODB/tests/RecoveryStorage.py:1.8 Fri May 30 16:05:46 2003
+++ ZODB3/ZODB/tests/RecoveryStorage.py Tue Sep 9 11:06:12 2003
@@ -15,7 +15,7 @@
from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
-from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
+from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
from ZODB.referencesf import referencesf
@@ -154,3 +154,31 @@
it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
+
+ def checkPackWithGCOnDestinationAfterRestore(self):
+ raises = self.assertRaises
+ db = DB(self._storage)
+ conn = db.open()
+ root = conn.root()
+ root.obj = obj1 = MinPO(1)
+ txn = get_transaction()
+ txn.note('root -> obj')
+ txn.commit()
+ root.obj.obj = obj2 = MinPO(2)
+ txn = get_transaction()
+ txn.note('root -> obj -> obj')
+ txn.commit()
+ del root.obj
+ txn = get_transaction()
+ txn.note('root -X->')
+ txn.commit()
+ # Now copy the transactions to the destination
+ self._dst.copyTransactionsFrom(self._storage)
+ # Now pack the destination.
+ snooze()
+ self._dst.pack(time.time(), referencesf)
+ # And check to see that the root object exists, but not the other
+ # objects.
+ data, serial = self._dst.load(root._p_oid, '')
+ raises(KeyError, self._dst.load, obj1._p_oid, '')
+ raises(KeyError, self._dst.load, obj2._p_oid, '')
More information about the Zodb-checkins
mailing list