[Checkins] SVN: ZODB/trunk/src/ZODB/ Fixed intermittent failures by making MVCCMappingStorage hold a per-connection snapshot of the database.

Shane Hathaway shane at hathawaymix.org
Sun May 17 08:14:01 EDT 2009


Log message for revision 100024:
  Fixed intermittent failures by making MVCCMappingStorage hold a per-connection snapshot of the database.
  

Changed:
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py

-=-
Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py	2009-05-17 12:00:09 UTC (rev 100023)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2009-05-17 12:14:00 UTC (rev 100024)
@@ -231,7 +231,8 @@
                     if transactions[tid].pack(oid):
                         del transactions[tid]
 
-            self._data = new_data
+            self._data.clear()
+            self._data.update(new_data)
 
     # ZODB.interfaces.IStorage
     def registerDB(self, db):
@@ -307,6 +308,7 @@
         self._ltid = tid
         self._transactions[tid] = TransactionRecord(tid, transaction, tdata)
         self._transaction = None
+        del self._tdata
         self._commit_lock.release()
  
     # ZEO.interfaces.IServeable

Modified: ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py	2009-05-17 12:00:09 UTC (rev 100023)
+++ ZODB/trunk/src/ZODB/tests/MVCCMappingStorage.py	2009-05-17 12:14:00 UTC (rev 100024)
@@ -20,6 +20,8 @@
 import time
 
 import BTrees
+import ZODB.utils
+import ZODB.POSException
 from ZODB.interfaces import IMVCCStorage
 from ZODB.MappingStorage import MappingStorage
 from ZODB.TimeStamp import TimeStamp
@@ -33,56 +35,102 @@
         MappingStorage.__init__(self, name=name)
         # _polled_tid contains the transaction ID at the last poll.
         self._polled_tid = ''
+        self._data_snapshot = None  # {oid->(state, tid)}
+        self._main_lock_acquire = self._lock_acquire
+        self._main_lock_release = self._lock_release
 
     def new_instance(self):
         """Returns a storage instance that is a view of the same data.
         """
-        res = MVCCMappingStorage(name=self.__name__)
-        res._transactions = self._transactions
-        return res
+        inst = MVCCMappingStorage(name=self.__name__)
+        # All instances share the same OID data, transaction log, commit lock,
+        # and OID sequence.
+        inst._data = self._data
+        inst._transactions = self._transactions
+        inst._commit_lock = self._commit_lock
+        inst.new_oid = self.new_oid
+        inst.pack = self.pack
+        inst._main_lock_acquire = self._lock_acquire
+        inst._main_lock_release = self._lock_release
+        return inst
 
+    @ZODB.utils.locked(MappingStorage.opened)
     def sync(self, force=False):
-        pass
+        self._data_snapshot = None
 
     def release(self):
         pass
 
+    @ZODB.utils.locked(MappingStorage.opened)
+    def load(self, oid, version=''):
+        assert not version, "Versions are not supported"
+        if self._data_snapshot is None:
+            self.poll_invalidations()
+        info = self._data_snapshot.get(oid)
+        if info:
+            return info
+        raise ZODB.POSException.POSKeyError(oid)
+
     def poll_invalidations(self):
         """Poll the storage for changes by other connections.
         """
-        if self._transactions:
-            new_tid = self._transactions.maxKey()
-        else:
-            new_tid = ''
+        # prevent changes to _transactions and _data during analysis
+        self._main_lock_acquire()
+        try:
 
-        if self._polled_tid:
-            if not self._transactions.has_key(self._polled_tid):
-                # This connection is so old that we can no longer enumerate
-                # all the changes.
-                self._polled_tid = new_tid
-                return None
+            if self._transactions:
+                new_tid = self._transactions.maxKey()
+            else:
+                new_tid = ''
 
-        changed_oids = set()
-        for tid, txn in self._transactions.items(
-                self._polled_tid, new_tid, excludemin=True, excludemax=False):
-            if txn.status == 'p':
-                # This transaction has been packed, so it is no longer
-                # possible to enumerate all changed oids.
-                self._polled_tid = new_tid
-                return None
-            if tid == self._ltid:
-                # ignore the transaction committed by this connection
-                continue
+            # Copy the current data into a snapshot. This is obviously
+            # very inefficient for large storages, but it's good for
+            # tests.
+            self._data_snapshot = {}
+            for oid, tid_data in self._data.items():
+                if tid_data:
+                    tid = tid_data.maxKey()
+                    self._data_snapshot[oid] = tid_data[tid], tid
 
-            changes = txn.data
-            # pull in changes from the transaction log
-            for oid, value in changes.iteritems():
-                tid_data = self._data.get(oid)
-                if tid_data is None:
-                    tid_data = BTrees.OOBTree.OOBucket()
-                    self._data[oid] = tid_data
-                tid_data[tid] = changes[oid]
-            changed_oids.update(changes.keys())
+            if self._polled_tid:
+                if not self._transactions.has_key(self._polled_tid):
+                    # This connection is so old that we can no longer enumerate
+                    # all the changes.
+                    self._polled_tid = new_tid
+                    return None
 
+            changed_oids = set()
+            for tid, txn in self._transactions.items(
+                    self._polled_tid, new_tid,
+                    excludemin=True, excludemax=False):
+                if txn.status == 'p':
+                    # This transaction has been packed, so it is no longer
+                    # possible to enumerate all changed oids.
+                    self._polled_tid = new_tid
+                    return None
+                if tid == self._ltid:
+                    # ignore the transaction committed by this connection
+                    continue
+                changed_oids.update(txn.data.keys())
+
+        finally:
+            self._main_lock_release()
+
         self._polled_tid = new_tid
         return list(changed_oids)
+
+    def tpc_finish(self, transaction, func = lambda tid: None):
+        self._data_snapshot = None
+        MappingStorage.tpc_finish(self, transaction, func)
+
+    def tpc_abort(self, transaction):
+        self._data_snapshot = None
+        MappingStorage.tpc_abort(self, transaction)
+
+    def pack(self, t, referencesf, gc=True):
+        # prevent all concurrent commits during packing
+        self._commit_lock.acquire()
+        try:
+            MappingStorage.pack(self, t, referencesf, gc)
+        finally:
+            self._commit_lock.release()



More information about the Checkins mailing list