[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZEO/ Fixed bug that caused server to stop committing transactions when
Jim Fulton
jim at zope.com
Fri Oct 2 13:18:18 EDT 2009
Log message for revision 104753:
Fixed bug that caused server to stop committing transactions when
there was a conflict error on blobs.
Refactored an optimization that speeds server output by writing ahead
of select loop.
Changed:
U ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
A ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py
-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py 2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py 2009-10-02 17:18:18 UTC (rev 104753)
@@ -273,6 +273,8 @@
if self.auth_realm and not self.authenticated:
raise AuthError("Client was never authenticated with server!")
+ self.connection.auth_done()
+
if self.storage is not None:
self.log("duplicate register() call")
raise ValueError("duplicate register() call")
@@ -290,7 +292,6 @@
self.storage = storage
self.setup_delegation()
self.stats = self.server.register_connection(storage_id, self)
- self.connection.thread_ident = self.connection.unregistered_thread_ident
def get_info(self):
storage = self.storage
@@ -546,29 +547,35 @@
else:
self.storage.tpc_begin(self.transaction)
- loads, loader = self.txnlog.get_loader()
- for i in range(loads):
- store = loader.load()
- store_type = store[0]
- store_args = store[1:]
+ try:
+ loads, loader = self.txnlog.get_loader()
+ for i in range(loads):
+ store = loader.load()
+ store_type = store[0]
+ store_args = store[1:]
- if store_type == 'd':
- do_store = self._delete
- elif store_type == 's':
- do_store = self._store
- elif store_type == 'r':
- do_store = self._restore
- else:
- raise ValueError('Invalid store type: %r' % store_type)
+ if store_type == 'd':
+ do_store = self._delete
+ elif store_type == 's':
+ do_store = self._store
+ elif store_type == 'r':
+ do_store = self._restore
+ else:
+ raise ValueError('Invalid store type: %r' % store_type)
- if not do_store(*store_args):
- break
+ if not do_store(*store_args):
+ break
- # Blob support
- for oid, oldserial, data, blobfilename in self.blob_log:
- self.storage.storeBlob(oid, oldserial, data, blobfilename,
- '', self.transaction,)
+ # Blob support
+ while self.blob_log and not self.store_failed:
+ oid, oldserial, data, blobfilename = self.blob_log.pop()
+ self._store(oid, oldserial, data, blobfilename)
+ except:
+ self.storage.tpc_abort(self.transaction)
+ self._clear_transaction()
+ raise
+
thunk = self._thunk
delay = self._delay
self._thunk = self._delay = None
@@ -664,11 +671,15 @@
return err is None
- def _store(self, oid, serial, data):
+ def _store(self, oid, serial, data, blobfile=None):
err = None
try:
- newserial = self.storage.store(oid, serial, data, '',
- self.transaction)
+ if blobfile is None:
+ newserial = self.storage.store(
+ oid, serial, data, '', self.transaction)
+ else:
+ newserial = self.storage.storeBlob(
+ oid, serial, data, blobfile, '', self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py 2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py 2009-10-02 17:18:18 UTC (rev 104753)
@@ -57,6 +57,9 @@
peer_protocol_version = (
ZEO.zrpc.connection.Connection.current_protocol)
+ def auth_done(self):
+ pass
+
def test_server_record_iternext():
"""
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py 2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py 2009-10-02 17:18:18 UTC (rev 104753)
@@ -722,10 +722,12 @@
class FauxConn:
addr = 'x'
- thread_ident = unregistered_thread_ident = None
peer_protocol_version = (
ZEO.zrpc.connection.Connection.current_protocol)
+ def auth_done(self):
+ pass
+
class StorageServerClientWrapper:
def __init__(self):
Copied: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py (from rev 104716, ZODB/trunk/src/ZEO/tests/testZEO2.py)
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py (rev 0)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py 2009-10-02 17:18:18 UTC (rev 104753)
@@ -0,0 +1,171 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+from zope.testing import doctest, setupstack, renormalizing
+import logging
+import re
+import sys
+import transaction
+import unittest
+import ZEO.StorageServer
+import ZEO.tests.servertesting
+import ZODB.blob
+import ZODB.FileStorage
+import ZODB.tests.util
+import ZODB.utils
+
+def proper_handling_of_blob_conflicts():
+ r"""
+
+Conflict errors weren't properly handled when storing blobs, the
+result being that the storage was left in a transaction.
+
+We originally saw this when restarting a block transaction, although
+it doesn't really matter.
+
+Set up the storage with some initial blob data.
+
+ >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+ >>> db = ZODB.DB(fs)
+ >>> conn = db.open()
+ >>> conn.root.b = ZODB.blob.Blob('x')
+ >>> transaction.commit()
+
+Get the iod and first serial. We'll use the serial later to provide
+out-of-date data.
+
+ >>> oid = conn.root.b._p_oid
+ >>> serial = conn.root.b._p_serial
+ >>> conn.root.b.open('w').write('y')
+ >>> transaction.commit()
+ >>> data = fs.load(oid)[0]
+
+Create the server:
+
+ >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+ >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn1 = ZEO.tests.servertesting.Connection(1)
+ >>> zs1.notifyConnected(conn1)
+ >>> zs1.register('1', 0)
+ >>> zs1.tpc_begin('0', '', '', {})
+ >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+ >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+ 1 callAsync serialnos ...
+
+In a second client, we'll try to commit using the old serial. This
+will conflict. It will be blocked at the vote call.
+
+ >>> zs2 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn2 = ZEO.tests.servertesting.Connection(2)
+ >>> zs2.notifyConnected(conn2)
+ >>> zs2.register('1', 0)
+ >>> zs2.tpc_begin('1', '', '', {})
+ >>> zs2.storeBlobStart()
+ >>> zs2.storeBlobChunk('z')
+ >>> zs2.storeBlobEnd(oid, serial, data, '1')
+ >>> delay = zs2.vote('1')
+
+ >>> def send_reply(id, reply):
+ ... print 'reply', id, reply
+ >>> delay.set_sender(1, send_reply, None)
+
+ >>> logger = logging.getLogger('ZEO')
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> logger.setLevel(logging.INFO)
+ >>> logger.addHandler(handler)
+
+Now, when we abort the transaction for the first client. the second
+client will be restarted. It will get a conflict error, that is
+handled correctly:
+
+ >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+ (511/test-addr) ('1') unlock: transactions waiting: 0
+ 2 callAsync serialnos ...
+ reply 1 None
+
+ >>> fs.tpc_transaction() is not None
+ True
+ >>> conn2.connected
+ True
+
+ >>> logger.setLevel(logging.NOTSET)
+ >>> logger.removeHandler(handler)
+ >>> zs2.tpc_abort('1')
+ >>> fs.close()
+ """
+
+def proper_handling_of_errors_in_restart():
+ r"""
+
+It's critical that if there is an error in _restart (ie vote) that the
+storage isn't left in tpc.
+
+ >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+ >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+ >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn1 = ZEO.tests.servertesting.Connection(1)
+ >>> zs1.notifyConnected(conn1)
+ >>> zs1.register('1', 0)
+ >>> zs1.tpc_begin('0', '', '', {})
+ >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+
+Intentionally break zs1:
+
+ >>> zs1._store = lambda : None
+ >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ TypeError: <lambda>() takes no arguments (3 given)
+
+We're not in a transaction:
+
+ >>> fs.tpc_transaction() is None
+ True
+
+We can start another client and get the storage lock.
+
+ >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+ >>> conn1 = ZEO.tests.servertesting.Connection(1)
+ >>> zs1.notifyConnected(conn1)
+ >>> zs1.register('1', 0)
+ >>> zs1.tpc_begin('1', '', '', {})
+ >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
+ >>> _ = zs1.vote('1') # doctest: +ELLIPSIS
+ 1 callAsync serialnos ...
+
+ >>> zs1.tpc_finish('1') is not None
+ True
+
+ >>> fs.close()
+ """
+
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocTestSuite(
+ setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
+ checker=renormalizing.RENormalizing([
+ (re.compile('\d+/test-addr'), ''),
+ ]),
+ ),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py 2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py 2009-10-02 17:18:18 UTC (rev 104753)
@@ -783,8 +783,13 @@
else:
self.trigger.pull_trigger()
+ def auth_done(self):
+ # We're done with the auth dance. We can be fast now.
+ self.thread_ident = self.unregistered_thread_ident
+
def server_loop(map, conn):
conn.unregistered_thread_ident = thread.get_ident()
+
while len(map) > 1:
asyncore.poll(30.0, map)
for o in map.values():
More information about the Zodb-checkins
mailing list