[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZEO/ Fixed a threading bug introduced when switching to having
Jim Fulton
jim at zope.com
Mon Sep 21 16:58:30 EDT 2009
Log message for revision 104411:
Fixed a threading bug introduced when switching to having
per-connection threads.
Simplified locking logic somewhat -- I think. :)
It's still more complicated than I'd like to be, in part because of
undo, which isn't a one-way call but should be.
Changed:
U ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
U ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py 2009-09-21 19:47:54 UTC (rev 104410)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py 2009-09-21 20:58:29 UTC (rev 104411)
@@ -32,6 +32,7 @@
import transaction
+import ZODB.blob
import ZODB.serialize
import ZODB.TimeStamp
import ZEO.zrpc.error
@@ -95,14 +96,14 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
- self.locked = 0
+ self.locked = False # Don't have storage lock
+ self.locked_lock = threading.Lock() # mediate locked access
self.verifying = 0
self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_tempfile = None
- self.blob_log = []
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
@@ -138,25 +139,37 @@
label = str(host) + ":" + str(port)
self.log_label = _label + "/" + label
+ def notifyLocked(self):
+ # We don't want to give a lock to a disconnected client and, we
+ # need to avoid a race of giving a lock to a client while it's
+ # disconecting. We check self.connection and set self.locked while
+ # the locked_lock is held, preventing self.connection from being
+ # set to None between the check and setting self.lock.
+ self.locked_lock.acquire()
+ try:
+ if self.connection is None:
+ return False # We've been disconnected. Don't take the lock
+ self.locked = True
+ # What happens if, before processing the trigger we, disconnect,
+ # reconnect, and start a new transaction?
+ # This isn't possible because we never reconnect!
+ self.connection.trigger.pull_trigger(self._restart)
+ return True
+ finally:
+ self.locked_lock.release()
+
def notifyDisconnected(self):
+ self.locked_lock.acquire()
+ try:
+ self.connection = None
+ finally:
+ self.locked_lock.release()
+
# When this storage closes, we must ensure that it aborts
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction)
- if not self.locked:
- # Delete (d, zeo_storage) from the _waiting list, if found.
- waiting = self.storage._waiting
- for i in range(len(waiting)):
- d, z = waiting[i]
- if z is self:
- del waiting[i]
- self.log("Closed connection removed from waiting list."
- " Clients waiting: %d." % len(waiting))
- break
-
- if self.transaction:
- self.tpc_abort(self.transaction.id)
-
+ self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
@@ -415,6 +428,7 @@
self.serials = []
self.invalidated = []
self.txnlog = CommitLog()
+ self.blob_log = []
self.tid = tid
self.status = status
self.store_failed = 0
@@ -434,109 +448,99 @@
if not self._check_tid(id):
return
assert self.locked
+
self.stats.commits += 1
- self.storage.tpc_finish(self.transaction)
+ self.storage.tpc_finish(self.transaction, self._invalidate)
+ # Note that the tid is still current because we still hold the
+ # commit lock. We'll relinquish it in _clear_transaction.
tid = self.storage.lastTransaction()
- if self.invalidated:
- self.server.invalidate(self, self.storage_id, tid,
- self.invalidated, self.get_size_info())
self._clear_transaction()
# Return the tid, for cache invalidation optimization
return tid
- def tpc_abort(self, id):
- if not self._check_tid(id):
+ def _invalidate(self, tid):
+ if self.invalidated:
+ self.server.invalidate(self, self.storage_id, tid,
+ self.invalidated, self.get_size_info())
+
+ def tpc_abort(self, tid):
+ if not self._check_tid(tid):
return
self.stats.aborts += 1
+
+ # Is there a race here? What if notifyLocked is called after
+ # the check? Well, we still won't have started committing the actual
+ # storage. That wouldn't happen until _restart is called and that
+ # can't happen while this method is executing, as both are only
+ # run by the client thtread. So no race.
if self.locked:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
+ self.server.unlock_storage(self)
+ self.locked = 0
+ self.transaction = None
self.stats.active_txns -= 1
- self.transaction = None
- self.txnlog.close()
- if self.locked:
- self.locked = 0
- self.timeout.end(self)
- self.stats.lock_time = None
- self.log("Transaction released storage lock", BLATHER)
+ if self.txnlog is not None:
+ self.txnlog.close()
+ self.txnlog = None
+ for oid, oldserial, data, blobfilename in self.blob_log:
+ ZODB.blob.remove_committed(blobfilename)
+ del self.blob_log
- # Restart any client waiting for the storage lock.
- while self.storage._waiting:
- delay, zeo_storage = self.storage._waiting.pop(0)
- try:
- zeo_storage._restart(delay)
- except:
- self.log("Unexpected error handling waiting transaction",
- level=logging.WARNING, exc_info=True)
- zeo_storage.connection.close()
- continue
-
- if self.storage._waiting:
- n = len(self.storage._waiting)
- self.log("Blocked transaction restarted. "
- "Clients waiting: %d" % n)
- else:
- self.log("Blocked transaction restarted.")
-
- break
-
# The following two methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
# It's a bit vile that undo can cause us to get the lock before vote.
- def undo(self, trans_id, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
+ def undo(self, trans_id, tid):
+ self._check_tid(tid, exc=StorageTransactionError)
+
+ if self.txnlog is not None:
+ return self._wait(lambda: self._undo(trans_id))
+ else:
return self._undo(trans_id)
+
+ def vote(self, tid):
+ self._check_tid(tid, exc=StorageTransactionError)
+
+ if self.txnlog is not None:
+ return self._wait(lambda: self._vote())
else:
- return self._wait(lambda: self._undo(trans_id))
-
- def vote(self, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
return self._vote()
- else:
- return self._wait(lambda: self._vote())
- # When a delayed transaction is restarted, the dance is
- # complicated. The restart occurs when one ZEOStorage instance
- # finishes as a transaction and finds another instance is in the
- # _waiting list.
- # It might be better to have a mechanism to explicitly send
- # the finishing transaction's reply before restarting the waiting
- # transaction. If the restart takes a long time, the previous
- # client will be blocked until it finishes.
-
+ _thunk = _delay = None
def _wait(self, thunk):
# Wait for the storage lock to be acquired.
+ assert self._thunk == self._delay == None
self._thunk = thunk
- if self.tpc_transaction():
- d = Delay()
- self.storage._waiting.append((d, self))
- self.log("Transaction blocked waiting for storage. "
- "Clients waiting: %d." % len(self.storage._waiting))
- return d
- else:
+ if self.server.lock_storage(self):
+ assert not self.tpc_transaction()
self.log("Transaction acquired storage lock.", BLATHER)
+ self.locked = True
return self._restart()
- def _restart(self, delay=None):
+ self._delay = d = Delay()
+ return d
+
+ def _restart(self):
+ if not self.locked:
+ # Must have been disconnected after locking
+ assert self.connection is None
+ return
+
# Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
+
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
- self.locked = 1
- self.timeout.begin(self)
- self.stats.lock_time = time.time()
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
else:
@@ -561,17 +565,23 @@
break
# Blob support
- while self.blob_log:
- oid, oldserial, data, blobfilename = self.blob_log.pop()
+ for oid, oldserial, data, blobfilename in self.blob_log:
self.storage.storeBlob(oid, oldserial, data, blobfilename,
'', self.transaction,)
- resp = self._thunk()
+ thunk = self._thunk
+ delay = self._delay
+ self._thunk = self._delay = None
+
+ resp = thunk()
if delay is not None:
delay.reply(resp)
- else:
- return resp
+ self.txnlog.close()
+ self.txnlog = None
+ del self.blob_log
+ return resp
+
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with
@@ -601,14 +611,18 @@
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ assert self.txnlog is not None # effectively not allowed after undo
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ assert self.txnlog is not None # effectively not allowed after undo
+
# Reconstruct the full path from the filename in the OID directory
-
if (os.path.sep in filename
or not (filename.endswith('.tmp')
or filename[:-1].endswith('.tmp')
@@ -722,6 +736,7 @@
return error
def _vote(self):
+ assert self.locked
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
# the serialnos() call will deliver an exception that will be
@@ -916,8 +931,10 @@
for name, storage in storages.items()])
log("%s created %s with storages: %s" %
(self.__class__.__name__, read_only and "RO" or "RW", msg))
- for s in storages.values():
- s._waiting = []
+
+ self.lockers = dict((name, []) for name in storages)
+ self.lockers_lock = threading.Lock()
+
self.read_only = read_only
self.auth_protocol = auth_protocol
self.auth_database = auth_database
@@ -1203,7 +1220,51 @@
if conn.obj in cl:
cl.remove(conn.obj)
+ def lock_storage(self, zeostore):
+ self.lockers_lock.acquire()
+ try:
+ storage_id = zeostore.storage_id
+ lockers = self.lockers[storage_id]
+ lockers.append(zeostore)
+ if len(lockers) == 1:
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ return True
+ else:
+ zeostore.log("(%r) queue lock: transactions waiting: %s"
+ % (storage_id, len(lockers)-1))
+ finally:
+ self.lockers_lock.release()
+ def unlock_storage(self, zeostore):
+ self.lockers_lock.acquire()
+ try:
+ storage_id = zeostore.storage_id
+ lockers = self.lockers[storage_id]
+ if zeostore in lockers:
+ if lockers[0] == zeostore:
+ self.timeouts[storage_id].end(zeostore)
+ self.stats[storage_id].lock_time = None
+ lockers.pop(0)
+ while lockers:
+ zeostore.log("(%r) unlock: transactions waiting: %s"
+ % (storage_id, len(lockers)-1))
+ zeostore = lockers[0]
+ if zeostore.notifyLocked():
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ break
+ else:
+ # The queued client was closed, so dequeue it
+ lockers.pop(0)
+ else:
+ lockers.remove(zeostore)
+ if lockers:
+ zeostore.log("(%r) dequeue: transactions waiting: %s"
+ % (storage_id, len(lockers)-1))
+ finally:
+ self.lockers_lock.release()
+
class StubTimeoutThread:
def begin(self, client):
@@ -1225,7 +1286,6 @@
self._client = None
self._deadline = None
self._cond = threading.Condition() # Protects _client and _deadline
- self._trigger = trigger()
def begin(self, client):
# Called from the restart code the "main" thread, whenever the
@@ -1268,7 +1328,8 @@
if howlong <= 0:
client.log("Transaction timeout after %s seconds" %
self._timeout)
- self._trigger.pull_trigger(lambda: client.connection.close())
+ client.connection.trigger.pull_trigger(
+ lambda: client.connection.close())
else:
time.sleep(howlong)
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py 2009-09-21 19:47:54 UTC (rev 104410)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py 2009-09-21 20:58:29 UTC (rev 104411)
@@ -734,7 +734,6 @@
self.storage_id = storage_id
self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
self.server.register(storage_id, False)
- self.server._thunk = lambda : None
self.server.client = StorageServerClientWrapper()
def sortKey(self):
@@ -756,7 +755,6 @@
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
def tpc_vote(self, transaction):
- self.server._restart()
self.server.vote(id(transaction))
result = self.server.client.serials[:]
del self.server.client.serials[:]
More information about the Zodb-checkins
mailing list