[Checkins] SVN: ZODB/trunk/src/ZODB/ Fixed intermittent failures by making MVCCMappingStorage hold a per-connection snapshot of the database.
Shane Hathaway
shane at hathawaymix.org
Sun May 17 08:14:01 EDT 2009
Log message for revision 100024:
Fixed intermittent failures by making MVCCMappingStorage hold a per-connection snapshot of the database.
Changed:
U ZODB/trunk/src/ZODB/MappingStorage.py
U ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py
-=-
Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py 2009-05-17 12:00:09 UTC (rev 100023)
+++ ZODB/trunk/src/ZODB/MappingStorage.py 2009-05-17 12:14:00 UTC (rev 100024)
@@ -231,7 +231,8 @@
if transactions[tid].pack(oid):
del transactions[tid]
- self._data = new_data
+ self._data.clear()
+ self._data.update(new_data)
# ZODB.interfaces.IStorage
def registerDB(self, db):
@@ -307,6 +308,7 @@
self._ltid = tid
self._transactions[tid] = TransactionRecord(tid, transaction, tdata)
self._transaction = None
+ del self._tdata
self._commit_lock.release()
# ZEO.interfaces.IServeable
Modified: ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py 2009-05-17 12:00:09 UTC (rev 100023)
+++ ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py 2009-05-17 12:14:00 UTC (rev 100024)
@@ -20,6 +20,8 @@
import time
import BTrees
+import ZODB.utils
+import ZODB.POSException
from ZODB.interfaces import IMVCCStorage
from ZODB.MappingStorage import MappingStorage
from ZODB.TimeStamp import TimeStamp
@@ -33,56 +35,102 @@
MappingStorage.__init__(self, name=name)
# _polled_tid contains the transaction ID at the last poll.
self._polled_tid = ''
+ self._data_snapshot = None # {oid->(state, tid)}
+ self._main_lock_acquire = self._lock_acquire
+ self._main_lock_release = self._lock_release
def new_instance(self):
"""Returns a storage instance that is a view of the same data.
"""
- res = MVCCMappingStorage(name=self.__name__)
- res._transactions = self._transactions
- return res
+ inst = MVCCMappingStorage(name=self.__name__)
+ # All instances share the same OID data, transaction log, commit lock,
+ # and OID sequence.
+ inst._data = self._data
+ inst._transactions = self._transactions
+ inst._commit_lock = self._commit_lock
+ inst.new_oid = self.new_oid
+ inst.pack = self.pack
+ inst._main_lock_acquire = self._lock_acquire
+ inst._main_lock_release = self._lock_release
+ return inst
+ @ZODB.utils.locked(MappingStorage.opened)
def sync(self, force=False):
- pass
+ self._data_snapshot = None
def release(self):
pass
+ @ZODB.utils.locked(MappingStorage.opened)
+ def load(self, oid, version=''):
+ assert not version, "Versions are not supported"
+ if self._data_snapshot is None:
+ self.poll_invalidations()
+ info = self._data_snapshot.get(oid)
+ if info:
+ return info
+ raise ZODB.POSException.POSKeyError(oid)
+
def poll_invalidations(self):
"""Poll the storage for changes by other connections.
"""
- if self._transactions:
- new_tid = self._transactions.maxKey()
- else:
- new_tid = ''
+ # prevent changes to _transactions and _data during analysis
+ self._main_lock_acquire()
+ try:
- if self._polled_tid:
- if not self._transactions.has_key(self._polled_tid):
- # This connection is so old that we can no longer enumerate
- # all the changes.
- self._polled_tid = new_tid
- return None
+ if self._transactions:
+ new_tid = self._transactions.maxKey()
+ else:
+ new_tid = ''
- changed_oids = set()
- for tid, txn in self._transactions.items(
- self._polled_tid, new_tid, excludemin=True, excludemax=False):
- if txn.status == 'p':
- # This transaction has been packed, so it is no longer
- # possible to enumerate all changed oids.
- self._polled_tid = new_tid
- return None
- if tid == self._ltid:
- # ignore the transaction committed by this connection
- continue
+ # Copy the current data into a snapshot. This is obviously
+ # very inefficient for large storages, but it's good for
+ # tests.
+ self._data_snapshot = {}
+ for oid, tid_data in self._data.items():
+ if tid_data:
+ tid = tid_data.maxKey()
+ self._data_snapshot[oid] = tid_data[tid], tid
- changes = txn.data
- # pull in changes from the transaction log
- for oid, value in changes.iteritems():
- tid_data = self._data.get(oid)
- if tid_data is None:
- tid_data = BTrees.OOBTree.OOBucket()
- self._data[oid] = tid_data
- tid_data[tid] = changes[oid]
- changed_oids.update(changes.keys())
+ if self._polled_tid:
+ if not self._transactions.has_key(self._polled_tid):
+ # This connection is so old that we can no longer enumerate
+ # all the changes.
+ self._polled_tid = new_tid
+ return None
+ changed_oids = set()
+ for tid, txn in self._transactions.items(
+ self._polled_tid, new_tid,
+ excludemin=True, excludemax=False):
+ if txn.status == 'p':
+ # This transaction has been packed, so it is no longer
+ # possible to enumerate all changed oids.
+ self._polled_tid = new_tid
+ return None
+ if tid == self._ltid:
+ # ignore the transaction committed by this connection
+ continue
+ changed_oids.update(txn.data.keys())
+
+ finally:
+ self._main_lock_release()
+
self._polled_tid = new_tid
return list(changed_oids)
+
+ def tpc_finish(self, transaction, func = lambda tid: None):
+ self._data_snapshot = None
+ MappingStorage.tpc_finish(self, transaction, func)
+
+ def tpc_abort(self, transaction):
+ self._data_snapshot = None
+ MappingStorage.tpc_abort(self, transaction)
+
+ def pack(self, t, referencesf, gc=True):
+ # prevent all concurrent commits during packing
+ self._commit_lock.acquire()
+ try:
+ MappingStorage.pack(self, t, referencesf, gc)
+ finally:
+ self._commit_lock.release()
More information about the Checkins
mailing list