[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.3
Jeremy Hylton
jeremy@zope.com
Wed, 30 Oct 2002 16:42:10 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv1434/ZEO
Modified Files:
Tag: ZODB3-3_1-branch
StorageServer.py
Log Message:
Backport from trunk: New timeout feature.
Before we do a maintenance release, we should also backport the
configuration option to enable / disable.
=== ZODB3/ZEO/StorageServer.py 1.74.2.2 => 1.74.2.3 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.2 Fri Oct 4 15:49:53 2002
+++ ZODB3/ZEO/StorageServer.py Wed Oct 30 16:42:10 2002
@@ -25,6 +25,7 @@
import os
import sys
import threading
+import time
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
@@ -210,9 +211,12 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
+ self.timeout = TimeoutThread()
+ self.timeout.start()
def notifyConnected(self, conn):
self.client = self.ClientStorageStubClass(conn)
+ self.timeout.notifyConnected(conn)
def notifyDisconnected(self):
# When this storage closes, we must ensure that it aborts
@@ -222,6 +226,7 @@
self.abort()
else:
self.log("disconnected")
+ self.timeout.notifyDisconnected()
def __repr__(self):
tid = self.transaction and repr(self.transaction.id)
@@ -393,6 +398,7 @@
if self.storage._transaction is None:
self.strategy = self.ImmediateCommitStrategyClass(self.storage,
self.client)
+ self.timeout.begin()
else:
self.strategy = self.DelayedCommitStrategyClass(self.storage,
self.wait)
@@ -409,6 +415,7 @@
def tpc_finish(self, id):
if not self.check_tid(id):
return
+ self.timeout.end()
invalidated = self.strategy.tpc_finish()
if invalidated:
self.server.invalidate(self, self.storage_id,
@@ -420,6 +427,7 @@
def tpc_abort(self, id):
if not self.check_tid(id):
return
+ self.timeout.end()
strategy = self.strategy
strategy.tpc_abort()
self.transaction = None
@@ -440,7 +448,9 @@
def vote(self, id):
self.check_tid(id, exc=StorageTransactionError)
- return self.strategy.tpc_vote()
+ r = self.strategy.tpc_vote()
+ self.timeout.begin()
+ return r
def abortVersion(self, src, id):
self.check_tid(id, exc=StorageTransactionError)
@@ -736,6 +746,79 @@
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
+
+class TimeoutThread(threading.Thread):
+ # A TimeoutThread is associated with a ZEOStorage. It trackes
+ # how long transactions take to commit. If a transaction takes
+ # too long, it will close the connection.
+
+ TIMEOUT = 30
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self._lock = threading.Lock()
+ self._timestamp = None
+ self._conn = None
+
+ def begin(self):
+ self._lock.acquire()
+ try:
+ self._timestamp = time.time()
+ finally:
+ self._lock.release()
+
+ def end(self):
+ self._lock.acquire()
+ try:
+ self._timestamp = None
+ finally:
+ self._lock.release()
+
+ # There's a race here, but I hope it is harmless.
+
+ def notifyConnected(self, conn):
+ self._conn = conn
+
+ def notifyDisconnected(self):
+ self._conn = None
+
+ def run(self):
+ timeout = self.TIMEOUT
+ while self._conn is not None:
+ time.sleep(timeout)
+
+ self._lock.acquire()
+ try:
+ if self._timestamp is not None:
+ deadline = self._timestamp + self.TIMEOUT
+ else:
+ log("TimeoutThread no current transaction",
+ zLOG.BLATHER)
+ timeout = self.TIMEOUT
+ continue
+ finally:
+ self._lock.release()
+
+ timeout = deadline - time.time()
+ if deadline < time.time():
+ self._abort()
+ break
+ else:
+ elapsed = self.TIMEOUT - timeout
+ log("TimeoutThread transaction has %0.2f sec to complete"
+ " (%.2f elapsed)" % (timeout, elapsed), zLOG.BLATHER)
+ log("TimeoutThread exiting. Connection closed.", zLOG.BLATHER)
+
+ def _abort(self):
+ # It's possible for notifyDisconnected to remove the connection
+ # just before we use it. I think that's harmless, since it means
+ # the connection was closed.
+ log("TimeoutThread aborting transaction", zLOG.WARNING)
+ try:
+ self._conn.close()
+ except AttributeError, msg:
+ log(msg)
+
# Patch up class references
StorageServer.ZEOStorageClass = ZEOStorage