[Zope-Checkins] CVS: Zope/lib/python/ZODB/tests - MTStorage.py:1.2.6.1

Fred L. Drake, Jr. fdrake@acm.org
Wed, 20 Feb 2002 09:50:26 -0500


Update of /cvs-repository/Zope/lib/python/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv28816/tests

Added Files:
      Tag: Zope-2_5-branch
	MTStorage.py 
Log Message:
Support for testing a storage from multiple simultaneous client threads.
[Merge from trunk.]


=== Added File Zope/lib/python/ZODB/tests/MTStorage.py ===
import random
import threading
import time

import ZODB
from PersistentMapping import PersistentMapping

from ZODB.tests.StorageTestBase \
     import zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError

SHORT_DELAY = 0.01

def sort(l):
    "Sort a list in place and return it."
    l.sort()
    return l

class ZODBClientThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
        self.__super_init()
        self.db = db
        self.test = test
        self.commits = commits
        self.delay = delay

    def run(self):
        conn = self.db.open()
        root = conn.root()
        d = self.get_thread_dict(root)
        if d is None:
            self.test.fail()
        else:
            for i in range(self.commits):
                self.commit(d, i)
        self.test.assertEqual(sort(d.keys()), range(self.commits))

    def commit(self, d, num):
        d[num] = time.time()
        time.sleep(self.delay)
        get_transaction().commit()
        time.sleep(self.delay)

    def get_thread_dict(self, root):
        name = self.getName()
        # arbitrarily limit to 10 re-tries
        for i in range(10):
            try:
                m = PersistentMapping()
                root[name] = m
                get_transaction().commit()
                break
            except ConflictError:
                get_transaction().abort()
        for i in range(10):
            try:
                return root.get(name)
            except ConflictError:
                get_transaction().abort()

class StorageClientThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
        self.__super_init()
        self.storage = storage
        self.test = test
        self.commits = commits
        self.delay = delay
        self.oids = {}

    def run(self):
        for i in range(self.commits):
            self.dostore(i)
        self.check()

    def check(self):
        for oid, revid in self.oids.items():
            data, serial = self.storage.load(oid, '')
            self.test.assertEqual(serial, revid)
            obj = zodb_unpickle(data)
            self.test.assertEqual(obj.value[0], self.getName())

    def pause(self):
        time.sleep(self.delay)

    def oid(self):
        oid = self.storage.new_oid()
        self.oids[oid] = None
        return oid

    def dostore(self, i):
        data = zodb_pickle(MinPO((self.getName(), i)))
        t = Transaction()
        oid = self.oid()
        self.pause()

        self.storage.tpc_begin(t)
        self.pause()

        # Always create a new object, signified by None for revid
        r1 = self.storage.store(oid, None, data, '', t)
        self.pause()

        r2 = self.storage.tpc_vote(t)
        self.pause()

        self.storage.tpc_finish(t)
        self.pause()

        revid = handle_serials(oid, r1, r2)
        self.oids[oid] = revid

class ExtStorageClientThread(StorageClientThread):

    def run(self):
        # pick some other storage ops to execute
        ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
               if meth.startswith('do_')]
        assert ops, "Didn't find an storage ops in %s" % self.storage
        # do a store to guarantee there's at least one oid in self.oids
        self.dostore(0)

        for i in range(self.commits - 1):
            meth = random.choice(ops)
            meth()
            self.dostore(i)
        self.check()

    def pick_oid(self):
        return random.choice(self.oids.keys())

    def do_load(self):
        oid = self.pick_oid()
        self.storage.load(oid, '')

    def do_loadSerial(self):
        oid = self.pick_oid()
        self.storage.loadSerial(oid, self.oids[oid])

    def do_modifiedInVersion(self):
        oid = self.pick_oid()
        self.storage.modifiedInVersion(oid)

    def do_undoLog(self):
        self.storage.undoLog(0, -20)

    def do_iterator(self):
        try:
            iter = self.storage.iterator()
        except AttributeError:
            # XXX It's hard to detect that a ZEO ClientStorage
            # doesn't have this method, but does have all the others.
            return
        for obj in iter:
            pass

class MTStorage:
    "Test a storage with multiple client threads executing concurrently."

    def _checkNThreads(self, n, constructor, *args):
        threads = [constructor(*args) for i in range(n)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    
    def check2ZODBThreads(self):
        db = ZODB.DB(self._storage)
        self._checkNThreads(2, ZODBClientThread, db, self)

    def check7ZODBThreads(self):
        db = ZODB.DB(self._storage)
        self._checkNThreads(7, ZODBClientThread, db, self)

    def check2StorageThreads(self):
        self._checkNThreads(2, StorageClientThread, self._storage, self)
    
    def check7StorageThreads(self):
        self._checkNThreads(7, StorageClientThread, self._storage, self)

    def check4ExtStorageThread(self):
        self._checkNThreads(4, ExtStorageClientThread, self._storage, self)