[Zodb-checkins] CVS: StandaloneZODB/ZODB/tests - MTStorage.py:1.1.2.1 BasicStorage.py:1.14.18.1 StorageTestBase.py:1.9.18.3 testFileStorage.py:1.13.24.2
Jeremy Hylton
jeremy@zope.com
Sun, 6 Jan 2002 19:41:16 -0500
Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv22215/ZODB/tests
Modified Files:
Tag: Standby-branch
BasicStorage.py StorageTestBase.py testFileStorage.py
Added Files:
Tag: Standby-branch
MTStorage.py
Log Message:
Add tests of storage with multiple client threads.
As part of the tests, change handle_serial and handle_all_serials from
methods on StorageTestBase to functions in the module. They should
have been functions to start with.
Add MTStorage to list of tests run by testFileStorage.
=== Added File StandaloneZODB/ZODB/tests/MTStorage.py ===
import random
import threading
import time
import ZODB
from PersistentMapping import PersistentMapping
from ZODB.tests.StorageTestBase \
import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError
SHORT_DELAY = 0.01
def sort(l):
"Sort a list in place and return it."
l.sort()
return l
class ZODBClientThread(threading.Thread):
__super_init = threading.Thread.__init__
def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
self.__super_init()
self.db = db
self.test = test
self.commits = commits
self.delay = delay
def run(self):
conn = self.db.open()
root = conn.root()
d = self.get_thread_dict(root)
if d is None:
self.test.fail()
else:
for i in range(self.commits):
self.commit(d, i)
self.test.assertEqual(sort(d.keys()), range(self.commits))
def commit(self, d, num):
d[num] = time.time()
time.sleep(self.delay)
get_transaction().commit()
time.sleep(self.delay)
def get_thread_dict(self, root):
name = self.getName()
# arbitrarily limit to 10 re-tries
for i in range(10):
try:
m = PersistentMapping()
root[name] = m
get_transaction().commit()
break
except ConflictError:
get_transaction().abort()
return root.get(name)
class StorageClientThread(threading.Thread):
__super_init = threading.Thread.__init__
def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
self.__super_init()
self.storage = storage
self.test = test
self.commits = commits
self.delay = delay
self.oids = {}
def run(self):
for i in range(self.commits):
self.dostore(i)
self.check()
def check(self):
for oid, revid in self.oids.items():
data, serial = self.storage.load(oid, '')
self.test.assertEqual(serial, revid)
obj = zodb_unpickle(data)
self.test.assertEqual(obj.value[0], self.getName())
def pause(self):
time.sleep(self.delay)
def oid(self):
oid = self.storage.new_oid()
self.oids[oid] = None
return oid
def dostore(self, i):
data = zodb_pickle(MinPO((self.getName(), i)))
t = Transaction()
oid = self.oid()
self.pause()
self.storage.tpc_begin(t)
self.pause()
# Always create a new object, signified by None for revid
r1 = self.storage.store(oid, None, data, '', t)
self.pause()
r2 = self.storage.tpc_vote(t)
self.pause()
self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
def run(self):
ops = [getattr(self, meth) for meth in dir(self)
if meth.startswith('do_')]
# do a store to guarantee there's at least one oid in self.oids
self.dostore(0)
for i in range(self.commits - 1):
meth = random.choice(ops)
meth()
self.dostore(i)
self.check()
def pick_oid(self):
return random.choice(self.oids.keys())
def do_load(self):
oid = self.pick_oid()
self.storage.load(oid, '')
def do_loadSerial(self):
oid = self.pick_oid()
self.storage.loadSerial(oid, self.oids[oid])
def do_modifiedInVersion(self):
oid = self.pick_oid()
self.storage.modifiedInVersion(oid)
def do_undoLog(self):
self.storage.undoLog()
def do_iterator(self):
iter = self.storage.iterator()
for obj in iter:
pass
class MTStorage:
"Test a storage with multiple client threads executing concurrently."
def _checkNThreads(self, n, constructor, *args):
threads = [constructor(*args) for i in range(n)]
for t in threads:
t.start()
for t in threads:
t.join()
def check2ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(2, ZODBClientThread, db, self)
def check7ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(7, ZODBClientThread, db, self)
def check2StorageThreads(self):
self._checkNThreads(2, StorageClientThread, self._storage, self)
def check7StorageThreads(self):
self._checkNThreads(7, StorageClientThread, self._storage, self)
def check4ExtStorageThread(self):
self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
=== StandaloneZODB/ZODB/tests/BasicStorage.py 1.14 => 1.14.18.1 ===
from ZODB.tests.MinPO import MinPO
-from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
+from ZODB.tests.StorageTestBase \
+ import zodb_unpickle, zodb_pickle, handle_serials
ZERO = '\0'*8
@@ -77,7 +78,7 @@
'', txn)
r2 = self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn)
- newrevid = self._handle_serials(oid, r1, r2)
+ newrevid = handle_serials(oid, r1, r2)
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
=== StandaloneZODB/ZODB/tests/StorageTestBase.py 1.9.18.2 => 1.9.18.3 ===
return inst
+def handle_all_serials(oid, *args):
+ """Return dict of oid to serialno from store() and tpc_vote().
+
+ Raises an exception if one of the calls raised an exception.
+
+ The storage interface got complicated when ZEO was introduced.
+ Any individual store() call can return None or a sequence of
+ 2-tuples where the 2-tuple is either oid, serialno or an
+ exception to be raised by the client.
+
+ The original interface just returned the serialno for the
+ object.
+ """
+ d = {}
+ for arg in args:
+ if isinstance(arg, types.StringType):
+ d[oid] = arg
+ elif arg is None:
+ pass
+ else:
+ for oid, serial in arg:
+ if not isinstance(serial, types.StringType):
+ raise arg
+ d[oid] = serial
+ return d
+
+def handle_serials(oid, *args):
+ """Return the serialno for oid based on multiple return values.
+
+ A helper for function _handle_all_serials().
+ """
+ args = (oid,) + args
+ return apply(handle_all_serials, args)[oid]
+
def import_helper(name):
mod = __import__(name)
return sys.modules[name]
@@ -86,40 +120,6 @@
def tearDown(self):
self._close()
- def _handle_all_serials(self, oid, *args):
- """Return dict of oid to serialno from store() and tpc_vote().
-
- Raises an exception if one of the calls raised an exception.
-
- The storage interface got complicated when ZEO was introduced.
- Any individual store() call can return None or a sequence of
- 2-tuples where the 2-tuple is either oid, serialno or an
- exception to be raised by the client.
-
- The original interface just returned the serialno for the
- object.
- """
- d = {}
- for arg in args:
- if isinstance(arg, types.StringType):
- d[oid] = arg
- elif arg is None:
- pass
- else:
- for oid, serial in arg:
- if not isinstance(serial, types.StringType):
- raise arg
- d[oid] = serial
- return d
-
- def _handle_serials(self, oid, *args):
- """Return the serialno for oid based on multiple return values.
-
- A helper for function _handle_all_serials().
- """
- args = (oid,) + args
- return apply(self._handle_all_serials, args)[oid]
-
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0):
"""Do a complete storage transaction. The defaults are:
@@ -152,7 +152,7 @@
# Finish the transaction
r2 = self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- return self._handle_serials(oid, r1, r2)
+ return handle_serials(oid, r1, r2)
def _dostoreNP(self, oid=None, revid=None, data=None, version=None):
return self._dostore(oid, revid, data, version, already_pickled=1)
=== StandaloneZODB/ZODB/tests/testFileStorage.py 1.13.24.1 => 1.13.24.2 ===
TransactionalUndoVersionStorage, PackableStorage, \
Synchronization, ConflictResolution, HistoryStorage, \
- IteratorStorage, Corruption, RevisionStorage, PersistentStorage
+ IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
+ MTStorage
class FileStorageTests(
StorageTestBase.StorageTestBase,
@@ -21,6 +22,7 @@
IteratorStorage.IteratorStorage,
IteratorStorage.ExtendedIteratorStorage,
PersistentStorage.PersistentStorage,
+ MTStorage.MTStorage
):
def open(self, **kwargs):