[Zodb-checkins] SVN: ZODB/branches/3.9/src/ Fixed a serious bug that caused servers to stop commiting transactions
Jim Fulton
jim at zope.com
Wed Sep 30 14:24:06 EDT 2009
Log message for revision 104657:
Fixed a serious bug that caused servers to stop commiting transactions
after conflict errors on blobs. :(
Changed:
U ZODB/branches/3.9/src/CHANGES.txt
U ZODB/branches/3.9/src/ZEO/StorageServer.py
A ZODB/branches/3.9/src/ZEO/tests/testZEO2.py
U ZODB/branches/3.9/src/ZODB/interfaces.py
-=-
Modified: ZODB/branches/3.9/src/CHANGES.txt
===================================================================
--- ZODB/branches/3.9/src/CHANGES.txt 2009-09-30 18:24:04 UTC (rev 104656)
+++ ZODB/branches/3.9/src/CHANGES.txt 2009-09-30 18:24:05 UTC (rev 104657)
@@ -2,6 +2,15 @@
Change History
================
+3.9.1 (2009-09-30)
+==================
+
+Bugs Fixed
+----------
+
+- Conflict errors committing blobs caused servers to stop committing
+ transactuions.
+
3.9.0 (2009-09-08)
==================
Modified: ZODB/branches/3.9/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/3.9/src/ZEO/StorageServer.py 2009-09-30 18:24:04 UTC (rev 104656)
+++ ZODB/branches/3.9/src/ZEO/StorageServer.py 2009-09-30 18:24:05 UTC (rev 104657)
@@ -542,30 +542,34 @@
else:
self.storage.tpc_begin(self.transaction)
- loads, loader = self.txnlog.get_loader()
- for i in range(loads):
- store = loader.load()
- store_type = store[0]
- store_args = store[1:]
+ try:
+ loads, loader = self.txnlog.get_loader()
+ for i in range(loads):
+ store = loader.load()
+ store_type = store[0]
+ store_args = store[1:]
- if store_type == 'd':
- do_store = self._delete
- elif store_type == 's':
- do_store = self._store
- elif store_type == 'r':
- do_store = self._restore
- else:
- raise ValueError('Invalid store type: %r' % store_type)
+ if store_type == 'd':
+ do_store = self._delete
+ elif store_type == 's':
+ do_store = self._store
+ elif store_type == 'r':
+ do_store = self._restore
+ else:
+ raise ValueError('Invalid store type: %r' % store_type)
- if not do_store(*store_args):
- break
+ if not do_store(*store_args):
+ break
- # Blob support
- while self.blob_log:
- oid, oldserial, data, blobfilename = self.blob_log.pop()
- self.storage.storeBlob(oid, oldserial, data, blobfilename,
- '', self.transaction,)
+ # Blob support
+ while self.blob_log and not self.store_failed:
+ oid, oldserial, data, blobfilename = self.blob_log.pop()
+ self._store(oid, oldserial, data, blobfilename)
+ except:
+ self.storage.tpc_abort(self.transaction)
+ raise
+
resp = self._thunk()
if delay is not None:
delay.reply(resp)
@@ -650,11 +654,15 @@
return err is None
- def _store(self, oid, serial, data):
+ def _store(self, oid, serial, data, blobfile=None):
err = None
try:
- newserial = self.storage.store(oid, serial, data, '',
- self.transaction)
+ if blobfile is None:
+ newserial = self.storage.store(
+ oid, serial, data, '', self.transaction)
+ else:
+ newserial = self.storage.storeBlob(
+ oid, serial, data, blobfile, '', self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
Copied: ZODB/branches/3.9/src/ZEO/tests/testZEO2.py (from rev 104655, ZODB/trunk/src/ZEO/tests/testZEO2.py)
===================================================================
--- ZODB/branches/3.9/src/ZEO/tests/testZEO2.py (rev 0)
+++ ZODB/branches/3.9/src/ZEO/tests/testZEO2.py 2009-09-30 18:24:05 UTC (rev 104657)
@@ -0,0 +1,170 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+from zope.testing import doctest, setupstack, renormalizing
+import logging
+import re
+import sys
+import transaction
+import unittest
+import ZEO.StorageServer
+import ZEO.tests.servertesting
+import ZODB.blob
+import ZODB.FileStorage
+import ZODB.tests.util
+import ZODB.utils
+
+def proper_handling_of_blob_conflicts():
+ r"""
+
+Conflict errors weren't properly handled when storing blobs, the
+result being that the storage was left in a transaction.
+
+We originally saw this when restarting a block transaction, although
+it doesn't really matter.
+
+Set up the storage with some initial blob data.
+
+ >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+ >>> db = ZODB.DB(fs)
+ >>> conn = db.open()
+ >>> conn.root.b = ZODB.blob.Blob('x')
+ >>> transaction.commit()
+
+Get the iod and first serial. We'll use the serial later to provide
+out-of-date data.
+
+ >>> oid = conn.root.b._p_oid
+ >>> serial = conn.root.b._p_serial
+ >>> conn.root.b.open('w').write('y')
+ >>> transaction.commit()
+ >>> data = fs.load(oid)[0]
+
+Create the server:
+
+ >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+ >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn1 = ZEO.tests.servertesting.Conection(1)
+ >>> zs1.notifyConnected(conn1)
+ >>> zs1.register('1', 0)
+ >>> zs1.tpc_begin('0', '', '', {})
+ >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+ >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+ 1 callAsync serialnos ...
+
+In a second client, we'll try to commit using the old serial. This
+will conflict. It will be blocked at the vote call.
+
+ >>> zs2 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn2 = ZEO.tests.servertesting.Conection(2)
+ >>> zs2.notifyConnected(conn2)
+ >>> zs2.register('1', 0)
+ >>> zs2.tpc_begin('1', '', '', {})
+ >>> zs2.storeBlobStart()
+ >>> zs2.storeBlobChunk('z')
+ >>> zs2.storeBlobEnd(oid, serial, data, '1')
+ >>> delay = zs2.vote('1')
+
+ >>> def send_reply(id, reply):
+ ... print 'reply', id, reply
+ >>> delay.set_sender(1, send_reply, None)
+
+ >>> logger = logging.getLogger('ZEO')
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> logger.setLevel(logging.INFO)
+ >>> logger.addHandler(handler)
+
+Now, whem we abort the transaction for the first client. the second
+client will be restarted. It will get a conflict error, that is
+handled correctly:
+
+ >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+ 2 callAsync serialnos ...
+ reply 1 None
+ (511/test-addr) Blocked transaction restarted.
+
+ >>> fs.tpc_transaction() is not None
+ True
+ >>> conn2.connected
+ True
+
+ >>> logger.setLevel(logging.NOTSET)
+ >>> logger.removeHandler(handler)
+ >>> fs.close()
+ """
+
+def proper_handling_of_errors_in_restart():
+ r"""
+
+It's critical that if there is an error in _restart (ie vote) that the
+storage isn't left in tpc.
+
+ >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+ >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+ >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn1 = ZEO.tests.servertesting.Conection(1)
+ >>> zs1.notifyConnected(conn1)
+ >>> zs1.register('1', 0)
+ >>> zs1.tpc_begin('0', '', '', {})
+ >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+
+Intentionally break zs1:
+
+ >>> zs1._store = lambda : None
+ >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ TypeError: <lambda>() takes no arguments (3 given)
+
+We're not in a transaction:
+
+ >>> fs.tpc_transaction() is None
+ True
+
+We can start another client and get the storage lock.
+
+ >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn1 = ZEO.tests.servertesting.Conection(1)
+ >>> zs1.notifyConnected(conn1)
+ >>> zs1.register('1', 0)
+ >>> zs1.tpc_begin('1', '', '', {})
+ >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
+ >>> _ = zs1.vote('1') # doctest: +ELLIPSIS
+ 1 callAsync serialnos ...
+
+ >>> zs1.tpc_finish('1') is not None
+ True
+
+ >>> fs.close()
+ """
+
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocTestSuite(
+ setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
+ checker=renormalizing.RENormalizing([
+ (re.compile('\d+/test-addr'), ''),
+ ]),
+ ),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
Modified: ZODB/branches/3.9/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/3.9/src/ZODB/interfaces.py 2009-09-30 18:24:04 UTC (rev 104656)
+++ ZODB/branches/3.9/src/ZODB/interfaces.py 2009-09-30 18:24:05 UTC (rev 104657)
@@ -316,7 +316,7 @@
This can be necessary if there have been major changes to
stored data and it is either impractical to enumerate them or
- there would be so many that it would be inefficient to do so.
+ there would be so many that it would be inefficient to do so.
"""
def invalidate(transaction_id, oids, version=''):
@@ -326,7 +326,7 @@
The version argument is provided for backward
compatibility. If passed, it must be an empty string.
-
+
"""
def references(record, oids=None):
@@ -343,7 +343,7 @@
"""
# TODO: This interface is incomplete.
- # XXX how is it incomplete?
+ # XXX how is it incomplete?
databases = Attribute(
"""A mapping from database name to DB (database) object.
@@ -362,8 +362,8 @@
application code should rarely, if ever, have a need to use
this attribute.
""")
-
+
def open(transaction_manager=None, serial=''):
"""Return an IConnection object for use by application code.
@@ -451,7 +451,7 @@
def getSize():
"""An approximate size of the database, in bytes.
-
+
This is used soley for informational purposes.
"""
@@ -459,7 +459,7 @@
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
-
+
The information provides a log of the changes made to the
object. Data are reported in reverse chronological order.
@@ -468,7 +468,7 @@
time
UTC seconds since the epoch (as in time.time) that the
object revision was committed.
-
+
tid
The transaction identifier of the transaction that
committed the version.
@@ -489,7 +489,7 @@
If the transaction had extension items, then these items are
also included if they don't conflict with the keys above.
-
+
"""
def isReadOnly():
@@ -510,7 +510,7 @@
def __len__():
"""The approximate number of objects in the storage
-
+
This is used soley for informational purposes.
"""
@@ -623,7 +623,7 @@
oid
The object identifier. This is either a string
consisting of 8 nulls or a string previously returned by
- new_oid.
+ new_oid.
serial
The serial of the data that was read when the object was
@@ -673,7 +673,7 @@
StorageError or, more often, a subclass of it
is raised when an internal error occurs while the storage is
handling the store() call.
-
+
"""
def tpc_abort(transaction):
@@ -771,8 +771,8 @@
# - Incorrect pack garbage-collection algorithms (possibly
# including the existing FileStorage implementation), that
# failed to take into account records after the pack time.
-
+
def restore(oid, serial, data, version, prev_txn, transaction):
"""Write data already committed in a separate database
@@ -785,7 +785,7 @@
oid
The object id for the record
-
+
serial
The transaction identifier that originally committed this object.
@@ -1071,7 +1071,7 @@
... # do things with oid, tid, and data
... if next is None:
... break
-
+
"""
class IExternalGC(IStorage):
@@ -1106,7 +1106,7 @@
The mode 'c' is similar to 'r', except that an orinary file
object is returned and may be used in a separate transaction
and after the blob's database connection has been closed.
-
+
"""
def committed():
@@ -1127,12 +1127,12 @@
Replace the current data of the blob with the file given under
filename.
- The blob must not be opened for reading or writing when consuming a
+ The blob must not be opened for reading or writing when consuming a
file.
The blob will take over ownership of the file and will either
rename or copy and remove it. The file must not be open.
-
+
"""
@@ -1147,13 +1147,45 @@
(or copy and remove it) immediately, or at transaction-commit
time. The file must not be open.
- The new serial is returned.
+ The new serial for the object is returned, but not necessarily
+ immediately. It may be returned directly, or on a subsequent
+ store or tpc_vote call.
+
+ The return value may be:
+
+ - None
+
+ - A new serial (string) for the object, or
+
+ - An iterable of object-id and serial pairs giving new serials
+ for objects.
+
+ A serial, returned as a string or in a sequence of oid/serial
+ pairs, may be the special value
+ ZODB.ConflictResolution.ResolvedSerial to indicate that a
+ conflict occured and that the object should be invalidated.
+
+ Several different exceptions may be raised when an error occurs.
+
+ ConflictError
+ is raised when serial does not match the most recent serial
+ number for object oid and the conflict was not resolved by
+ the storage.
+
+ StorageTransactionError
+ is raised when transaction does not match the current
+ transaction.
+
+ StorageError or, more often, a subclass of it
+ is raised when an internal error occurs while the storage is
+ handling the store() call.
+
"""
def loadBlob(oid, serial):
"""Return the filename of the Blob data for this OID and serial.
- Returns a filename.
+ Returns a filename.
Raises POSKeyError if the blobfile cannot be found.
"""
More information about the Zodb-checkins
mailing list