[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.56

Jeremy Hylton jeremy@zope.com
Wed, 11 Sep 2002 17:40:39 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv6396

Modified Files:
	StorageServer.py 
Log Message:
Implement synchronous pack() invocation with new SlowMethodThread class.


=== ZODB3/ZEO/StorageServer.py 1.55 => 1.56 ===
--- ZODB3/ZEO/StorageServer.py:1.55	Thu Sep  5 13:51:10 2002
+++ ZODB3/ZEO/StorageServer.py	Wed Sep 11 17:40:39 2002
@@ -240,32 +240,20 @@
     def endZeoVerify(self):
         self.client.endVerify()
 
-    def pack(self, t, wait=None):
+    def pack(self, time, wait=None):
         if wait is not None:
-            wait = MTDelay()
-        t = threading.Thread(target=self._pack, args=(t, wait))
-        t.start()
-        if wait is not None:
-            return wait
+            return run_in_thread(self._pack, time)
         else:
+            # If the client isn't waiting for a reply, start a thread
+            # and forget about it.
+            t = threading.Thread(target=self._pack, args=(time,))
+            t.start()
             return None
 
-    def _pack(self, t, delay):
-        try:
-            self.__storage.pack(t, referencesf)
-        except:
-            self._log('Pack failed for %s' % self.__storage_id,
-                      zLOG.ERROR,
-                      error=sys.exc_info())
-            if delay is not None:
-                raise
-        else:
-            if delay is None:
-                # Broadcast new size statistics
-                self.server.invalidate(0, self.__storage_id, (),
-                                       self.get_size_info())
-            else:
-                delay.reply(None)
+    def _pack(self, time):
+        self.__storage.pack(time, referencesf)
+        # Broadcast new size statistics
+        self.server.invalidate(0, self.__storage_id, (), self.get_size_info())
 
     def new_oids(self, n=100):
         """Return a sequence of n new oids, where n defaults to 100"""
@@ -583,3 +571,35 @@
             new_strategy.store(oid, serial, data, version)
         meth = getattr(new_strategy, self.name)
         return meth(*self.args)
+
+def run_in_thread(method, *args):
+    t = SlowMethodThread(method, args)
+    t.start()
+    return t.delay
+
+class SlowMethodThread(threading.Thread):
+    """Thread to run potentially slow storage methods.
+
+    Clients can use the delay attribute to access the MTDelay object
+    used to send a zrpc response at the right time.
+    """
+
+    # Some storage methods can take a long time to complete.  If we
+    # run these methods via a standard asyncore read handler, they
+    # will block all other server activity until they complete.  To
+    # avoid blocking, we spawn a separate thread, return an MTDelay()
+    # object, and have the thread reply() when it finishes.
+
+    def __init__(self, method, args):
+        threading.Thread.__init__(self)
+        self._method = method
+        self._args = args
+        self.delay = MTDelay()
+
+    def run(self):
+        try:
+            result = self._method(*self._args)
+        except Exception:
+            self.delay.error(sys.exc_info())
+        else:
+            self.delay.reply(result)