[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Merged the shane-poll-invalidations branch, which adds RelStorage

Jim Fulton jim at zope.com
Mon Apr 27 18:25:55 EDT 2009


Log message for revision 99544:
  Merged the shane-poll-invalidations branch, which adds RelStorage
  support.
  

Changed:
  U   ZODB/trunk/src/ZODB/Connection.py
  U   ZODB/trunk/src/ZODB/DB.py
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/interfaces.py
  A   ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py
  A   ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py

-=-
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py	2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/Connection.py	2009-04-27 22:25:55 UTC (rev 99544)
@@ -30,6 +30,7 @@
 from persistent.interfaces import IPersistentDataManager
 from ZODB.interfaces import IConnection
 from ZODB.interfaces import IBlobStorage
+from ZODB.interfaces import IMVCCStorage
 from ZODB.blob import Blob, rename_or_copy_blob
 from transaction.interfaces import ISavepointDataManager
 from transaction.interfaces import IDataManagerSavepoint
@@ -94,8 +95,16 @@
         # Multi-database support
         self.connections = {self._db.database_name: self}
 
-        self._normal_storage = self._storage = db.storage
-        self.new_oid = db.storage.new_oid
+        storage = db.storage
+        if IMVCCStorage.providedBy(storage):
+            # Use a connection-specific storage instance.
+            self._mvcc_storage = True
+            storage = storage.new_instance()
+        else:
+            self._mvcc_storage = False
+
+        self._normal_storage = self._storage = storage
+        self.new_oid = storage.new_oid
         self._savepoint_storage = None
 
         # Do we need to join a txn manager?
@@ -148,7 +157,6 @@
         # in the cache on abort and in other connections on finish.
         self._modified = []
 
-
         # _invalidated queues invalidate messages delivered from the DB
         # _inv_lock prevents one thread from modifying the set while
         # another is processing invalidations.  All the invalidations
@@ -179,10 +187,10 @@
         # _conflicts).
         self._conflicts = {}
 
-        # If MVCC is enabled, then _mvcc is True and _txn_time stores
-        # the upper bound on transactions visible to this connection.
-        # That is, all object revisions must be written before _txn_time.
-        # If it is None, then the current revisions are acceptable.
+        # _txn_time stores the upper bound on transactions visible to
+        # this connection. That is, all object revisions must be
+        # written before _txn_time. If it is None, then the current
+        # revisions are acceptable.
         self._txn_time = None
 
         # To support importFile(), implemented in the ExportImport base
@@ -295,6 +303,9 @@
         if self.opened:
             self.transaction_manager.unregisterSynch(self)
 
+        if self._mvcc_storage:
+            self._storage.sync(force=False)
+
         if primary:
             for connection in self.connections.values():
                 if connection is not self:
@@ -323,6 +334,10 @@
 
     def invalidate(self, tid, oids):
         """Notify the Connection that transaction 'tid' invalidated oids."""
+        if self._mvcc_storage:
+            # Inter-connection invalidation is not needed when the
+            # storage provides MVCC.
+            return
         if self.before is not None:
             # this is an historical connection.  Invalidations are irrelevant.
             return
@@ -462,6 +477,16 @@
 
     # Process pending invalidations.
     def _flush_invalidations(self):
+        if self._mvcc_storage:
+            # Poll the storage for invalidations.
+            invalidated = self._storage.poll_invalidations()
+            if invalidated is None:
+                # special value: the transaction is so old that
+                # we need to flush the whole cache.
+                self._cache.invalidate(self._cache.cache_data.keys())
+            elif invalidated:
+                self._cache.invalidate(invalidated)
+
         self._inv_lock.acquire()
         try:
             # Non-ghostifiable objects may need to read when they are
@@ -1048,6 +1073,11 @@
         if getattr(self, '_reader', None) is not None:
             self._reader._cache = cache
 
+    def _releaseStorage(self):
+        """Tell the storage to release resources it's using"""
+        if self._mvcc_storage:
+            self._storage.release()
+
     ##########################################################################
     # Python protocol
 

Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py	2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/DB.py	2009-04-27 22:25:55 UTC (rev 99544)
@@ -34,6 +34,7 @@
 
 from zope.interface import implements
 from ZODB.interfaces import IDatabase
+from ZODB.interfaces import IMVCCStorage
 
 import BTrees.OOBTree
 import transaction
@@ -198,6 +199,7 @@
             # reclaim `c` now, and `c` would be left in a user-visible
             # crazy state.
             c._resetCache()
+            c._releaseStorage()
 
     def reduce_size(self):
         self._reduce_size()
@@ -452,24 +454,32 @@
                 DeprecationWarning, 2)
             storage.tpc_vote = lambda *args: None
 
+        if IMVCCStorage.providedBy(storage):
+            temp_storage = storage.new_instance()
+        else:
+            temp_storage = storage
         try:
-            storage.load(z64, '')
-        except KeyError:
-            # Create the database's root in the storage if it doesn't exist
-            from persistent.mapping import PersistentMapping
-            root = PersistentMapping()
-            # Manually create a pickle for the root to put in the storage.
-            # The pickle must be in the special ZODB format.
-            file = cStringIO.StringIO()
-            p = cPickle.Pickler(file, 1)
-            p.dump((root.__class__, None))
-            p.dump(root.__getstate__())
-            t = transaction.Transaction()
-            t.description = 'initial database creation'
-            storage.tpc_begin(t)
-            storage.store(z64, None, file.getvalue(), '', t)
-            storage.tpc_vote(t)
-            storage.tpc_finish(t)
+            try:
+                temp_storage.load(z64, '')
+            except KeyError:
+                # Create the database's root in the storage if it doesn't exist
+                from persistent.mapping import PersistentMapping
+                root = PersistentMapping()
+                # Manually create a pickle for the root to put in the storage.
+                # The pickle must be in the special ZODB format.
+                file = cStringIO.StringIO()
+                p = cPickle.Pickler(file, 1)
+                p.dump((root.__class__, None))
+                p.dump(root.__getstate__())
+                t = transaction.Transaction()
+                t.description = 'initial database creation'
+                temp_storage.tpc_begin(t)
+                temp_storage.store(z64, None, file.getvalue(), '', t)
+                temp_storage.tpc_vote(t)
+                temp_storage.tpc_finish(t)
+        finally:
+            if IMVCCStorage.providedBy(temp_storage):
+                temp_storage.release()
 
         # Multi-database setup.
         if databases is None:

Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py	2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2009-04-27 22:25:55 UTC (rev 99544)
@@ -37,7 +37,7 @@
     def __init__(self, name='MappingStorage'):
         self.__name__ = name
         self._data = {}                               # {oid->{tid->pickle}}
-        self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
+        self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
         self._ltid = None
         self._last_pack = None
         _lock = threading.RLock()

Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py	2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/interfaces.py	2009-04-27 22:25:55 UTC (rev 99544)
@@ -953,6 +953,111 @@
         # DB pass-through
 
 
+class IMVCCStorage(IStorage):
+    """A storage that provides MVCC semantics internally.
+
+    MVCC (multi-version concurrency control) means each user of a
+    database has a snapshot view of the database. The snapshot view
+    does not change, even if concurrent connections commit
+    transactions, until a transaction boundary. Relational databases
+    that support serializable transaction isolation provide MVCC.
+
+    Storages that implement IMVCCStorage, such as RelStorage, provide
+    MVCC semantics at the ZODB storage layer. When ZODB.Connection uses
+    a storage that implements IMVCCStorage, each connection uses a
+    connection-specific storage instance, and that storage instance
+    provides a snapshot of the database.
+
+    By contrast, storages that do not implement IMVCCStorage, such as
+    FileStorage, rely on ZODB.Connection to provide MVCC semantics, so
+    in that case, one storage instance is shared by many
+    ZODB.Connections. Applications that use ZODB.Connection always have
+    a snapshot view of the database; IMVCCStorage only modifies which
+    layer of ZODB provides MVCC.
+
+    Furthermore, IMVCCStorage changes the way object invalidation
+    works. An essential feature of ZODB is the propagation of object
+    invalidation messages to keep in-memory caches up to date. Storages
+    like FileStorage and ZEO.ClientStorage send invalidation messages
+    to all other Connection instances at transaction commit time.
+    Storages that implement IMVCCStorage, on the other hand, expect the
+    ZODB.Connection to poll for a list of invalidated objects.
+
+    Certain methods of IMVCCStorage implementations open persistent
+    back end database sessions and retain the sessions even after the
+    method call finishes::
+
+        load
+        loadEx
+        loadSerial
+        loadBefore
+        store
+        restore
+        new_oid
+        history
+        tpc_begin
+        tpc_vote
+        tpc_abort
+        tpc_finish
+
+    If you know that the storage instance will no longer be used after
+    calling any of these methods, you should call the release method to
+    release the persistent sessions. The persistent sessions will be
+    reopened as necessary if you call one of those methods again.
+
+    Other storage methods open short lived back end sessions and close
+    the back end sessions before returning. These include::
+
+        __len__
+        getSize
+        undoLog
+        undo
+        pack
+        iterator
+
+    These methods do not provide MVCC semantics, so these methods
+    operate on the most current view of the database, rather than the
+    snapshot view that the other methods use.
+    """
+
+    def new_instance():
+        """Creates and returns another storage instance.
+
+        The returned instance provides IMVCCStorage and connects to the
+        same back-end database. The database state visible by the
+        instance will be a snapshot that varies independently of other
+        storage instances.
+        """
+
+    def release():
+        """Release all persistent sessions used by this storage instance.
+
+        After this call, the storage instance can still be used;
+        calling methods that use persistent sessions will cause the
+        persistent sessions to be reopened.
+        """
+
+    def poll_invalidations():
+        """Poll the storage for external changes.
+
+        Returns either a sequence of OIDs that have changed, or None.  When a
+        sequence is returned, the corresponding objects should be removed
+        from the ZODB in-memory cache.  When None is returned, the storage is
+        indicating that so much time has elapsed since the last poll that it
+        is no longer possible to enumerate all of the changed OIDs, since the
+        previous transaction seen by the connection has already been packed.
+        In that case, the ZODB in-memory cache should be cleared.
+        """
+
+    def sync(force=True):
+        """Updates the internal snapshot to the current state of the database.
+
+        If the force parameter is False, the storage may choose to
+        ignore this call. By ignoring this call, a storage can reduce
+        the frequency of database polls, thus reducing database load.
+        """
+
+
 class IStorageCurrentRecordIteration(IStorage):
 
     def record_iternext(next=None):

Copied: ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py (from rev 99539, ZODB/branches/shane-poll-invalidations/src/ZODB/tests/MVCCMappingStorage.py)
===================================================================
--- ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py	                        (rev 0)
+++ ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py	2009-04-27 22:25:55 UTC (rev 99544)
@@ -0,0 +1,85 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""An extension of MappingStorage that depends on polling.
+
+Each Connection has its own view of the database.  Polling updates each
+connection's view.
+"""
+
+import time
+
+import BTrees
+from ZODB.interfaces import IMVCCStorage
+from ZODB.MappingStorage import MappingStorage
+from ZODB.TimeStamp import TimeStamp
+from zope.interface import implements
+
+
+class MVCCMappingStorage(MappingStorage):
+    implements(IMVCCStorage)
+
+    def __init__(self, name="MVCC Mapping Storage"):
+        MappingStorage.__init__(self, name=name)
+        # _polled_tid contains the transaction ID at the last poll.
+        self._polled_tid = ''
+
+    def new_instance(self):
+        """Returns a storage instance that is a view of the same data.
+        """
+        res = MVCCMappingStorage(name=self.__name__)
+        res._transactions = self._transactions
+        return res
+
+    def sync(self, force=False):
+        pass
+
+    def release(self):
+        pass
+
+    def poll_invalidations(self):
+        """Poll the storage for changes by other connections.
+        """
+        new_tid = self._transactions.maxKey()
+
+        if self._polled_tid:
+            if not self._transactions.has_key(self._polled_tid):
+                # This connection is so old that we can no longer enumerate
+                # all the changes.
+                self._polled_tid = new_tid
+                return None
+
+        changed_oids = set()
+        for tid, txn in self._transactions.items(
+                self._polled_tid, new_tid, excludemin=True, excludemax=False):
+            if txn.status == 'p':
+                # This transaction has been packed, so it is no longer
+                # possible to enumerate all changed oids.
+                self._polled_tid = new_tid
+                return None
+            if tid == self._ltid:
+                # ignore the transaction committed by this connection
+                continue
+
+            changes = txn.data
+            # pull in changes from the transaction log
+            for oid, value in changes.iteritems():
+                tid_data = self._data.get(oid)
+                if tid_data is None:
+                    tid_data = BTrees.OOBTree.OOBucket()
+                    self._data[oid] = tid_data
+                tid_data[tid] = changes[oid]
+            changed_oids.update(changes.keys())
+
+        self._polled_tid = new_tid
+        return list(changed_oids)

Copied: ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py (from rev 99539, ZODB/branches/shane-poll-invalidations/src/ZODB/tests/testMVCCMappingStorage.py)
===================================================================
--- ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py	                        (rev 0)
+++ ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py	2009-04-27 22:25:55 UTC (rev 99544)
@@ -0,0 +1,164 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import unittest
+
+from persistent.mapping import PersistentMapping
+import transaction
+from ZODB.DB import DB
+from ZODB.tests.MVCCMappingStorage import MVCCMappingStorage
+
+
+from ZODB.tests import (
+    BasicStorage,
+    HistoryStorage,
+    IteratorStorage,
+    MTStorage,
+    PackableStorage,
+    RevisionStorage,
+    StorageTestBase,
+    Synchronization,
+    )
+
+class MVCCTests:
+
+    def checkCrossConnectionInvalidation(self):
+        # Verify connections see updated state at txn boundaries.
+        # This will fail if the Connection doesn't poll for changes.
+        db = DB(self._storage)
+        try:
+            c1 = db.open()
+            r1 = c1.root()
+            r1['myobj'] = 'yes'
+            c2 = db.open()
+            r2 = c2.root()
+            self.assert_('myobj' not in r2)
+
+            storage = c1._storage
+            t = transaction.Transaction()
+            t.description = 'invalidation test'
+            storage.tpc_begin(t)
+            c1.commit(t)
+            storage.tpc_vote(t)
+            storage.tpc_finish(t)
+
+            self.assert_('myobj' not in r2)
+            c2.sync()
+            self.assert_('myobj' in r2)
+            self.assert_(r2['myobj'] == 'yes')
+        finally:
+            db.close()
+
+    def checkCrossConnectionIsolation(self):
+        # Verify MVCC isolates connections.
+        # This will fail if Connection doesn't poll for changes.
+        db = DB(self._storage)
+        try:
+            c1 = db.open()
+            r1 = c1.root()
+            r1['alpha'] = PersistentMapping()
+            r1['gamma'] = PersistentMapping()
+            transaction.commit()
+
+            # Open a second connection but don't load root['alpha'] yet
+            c2 = db.open()
+            r2 = c2.root()
+
+            r1['alpha']['beta'] = 'yes'
+
+            storage = c1._storage
+            t = transaction.Transaction()
+            t.description = 'isolation test 1'
+            storage.tpc_begin(t)
+            c1.commit(t)
+            storage.tpc_vote(t)
+            storage.tpc_finish(t)
+
+            # The second connection will now load root['alpha'], but due to
+            # MVCC, it should continue to see the old state.
+            self.assert_(r2['alpha']._p_changed is None)  # A ghost
+            self.assert_(not r2['alpha'])
+            self.assert_(r2['alpha']._p_changed == 0)
+
+            # make root['alpha'] visible to the second connection
+            c2.sync()
+
+            # Now it should be in sync
+            self.assert_(r2['alpha']._p_changed is None)  # A ghost
+            self.assert_(r2['alpha'])
+            self.assert_(r2['alpha']._p_changed == 0)
+            self.assert_(r2['alpha']['beta'] == 'yes')
+
+            # Repeat the test with root['gamma']
+            r1['gamma']['delta'] = 'yes'
+
+            storage = c1._storage
+            t = transaction.Transaction()
+            t.description = 'isolation test 2'
+            storage.tpc_begin(t)
+            c1.commit(t)
+            storage.tpc_vote(t)
+            storage.tpc_finish(t)
+
+            # The second connection will now load root[3], but due to MVCC,
+            # it should continue to see the old state.
+            self.assert_(r2['gamma']._p_changed is None)  # A ghost
+            self.assert_(not r2['gamma'])
+            self.assert_(r2['gamma']._p_changed == 0)
+
+            # make root[3] visible to the second connection
+            c2.sync()
+
+            # Now it should be in sync
+            self.assert_(r2['gamma']._p_changed is None)  # A ghost
+            self.assert_(r2['gamma'])
+            self.assert_(r2['gamma']._p_changed == 0)
+            self.assert_(r2['gamma']['delta'] == 'yes')
+        finally:
+            db.close()
+    
+
+class MVCCMappingStorageTests(
+    StorageTestBase.StorageTestBase,
+    BasicStorage.BasicStorage,
+
+    HistoryStorage.HistoryStorage,
+    IteratorStorage.ExtendedIteratorStorage,
+    IteratorStorage.IteratorStorage,
+    MTStorage.MTStorage,
+    PackableStorage.PackableStorageWithOptionalGC,
+    RevisionStorage.RevisionStorage,
+    Synchronization.SynchronizedStorage,
+    MVCCTests
+    ):
+
+    def setUp(self):
+        self._storage = MVCCMappingStorage()
+
+    def tearDown(self):
+        self._storage.close()
+
+    def checkLoadBeforeUndo(self):
+        pass # we don't support undo yet
+    checkUndoZombie = checkLoadBeforeUndo
+
+
+def test_suite():
+    suite = unittest.makeSuite(MVCCMappingStorageTests, 'check')
+    return suite
+
+if __name__ == "__main__":
+    loader = unittest.TestLoader()
+    loader.testMethodPrefix = "check"
+    unittest.main(testLoader=loader)



More information about the Zodb-checkins mailing list