[Zope-Checkins] CVS: ZODB/src/ZODB/tests - test_storage.py:1.1 testmvcc.py:1.1

Jeremy Hylton jeremy at zope.com
Thu Mar 11 15:11:28 EST 2004


Update of /cvs-repository/ZODB/src/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv17010/ZODB/tests

Added Files:
	test_storage.py testmvcc.py 
Log Message:
First, minimal MVCC tests.


=== Added File ZODB/src/ZODB/tests/test_storage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import bisect
import threading
import unittest

from ZODB.BaseStorage import BaseStorage
from ZODB import POSException
from ZODB.utils import z64

from ZODB.tests import StorageTestBase
from ZODB.tests \
     import BasicStorage, MTStorage, Synchronization, PackableStorage, \
     RevisionStorage

class Transaction(object):
    """Hold data for current transaction for MinimalMemoryStorage."""

    def __init__(self, tid):
        self.index = {}
        self.tid = tid

    def store(self, oid, data):
        self.index[(oid, self.tid)] = data

    def cur(self):
        return dict.fromkeys([oid for oid, tid in self.index.keys()], self.tid)

class MinimalMemoryStorage(BaseStorage, object):
    """Simple in-memory storage that supports revisions.

    This storage is needed to test multi-version concurrency control.
    It is similar to MappingStorage, but keeps multiple revisions.
    It does not support versions.
    """

    def __init__(self):
        super(MinimalMemoryStorage, self).__init__("name")
        # _index maps oid, tid pairs to data records
        self._index = {}
        # _cur maps oid to current tid
        self._cur = {}

    def isCurrent(self, oid, serial):
        return serial == self._cur[oid]

    def __len__(self):
        return len(self._index)

    def _clear_temp(self):
        pass

    def loadEx(self, oid, version):
        self._lock_acquire()
        try:
            assert not version
            tid = self._cur[oid]
            return self._index[(oid, tid)], tid, ""
        finally:
            self._lock_release()

    def load(self, oid, version):
        return self.loadEx(oid, version)[:2]

    def _begin(self, tid, u, d, e):
        self._txn = Transaction(tid)

    def store(self, oid, serial, data, v, txn):
        if txn is not self._transaction:
            raise POSException.StorageTransactionError(self, txn)
        assert not v
        if self._cur.get(oid) != serial:
            if not (serial is None or self._cur.get(oid) in [None, z64]):
                raise POSException.ConflictError(
                    oid=oid, serials=(self._cur.get(oid), serial), data=data)
        self._txn.store(oid, data)
        return self._tid

    def _abort(self):
        del self._txn

    def _finish(self, tid, u, d, e):
        self._lock_acquire()
        try:
            self._index.update(self._txn.index)
            self._cur.update(self._txn.cur())
            self._ltid = self._tid
        finally:
            self._lock_release()

    def lastTransaction(self):
        return self._ltid

    def loadBefore(self, the_oid, the_tid):
        # It's okay if loadBefore() is really expensive, because this
        # storage is just used for testing.
        self._lock_acquire()
        try:
            tids = [tid for oid, tid in self._index if oid == the_oid]
            if not tids:
                raise KeyError, the_oid
            tids.sort()
            i = bisect.bisect_left(tids, the_tid) - 1
            if i == -1:
                return None
            tid = tids[i]
            j = i + 1
            if j == len(tids):
                end_tid = None
            else:
                end_tid = tids[j]
            return self._index[(the_oid, tid)], tid, end_tid
        finally:
            self._lock_release()

    def loadSerial(self, oid, serial):
        self._lock_acquire()
        try:
            return self._index[(oid, serial)]
        finally:
            self._lock_release()

class MinimalTestSuite(StorageTestBase.StorageTestBase,
                       BasicStorage.BasicStorage,
                       MTStorage.MTStorage,
                       PackableStorage.PackableStorage,
                       Synchronization.SynchronizedStorage,
                       RevisionStorage.RevisionStorage,
                       ):

    def setUp(self):
        self._storage = MinimalMemoryStorage()

    # we don't implement undo

    def checkLoadBeforeUndo(self):
        pass

def test_suite():
    return unittest.makeSuite(MinimalTestSuite, "check")


=== Added File ZODB/src/ZODB/tests/testmvcc.py ===
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
r"""
Multi-version concurrency control tests
=======================================

Multi-version concurrency control (MVCC) exploits storages that store
multiple revisions of an object to avoid read conflicts.  Normally
when an object is read from the storage, its most recent revisions is
read.  Under MVCC, an older revision is read so that the transaction
sees a consistent view of the database.

ZODB guarantees execution-time consistency: A single transaction will
always see a consistent view of the database while it is executing.
If transaction A is running, has already read an object O1, and an
external transaction B modifies object O2, then transaction A can no
longer read the current revision of O2.  It must either read the
version of O2 that is consistent with O1 or raise a ReadConflictError.

This note includes doctests that explain how MVCC is implemented (and
test that the implementation is correct).  The tests use a
MinimalMemoryStorage that implements MVCC support, but not much else.

>>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB
>>> db = DB(MinimalMemoryStorage())

We will use two different connections with the experimental
setLocalTransaction() method to make sure that the connections act
independently, even though they'll be run from a single thread.

>>> cn1 = db.open()
>>> txn1 = cn1.setLocalTransaction()

The test will just use some MinPO objects.  The next few lines just
setup an initial database state.

>>> from ZODB.tests.MinPO import MinPO
>>> r = cn1.root()
>>> r["a"] = MinPO(1)
>>> r["b"] = MinPO(1)
>>> txn1.commit()

Now open a second connection.

>>> cn2 = db.open()
>>> txn2 = cn2.setLocalTransaction()

The ZODB Connection tracks a transaction high-water mark, which
represents the latest transaction id that can be read by the current
transaction and still present a consistent view of the database.  When
a transaction commits, the database sends invalidations to all the
other transactions; the invalidation contains the transaction id and
the oids of modified objects.  The Connection stores the high-water
mark in _txn_time, which is set to None until an invalidation arrives.

>>> cn = db.open()

>>> cn._txn_time
>>> cn.invalidate(1, dict.fromkeys([1, 2]))
>>> cn._txn_time
1
>>> cn.invalidate(2, dict.fromkeys([1, 2]))
>>> cn._txn_time
1

The high-water mark is set to the transaction id of the first
transaction, because transaction ids must be monotonically increasing.
It is reset at transaction boundaries.

XXX We'd like simple abort and commit calls to make txn boundaries,
but that doesn't work unless an object is modified.  sync() will abort
a transaction and process invalidations.

>>> cn.sync()
>>> cn._txn_time

The next bit of code includes a simple MVCC test.  One transaction
will begin and modify "a."  The other transaction will then modify "b"
and commit.

>>> r1 = cn1.root()
>>> r1["a"].value = 2

>>> cn1.getTransaction().commit()
>>> txn = db.lastTransaction()

The second connection has its high-water mark set now.

>>> cn2._txn_time == txn
True

It is safe to read "b," because it was not modified by the concurrent
transaction. 

>>> r2 = cn2.root()
>>> r2["b"]._p_serial < cn2._txn_time
True
>>> r2["b"].value = 2

It is not safe, however, to read the current revision "a," because it
was modified at the high-water mark.  If we read it, we'll get a
non-current version.

>>> r2["a"].value
1
>>> r2["a"]._p_serial < cn2._txn_time
True

We can confirm that we have a non-current revision by asking the
storage.

>>> db._storage.isCurrent(r2["a"]._p_oid, r2["a"]._p_serial)
False

It's possible to modify "a," but we get a conflict error when we
commit the transaction.

>>> r2["a"].value = 3
>>> txn2.commit()
Traceback (most recent call last):
 ...
ConflictError: database conflict error (oid 0000000000000001, class ZODB.tests.MinPO.MinPO)

"""

import doctest

def test_suite():
    return doctest.DocTestSuite()




More information about the Zope-Checkins mailing list