[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/StorageServer.py Rearranged the code a bit, especially rolled up some excess
Jim Fulton
jim at zope.com
Tue Dec 23 14:10:47 EST 2008
Log message for revision 94295:
Rearranged the code a bit, especially rolled up some excess
abstraction to make the code a little more readable while trying to
decipher the storage locking logic.
Changed:
U ZODB/trunk/src/ZEO/StorageServer.py
-=-
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2008-12-23 18:58:25 UTC (rev 94294)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2008-12-23 19:10:46 UTC (rev 94295)
@@ -123,7 +123,7 @@
self.database = database
def notifyConnected(self, conn):
- self.connection = conn # For restart_other() below
+ self.connection = conn
assert conn.peer_protocol_version is not None
if conn.peer_protocol_version < 'Z309':
self.client = ClientStub308(conn)
@@ -143,9 +143,23 @@
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction)
- self._abort()
+ if not self.locked:
+ # Delete (d, zeo_storage) from the _waiting list, if found.
+ waiting = self.storage._waiting
+ for i in range(len(waiting)):
+ d, z = waiting[i]
+ if z is self:
+ del waiting[i]
+ self.log("Closed connection removed from waiting list."
+ " Clients waiting: %d." % len(waiting))
+ break
+
+ if self.transaction:
+ self.tpc_abort(self.transaction.id)
+
else:
self.log("disconnected")
+
if self.stats is not None:
self.stats.clients -= 1
@@ -412,7 +426,6 @@
if not self._check_tid(id):
return
assert self.locked
- self.stats.active_txns -= 1
self.stats.commits += 1
self.storage.tpc_finish(self.transaction)
tid = self.storage.lastTransaction()
@@ -426,7 +439,6 @@
def tpc_abort(self, id):
if not self._check_tid(id):
return
- self.stats.active_txns -= 1
self.stats.aborts += 1
if self.locked:
self.storage.tpc_abort(self.transaction)
@@ -434,6 +446,7 @@
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
+ self.stats.active_txns -= 1
self.transaction = None
self.txnlog.close()
if self.locked:
@@ -441,28 +454,116 @@
self.timeout.end(self)
self.stats.lock_time = None
self.log("Transaction released storage lock", BLATHER)
- # _handle_waiting() can start another transaction (by
- # restarting a waiting one) so must be done last
- self._handle_waiting()
- def _abort(self):
- # called when a connection is closed unexpectedly
- if not self.locked:
- # Delete (d, zeo_storage) from the _waiting list, if found.
- waiting = self.storage._waiting
- for i in range(len(waiting)):
- d, z = waiting[i]
- if z is self:
- del waiting[i]
- self.log("Closed connection removed from waiting list."
- " Clients waiting: %d." % len(waiting))
- break
+ # Restart any client waiting for the storage lock.
+ while self.storage._waiting:
+ delay, zeo_storage = self.storage._waiting.pop(0)
+ try:
+ zeo_storage._restart(delay)
+ except:
+ self.log("Unexpected error handling waiting transaction",
+ level=logging.WARNING, exc_info=True)
+ zeo_storage.connection.close()
+ continue
- if self.transaction:
- self.stats.active_txns -= 1
- self.stats.aborts += 1
- self.tpc_abort(self.transaction.id)
+ if self.storage._waiting:
+ n = len(self.storage._waiting)
+ self.log("Blocked transaction restarted. "
+ "Clients waiting: %d" % n)
+ else:
+ self.log("Blocked transaction restarted.")
+ break
+
+ # The following two methods return values, so they must acquire
+ # the storage lock and begin the transaction before returning.
+
+ # It's a bit vile that undo can cause us to get the lock before vote.
+
+ def undo(self, trans_id, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ if self.locked:
+ return self._undo(trans_id)
+ else:
+ return self._wait(lambda: self._undo(trans_id))
+
+ def vote(self, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ if self.locked:
+ return self._vote()
+ else:
+ return self._wait(lambda: self._vote())
+
+ # When a delayed transaction is restarted, the dance is
+ # complicated. The restart occurs when one ZEOStorage instance
+ # finishes as a transaction and finds another instance is in the
+ # _waiting list.
+
+ # It might be better to have a mechanism to explicitly send
+ # the finishing transaction's reply before restarting the waiting
+ # transaction. If the restart takes a long time, the previous
+ # client will be blocked until it finishes.
+
+ def _wait(self, thunk):
+ # Wait for the storage lock to be acquired.
+ self._thunk = thunk
+ if self.tpc_transaction():
+ d = Delay()
+ self.storage._waiting.append((d, self))
+ self.log("Transaction blocked waiting for storage. "
+ "Clients waiting: %d." % len(self.storage._waiting))
+ return d
+ else:
+ self.log("Transaction acquired storage lock.", BLATHER)
+ return self._restart()
+
+ def _restart(self, delay=None):
+ # Restart when the storage lock is available.
+ if self.txnlog.stores == 1:
+ template = "Preparing to commit transaction: %d object, %d bytes"
+ else:
+ template = "Preparing to commit transaction: %d objects, %d bytes"
+ self.log(template % (self.txnlog.stores, self.txnlog.size()),
+ level=BLATHER)
+
+ self.locked = 1
+ self.timeout.begin(self)
+ self.stats.lock_time = time.time()
+ if (self.tid is not None) or (self.status != ' '):
+ self.storage.tpc_begin(self.transaction, self.tid, self.status)
+ else:
+ self.storage.tpc_begin(self.transaction)
+
+ loads, loader = self.txnlog.get_loader()
+ for i in range(loads):
+ store = loader.load()
+ store_type = store[0]
+ store_args = store[1:]
+
+ if store_type == 'd':
+ do_store = self._delete
+ elif store_type == 's':
+ do_store = self._store
+ elif store_type == 'r':
+ do_store = self._restore
+ else:
+ raise ValueError('Invalid store type: %r' % store_type)
+
+ if not do_store(*store_args):
+ break
+
+ # Blob support
+ while self.blob_log:
+ oid, oldserial, data, blobfilename = self.blob_log.pop()
+ self.storage.storeBlob(oid, oldserial, data, blobfilename,
+ '', self.transaction,)
+
+ resp = self._thunk()
+ if delay is not None:
+ delay.reply(resp)
+ else:
+ return resp
+
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with
@@ -487,7 +588,7 @@
assert self.blob_tempfile is None
self.blob_tempfile = tempfile.mkstemp(
dir=self.storage.temporaryDirectory())
-
+
def storeBlobChunk(self, chunk):
os.write(self.blob_tempfile[0], chunk)
@@ -506,23 +607,6 @@
def sendBlob(self, oid, serial):
self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
- # The following four methods return values, so they must acquire
- # the storage lock and begin the transaction before returning.
-
- def vote(self, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
- return self._vote()
- else:
- return self._wait(lambda: self._vote())
-
- def undo(self, trans_id, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
- return self._undo(trans_id)
- else:
- return self._wait(lambda: self._undo(trans_id))
-
def _delete(self, oid, serial):
err = None
try:
@@ -635,102 +719,6 @@
self.invalidated.extend(oids)
return tid, oids
- # When a delayed transaction is restarted, the dance is
- # complicated. The restart occurs when one ZEOStorage instance
- # finishes as a transaction and finds another instance is in the
- # _waiting list.
-
- # It might be better to have a mechanism to explicitly send
- # the finishing transaction's reply before restarting the waiting
- # transaction. If the restart takes a long time, the previous
- # client will be blocked until it finishes.
-
- def _wait(self, thunk):
- # Wait for the storage lock to be acquired.
- self._thunk = thunk
- if self.tpc_transaction():
- d = Delay()
- self.storage._waiting.append((d, self))
- self.log("Transaction blocked waiting for storage. "
- "Clients waiting: %d." % len(self.storage._waiting))
- return d
- else:
- self.log("Transaction acquired storage lock.", BLATHER)
- return self._restart()
-
- def _restart(self, delay=None):
- # Restart when the storage lock is available.
- if self.txnlog.stores == 1:
- template = "Preparing to commit transaction: %d object, %d bytes"
- else:
- template = "Preparing to commit transaction: %d objects, %d bytes"
- self.log(template % (self.txnlog.stores, self.txnlog.size()),
- level=BLATHER)
-
- self.locked = 1
- self.timeout.begin(self)
- self.stats.lock_time = time.time()
- if (self.tid is not None) or (self.status != ' '):
- self.storage.tpc_begin(self.transaction, self.tid, self.status)
- else:
- self.storage.tpc_begin(self.transaction)
-
- loads, loader = self.txnlog.get_loader()
- for i in range(loads):
- store = loader.load()
- store_type = store[0]
- store_args = store[1:]
-
- if store_type == 'd':
- do_store = self._delete
- elif store_type == 's':
- do_store = self._store
- elif store_type == 'r':
- do_store = self._restore
- else:
- raise ValueError('Invalid store type: %r' % store_type)
-
- if not do_store(*store_args):
- break
-
- # Blob support
- while self.blob_log:
- oid, oldserial, data, blobfilename = self.blob_log.pop()
- self.storage.storeBlob(oid, oldserial, data, blobfilename,
- '', self.transaction,)
-
- resp = self._thunk()
- if delay is not None:
- delay.reply(resp)
- else:
- return resp
-
- def _handle_waiting(self):
- # Restart any client waiting for the storage lock.
- while self.storage._waiting:
- delay, zeo_storage = self.storage._waiting.pop(0)
- if self._restart_other(zeo_storage, delay):
- if self.storage._waiting:
- n = len(self.storage._waiting)
- self.log("Blocked transaction restarted. "
- "Clients waiting: %d" % n)
- else:
- self.log("Blocked transaction restarted.")
- return
-
- def _restart_other(self, zeo_storage, delay):
- # Return True if the server restarted.
- # call the restart() method on the appropriate server.
- try:
- zeo_storage._restart(delay)
- except:
- self.log("Unexpected error handling waiting transaction",
- level=logging.WARNING, exc_info=True)
- zeo_storage.connection.close()
- return 0
- else:
- return 1
-
# IStorageIteration support
def iterator_start(self, start, stop):
@@ -809,8 +797,8 @@
def invalidateCache(self):
self.server._invalidateCache(self.storage_id)
-
+
class StorageServer:
"""The server side implementation of ZEO.
@@ -1064,8 +1052,8 @@
p.connection.trigger.pull_trigger()
except ZEO.zrpc.error.DisconnectedError:
pass
-
+
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
@@ -1110,8 +1098,8 @@
#
# b. A connection is closes while we are iterating. We'll need
# to cactch and ignore Disconnected errors.
-
+
if invalidated:
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
@@ -1138,11 +1126,11 @@
do full cache verification.
"""
-
+
invq = self.invq[storage_id]
# We make a copy of invq because it might be modified by a
- # foreign (other than main thread) calling invalidate above.
+ # foreign (other than main thread) calling invalidate above.
invq = invq[:]
if not invq:
@@ -1421,4 +1409,4 @@
def __getattr__(self, name):
return getattr(self.storage, name)
-
+
More information about the Zodb-checkins
mailing list