[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.86
Jeremy Hylton
jeremy@zope.com
Tue, 7 Jan 2003 17:13:29 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv27954/ZEO
Modified Files:
StorageServer.py
Log Message:
Add per-storage transaction timeout feature and a couple of tests.
=== ZODB3/ZEO/StorageServer.py 1.85 => 1.86 ===
--- ZODB3/ZEO/StorageServer.py:1.85 Tue Jan 7 14:24:41 2003
+++ ZODB3/ZEO/StorageServer.py Tue Jan 7 17:12:57 2003
@@ -33,6 +33,7 @@
from ZEO.CommitLog import CommitLog
from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB.POSException import StorageError, StorageTransactionError
@@ -72,7 +73,8 @@
ManagedServerConnectionClass = ManagedServerConnection
def __init__(self, addr, storages, read_only=0,
- invalidation_queue_size=100):
+ invalidation_queue_size=100,
+ transaction_timeout=None):
"""StorageServer constructor.
This is typically invoked from the start.py script.
@@ -104,6 +106,11 @@
N == invalidation_queue_size. This queue is used to
speed client cache verification when a client disconnects
for a short period of time.
+
+ transaction_timout -- The maximum amount of time to wait for
+ a transaction to commit after acquiring the storage lock.
+ If the transaction takes too long, the client connection
+ will be closed and the transaction aborted.
"""
self.addr = addr
@@ -125,6 +132,15 @@
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection,
reuse_addr=1)
+ self.timeouts = {}
+ for name in self.storages.keys():
+ if transaction_timeout is None:
+ # An object with no-op methods
+ timeout = StubTimeoutThread()
+ else:
+ timeout = TimeoutThread(transaction_timeout)
+ timeout.start()
+ self.timeouts[name] = timeout
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
@@ -147,11 +163,14 @@
list of current connections for that storage; this information
is needed to handle invalidation. This function updates this
dictionary.
+
+ Returns the timeout object for the appropriate storage.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
l.append(conn)
+ return self.timeouts[storage_id]
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
@@ -216,6 +235,7 @@
This is only called from the test suite, AFAICT.
"""
+ self.timeout.stop()
self.dispatcher.close()
for storage in self.storages.values():
storage.close()
@@ -246,6 +266,7 @@
def __init__(self, server, read_only=0):
self.server = server
+ self.timeout = None
self.connection = None
self.client = None
self.storage = None
@@ -350,7 +371,7 @@
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
- self.server.register_connection(storage_id, self)
+ self.timeout = self.server.register_connection(storage_id, self)
def get_info(self):
return {'length': len(self.storage),
@@ -512,6 +533,7 @@
self.invalidated, self.get_size_info())
self.transaction = None
self.locked = 0
+ self.timeout.end(self)
# Return the tid, for cache invalidation optimization
self._handle_waiting()
return tid
@@ -523,6 +545,7 @@
self.storage.tpc_abort(self.transaction)
self.transaction = None
self.locked = 0
+ self.timeout.end(self)
self._handle_waiting()
def _abort(self):
@@ -584,6 +607,7 @@
def _tpc_begin(self, txn, tid, status):
self.locked = 1
self.storage.tpc_begin(txn, tid, status)
+ self.timeout.begin(self)
def _store(self, oid, serial, data, version):
try:
@@ -701,6 +725,86 @@
return 0
else:
return 1
+
+class StubTimeoutThread:
+
+ def begin(self, client):
+ pass
+
+ def end(self, client):
+ pass
+
+ def stop(self):
+ pass
+
+class TimeoutThread(threading.Thread):
+ """Monitors transaction progress and generates timeouts."""
+
+ def __init__(self, timeout):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self._timeout = timeout
+ self._client = None
+ self._deadline = None
+ self._stop = 0
+ self._active = threading.Event()
+ self._lock = threading.Lock()
+ self._trigger = trigger()
+
+ def stop(self):
+ self._stop = 1
+
+ def begin(self, client):
+ self._lock.acquire()
+ try:
+ self._active.set()
+ self._client = client
+ self._deadline = time.time() + self._timeout
+ finally:
+ self._lock.release()
+
+ def end(self, client):
+ # The ZEOStorage will call this message for every aborted
+ # transaction, regardless of whether the transaction started
+ # the 2PC. Ignore here if 2PC never began.
+ if client is not self._client:
+ return
+ self._lock.acquire()
+ try:
+ self._active.clear()
+ self._client = None
+ self._deadline = None
+ finally:
+ self._lock.release()
+
+ def run(self):
+ while not self._stop:
+ self._active.wait()
+ self._lock.acquire()
+ try:
+ howlong = self._deadline - time.time()
+ finally:
+ self._lock.release()
+ if howlong <= 0:
+ self.timeout()
+ else:
+ time.sleep(howlong)
+
+ def timeout(self):
+ self._lock.acquire()
+ try:
+ client = self._client
+ deadline = self._deadline
+ self._active.clear()
+ self._client = None
+ self._deadline = None
+ finally:
+ self._lock.release()
+ if client is None:
+ return
+ elapsed = time.time() - (deadline - self._timeout)
+ client.log("Transaction timeout after %d seconds" % int(elapsed))
+ self._trigger.pull_trigger(lambda: client.connection.close())
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)