[Zodb-checkins] SVN: ZODB/trunk/src/ Bug fixed

Jim Fulton jim at zope.com
Mon Sep 27 17:38:46 EDT 2010


Log message for revision 116991:
  Bug fixed
  
  - A file storage bug could cause ZEO clients to have incorrect
    information about current object revisions after reconnecting to a
    database server.
  
  Also added a locking/transaction ordering test.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZODB/BaseStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/interfaces.py
  U   ZODB/trunk/src/ZODB/tests/BasicStorage.py
  U   ZODB/trunk/src/ZODB/tests/test_storage.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2010-09-27 18:48:12 UTC (rev 116990)
+++ ZODB/trunk/src/CHANGES.txt	2010-09-27 21:38:46 UTC (rev 116991)
@@ -12,6 +12,10 @@
   2.7 broke the object/connection cache implementation.
   (https://bugs.launchpad.net/zodb/+bug/641481)
 
+- A file storage bug could cause ZEO clients to have incorrect
+  information about current object revisions after reconnecting to a
+  database server.
+
 - Updated the 'repozo --kill-old-on-full' option to remove any '.index'
   files corresponding to backups being removed.
 

Modified: ZODB/trunk/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/BaseStorage.py	2010-09-27 18:48:12 UTC (rev 116990)
+++ ZODB/trunk/src/ZODB/BaseStorage.py	2010-09-27 21:38:46 UTC (rev 116991)
@@ -11,10 +11,14 @@
 # FOR A PARTICULAR PURPOSE
 #
 ##############################################################################
-"""Handy standard storage machinery
+"""Storage base class that is mostly a mistake
 
-$Id$
+The base class here is tightly coupled with its subclasses and
+its use is not recommended.  It's still here for historical reasons.
 """
+
+from __future__ import with_statement
+
 import cPickle
 import threading
 import time
@@ -306,6 +310,10 @@
         """
         pass
 
+    def lastTransaction(self):
+        with self._lock:
+            return self._ltid
+
     def getTid(self, oid):
         self._lock_acquire()
         try:

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2010-09-27 18:48:12 UTC (rev 116990)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2010-09-27 21:38:46 UTC (rev 116991)
@@ -1221,10 +1221,6 @@
     def iterator(self, start=None, stop=None):
         return FileIterator(self._file_name, start, stop)
 
-    def lastTransaction(self):
-        """Return transaction id for last committed transaction"""
-        return self._ltid
-
     def lastInvalidations(self, count):
         file = self._file
         seek = file.seek

Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py	2010-09-27 18:48:12 UTC (rev 116990)
+++ ZODB/trunk/src/ZODB/interfaces.py	2010-09-27 21:38:46 UTC (rev 116991)
@@ -458,6 +458,36 @@
 
 class IStorage(Interface):
     """A storage is responsible for storing and retrieving data of objects.
+
+    Consistency and locking
+    -----------------------
+
+    When transactions are committed, a storage assigns monotonically
+    increasing transaction identifiers (tids) to the transactions and
+    to the object versions written by the transactions.  ZODB relies
+    on this to decide if data in object caches are up to date and to
+    implement multi-version concurrency control.
+
+    There are methods in IStorage and in derived interfaces that
+    provide information about the current revisions (tids) for objects
+    or for the database as a whole.  It is critical for the proper
+    working of ZODB that the resulting tids are increasing with
+    respect to the object identifier given or to the databases.  That
+    is, if there are 2 results for an object or for the database, R1
+    and R2, such that R1 is returned before R2, then the tid returned
+    by R2 must be greater than or equal to the tid returned by R1.
+    (When thinking about results for the database, think of these as
+    results for all objects in the database.)
+
+    This implies some sort of locking strategy.  The key method is
+    tcp_finish, which causes new tids to be generated and also,
+    through the callback passed to it, returns new current tids for
+    the objects stored in a transaction and for the database as a whole.
+
+    The IStorage methods affected are lastTransaction, load, store,
+    and tpc_finish.  Derived interfaces may introduce additional
+    methods.
+
     """
 
     def close():
@@ -1294,7 +1324,6 @@
     __Broken_initargs__ = Attribute("Arguments passed to __init__.")
     __Broken_state__ = Attribute("Value passed to __setstate__.")
 
-
 class BlobError(Exception):
     pass
 

Modified: ZODB/trunk/src/ZODB/tests/BasicStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/BasicStorage.py	2010-09-27 18:48:12 UTC (rev 116990)
+++ ZODB/trunk/src/ZODB/tests/BasicStorage.py	2010-09-27 21:38:46 UTC (rev 116991)
@@ -19,6 +19,8 @@
 All storages should be able to pass these tests.
 """
 
+from __future__ import with_statement
+
 from ZODB import POSException
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
@@ -298,3 +300,101 @@
             tid4 = self._storage.load(oid)[1]
             self.assert_(tid4 > self._storage.load('\0\0\0\0\0\0\0\xf4')[1])
 
+
+    def check_tid_ordering_w_commit(self):
+
+        # It's important that storages always give a consistent
+        # ordering for revisions, tids.  This is most likely to fail
+        # around commit.  Here we'll do some basic tests to check this.
+
+        # We'll use threads to arrange for ordering to go wrong and
+        # verify that a storage gets it right.
+
+        # First, some initial data.
+        t = transaction.get()
+        self._storage.tpc_begin(t)
+        self._storage.store(ZERO, ZERO, 'x', '', t)
+        self._storage.tpc_vote(t)
+        tids = []
+        self._storage.tpc_finish(t, lambda tid: tids.append(tid))
+
+        # OK, now we'll start a new transaction, take it to finish,
+        # and then block finish while we do some other operations.
+
+        t = transaction.get()
+        self._storage.tpc_begin(t)
+        self._storage.store(ZERO, tids[0], 'y', '', t)
+        self._storage.tpc_vote(t)
+
+        to_join = []
+        def run_in_thread(func):
+            t = threading.Thread(target=func)
+            t.setDaemon(True)
+            t.start()
+            to_join.append(t)
+
+        started = threading.Event()
+        finish = threading.Event()
+        @run_in_thread
+        def commit():
+            def callback(tid):
+                started.set()
+                tids.append(tid)
+                finish.wait()
+
+            self._storage.tpc_finish(t, callback)
+
+        results = {}
+        started.wait()
+        attempts = []
+        attempts_cond = threading.Condition()
+
+        def update_attempts():
+            with attempts_cond:
+                attempts.append(1)
+                attempts_cond.notifyAll()
+
+
+        @run_in_thread
+        def lastTransaction():
+            update_attempts()
+            results['lastTransaction'] = self._storage.lastTransaction()
+
+        @run_in_thread
+        def load():
+            update_attempts()
+            results['load'] = self._storage.load(ZERO, '')[1]
+
+        expected_attempts = 2
+
+        if hasattr(self._storage, 'getTid'):
+            expected_attempts += 1
+            @run_in_thread
+            def getTid():
+                update_attempts()
+                results['getTid'] = self._storage.getTid(ZERO)
+
+        if hasattr(self._storage, 'lastInvalidations'):
+            expected_attempts += 1
+            @run_in_thread
+            def lastInvalidations():
+                update_attempts()
+                invals = self._storage.lastInvalidations(1)
+                if invals:
+                    results['lastInvalidations'] = invals[0][0]
+
+        with attempts_cond:
+            while len(attempts) < expected_attempts:
+                attempts_cond.wait()
+
+        time.sleep(.01) # for good measure :)
+        finish.set()
+
+        for t in to_join:
+            t.join(1)
+
+        self.assertEqual(results.pop('load'), tids[1])
+        self.assertEqual(results.pop('lastTransaction'), tids[1])
+        for m, tid in results.items():
+            self.assertEqual(tid, tids[1])
+

Modified: ZODB/trunk/src/ZODB/tests/test_storage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/test_storage.py	2010-09-27 18:48:12 UTC (rev 116990)
+++ ZODB/trunk/src/ZODB/tests/test_storage.py	2010-09-27 21:38:46 UTC (rev 116991)
@@ -18,6 +18,7 @@
 Since even a minimal storage has some complexity, we run standard
 storage tests against the test storage.
 """
+from __future__ import with_statement
 
 import bisect
 import threading
@@ -77,14 +78,11 @@
 
     def load(self, oid, version=''):
         assert version == ''
-        self._lock_acquire()
-        try:
+        with self._lock:
             assert not version
             tid = self._cur[oid]
             self.hook(oid, tid, '')
             return self._index[(oid, tid)], tid
-        finally:
-            self._lock_release()
 
     def _begin(self, tid, u, d, e):
         self._txn = Transaction(tid)
@@ -104,22 +102,15 @@
         del self._txn
 
     def _finish(self, tid, u, d, e):
-        self._lock_acquire()
-        try:
+        with self._lock:
             self._index.update(self._txn.index)
             self._cur.update(self._txn.cur())
             self._ltid = self._tid
-        finally:
-            self._lock_release()
 
-    def lastTransaction(self):
-        return self._ltid
-
     def loadBefore(self, the_oid, the_tid):
         # It's okay if loadBefore() is really expensive, because this
         # storage is just used for testing.
-        self._lock_acquire()
-        try:
+        with self._lock:
             tids = [tid for oid, tid in self._index if oid == the_oid]
             if not tids:
                 raise KeyError(the_oid)
@@ -134,15 +125,9 @@
             else:
                 end_tid = tids[j]
             return self._index[(the_oid, tid)], tid, end_tid
-        finally:
-            self._lock_release()
 
     def loadSerial(self, oid, serial):
-        self._lock_acquire()
-        try:
-            return self._index[(oid, serial)]
-        finally:
-            self._lock_release()
+        return self._index[(oid, serial)]
 
     def close(self):
         pass



More information about the Zodb-checkins mailing list