[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.8 start.py:1.45.6.4
Guido van Rossum
guido@python.org
Mon, 20 Jan 2003 16:55:00 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv6796/ZEO
Modified Files:
Tag: ZODB3-3_1-branch
StorageServer.py start.py
Log Message:
Sneak a new feature in the ZODB 3.1.1 release branch: transaction
timeouts. This is a backport of the same feature in ZODB 3.2.
Why backport a feature? We absolutely need this for a customer
project that's stuck on ZODB 3.1.x indefinitely; maintaining a brach
off the 3.1.1 release branch indefinitely would be a branching
nightmare. The feature is not enabled by default, and when not
enabled it doesn't change anything. So this should be safe.
=== ZODB3/ZEO/StorageServer.py 1.74.2.7 => 1.74.2.8 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.7 Wed Dec 18 16:11:22 2002
+++ ZODB3/ZEO/StorageServer.py Mon Jan 20 16:54:26 2003
@@ -24,12 +24,14 @@
import cPickle
import os
import sys
+import time
import threading
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB.POSException import StorageError, StorageTransactionError
@@ -67,7 +69,8 @@
ZEOStorageClass = None # patched up later
ManagedServerConnectionClass = ManagedServerConnection
- def __init__(self, addr, storages, read_only=0):
+ def __init__(self, addr, storages, read_only=0,
+ transaction_timeout=None):
"""StorageServer constructor.
@@ -95,6 +98,11 @@
allowed, even if the storages are writable. Note that
pack() is considered a read-only operation.
+ transaction_timeout -- The maximum amount of time to wait for
+ a transaction to commit after acquiring the storage lock.
+ If the transaction takes too long, the client connection
+ will be closed and the transaction aborted.
+
"""
self.addr = addr
@@ -112,6 +120,15 @@
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection,
reuse_addr=1)
+ self.timeouts = {}
+ for name in self.storages.keys():
+ if transaction_timeout is None:
+ # An object with no-op methods
+ timeout = StubTimeoutThread()
+ else:
+ timeout = TimeoutThread(transaction_timeout)
+ timeout.start()
+ self.timeouts[name] = timeout
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
@@ -139,6 +156,7 @@
if l is None:
l = self.connections[storage_id] = []
l.append(conn)
+ return self.timeouts[storage_id]
def invalidate(self, conn, storage_id, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
@@ -204,6 +222,7 @@
ImmediateCommitStrategyClass = None # patched up later
def __init__(self, server, read_only=0):
+ self.timeout = None # Will be initialized in register()
self.server = server
self.connection = None
self.client = None
@@ -211,6 +230,7 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
+ self.strategy = None
self.log_label = _label
def notifyConnected(self, conn):
@@ -295,7 +315,7 @@
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
- self.server.register_connection(storage_id, self)
+ self.timeout = self.server.register_connection(storage_id, self)
def get_info(self):
return {'length': len(self.storage),
@@ -439,23 +459,29 @@
if invalidated:
self.server.invalidate(self, self.storage_id,
invalidated, self.get_size_info())
- self.transaction = None
- self.strategy = None
- self.handle_waiting()
+ self._clear_transaction()
def tpc_abort(self, id):
if not self.check_tid(id):
return
strategy = self.strategy
strategy.tpc_abort()
+ self._clear_transaction()
+
+ def _clear_transaction(self):
+ # Common code at end of tpc_finish() and tpc_abort()
self.transaction = None
self.strategy = None
+ self.timeout.end(self)
+ # handle_waiting() can start another transaction (by
+ # restarting a waiting one) so must be done last
self.handle_waiting()
def abort(self):
strategy = self.strategy
self.transaction = None
self.strategy = None
+ self.timeout.end(self)
strategy.abort(self)
# XXX handle new serialnos
@@ -531,9 +557,9 @@
def restart(self, delay=None):
old_strategy = self.strategy
assert isinstance(old_strategy, DelayedCommitStrategy)
- self.strategy = ImmediateCommitStrategy(self.storage,
- self.client,
- self.log)
+ self.timeout.begin(self)
+ self.strategy = self.ImmediateCommitStrategyClass(
+ self.storage, self.client, self.log)
resp = old_strategy.restart(self.strategy)
if delay is not None:
delay.reply(resp)
@@ -738,6 +764,75 @@
if z is zeo_storage:
del waiting[i]
break
+
+class StubTimeoutThread:
+
+ def begin(self, client):
+ pass
+
+ def end(self, client):
+ pass
+
+class TimeoutThread(threading.Thread):
+ """Monitors transaction progress and generates timeouts."""
+
+ # There is one TimeoutThread per storage, because there's one
+ # transaction lock per storage.
+
+ def __init__(self, timeout):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self._timeout = timeout
+ self._client = None
+ self._deadline = None
+ self._cond = threading.Condition() # Protects _client and _deadline
+ self._trigger = trigger()
+
+ def begin(self, client):
+ # Called from the restart code the "main" thread, whenever the
+ # storage lock is being acquired. (Serialized by asyncore.)
+ self._cond.acquire()
+ try:
+ assert self._client is None
+ self._client = client
+ self._deadline = time.time() + self._timeout
+ self._cond.notify()
+ finally:
+ self._cond.release()
+
+ def end(self, client):
+ # Called from the "main" thread whenever the storage lock is
+ # being released. (Serialized by asyncore.)
+ self._cond.acquire()
+ try:
+ if self._client is client:
+ self._client = None
+ self._deadline = None
+ finally:
+ self._cond.release()
+
+ def run(self):
+ # Code running in the thread.
+ while 1:
+ self._cond.acquire()
+ try:
+ while self._deadline is None:
+ self._cond.wait()
+ howlong = self._deadline - time.time()
+ if howlong <= 0:
+ # Prevent reporting timeout more than once
+ self._deadline = None
+ client = self._client # For the howlong <= 0 branch far below
+ finally:
+ self._cond.release()
+ if howlong <= 0:
+ client.log("Transaction timeout after %s seconds" %
+ self._timeout)
+ self._trigger.pull_trigger(
+ lambda client=client: client.connection.close())
+ else:
+ time.sleep(howlong)
+ self.trigger.close()
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)
=== ZODB3/ZEO/start.py 1.45.6.3 => 1.45.6.4 ===
--- ZODB3/ZEO/start.py:1.45.6.3 Fri Dec 20 15:04:01 2002
+++ ZODB3/ZEO/start.py Mon Jan 20 16:54:26 2003
@@ -152,6 +152,8 @@
attr_name -- This is the name to which the storage object
is assigned in the module.
+ -t timeout -- transaction timeout in seconds (default no timeout)
+
-P file -- Run under profile and dump output to file. Implies the
-s flag.
@@ -159,7 +161,7 @@
""" % (me, env.fs)
try:
- opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:d')
+ opts, args = getopt.getopt(args, 'p:Dh:U:sS:t:u:P:d')
except getopt.error, msg:
print usage
print msg
@@ -174,6 +176,7 @@
prof = None
detailed = 0
fs = None
+ transaction_timeout = None
for o, v in opts:
if o =='-p':
port = int(v)
@@ -189,6 +192,8 @@
detailed = 1
elif o =='-s':
Z = 0
+ elif o =='-t':
+ transaction_timeout = float(v)
elif o =='-P':
prof = v
@@ -263,7 +268,8 @@
if not unix:
unix = host, port
- ZEO.StorageServer.StorageServer(unix, storages)
+ ZEO.StorageServer.StorageServer(
+ unix, storages, transaction_timeout=transaction_timeout)
if not Z:
try: