[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.90
Guido van Rossum
guido@python.org
Mon, 20 Jan 2003 14:39:01 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv5070
Modified Files:
StorageServer.py
Log Message:
More cleanup and refactoring:
- Get rid of _lock() and _unlock(); _lock() is inlined, and so is
_unlock(), after factoring out some common code at the end of
tpc_begin() and tpc_end().
- In the refactored _unlock() code, only call self.timeout.end() if
self.locked was actually set.
In the TimeoutThread class:
- Add some comments.
- Add some assertions.
- Get rid of the stop() method; we're a daemon thread so we'll be
killed anyway; close_server() is only used from the test suite.
- Switch from using a lock + an event to a condition variable. Be
religious about doing stuff only while holding the lock.
- Inline the timeout() function; it shouldn't reacquire the lock
anyway.
--Guido & Jeremy
=== ZODB3/ZEO/StorageServer.py 1.89 => 1.90 ===
--- ZODB3/ZEO/StorageServer.py:1.89 Thu Jan 9 16:50:18 2003
+++ ZODB3/ZEO/StorageServer.py Mon Jan 20 14:38:59 2003
@@ -157,18 +157,6 @@
return 0
return 1
- # _lock() and _unlock() control the locked flag
-
- def _lock(self):
- self.locked = 1
- self.timeout.begin(self)
- self.stats.lock_time = time.time()
-
- def _unlock(self):
- self.locked = 0
- self.timeout.end(self)
- self.stats.lock_time = None
-
def register(self, storage_id, read_only):
"""Select the storage that this client will use
@@ -360,10 +348,8 @@
if self.invalidated:
self.server.invalidate(self, self.storage_id, tid,
self.invalidated, self.get_size_info())
- self.transaction = None
- self._unlock()
+ self._clear_transaction()
# Return the tid, for cache invalidation optimization
- self._handle_waiting()
return tid
def tpc_abort(self, id):
@@ -373,8 +359,17 @@
self.stats.aborts += 1
if self.locked:
self.storage.tpc_abort(self.transaction)
+ self._clear_transaction()
+
+ def _clear_transaction(self):
+ # Common code at end of tpc_finish() and tpc_abort()
self.transaction = None
- self._unlock()
+ if self.locked:
+ self.locked = 0
+ self.timeout.end(self)
+ self.stats.lock_time = None
+ # _handle_waiting() can start another transaction (by
+ # restarting a waiting one) so must be done last
self._handle_waiting()
def _abort(self):
@@ -437,7 +432,9 @@
return self._wait(lambda: self._transactionalUndo(trans_id))
def _tpc_begin(self, txn, tid, status):
- self._lock()
+ self.locked = 1
+ self.timeout.begin(self)
+ self.stats.lock_time = time.time()
self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version):
@@ -753,8 +750,6 @@
This is only called from the test suite, AFAICT.
"""
- for timeout in self.timeouts.values():
- timeout.stop()
self.dispatcher.close()
if self.monitor is not None:
self.monitor.close()
@@ -786,81 +781,62 @@
def end(self, client):
pass
- def stop(self):
- pass
-
class TimeoutThread(threading.Thread):
"""Monitors transaction progress and generates timeouts."""
+ # There is one TimeoutThread per storage, because there's one
+ # transaction lock per storage.
+
def __init__(self, timeout):
threading.Thread.__init__(self)
self.setDaemon(1)
self._timeout = timeout
self._client = None
self._deadline = None
- self._stop = 0
- self._active = threading.Event()
- self._lock = threading.Lock()
+ self._cond = threading.Condition() # Protects _client and _deadline
self._trigger = trigger()
- def stop(self):
- self._stop = 1
-
def begin(self, client):
- self._lock.acquire()
+ # Called from the restart code the "main" thread, whenever the
+ # storage lock is being acquired. (Serialized by asyncore.)
+ self._cond.acquire()
try:
- self._active.set()
+ assert self._client is None
self._client = client
self._deadline = time.time() + self._timeout
+ self._cond.notify()
finally:
- self._lock.release()
+ self._cond.release()
def end(self, client):
- # The ZEOStorage will call this message for every aborted
- # transaction, regardless of whether the transaction started
- # the 2PC. Ignore here if 2PC never began.
- if client is not self._client:
- return
- self._lock.acquire()
+ # Called from the "main" thread whenever the storage lock is
+ # being released. (Serialized by asyncore.)
+ self._cond.acquire()
try:
- self._active.clear()
+ assert self._client is not None
self._client = None
self._deadline = None
finally:
- self._lock.release()
+ self._cond.release()
def run(self):
- while not self._stop:
- self._active.wait()
- self._lock.acquire()
+ # Code running in the thread.
+ while 1:
+ self._cond.acquire()
try:
- deadline = self._deadline
- if deadline is None:
- continue
- howlong = deadline - time.time()
+ while self._client is None:
+ self._cond.wait()
+ howlong = self._deadline - time.time()
+ client = self._client # For the howlong <= 0 branch below
finally:
- self._lock.release()
+ self._cond.release()
if howlong <= 0:
- self.timeout()
+ client.log("Transaction timeout after %s seconds" %
+ self._timeout)
+ self._trigger.pull_trigger(lambda: client.connection.close())
else:
time.sleep(howlong)
self.trigger.close()
-
- def timeout(self):
- self._lock.acquire()
- try:
- client = self._client
- deadline = self._deadline
- self._active.clear()
- self._client = None
- self._deadline = None
- finally:
- self._lock.release()
- if client is None:
- return
- elapsed = time.time() - (deadline - self._timeout)
- client.log("Transaction timeout after %d seconds" % int(elapsed))
- self._trigger.pull_trigger(lambda: client.connection.close())
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)