[Zodb-checkins] CVS: StandaloneZODB/ZEO - StorageServer.py:1.28.2.7
Jeremy Hylton
jeremy@zope.com
Thu, 7 Mar 2002 21:16:55 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv9224
Modified Files:
Tag: zeo-1_0-branch
StorageServer.py
Log Message:
Refactor and fix distributed commit lock implementation.
This is a candidate for ZEO 1.0b6.
This fixes a bug reported by Chris McDonough on zodb-dev. The bug
caused the storage server to stop committing transactions for a
storage. The details of the bug are the in the checkCommitLock test
cases.
The following comment explains the new code.
# distributed commit lock support methods
# Only one client at a time can commit a transaction on a
# storage. If one client is committing a transaction, and a
# second client sends a tpc_begin(), then second client is queued.
# When the first transaction finishes, either by abort or commit,
# the request from the queued client must be handled.
# It is important that this code be robust. If a queued
# transaction is not restarted, the server will stop processing
# new transactions.
# This lock is implemented by storing the queued requests in a
# list on the storage object. The list contains:
# a callable object to resume request
# arguments to that object
# a callable object to handle errors during resume
# XXX I am not sure that the commitlock_resume() method is
# sufficiently paranoid.
=== StandaloneZODB/ZEO/StorageServer.py 1.28.2.6 => 1.28.2.7 ===
from ZEO import trigger
from ZEO import asyncwrap
+from ZEO.smac import Disconnected
from types import StringType
class StorageServerError(POSException.StorageError): pass
@@ -133,6 +134,8 @@
self.__storages=storages
for n, s in storages.items():
init_storage(s)
+ # Create a waiting list to support the distributed commit lock.
+ s._waiting = []
self.__connections={}
self.__get_connections=self.__connections.get
@@ -280,6 +283,7 @@
# This is the first communication from the client
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
+
# Send info back asynchronously, so client need not ask
self.message_output('S'+dump(self.get_info(), 1))
return
@@ -501,39 +505,76 @@
return oids
return ()
- def tpc_abort(self, id):
- t=self._transaction
- if t is None or id != t.id: return
- r=self.__storage.tpc_abort(t)
+ # distributed commit lock support methods
- storage=self.__storage
- try: waiting=storage.__waiting
- except: waiting=storage.__waiting=[]
+ # Only one client at a time can commit a transaction on a
+ # storage. If one client is committing a transaction, and a
+ # second client sends a tpc_begin(), then second client is queued.
+ # When the first transaction finishes, either by abort or commit,
+ # the request from the queued client must be handled.
+
+ # It is important that this code be robust. If a queued
+ # transaction is not restarted, the server will stop processing
+ # new transactions.
+
+ # This lock is implemented by storing the queued requests in a
+ # list on the storage object. The list contains:
+ # a callable object to resume request
+ # arguments to that object
+ # a callable object to handle errors during resume
+
+ # XXX I am not sure that the commitlock_resume() method is
+ # sufficiently paranoid.
+
+ def commitlock_suspend(self, resume, args, onerror):
+ self.__storage._waiting.append((resume, args, onerror))
+
+ def commitlock_resume(self):
+ waiting = self.__storage._waiting
while waiting:
- f, args = waiting.pop(0)
- if apply(f,args): break
+ resume, args, onerror = waiting.pop(0)
+ try:
+ if apply(resume, args):
+ break
+ except Disconnected:
+ # A disconnected error isn't an unexpected error.
+ # There should be no need to log it, because the
+ # disconnect will have generated its own log event.
+ onerror()
+ except:
+ LOG('ZEO Server', ERROR,
+ "Unexpected error handling queued tpc_begin()",
+ error=sys.exc_info())
+ onerror()
- self._transaction=None
- self.__invalidated=[]
+ def tpc_abort(self, id):
+ t = self._transaction
+ if t is None or id != t.id:
+ return
+ r = self.__storage.tpc_abort(t)
+
+ self._transaction = None
+ self.__invalidated = []
+ self.commitlock_resume()
def unlock(self):
- if self.__closed: return
+ if self.__closed:
+ return
self.message_output('UN.')
def tpc_begin(self, id, user, description, ext):
- t=self._transaction
+ t = self._transaction
if t is not None:
- if id == t.id: return
+ if id == t.id:
+ return
else:
raise StorageServerError(
"Multiple simultaneous tpc_begin requests from the same "
"client."
)
- storage=self.__storage
+ storage = self.__storage
if storage._transaction is not None:
- try: waiting=storage.__waiting
- except: waiting=storage.__waiting=[]
- waiting.append((self.unlock, ()))
+ self.commitlock_suspend(self.unlock, (), self.close)
return 1 # Return a flag indicating a lock condition.
self._transaction=t=Transaction()
@@ -552,9 +593,9 @@
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
else:
- try: waiting=storage.__waiting
- except: waiting=storage.__waiting=[]
- waiting.append((self.try_again_sync, (id, user, description, ext)))
+ self.commitlock_suspend(self.try_again_sync,
+ (id, user, description, ext),
+ self.close)
return _noreturn
@@ -572,24 +613,21 @@
return 1
def tpc_finish(self, id, user, description, ext):
- t=self._transaction
- if id != t.id: return
+ t = self._transaction
+ if id != t.id:
+ return
- storage=self.__storage
- r=storage.tpc_finish(t)
-
- try: waiting=storage.__waiting
- except: waiting=storage.__waiting=[]
- while waiting:
- f, args = waiting.pop(0)
- if apply(f,args): break
+ storage = self.__storage
+ r = storage.tpc_finish(t)
- self._transaction=None
+ self._transaction = None
if self.__invalidated:
self.__server.invalidate(self, self.__storage_id,
self.__invalidated,
self.get_size_info())
- self.__invalidated=[]
+ self.__invalidated = []
+
+ self.commitlock_resume()
def init_storage(storage):
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None