[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Merged the shane-poll-invalidations branch, which adds RelStorage
Jim Fulton
jim at zope.com
Mon Apr 27 18:25:55 EDT 2009
Log message for revision 99544:
Merged the shane-poll-invalidations branch, which adds RelStorage
support.
Changed:
U ZODB/trunk/src/ZODB/Connection.py
U ZODB/trunk/src/ZODB/DB.py
U ZODB/trunk/src/ZODB/MappingStorage.py
U ZODB/trunk/src/ZODB/interfaces.py
A ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py
A ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py
-=-
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/Connection.py 2009-04-27 22:25:55 UTC (rev 99544)
@@ -30,6 +30,7 @@
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
+from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
@@ -94,8 +95,16 @@
# Multi-database support
self.connections = {self._db.database_name: self}
- self._normal_storage = self._storage = db.storage
- self.new_oid = db.storage.new_oid
+ storage = db.storage
+ if IMVCCStorage.providedBy(storage):
+ # Use a connection-specific storage instance.
+ self._mvcc_storage = True
+ storage = storage.new_instance()
+ else:
+ self._mvcc_storage = False
+
+ self._normal_storage = self._storage = storage
+ self.new_oid = storage.new_oid
self._savepoint_storage = None
# Do we need to join a txn manager?
@@ -148,7 +157,6 @@
# in the cache on abort and in other connections on finish.
self._modified = []
-
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
@@ -179,10 +187,10 @@
# _conflicts).
self._conflicts = {}
- # If MVCC is enabled, then _mvcc is True and _txn_time stores
- # the upper bound on transactions visible to this connection.
- # That is, all object revisions must be written before _txn_time.
- # If it is None, then the current revisions are acceptable.
+ # _txn_time stores the upper bound on transactions visible to
+ # this connection. That is, all object revisions must be
+ # written before _txn_time. If it is None, then the current
+ # revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
@@ -295,6 +303,9 @@
if self.opened:
self.transaction_manager.unregisterSynch(self)
+ if self._mvcc_storage:
+ self._storage.sync(force=False)
+
if primary:
for connection in self.connections.values():
if connection is not self:
@@ -323,6 +334,10 @@
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
+ if self._mvcc_storage:
+ # Inter-connection invalidation is not needed when the
+ # storage provides MVCC.
+ return
if self.before is not None:
# this is an historical connection. Invalidations are irrelevant.
return
@@ -462,6 +477,16 @@
# Process pending invalidations.
def _flush_invalidations(self):
+ if self._mvcc_storage:
+ # Poll the storage for invalidations.
+ invalidated = self._storage.poll_invalidations()
+ if invalidated is None:
+ # special value: the transaction is so old that
+ # we need to flush the whole cache.
+ self._cache.invalidate(self._cache.cache_data.keys())
+ elif invalidated:
+ self._cache.invalidate(invalidated)
+
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
@@ -1048,6 +1073,11 @@
if getattr(self, '_reader', None) is not None:
self._reader._cache = cache
+ def _releaseStorage(self):
+ """Tell the storage to release resources it's using"""
+ if self._mvcc_storage:
+ self._storage.release()
+
##########################################################################
# Python protocol
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/DB.py 2009-04-27 22:25:55 UTC (rev 99544)
@@ -34,6 +34,7 @@
from zope.interface import implements
from ZODB.interfaces import IDatabase
+from ZODB.interfaces import IMVCCStorage
import BTrees.OOBTree
import transaction
@@ -198,6 +199,7 @@
# reclaim `c` now, and `c` would be left in a user-visible
# crazy state.
c._resetCache()
+ c._releaseStorage()
def reduce_size(self):
self._reduce_size()
@@ -452,24 +454,32 @@
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None
+ if IMVCCStorage.providedBy(storage):
+ temp_storage = storage.new_instance()
+ else:
+ temp_storage = storage
try:
- storage.load(z64, '')
- except KeyError:
- # Create the database's root in the storage if it doesn't exist
- from persistent.mapping import PersistentMapping
- root = PersistentMapping()
- # Manually create a pickle for the root to put in the storage.
- # The pickle must be in the special ZODB format.
- file = cStringIO.StringIO()
- p = cPickle.Pickler(file, 1)
- p.dump((root.__class__, None))
- p.dump(root.__getstate__())
- t = transaction.Transaction()
- t.description = 'initial database creation'
- storage.tpc_begin(t)
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
+ try:
+ temp_storage.load(z64, '')
+ except KeyError:
+ # Create the database's root in the storage if it doesn't exist
+ from persistent.mapping import PersistentMapping
+ root = PersistentMapping()
+ # Manually create a pickle for the root to put in the storage.
+ # The pickle must be in the special ZODB format.
+ file = cStringIO.StringIO()
+ p = cPickle.Pickler(file, 1)
+ p.dump((root.__class__, None))
+ p.dump(root.__getstate__())
+ t = transaction.Transaction()
+ t.description = 'initial database creation'
+ temp_storage.tpc_begin(t)
+ temp_storage.store(z64, None, file.getvalue(), '', t)
+ temp_storage.tpc_vote(t)
+ temp_storage.tpc_finish(t)
+ finally:
+ if IMVCCStorage.providedBy(temp_storage):
+ temp_storage.release()
# Multi-database setup.
if databases is None:
Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py 2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/MappingStorage.py 2009-04-27 22:25:55 UTC (rev 99544)
@@ -37,7 +37,7 @@
def __init__(self, name='MappingStorage'):
self.__name__ = name
self._data = {} # {oid->{tid->pickle}}
- self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
+ self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
self._ltid = None
self._last_pack = None
_lock = threading.RLock()
Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py 2009-04-27 22:07:57 UTC (rev 99543)
+++ ZODB/trunk/src/ZODB/interfaces.py 2009-04-27 22:25:55 UTC (rev 99544)
@@ -953,6 +953,111 @@
# DB pass-through
+class IMVCCStorage(IStorage):
+ """A storage that provides MVCC semantics internally.
+
+ MVCC (multi-version concurrency control) means each user of a
+ database has a snapshot view of the database. The snapshot view
+ does not change, even if concurrent connections commit
+ transactions, until a transaction boundary. Relational databases
+ that support serializable transaction isolation provide MVCC.
+
+ Storages that implement IMVCCStorage, such as RelStorage, provide
+ MVCC semantics at the ZODB storage layer. When ZODB.Connection uses
+ a storage that implements IMVCCStorage, each connection uses a
+ connection-specific storage instance, and that storage instance
+ provides a snapshot of the database.
+
+ By contrast, storages that do not implement IMVCCStorage, such as
+ FileStorage, rely on ZODB.Connection to provide MVCC semantics, so
+ in that case, one storage instance is shared by many
+ ZODB.Connections. Applications that use ZODB.Connection always have
+ a snapshot view of the database; IMVCCStorage only modifies which
+ layer of ZODB provides MVCC.
+
+ Furthermore, IMVCCStorage changes the way object invalidation
+ works. An essential feature of ZODB is the propagation of object
+ invalidation messages to keep in-memory caches up to date. Storages
+ like FileStorage and ZEO.ClientStorage send invalidation messages
+ to all other Connection instances at transaction commit time.
+ Storages that implement IMVCCStorage, on the other hand, expect the
+ ZODB.Connection to poll for a list of invalidated objects.
+
+ Certain methods of IMVCCStorage implementations open persistent
+ back end database sessions and retain the sessions even after the
+ method call finishes::
+
+ load
+ loadEx
+ loadSerial
+ loadBefore
+ store
+ restore
+ new_oid
+ history
+ tpc_begin
+ tpc_vote
+ tpc_abort
+ tpc_finish
+
+ If you know that the storage instance will no longer be used after
+ calling any of these methods, you should call the release method to
+ release the persistent sessions. The persistent sessions will be
+ reopened as necessary if you call one of those methods again.
+
+ Other storage methods open short lived back end sessions and close
+ the back end sessions before returning. These include::
+
+ __len__
+ getSize
+ undoLog
+ undo
+ pack
+ iterator
+
+ These methods do not provide MVCC semantics, so these methods
+ operate on the most current view of the database, rather than the
+ snapshot view that the other methods use.
+ """
+
+ def new_instance():
+ """Creates and returns another storage instance.
+
+ The returned instance provides IMVCCStorage and connects to the
+ same back-end database. The database state visible by the
+ instance will be a snapshot that varies independently of other
+ storage instances.
+ """
+
+ def release():
+ """Release all persistent sessions used by this storage instance.
+
+ After this call, the storage instance can still be used;
+ calling methods that use persistent sessions will cause the
+ persistent sessions to be reopened.
+ """
+
+ def poll_invalidations():
+ """Poll the storage for external changes.
+
+ Returns either a sequence of OIDs that have changed, or None. When a
+ sequence is returned, the corresponding objects should be removed
+ from the ZODB in-memory cache. When None is returned, the storage is
+ indicating that so much time has elapsed since the last poll that it
+ is no longer possible to enumerate all of the changed OIDs, since the
+ previous transaction seen by the connection has already been packed.
+ In that case, the ZODB in-memory cache should be cleared.
+ """
+
+ def sync(force=True):
+ """Updates the internal snapshot to the current state of the database.
+
+ If the force parameter is False, the storage may choose to
+ ignore this call. By ignoring this call, a storage can reduce
+ the frequency of database polls, thus reducing database load.
+ """
+
+
class IStorageCurrentRecordIteration(IStorage):
def record_iternext(next=None):
Copied: ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py (from rev 99539, ZODB/branches/shane-poll-invalidations/src/ZODB/tests/MVCCMappingStorage.py)
===================================================================
--- ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py (rev 0)
+++ ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py 2009-04-27 22:25:55 UTC (rev 99544)
@@ -0,0 +1,85 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""An extension of MappingStorage that depends on polling.
+
+Each Connection has its own view of the database. Polling updates each
+connection's view.
+"""
+
+import time
+
+import BTrees
+from ZODB.interfaces import IMVCCStorage
+from ZODB.MappingStorage import MappingStorage
+from ZODB.TimeStamp import TimeStamp
+from zope.interface import implements
+
+
+class MVCCMappingStorage(MappingStorage):
+ implements(IMVCCStorage)
+
+ def __init__(self, name="MVCC Mapping Storage"):
+ MappingStorage.__init__(self, name=name)
+ # _polled_tid contains the transaction ID at the last poll.
+ self._polled_tid = ''
+
+ def new_instance(self):
+ """Returns a storage instance that is a view of the same data.
+ """
+ res = MVCCMappingStorage(name=self.__name__)
+ res._transactions = self._transactions
+ return res
+
+ def sync(self, force=False):
+ pass
+
+ def release(self):
+ pass
+
+ def poll_invalidations(self):
+ """Poll the storage for changes by other connections.
+ """
+ new_tid = self._transactions.maxKey()
+
+ if self._polled_tid:
+ if not self._transactions.has_key(self._polled_tid):
+ # This connection is so old that we can no longer enumerate
+ # all the changes.
+ self._polled_tid = new_tid
+ return None
+
+ changed_oids = set()
+ for tid, txn in self._transactions.items(
+ self._polled_tid, new_tid, excludemin=True, excludemax=False):
+ if txn.status == 'p':
+ # This transaction has been packed, so it is no longer
+ # possible to enumerate all changed oids.
+ self._polled_tid = new_tid
+ return None
+ if tid == self._ltid:
+ # ignore the transaction committed by this connection
+ continue
+
+ changes = txn.data
+ # pull in changes from the transaction log
+ for oid, value in changes.iteritems():
+ tid_data = self._data.get(oid)
+ if tid_data is None:
+ tid_data = BTrees.OOBTree.OOBucket()
+ self._data[oid] = tid_data
+ tid_data[tid] = changes[oid]
+ changed_oids.update(changes.keys())
+
+ self._polled_tid = new_tid
+ return list(changed_oids)
Copied: ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py (from rev 99539, ZODB/branches/shane-poll-invalidations/src/ZODB/tests/testMVCCMappingStorage.py)
===================================================================
--- ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py (rev 0)
+++ ZODB/trunk/src/ZODB/tests/testMVCCMappingStorage.py 2009-04-27 22:25:55 UTC (rev 99544)
@@ -0,0 +1,164 @@
+##############################################################################
+#
+# Copyright (c) Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import unittest
+
+from persistent.mapping import PersistentMapping
+import transaction
+from ZODB.DB import DB
+from ZODB.tests.MVCCMappingStorage import MVCCMappingStorage
+
+
+from ZODB.tests import (
+ BasicStorage,
+ HistoryStorage,
+ IteratorStorage,
+ MTStorage,
+ PackableStorage,
+ RevisionStorage,
+ StorageTestBase,
+ Synchronization,
+ )
+
+class MVCCTests:
+
+ def checkCrossConnectionInvalidation(self):
+ # Verify connections see updated state at txn boundaries.
+ # This will fail if the Connection doesn't poll for changes.
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['myobj'] = 'yes'
+ c2 = db.open()
+ r2 = c2.root()
+ self.assert_('myobj' not in r2)
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'invalidation test'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ self.assert_('myobj' not in r2)
+ c2.sync()
+ self.assert_('myobj' in r2)
+ self.assert_(r2['myobj'] == 'yes')
+ finally:
+ db.close()
+
+ def checkCrossConnectionIsolation(self):
+ # Verify MVCC isolates connections.
+ # This will fail if Connection doesn't poll for changes.
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = PersistentMapping()
+ r1['gamma'] = PersistentMapping()
+ transaction.commit()
+
+ # Open a second connection but don't load root['alpha'] yet
+ c2 = db.open()
+ r2 = c2.root()
+
+ r1['alpha']['beta'] = 'yes'
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'isolation test 1'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # The second connection will now load root['alpha'], but due to
+ # MVCC, it should continue to see the old state.
+ self.assert_(r2['alpha']._p_changed is None) # A ghost
+ self.assert_(not r2['alpha'])
+ self.assert_(r2['alpha']._p_changed == 0)
+
+ # make root['alpha'] visible to the second connection
+ c2.sync()
+
+ # Now it should be in sync
+ self.assert_(r2['alpha']._p_changed is None) # A ghost
+ self.assert_(r2['alpha'])
+ self.assert_(r2['alpha']._p_changed == 0)
+ self.assert_(r2['alpha']['beta'] == 'yes')
+
+ # Repeat the test with root['gamma']
+ r1['gamma']['delta'] = 'yes'
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'isolation test 2'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # The second connection will now load root[3], but due to MVCC,
+ # it should continue to see the old state.
+ self.assert_(r2['gamma']._p_changed is None) # A ghost
+ self.assert_(not r2['gamma'])
+ self.assert_(r2['gamma']._p_changed == 0)
+
+ # make root[3] visible to the second connection
+ c2.sync()
+
+ # Now it should be in sync
+ self.assert_(r2['gamma']._p_changed is None) # A ghost
+ self.assert_(r2['gamma'])
+ self.assert_(r2['gamma']._p_changed == 0)
+ self.assert_(r2['gamma']['delta'] == 'yes')
+ finally:
+ db.close()
+
+
+class MVCCMappingStorageTests(
+ StorageTestBase.StorageTestBase,
+ BasicStorage.BasicStorage,
+
+ HistoryStorage.HistoryStorage,
+ IteratorStorage.ExtendedIteratorStorage,
+ IteratorStorage.IteratorStorage,
+ MTStorage.MTStorage,
+ PackableStorage.PackableStorageWithOptionalGC,
+ RevisionStorage.RevisionStorage,
+ Synchronization.SynchronizedStorage,
+ MVCCTests
+ ):
+
+ def setUp(self):
+ self._storage = MVCCMappingStorage()
+
+ def tearDown(self):
+ self._storage.close()
+
+ def checkLoadBeforeUndo(self):
+ pass # we don't support undo yet
+ checkUndoZombie = checkLoadBeforeUndo
+
+
+def test_suite():
+ suite = unittest.makeSuite(MVCCMappingStorageTests, 'check')
+ return suite
+
+if __name__ == "__main__":
+ loader = unittest.TestLoader()
+ loader.testMethodPrefix = "check"
+ unittest.main(testLoader=loader)
More information about the Zodb-checkins
mailing list