[Zope-Checkins] CVS: StandaloneZODB/ZODB/tests - MTStorage.py:1.1.2.1 BasicStorage.py:1.14.18.1 StorageTestBase.py:1.9.18.3 testFileStorage.py:1.13.24.2

Jeremy Hylton jeremy@zope.com
Sun, 6 Jan 2002 19:41:16 -0500


Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv22215/ZODB/tests

Modified Files:
      Tag: Standby-branch
	BasicStorage.py StorageTestBase.py testFileStorage.py 
Added Files:
      Tag: Standby-branch
	MTStorage.py 
Log Message:
Add tests of storage with multiple client threads.

As part of the tests, change handle_serial and handle_all_serials from
methods on StorageTestBase to functions in the module.  They should
have been functions to start with.

Add MTStorage to list of tests run by testFileStorage.



=== Added File StandaloneZODB/ZODB/tests/MTStorage.py ===
import random
import threading
import time

import ZODB
from PersistentMapping import PersistentMapping

from ZODB.tests.StorageTestBase \
     import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError

SHORT_DELAY = 0.01

def sort(l):
    "Sort a list in place and return it."
    l.sort()
    return l

class ZODBClientThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
        self.__super_init()
        self.db = db
        self.test = test
        self.commits = commits
        self.delay = delay

    def run(self):
        conn = self.db.open()
        root = conn.root()
        d = self.get_thread_dict(root)
        if d is None:
            self.test.fail()
        else:
            for i in range(self.commits):
                self.commit(d, i)
        self.test.assertEqual(sort(d.keys()), range(self.commits))

    def commit(self, d, num):
        d[num] = time.time()
        time.sleep(self.delay)
        get_transaction().commit()
        time.sleep(self.delay)

    def get_thread_dict(self, root):
        name = self.getName()
        # arbitrarily limit to 10 re-tries
        for i in range(10):
            try:
                m = PersistentMapping()
                root[name] = m
                get_transaction().commit()
                break
            except ConflictError:
                get_transaction().abort()
        return root.get(name)

class StorageClientThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
        self.__super_init()
        self.storage = storage
        self.test = test
        self.commits = commits
        self.delay = delay
        self.oids = {}

    def run(self):
        for i in range(self.commits):
            self.dostore(i)
        self.check()

    def check(self):
        for oid, revid in self.oids.items():
            data, serial = self.storage.load(oid, '')
            self.test.assertEqual(serial, revid)
            obj = zodb_unpickle(data)
            self.test.assertEqual(obj.value[0], self.getName())

    def pause(self):
        time.sleep(self.delay)

    def oid(self):
        oid = self.storage.new_oid()
        self.oids[oid] = None
        return oid

    def dostore(self, i):
        data = zodb_pickle(MinPO((self.getName(), i)))
        t = Transaction()
        oid = self.oid()
        self.pause()

        self.storage.tpc_begin(t)
        self.pause()

        # Always create a new object, signified by None for revid
        r1 = self.storage.store(oid, None, data, '', t)
        self.pause()

        r2 = self.storage.tpc_vote(t)
        self.pause()

        self.storage.tpc_finish(t)
        self.pause()

        revid = handle_serials(oid, r1, r2)
        self.oids[oid] = revid

class ExtStorageClientThread(StorageClientThread):

    def run(self):
        ops = [getattr(self, meth) for meth in dir(self)
               if meth.startswith('do_')]
        # do a store to guarantee there's at least one oid in self.oids
        self.dostore(0)

        for i in range(self.commits - 1):
            meth = random.choice(ops)
            meth()
            self.dostore(i)
        self.check()

    def pick_oid(self):
        return random.choice(self.oids.keys())

    def do_load(self):
        oid = self.pick_oid()
        self.storage.load(oid, '')

    def do_loadSerial(self):
        oid = self.pick_oid()
        self.storage.loadSerial(oid, self.oids[oid])

    def do_modifiedInVersion(self):
        oid = self.pick_oid()
        self.storage.modifiedInVersion(oid)

    def do_undoLog(self):
        self.storage.undoLog()

    def do_iterator(self):
        iter = self.storage.iterator()
        for obj in iter:
            pass

class MTStorage:
    "Test a storage with multiple client threads executing concurrently."

    def _checkNThreads(self, n, constructor, *args):
        threads = [constructor(*args) for i in range(n)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    
    def check2ZODBThreads(self):
        db = ZODB.DB(self._storage)
        self._checkNThreads(2, ZODBClientThread, db, self)

    def check7ZODBThreads(self):
        db = ZODB.DB(self._storage)
        self._checkNThreads(7, ZODBClientThread, db, self)

    def check2StorageThreads(self):
        self._checkNThreads(2, StorageClientThread, self._storage, self)
    
    def check7StorageThreads(self):
        self._checkNThreads(7, StorageClientThread, self._storage, self)

    def check4ExtStorageThread(self):
        self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
        


=== StandaloneZODB/ZODB/tests/BasicStorage.py 1.14 => 1.14.18.1 ===
 
 from ZODB.tests.MinPO import MinPO
-from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
+from ZODB.tests.StorageTestBase \
+     import zodb_unpickle, zodb_pickle, handle_serials
 
 ZERO = '\0'*8
 
@@ -77,7 +78,7 @@
                                        '', txn)
         r2 = self._storage.tpc_vote(txn)
         self._storage.tpc_finish(txn)
-        newrevid = self._handle_serials(oid, r1, r2)
+        newrevid = handle_serials(oid, r1, r2)
         data, revid = self._storage.load(oid, '')
         value = zodb_unpickle(data)
         eq(value, MinPO(11))


=== StandaloneZODB/ZODB/tests/StorageTestBase.py 1.9.18.2 => 1.9.18.3 ===
     return inst
 
+def handle_all_serials(oid, *args):
+    """Return dict of oid to serialno from store() and tpc_vote().
+
+    Raises an exception if one of the calls raised an exception.
+
+    The storage interface got complicated when ZEO was introduced.
+    Any individual store() call can return None or a sequence of
+    2-tuples where the 2-tuple is either oid, serialno or an
+    exception to be raised by the client.
+
+    The original interface just returned the serialno for the
+    object.
+    """
+    d = {}
+    for arg in args:
+        if isinstance(arg, types.StringType):
+            d[oid] = arg
+        elif arg is None:
+            pass
+        else:
+            for oid, serial in arg:
+                if not isinstance(serial, types.StringType):
+                    raise arg
+                d[oid] = serial
+    return d
+
+def handle_serials(oid, *args):
+    """Return the serialno for oid based on multiple return values.
+
+    A helper for function _handle_all_serials().
+    """
+    args = (oid,) + args
+    return apply(handle_all_serials, args)[oid]
+
 def import_helper(name):
     mod = __import__(name)
     return sys.modules[name]
@@ -86,40 +120,6 @@
     def tearDown(self):
         self._close()
 
-    def _handle_all_serials(self, oid, *args):
-        """Return dict of oid to serialno from store() and tpc_vote().
-
-        Raises an exception if one of the calls raised an exception.
-
-        The storage interface got complicated when ZEO was introduced.
-        Any individual store() call can return None or a sequence of
-        2-tuples where the 2-tuple is either oid, serialno or an
-        exception to be raised by the client.
-
-        The original interface just returned the serialno for the
-        object.
-        """
-        d = {}
-        for arg in args:
-            if isinstance(arg, types.StringType):
-                d[oid] = arg
-            elif arg is None:
-                pass
-            else:
-                for oid, serial in arg:
-                    if not isinstance(serial, types.StringType):
-                        raise arg
-                    d[oid] = serial
-        return d
-
-    def _handle_serials(self, oid, *args):
-        """Return the serialno for oid based on multiple return values.
-
-        A helper for function _handle_all_serials().
-        """
-        args = (oid,) + args
-        return apply(self._handle_all_serials, args)[oid]
-
     def _dostore(self, oid=None, revid=None, data=None, version=None,
                  already_pickled=0):
         """Do a complete storage transaction.  The defaults are:
@@ -152,7 +152,7 @@
         # Finish the transaction
         r2 = self._storage.tpc_vote(self._transaction)
         self._storage.tpc_finish(self._transaction)
-        return self._handle_serials(oid, r1, r2)
+        return handle_serials(oid, r1, r2)
         
     def _dostoreNP(self, oid=None, revid=None, data=None, version=None):
         return self._dostore(oid, revid, data, version, already_pickled=1)


=== StandaloneZODB/ZODB/tests/testFileStorage.py 1.13.24.1 => 1.13.24.2 ===
      TransactionalUndoVersionStorage, PackableStorage, \
      Synchronization, ConflictResolution, HistoryStorage, \
-     IteratorStorage, Corruption, RevisionStorage, PersistentStorage
+     IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
+     MTStorage
 
 class FileStorageTests(
     StorageTestBase.StorageTestBase,
@@ -21,6 +22,7 @@
     IteratorStorage.IteratorStorage,
     IteratorStorage.ExtendedIteratorStorage,
     PersistentStorage.PersistentStorage,
+    MTStorage.MTStorage
     ):
 
     def open(self, **kwargs):