[Zope-Checkins] CVS: StandaloneZODB/ZODB/tests - MTStorage.py:1.2 ReadOnlyStorage.py:1.2
Jeremy Hylton
jeremy@zope.com
Mon, 21 Jan 2002 11:45:41 -0500
Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv9778
Added Files:
MTStorage.py ReadOnlyStorage.py
Log Message:
Add new tests from the Standby-branch branch
=== StandaloneZODB/ZODB/tests/MTStorage.py 1.1 => 1.2 ===
+import threading
+import time
+
+import ZODB
+from PersistentMapping import PersistentMapping
+
+from ZODB.tests.StorageTestBase \
+ import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
+from ZODB.tests.MinPO import MinPO
+from ZODB.Transaction import Transaction
+from ZODB.POSException import ConflictError
+
+SHORT_DELAY = 0.01
+
+def sort(l):
+ "Sort a list in place and return it."
+ l.sort()
+ return l
+
+class ZODBClientThread(threading.Thread):
+
+ __super_init = threading.Thread.__init__
+
+ def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
+ self.__super_init()
+ self.db = db
+ self.test = test
+ self.commits = commits
+ self.delay = delay
+
+ def run(self):
+ conn = self.db.open()
+ root = conn.root()
+ d = self.get_thread_dict(root)
+ if d is None:
+ self.test.fail()
+ else:
+ for i in range(self.commits):
+ self.commit(d, i)
+ self.test.assertEqual(sort(d.keys()), range(self.commits))
+
+ def commit(self, d, num):
+ d[num] = time.time()
+ time.sleep(self.delay)
+ get_transaction().commit()
+ time.sleep(self.delay)
+
+ def get_thread_dict(self, root):
+ name = self.getName()
+ # arbitrarily limit to 10 re-tries
+ for i in range(10):
+ try:
+ m = PersistentMapping()
+ root[name] = m
+ get_transaction().commit()
+ break
+ except ConflictError:
+ get_transaction().abort()
+ for i in range(10):
+ try:
+ return root.get(name)
+ except ConflictError:
+ get_transaction().abort()
+
+class StorageClientThread(threading.Thread):
+
+ __super_init = threading.Thread.__init__
+
+ def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
+ self.__super_init()
+ self.storage = storage
+ self.test = test
+ self.commits = commits
+ self.delay = delay
+ self.oids = {}
+
+ def run(self):
+ for i in range(self.commits):
+ self.dostore(i)
+ self.check()
+
+ def check(self):
+ for oid, revid in self.oids.items():
+ data, serial = self.storage.load(oid, '')
+ self.test.assertEqual(serial, revid)
+ obj = zodb_unpickle(data)
+ self.test.assertEqual(obj.value[0], self.getName())
+
+ def pause(self):
+ time.sleep(self.delay)
+
+ def oid(self):
+ oid = self.storage.new_oid()
+ self.oids[oid] = None
+ return oid
+
+ def dostore(self, i):
+ data = zodb_pickle(MinPO((self.getName(), i)))
+ t = Transaction()
+ oid = self.oid()
+ self.pause()
+
+ self.storage.tpc_begin(t)
+ self.pause()
+
+ # Always create a new object, signified by None for revid
+ r1 = self.storage.store(oid, None, data, '', t)
+ self.pause()
+
+ r2 = self.storage.tpc_vote(t)
+ self.pause()
+
+ self.storage.tpc_finish(t)
+ self.pause()
+
+ revid = handle_serials(oid, r1, r2)
+ self.oids[oid] = revid
+
+class ExtStorageClientThread(StorageClientThread):
+
+ def run(self):
+ # pick some other storage ops to execute
+ ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
+ if meth.startswith('do_')]
+ assert ops, "Didn't find an storage ops in %s" % self.storage
+ # do a store to guarantee there's at least one oid in self.oids
+ self.dostore(0)
+
+ for i in range(self.commits - 1):
+ meth = random.choice(ops)
+ meth()
+ self.dostore(i)
+ self.check()
+
+ def pick_oid(self):
+ return random.choice(self.oids.keys())
+
+ def do_load(self):
+ oid = self.pick_oid()
+ self.storage.load(oid, '')
+
+ def do_loadSerial(self):
+ oid = self.pick_oid()
+ self.storage.loadSerial(oid, self.oids[oid])
+
+ def do_modifiedInVersion(self):
+ oid = self.pick_oid()
+ self.storage.modifiedInVersion(oid)
+
+ def do_undoLog(self):
+ self.storage.undoLog(0, -20)
+
+ def do_iterator(self):
+ try:
+ iter = self.storage.iterator()
+ except AttributeError:
+ # XXX It's hard to detect that a ZEO ClientStorage
+ # doesn't have this method, but does have all the others.
+ return
+ for obj in iter:
+ pass
+
+class MTStorage:
+ "Test a storage with multiple client threads executing concurrently."
+
+ def _checkNThreads(self, n, constructor, *args):
+ threads = [constructor(*args) for i in range(n)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ def check2ZODBThreads(self):
+ db = ZODB.DB(self._storage)
+ self._checkNThreads(2, ZODBClientThread, db, self)
+
+ def check7ZODBThreads(self):
+ db = ZODB.DB(self._storage)
+ self._checkNThreads(7, ZODBClientThread, db, self)
+
+ def check2StorageThreads(self):
+ self._checkNThreads(2, StorageClientThread, self._storage, self)
+
+ def check7StorageThreads(self):
+ self._checkNThreads(7, StorageClientThread, self._storage, self)
+
+ def check4ExtStorageThread(self):
+ self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
+
=== StandaloneZODB/ZODB/tests/ReadOnlyStorage.py 1.1 => 1.2 ===
+from ZODB.Transaction import Transaction
+
+class ReadOnlyStorage:
+
+ def _create_data(self):
+ # test a read-only storage that already has some data
+ self.oids = {}
+ for i in range(10):
+ oid = self._storage.new_oid()
+ revid = self._dostore(oid)
+ self.oids[oid] = revid
+
+ def _make_readonly(self):
+ self._storage.close()
+ self.open(read_only=1)
+ self.assert_(self._storage.isReadOnly())
+
+ def checkReadMethods(self):
+ self._create_data()
+ self._make_readonly()
+ # XXX not going to bother checking all read methods
+ for oid in self.oids.keys():
+ data, revid = self._storage.load(oid, '')
+ self.assertEqual(revid, self.oids[oid])
+ self.assert_(not self._storage.modifiedInVersion(oid))
+ _data = self._storage.loadSerial(oid, revid)
+ self.assertEqual(data, _data)
+
+ def checkWriteMethods(self):
+ self._make_readonly()
+ self.assertRaises(ReadOnlyError, self._storage.new_oid)
+ self.assertRaises(ReadOnlyError, self._storage.undo,
+ '\000' * 8)
+
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self.assertRaises(ReadOnlyError, self._storage.abortVersion,
+ '', t)
+ self._storage.tpc_abort(t)
+
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self.assertRaises(ReadOnlyError, self._storage.commitVersion,
+ '', '', t)
+ self._storage.tpc_abort(t)
+
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self.assertRaises(ReadOnlyError, self._storage.store,
+ '\000' * 8, None, '', '', t)
+ self._storage.tpc_abort(t)
+
+ if self._storage.supportsTransactionalUndo():
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
+ '\000' * 8, t)
+ self._storage.tpc_abort(t)
+
+