[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.56
Jeremy Hylton
jeremy@zope.com
Wed, 11 Sep 2002 17:40:39 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv6396
Modified Files:
StorageServer.py
Log Message:
Implement synchronous pack() invocation with new SlowMethodThread class.
=== ZODB3/ZEO/StorageServer.py 1.55 => 1.56 ===
--- ZODB3/ZEO/StorageServer.py:1.55 Thu Sep 5 13:51:10 2002
+++ ZODB3/ZEO/StorageServer.py Wed Sep 11 17:40:39 2002
@@ -240,32 +240,20 @@
def endZeoVerify(self):
self.client.endVerify()
- def pack(self, t, wait=None):
+ def pack(self, time, wait=None):
if wait is not None:
- wait = MTDelay()
- t = threading.Thread(target=self._pack, args=(t, wait))
- t.start()
- if wait is not None:
- return wait
+ return run_in_thread(self._pack, time)
else:
+ # If the client isn't waiting for a reply, start a thread
+ # and forget about it.
+ t = threading.Thread(target=self._pack, args=(time,))
+ t.start()
return None
- def _pack(self, t, delay):
- try:
- self.__storage.pack(t, referencesf)
- except:
- self._log('Pack failed for %s' % self.__storage_id,
- zLOG.ERROR,
- error=sys.exc_info())
- if delay is not None:
- raise
- else:
- if delay is None:
- # Broadcast new size statistics
- self.server.invalidate(0, self.__storage_id, (),
- self.get_size_info())
- else:
- delay.reply(None)
+ def _pack(self, time):
+ self.__storage.pack(time, referencesf)
+ # Broadcast new size statistics
+ self.server.invalidate(0, self.__storage_id, (), self.get_size_info())
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
@@ -583,3 +571,35 @@
new_strategy.store(oid, serial, data, version)
meth = getattr(new_strategy, self.name)
return meth(*self.args)
+
+def run_in_thread(method, *args):
+ t = SlowMethodThread(method, args)
+ t.start()
+ return t.delay
+
+class SlowMethodThread(threading.Thread):
+ """Thread to run potentially slow storage methods.
+
+ Clients can use the delay attribute to access the MTDelay object
+ used to send a zrpc response at the right time.
+ """
+
+ # Some storage methods can take a long time to complete. If we
+ # run these methods via a standard asyncore read handler, they
+ # will block all other server activity until they complete. To
+ # avoid blocking, we spawn a separate thread, return an MTDelay()
+ # object, and have the thread reply() when it finishes.
+
+ def __init__(self, method, args):
+ threading.Thread.__init__(self)
+ self._method = method
+ self._args = args
+ self.delay = MTDelay()
+
+ def run(self):
+ try:
+ result = self._method(*self._args)
+ except Exception:
+ self.delay.error(sys.exc_info())
+ else:
+ self.delay.reply(result)