[Zodb-checkins] SVN: ZODB/branches/3.9/src/ Fixed a serious bug that caused servers to stop commiting transactions

Jim Fulton jim at zope.com
Wed Sep 30 14:24:06 EDT 2009


Log message for revision 104657:
  Fixed a serious bug that caused servers to stop commiting transactions
  after conflict errors on blobs. :(
  

Changed:
  U   ZODB/branches/3.9/src/CHANGES.txt
  U   ZODB/branches/3.9/src/ZEO/StorageServer.py
  A   ZODB/branches/3.9/src/ZEO/tests/testZEO2.py
  U   ZODB/branches/3.9/src/ZODB/interfaces.py

-=-
Modified: ZODB/branches/3.9/src/CHANGES.txt
===================================================================
--- ZODB/branches/3.9/src/CHANGES.txt	2009-09-30 18:24:04 UTC (rev 104656)
+++ ZODB/branches/3.9/src/CHANGES.txt	2009-09-30 18:24:05 UTC (rev 104657)
@@ -2,6 +2,15 @@
  Change History
 ================
 
+3.9.1 (2009-09-30)
+==================
+
+Bugs Fixed
+----------
+
+- Conflict errors committing blobs caused servers to stop committing
+  transactuions.
+
 3.9.0 (2009-09-08)
 ==================
 

Modified: ZODB/branches/3.9/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/3.9/src/ZEO/StorageServer.py	2009-09-30 18:24:04 UTC (rev 104656)
+++ ZODB/branches/3.9/src/ZEO/StorageServer.py	2009-09-30 18:24:05 UTC (rev 104657)
@@ -542,30 +542,34 @@
         else:
             self.storage.tpc_begin(self.transaction)
 
-        loads, loader = self.txnlog.get_loader()
-        for i in range(loads):
-            store = loader.load()
-            store_type = store[0]
-            store_args = store[1:]
+        try:
+            loads, loader = self.txnlog.get_loader()
+            for i in range(loads):
+                store = loader.load()
+                store_type = store[0]
+                store_args = store[1:]
 
-            if store_type == 'd':
-                do_store = self._delete
-            elif store_type == 's':
-                do_store = self._store
-            elif store_type == 'r':
-                do_store = self._restore
-            else:
-                raise ValueError('Invalid store type: %r' % store_type)
+                if store_type == 'd':
+                    do_store = self._delete
+                elif store_type == 's':
+                    do_store = self._store
+                elif store_type == 'r':
+                    do_store = self._restore
+                else:
+                    raise ValueError('Invalid store type: %r' % store_type)
 
-            if not do_store(*store_args):
-                break
+                if not do_store(*store_args):
+                    break
 
-        # Blob support
-        while self.blob_log:
-            oid, oldserial, data, blobfilename = self.blob_log.pop()
-            self.storage.storeBlob(oid, oldserial, data, blobfilename,
-                                   '', self.transaction,)
+            # Blob support
+            while self.blob_log and not self.store_failed:
+                oid, oldserial, data, blobfilename = self.blob_log.pop()
+                self._store(oid, oldserial, data, blobfilename)
 
+        except:
+            self.storage.tpc_abort(self.transaction)
+            raise
+
         resp = self._thunk()
         if delay is not None:
             delay.reply(resp)
@@ -650,11 +654,15 @@
 
         return err is None
 
-    def _store(self, oid, serial, data):
+    def _store(self, oid, serial, data, blobfile=None):
         err = None
         try:
-            newserial = self.storage.store(oid, serial, data, '',
-                                           self.transaction)
+            if blobfile is None:
+                newserial = self.storage.store(
+                    oid, serial, data, '', self.transaction)
+            else:
+                newserial = self.storage.storeBlob(
+                    oid, serial, data, blobfile, '', self.transaction)
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:

Copied: ZODB/branches/3.9/src/ZEO/tests/testZEO2.py (from rev 104655, ZODB/trunk/src/ZEO/tests/testZEO2.py)
===================================================================
--- ZODB/branches/3.9/src/ZEO/tests/testZEO2.py	                        (rev 0)
+++ ZODB/branches/3.9/src/ZEO/tests/testZEO2.py	2009-09-30 18:24:05 UTC (rev 104657)
@@ -0,0 +1,170 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+from zope.testing import doctest, setupstack, renormalizing
+import logging
+import re
+import sys
+import transaction
+import unittest
+import ZEO.StorageServer
+import ZEO.tests.servertesting
+import ZODB.blob
+import ZODB.FileStorage
+import ZODB.tests.util
+import ZODB.utils
+
+def proper_handling_of_blob_conflicts():
+    r"""
+
+Conflict errors weren't properly handled when storing blobs, the
+result being that the storage was left in a transaction.
+
+We originally saw this when restarting a block transaction, although
+it doesn't really matter.
+
+Set up the storage with some initial blob data.
+
+    >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+    >>> db = ZODB.DB(fs)
+    >>> conn = db.open()
+    >>> conn.root.b = ZODB.blob.Blob('x')
+    >>> transaction.commit()
+
+Get the iod and first serial. We'll use the serial later to provide
+out-of-date data.
+
+    >>> oid = conn.root.b._p_oid
+    >>> serial = conn.root.b._p_serial
+    >>> conn.root.b.open('w').write('y')
+    >>> transaction.commit()
+    >>> data = fs.load(oid)[0]
+
+Create the server:
+
+    >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+    >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn1 = ZEO.tests.servertesting.Conection(1)
+    >>> zs1.notifyConnected(conn1)
+    >>> zs1.register('1', 0)
+    >>> zs1.tpc_begin('0', '', '', {})
+    >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+    >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+    1 callAsync serialnos ...
+
+In a second client, we'll try to commit using the old serial. This
+will conflict. It will be blocked at the vote call.
+
+    >>> zs2 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn2 = ZEO.tests.servertesting.Conection(2)
+    >>> zs2.notifyConnected(conn2)
+    >>> zs2.register('1', 0)
+    >>> zs2.tpc_begin('1', '', '', {})
+    >>> zs2.storeBlobStart()
+    >>> zs2.storeBlobChunk('z')
+    >>> zs2.storeBlobEnd(oid, serial, data, '1')
+    >>> delay = zs2.vote('1')
+
+    >>> def send_reply(id, reply):
+    ...     print 'reply', id, reply
+    >>> delay.set_sender(1, send_reply, None)
+
+    >>> logger = logging.getLogger('ZEO')
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> logger.setLevel(logging.INFO)
+    >>> logger.addHandler(handler)
+
+Now, whem we abort the transaction for the first client. the second
+client will be restarted.  It will get a conflict error, that is
+handled correctly:
+
+    >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+    2 callAsync serialnos ...
+    reply 1 None
+    (511/test-addr) Blocked transaction restarted.
+
+    >>> fs.tpc_transaction() is not None
+    True
+    >>> conn2.connected
+    True
+
+    >>> logger.setLevel(logging.NOTSET)
+    >>> logger.removeHandler(handler)
+    >>> fs.close()
+    """
+
+def proper_handling_of_errors_in_restart():
+    r"""
+
+It's critical that if there is an error in _restart (ie vote) that the
+storage isn't left in tpc.
+
+    >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+    >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+    >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn1 = ZEO.tests.servertesting.Conection(1)
+    >>> zs1.notifyConnected(conn1)
+    >>> zs1.register('1', 0)
+    >>> zs1.tpc_begin('0', '', '', {})
+    >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+
+Intentionally break zs1:
+
+    >>> zs1._store = lambda : None
+    >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    TypeError: <lambda>() takes no arguments (3 given)
+
+We're not in a transaction:
+
+    >>> fs.tpc_transaction() is None
+    True
+
+We can start another client and get the storage lock.
+
+    >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn1 = ZEO.tests.servertesting.Conection(1)
+    >>> zs1.notifyConnected(conn1)
+    >>> zs1.register('1', 0)
+    >>> zs1.tpc_begin('1', '', '', {})
+    >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
+    >>> _ = zs1.vote('1') # doctest: +ELLIPSIS
+    1 callAsync serialnos ...
+
+    >>> zs1.tpc_finish('1') is not None
+    True
+
+    >>> fs.close()
+    """
+
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocTestSuite(
+            setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
+            checker=renormalizing.RENormalizing([
+                (re.compile('\d+/test-addr'), ''),
+                ]),
+            ),
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')
+

Modified: ZODB/branches/3.9/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/3.9/src/ZODB/interfaces.py	2009-09-30 18:24:04 UTC (rev 104656)
+++ ZODB/branches/3.9/src/ZODB/interfaces.py	2009-09-30 18:24:05 UTC (rev 104657)
@@ -316,7 +316,7 @@
 
         This can be necessary if there have been major changes to
         stored data and it is either impractical to enumerate them or
-        there would be so many that it would be inefficient to do so.        
+        there would be so many that it would be inefficient to do so.
         """
 
     def invalidate(transaction_id, oids, version=''):
@@ -326,7 +326,7 @@
 
         The version argument is provided for backward
         compatibility. If passed, it must be an empty string.
-        
+
         """
 
     def references(record, oids=None):
@@ -343,7 +343,7 @@
     """
 
     # TODO: This interface is incomplete.
-    # XXX how is it incomplete? 
+    # XXX how is it incomplete?
 
     databases = Attribute(
         """A mapping from database name to DB (database) object.
@@ -362,8 +362,8 @@
         application code should rarely, if ever, have a need to use
         this attribute.
         """)
-    
 
+
     def open(transaction_manager=None, serial=''):
         """Return an IConnection object for use by application code.
 
@@ -451,7 +451,7 @@
 
     def getSize():
         """An approximate size of the database, in bytes.
-        
+
         This is used soley for informational purposes.
         """
 
@@ -459,7 +459,7 @@
         """Return a sequence of history information dictionaries.
 
         Up to size objects (including no objects) may be returned.
-        
+
         The information provides a log of the changes made to the
         object. Data are reported in reverse chronological order.
 
@@ -468,7 +468,7 @@
         time
             UTC seconds since the epoch (as in time.time) that the
             object revision was committed.
-            
+
         tid
             The transaction identifier of the transaction that
             committed the version.
@@ -489,7 +489,7 @@
 
         If the transaction had extension items, then these items are
         also included if they don't conflict with the keys above.
-        
+
         """
 
     def isReadOnly():
@@ -510,7 +510,7 @@
 
     def __len__():
         """The approximate number of objects in the storage
-        
+
         This is used soley for informational purposes.
         """
 
@@ -623,7 +623,7 @@
         oid
             The object identifier.  This is either a string
             consisting of 8 nulls or a string previously returned by
-            new_oid. 
+            new_oid.
 
         serial
             The serial of the data that was read when the object was
@@ -673,7 +673,7 @@
         StorageError or, more often, a subclass of it
           is raised when an internal error occurs while the storage is
           handling the store() call.
-        
+
         """
 
     def tpc_abort(transaction):
@@ -771,8 +771,8 @@
         # - Incorrect pack garbage-collection algorithms (possibly
         #   including the existing FileStorage implementation), that
         #   failed to take into account records after the pack time.
-        
 
+
     def restore(oid, serial, data, version, prev_txn, transaction):
         """Write data already committed in a separate database
 
@@ -785,7 +785,7 @@
 
         oid
              The object id for the record
-        
+
         serial
              The transaction identifier that originally committed this object.
 
@@ -1071,7 +1071,7 @@
             ...     # do things with oid, tid, and data
             ...     if next is None:
             ...         break
-        
+
         """
 
 class IExternalGC(IStorage):
@@ -1106,7 +1106,7 @@
         The mode 'c' is similar to 'r', except that an orinary file
         object is returned and may be used in a separate transaction
         and after the blob's database connection has been closed.
-        
+
         """
 
     def committed():
@@ -1127,12 +1127,12 @@
         Replace the current data of the blob with the file given under
         filename.
 
-        The blob must not be opened for reading or writing when consuming a 
+        The blob must not be opened for reading or writing when consuming a
         file.
 
         The blob will take over ownership of the file and will either
         rename or copy and remove it.  The file must not be open.
-        
+
         """
 
 
@@ -1147,13 +1147,45 @@
         (or copy and remove it) immediately, or at transaction-commit
         time.  The file must not be open.
 
-        The new serial is returned.
+        The new serial for the object is returned, but not necessarily
+        immediately.  It may be returned directly, or on a subsequent
+        store or tpc_vote call.
+
+        The return value may be:
+
+        - None
+
+        - A new serial (string) for the object, or
+
+        - An iterable of object-id and serial pairs giving new serials
+          for objects.
+
+        A serial, returned as a string or in a sequence of oid/serial
+        pairs, may be the special value
+        ZODB.ConflictResolution.ResolvedSerial to indicate that a
+        conflict occured and that the object should be invalidated.
+
+        Several different exceptions may be raised when an error occurs.
+
+        ConflictError
+          is raised when serial does not match the most recent serial
+          number for object oid and the conflict was not resolved by
+          the storage.
+
+        StorageTransactionError
+          is raised when transaction does not match the current
+          transaction.
+
+        StorageError or, more often, a subclass of it
+          is raised when an internal error occurs while the storage is
+          handling the store() call.
+
         """
 
     def loadBlob(oid, serial):
         """Return the filename of the Blob data for this OID and serial.
 
-        Returns a filename. 
+        Returns a filename.
 
         Raises POSKeyError if the blobfile cannot be found.
         """



More information about the Zodb-checkins mailing list