[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo/src/ZEO/ checkpoint

Jim Fulton jim at zope.com
Wed Sep 9 17:46:11 EDT 2009


Log message for revision 103695:
  checkpoint
  

Changed:
  U   ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py
  A   ZODB/branches/jim-thready-zeo/src/ZEO/thready.py
  A   ZODB/branches/jim-thready-zeo/src/ZEO/thready.test
  U   ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py	2009-09-09 17:24:02 UTC (rev 103694)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py	2009-09-09 21:46:11 UTC (rev 103695)
@@ -34,6 +34,8 @@
 
 import ZODB.serialize
 import ZODB.TimeStamp
+
+import ZEO.thready
 import ZEO.zrpc.error
 
 import zope.interface
@@ -187,10 +189,24 @@
                 raise NotImplementedError
             self.undo = undo
 
-        self.getTid = storage.getTid
-        self.history = storage.history
-        self.load = storage.load
-        self.loadSerial = storage.loadSerial
+        self._getTid = storage.getTid
+        if (self.connection is None or
+            self.connection.peer_protocol_version <  'Z309'
+            ):
+            self.getTid = storage.getTid
+            self.history = storage.history
+            self.load = storage.load
+            self.loadSerial = storage.loadSerial
+            if hasattr(storage, 'loadBefore'):
+                self._loadBefore = storage.loadBefore
+        else:
+            self.getTid = ZEO.thready.delayed(storage.getTid)
+            self.history = ZEO.thready.delayed(storage.history)
+            self.load = ZEO.thready.delayed(storage.load)
+            self.loadSerial = ZEO.thready.delayed(storage.loadSerial)
+            if hasattr(storage, 'loadBefore'):
+                self._loadBefore = ZEO.thready.delayed(storage.loadBefore)
+
         record_iternext = getattr(storage, 'record_iternext', None)
         if record_iternext is not None:
             self.record_iternext = record_iternext
@@ -314,11 +330,11 @@
 
     def loadEx(self, oid):
         self.stats.loads += 1
-        return self.storage.load(oid, '')
+        return self.load(oid, '')
 
     def loadBefore(self, oid, tid):
         self.stats.loads += 1
-        return self.storage.loadBefore(oid, tid)
+        return self._loadBefore(oid, tid)
 
     def getInvalidations(self, tid):
         invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
@@ -330,7 +346,7 @@
 
     def verify(self, oid, tid):
         try:
-            t = self.getTid(oid)
+            t = self._getTid(oid)
         except KeyError:
             self.client.invalidateVerify(oid)
         else:
@@ -342,7 +358,7 @@
             self.verifying = 1
             self.stats.verifying_clients += 1
         try:
-            os = self.getTid(oid)
+            os = self._getTid(oid)
         except KeyError:
             self.client.invalidateVerify((oid, ''))
             # It's not clear what we should do now.  The KeyError

Modified: ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py	2009-09-09 17:24:02 UTC (rev 103694)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py	2009-09-09 21:46:11 UTC (rev 103695)
@@ -42,6 +42,7 @@
 import ZEO.ServerStub
 import ZEO.StorageServer
 import ZEO.tests.ConnectionTests
+import ZEO.thready
 import ZEO.zrpc.connection
 import ZODB
 import ZODB.blob
@@ -776,6 +777,11 @@
     >>> from ZODB.DB import DB
     >>> from persistent.mapping import PersistentMapping
     >>> from transaction import commit
+
+    >>> delayed = ZEO.thready.delayed
+    >>> ZEO.thready.delayed = lambda func: func
+
+
     >>> fs1 = FileStorage('t1.fs')
     >>> fs2 = FileStorage('t2.fs')
     >>> server = StorageServer(('', get_port()), dict(fs1=fs1, fs2=fs2))
@@ -808,6 +814,7 @@
     [10, 11, 12, 13, 14]
 
     >>> server.close_server()
+    >>> ZEO.thready.delayed = delayed
     """
 
 def getInvalidationsAfterServerRestart():
@@ -823,6 +830,10 @@
     >>> from ZODB.FileStorage import FileStorage
     >>> from ZODB.DB import DB
     >>> from persistent.mapping import PersistentMapping
+    >>> delayed = ZEO.thready.delayed
+    >>> ZEO.thready.delayed = lambda func: func
+
+
     >>> fs = FileStorage('t.fs')
     >>> db = DB(fs)
     >>> conn = db.open()
@@ -910,6 +921,9 @@
     [0, 101, 102, 103, 104]
 
     >>> fs.close()
+
+    >>> ZEO.thready.delayed = delayed
+
     """
 
 def tpc_finish_error():
@@ -1288,6 +1302,7 @@
             'zeo-fan-out.test', 'zdoptions.test',
             'drop_cache_rather_than_verify.txt', 'client-config.test',
             'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
+            '../thready.test',
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )

Added: ZODB/branches/jim-thready-zeo/src/ZEO/thready.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/thready.py	                        (rev 0)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/thready.py	2009-09-09 21:46:11 UTC (rev 103695)
@@ -0,0 +1,86 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Experimental support for thread pools
+"""
+
+import logging
+import Queue
+import sys
+import threading
+import ZEO.zrpc.connection
+
+logger = logging.getLogger(__name__)
+
+queue = Queue.Queue()
+
+stop = object()
+def run():
+    while 1:
+        try:
+            delay = queue.get()
+            if delay is stop:
+                break
+            delay()
+        except:
+            logger.critical('Error in thready job %r', delay,
+                            exc_info=sys.exc_info())
+
+nthread = 0
+def start_thread():
+    global nthread
+    nthread += 1
+    t = threading.Thread(target=run, name='thready-%s' % nthread)
+    t.setDaemon(True)
+    t.start()
+
+for i in range(4):
+    start_thread()
+
+def stop_thread():
+    queue.put(stop)
+
+class delayed(object):
+
+    def __init__(self, func):
+        self.func = func
+
+    def __get__(self, inst, class_):
+        if inst is None:
+            return self
+
+        return lambda *args: Delay(self.func, (inst,)+args)
+
+    def __call__(self, *args):
+        return Delay(self.func, args)
+
+class Delay(ZEO.zrpc.connection.Delay):
+
+    def __init__(self, func, args):
+        self.func = func
+        self.args = args
+
+    def set_sender(self, msgid, send_reply, return_error):
+        ZEO.zrpc.connection.Delay.set_sender(
+            self, msgid, send_reply, return_error)
+        queue.put(self)
+
+    def __call__(self):
+        try:
+            r = self.func(*self.args)
+        except MemoryError:
+            raise
+        except Exception:
+            self.error(sys.exc_info())
+        else:
+            self.reply(r)


Property changes on: ZODB/branches/jim-thready-zeo/src/ZEO/thready.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: ZODB/branches/jim-thready-zeo/src/ZEO/thready.test
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/thready.test	                        (rev 0)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/thready.test	2009-09-09 21:46:11 UTC (rev 103695)
@@ -0,0 +1,119 @@
+thready support
+===============
+
+The experimental thready module provides a decorator that causes
+methods to be run by a thread pool.
+
+    >>> import logging, sys, ZEO.thready
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> ZEO.thready.logger.addHandler(handler)
+    >>> ZEO.thready.logger.setLevel(logging.INFO)
+
+By default there are 4 threads.  We process jobs by adding them to the queue:
+
+    >>> import time
+    >>> def sleep():
+    ...     time.sleep(.1)
+    ...     print 'slept'
+
+    >>> for i in range(5):
+    ...     ZEO.thready.queue.put(sleep)
+
+    >>> time.sleep(.11)
+    slept
+    slept
+    slept
+    slept
+
+    >>> time.sleep(.1)
+    slept
+
+We can cause threads to stop (if they aren't busy):
+
+    >>> ZEO.thready.stop_thread()
+
+    >>> for i in range(5):
+    ...     ZEO.thready.queue.put(sleep)
+
+    >>> time.sleep(.11)
+    slept
+    slept
+    slept
+
+    >>> time.sleep(.1)
+    slept
+    slept
+
+And we can start new threads:
+
+    >>> ZEO.thready.start_thread()
+
+    >>> for i in range(5):
+    ...     ZEO.thready.queue.put(sleep)
+
+    >>> time.sleep(.15)
+    slept
+    slept
+    slept
+    slept
+
+    >>> time.sleep(.1)
+    slept
+
+Errors don't kill threads, but errors are logged:
+
+    >>> ZEO.thready.queue.put(lambda: XXX); time.sleep(.1) # doctest: +ELLIPSIS
+    Error in thready job <function <lambda> at ...>
+    Traceback (most recent call last):
+    ...
+    NameError: global name 'XXX' is not defined
+
+    >>> for i in range(5):
+    ...     ZEO.thready.queue.put(sleep)
+
+    >>> time.sleep(.11)
+    slept
+    slept
+    slept
+    slept
+
+    >>> time.sleep(.1)
+    slept
+
+There's a decorator that causes methods to be delayed:
+
+    >>> class C:
+    ...
+    ...     @ZEO.thready.delayed
+    ...     def foo(self, fail=0):
+    ...         time.sleep(0.1)
+    ...         if fail:
+    ...             raise ValueError()
+    ...         print 'foo', fail
+
+    >>> c = C()
+
+    >>> d = c.foo()
+
+We get back a delay object. Nothing happens until we call set_sender on the
+delay to tell it where to send results:
+
+    >>> def win(*args):
+    ...     print 'win', args
+    >>> def lose(*args):
+    ...     print 'lose', args
+
+    >>> d.set_sender(42, win, lose)
+    >>> time.sleep(.15)
+    foo 0
+    win (42, None)
+
+    >>> d = c.foo(1)
+    >>> d.set_sender(42, win, lose)
+    >>> time.sleep(.15)
+    lose (42, 0, <type 'exceptions.ValueError'>, ValueError())
+
+Cleanup:
+
+    >>> ZEO.thready.logger.removeHandler(handler)
+    >>> ZEO.thready.logger.setLevel(logging.NOTSET)


Property changes on: ZODB/branches/jim-thready-zeo/src/ZEO/thready.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py	2009-09-09 17:24:02 UTC (rev 103694)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py	2009-09-09 21:46:11 UTC (rev 103695)
@@ -34,6 +34,9 @@
 
 exception_type_type = type(Exception)
 
+# Exception types that should not be logged:
+unlogged_exception_types = (ZODB.POSException.POSKeyError, )
+
 ##############################################################################
 # Dedicated Client select loop:
 client_timeout = 30.0
@@ -176,7 +179,8 @@
         self.send_reply(self.msgid, obj)
 
     def error(self, exc_info):
-        log("Error raised in delayed method", logging.ERROR, exc_info=True)
+        if not isinstance(exc_info[1], unlogged_exception_types):
+            log("Error raised in delayed method", logging.ERROR, exc_info=True)
         self.return_error(self.msgid, 0, *exc_info[:2])
 
 class MTDelay(Delay):
@@ -378,9 +382,6 @@
     #         sends Z303 to server
     #     OK, because Z303 is in the server's clients_we_can_talk_to
 
-    # Exception types that should not be logged:
-    unlogged_exception_types = ()
-
     # Client constructor passes 'C' for tag, server constructor 'S'.  This
     # is used in log messages, and to determine whether we can speak with
     # our peer.
@@ -573,7 +574,7 @@
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, msg:
-            if not isinstance(msg, self.unlogged_exception_types):
+            if not isinstance(msg, unlogged_exception_types):
                 self.log("%s() raised exception: %s" % (name, msg),
                          logging.INFO, exc_info=True)
             error = sys.exc_info()[:2]
@@ -785,9 +786,6 @@
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
 
-    # Exception types that should not be logged:
-    unlogged_exception_types = (ZODB.POSException.POSKeyError, )
-
     # Servers use a shared server trigger that uses the asyncore socket map
     trigger = trigger()
 



More information about the Zodb-checkins mailing list