[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.10.8.2
Jeremy Hylton
jeremy at zope.com
Wed Aug 20 17:29:52 EDT 2003
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv21966/ZEO
Modified Files:
Tag: ZODB3-vote-backoff-branch
StorageServer.py
Log Message:
Run prepare phase of 2PC in a separate thread for large txns.
Rename one of the restart() methods prepare(). prepare() starts a
transaction from its commit log. It runs from tpc_begin() through to
the call that blocked the txn, usually vote().
Look for a magic number of store calls in the txn (hard-coded to 25
for now). If the number of stores calls is bigger than the magic
number, then the prepare is considered "slow" and run in a separate
thread. The use of a thread allows the mainloop to handle other
requests.
Change the signature of prepare() so that it explicitly takes a delay
argument, which can be None. This simplifies the control flow a
little, but it is still too complicated.
Extend run_in_thread() so that you can pass it a delay. When a client
has an already blocked txn with a delay, just use that delay for the
restarted thread.
=== ZODB3/ZEO/StorageServer.py 1.74.2.10.8.1 => 1.74.2.10.8.2 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.10.8.1 Wed Aug 20 13:41:13 2003
+++ ZODB3/ZEO/StorageServer.py Wed Aug 20 16:29:51 2003
@@ -569,11 +569,10 @@
self.timeout.begin(self)
self.strategy = self.ImmediateCommitStrategyClass(
self.storage, self.client, self.log)
- resp = old_strategy.restart(self.strategy)
- if delay is not None:
- delay.reply(resp)
- else:
- return resp
+ resp = old_strategy.prepare(self.strategy, delay)
+ # If delay is None, then we were called from restart_other()
+ # and the return value doesn't matter.
+ return resp
# A ZEOStorage instance can use different strategies to commit a
# transaction. The current implementation uses different strategies
@@ -760,11 +759,26 @@
self.args = trans_id,
return self.block()
- def restart(self, new_strategy):
- # called by the storage when the storage is available
+ def prepare(self, new_strategy, delay):
+ # Called when the storage is available: acquires the lock
+ # and performs all the stores up to the method that actually
+ # blocked. The prepare() method is a wrapper that decides
+ # whether to use another thread. The real work is done in
+ # _prepare().
assert isinstance(new_strategy, ImmediateCommitStrategy)
+ if self.log.stores > 25:
+ # If there are a lot of stores, fire off a separate thread
+ # to avoid blocking other clients.
+ return run_in_thread(self._prepare, new_strategy, delay=delay)
+ else:
+ r = self._prepare(new_strategy)
+ if delay is not None:
+ delay.reply(r)
+ else:
+ return r
+
+ def _prepare(self, new_strategy):
new_strategy.tpc_begin(self.txn, self.tid, self.status)
- print "restarting blocked txn", self.log.stores
loads, loader = self.log.get_loader()
for i in range(loads):
oid, serial, data, version = loader.load()
@@ -850,8 +864,12 @@
time.sleep(howlong)
self.trigger.close()
-def run_in_thread(method, *args):
- t = SlowMethodThread(method, args)
+def run_in_thread(method, *args, **kw):
+ # support kw only to allow delay keyword arg to be used
+ delay = None
+ if kw:
+ delay = kw["delay"]
+ t = SlowMethodThread(method, args, delay)
t.start()
return t.delay
@@ -868,11 +886,11 @@
# avoid blocking, we spawn a separate thread, return an MTDelay()
# object, and have the thread reply() when it finishes.
- def __init__(self, method, args):
+ def __init__(self, method, args, delay=None):
threading.Thread.__init__(self)
self._method = method
self._args = args
- self.delay = MTDelay()
+ self.delay = delay or MTDelay()
def run(self):
try:
More information about the Zodb-checkins
mailing list