[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo/src/ZEO/ checkpoint
Jim Fulton
jim at zope.com
Wed Sep 9 17:46:11 EDT 2009
Log message for revision 103695:
checkpoint
Changed:
U ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py
U ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py
A ZODB/branches/jim-thready-zeo/src/ZEO/thready.py
A ZODB/branches/jim-thready-zeo/src/ZEO/thready.test
U ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py
-=-
Modified: ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py 2009-09-09 17:24:02 UTC (rev 103694)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/StorageServer.py 2009-09-09 21:46:11 UTC (rev 103695)
@@ -34,6 +34,8 @@
import ZODB.serialize
import ZODB.TimeStamp
+
+import ZEO.thready
import ZEO.zrpc.error
import zope.interface
@@ -187,10 +189,24 @@
raise NotImplementedError
self.undo = undo
- self.getTid = storage.getTid
- self.history = storage.history
- self.load = storage.load
- self.loadSerial = storage.loadSerial
+ self._getTid = storage.getTid
+ if (self.connection is None or
+ self.connection.peer_protocol_version < 'Z309'
+ ):
+ self.getTid = storage.getTid
+ self.history = storage.history
+ self.load = storage.load
+ self.loadSerial = storage.loadSerial
+ if hasattr(storage, 'loadBefore'):
+ self._loadBefore = storage.loadBefore
+ else:
+ self.getTid = ZEO.thready.delayed(storage.getTid)
+ self.history = ZEO.thready.delayed(storage.history)
+ self.load = ZEO.thready.delayed(storage.load)
+ self.loadSerial = ZEO.thready.delayed(storage.loadSerial)
+ if hasattr(storage, 'loadBefore'):
+ self._loadBefore = ZEO.thready.delayed(storage.loadBefore)
+
record_iternext = getattr(storage, 'record_iternext', None)
if record_iternext is not None:
self.record_iternext = record_iternext
@@ -314,11 +330,11 @@
def loadEx(self, oid):
self.stats.loads += 1
- return self.storage.load(oid, '')
+ return self.load(oid, '')
def loadBefore(self, oid, tid):
self.stats.loads += 1
- return self.storage.loadBefore(oid, tid)
+ return self._loadBefore(oid, tid)
def getInvalidations(self, tid):
invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
@@ -330,7 +346,7 @@
def verify(self, oid, tid):
try:
- t = self.getTid(oid)
+ t = self._getTid(oid)
except KeyError:
self.client.invalidateVerify(oid)
else:
@@ -342,7 +358,7 @@
self.verifying = 1
self.stats.verifying_clients += 1
try:
- os = self.getTid(oid)
+ os = self._getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ''))
# It's not clear what we should do now. The KeyError
Modified: ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py 2009-09-09 17:24:02 UTC (rev 103694)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/tests/testZEO.py 2009-09-09 21:46:11 UTC (rev 103695)
@@ -42,6 +42,7 @@
import ZEO.ServerStub
import ZEO.StorageServer
import ZEO.tests.ConnectionTests
+import ZEO.thready
import ZEO.zrpc.connection
import ZODB
import ZODB.blob
@@ -776,6 +777,11 @@
>>> from ZODB.DB import DB
>>> from persistent.mapping import PersistentMapping
>>> from transaction import commit
+
+ >>> delayed = ZEO.thready.delayed
+ >>> ZEO.thready.delayed = lambda func: func
+
+
>>> fs1 = FileStorage('t1.fs')
>>> fs2 = FileStorage('t2.fs')
>>> server = StorageServer(('', get_port()), dict(fs1=fs1, fs2=fs2))
@@ -808,6 +814,7 @@
[10, 11, 12, 13, 14]
>>> server.close_server()
+ >>> ZEO.thready.delayed = delayed
"""
def getInvalidationsAfterServerRestart():
@@ -823,6 +830,10 @@
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.DB import DB
>>> from persistent.mapping import PersistentMapping
+ >>> delayed = ZEO.thready.delayed
+ >>> ZEO.thready.delayed = lambda func: func
+
+
>>> fs = FileStorage('t.fs')
>>> db = DB(fs)
>>> conn = db.open()
@@ -910,6 +921,9 @@
[0, 101, 102, 103, 104]
>>> fs.close()
+
+ >>> ZEO.thready.delayed = delayed
+
"""
def tpc_finish_error():
@@ -1288,6 +1302,7 @@
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
+ '../thready.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
Added: ZODB/branches/jim-thready-zeo/src/ZEO/thready.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/thready.py (rev 0)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/thready.py 2009-09-09 21:46:11 UTC (rev 103695)
@@ -0,0 +1,86 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Experimental support for thread pools
+"""
+
+import logging
+import Queue
+import sys
+import threading
+import ZEO.zrpc.connection
+
+logger = logging.getLogger(__name__)
+
+queue = Queue.Queue()
+
+stop = object()
+def run():
+ while 1:
+ try:
+ delay = queue.get()
+ if delay is stop:
+ break
+ delay()
+ except:
+ logger.critical('Error in thready job %r', delay,
+ exc_info=sys.exc_info())
+
+nthread = 0
+def start_thread():
+ global nthread
+ nthread += 1
+ t = threading.Thread(target=run, name='thready-%s' % nthread)
+ t.setDaemon(True)
+ t.start()
+
+for i in range(4):
+ start_thread()
+
+def stop_thread():
+ queue.put(stop)
+
+class delayed(object):
+
+ def __init__(self, func):
+ self.func = func
+
+ def __get__(self, inst, class_):
+ if inst is None:
+ return self
+
+ return lambda *args: Delay(self.func, (inst,)+args)
+
+ def __call__(self, *args):
+ return Delay(self.func, args)
+
+class Delay(ZEO.zrpc.connection.Delay):
+
+ def __init__(self, func, args):
+ self.func = func
+ self.args = args
+
+ def set_sender(self, msgid, send_reply, return_error):
+ ZEO.zrpc.connection.Delay.set_sender(
+ self, msgid, send_reply, return_error)
+ queue.put(self)
+
+ def __call__(self):
+ try:
+ r = self.func(*self.args)
+ except MemoryError:
+ raise
+ except Exception:
+ self.error(sys.exc_info())
+ else:
+ self.reply(r)
Property changes on: ZODB/branches/jim-thready-zeo/src/ZEO/thready.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: ZODB/branches/jim-thready-zeo/src/ZEO/thready.test
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/thready.test (rev 0)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/thready.test 2009-09-09 21:46:11 UTC (rev 103695)
@@ -0,0 +1,119 @@
+thready support
+===============
+
+The experimental thready module provides a decorator that causes
+methods to be run by a thread pool.
+
+ >>> import logging, sys, ZEO.thready
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> ZEO.thready.logger.addHandler(handler)
+ >>> ZEO.thready.logger.setLevel(logging.INFO)
+
+By default there are 4 threads. We process jobs by adding them to the queue:
+
+ >>> import time
+ >>> def sleep():
+ ... time.sleep(.1)
+ ... print 'slept'
+
+ >>> for i in range(5):
+ ... ZEO.thready.queue.put(sleep)
+
+ >>> time.sleep(.11)
+ slept
+ slept
+ slept
+ slept
+
+ >>> time.sleep(.1)
+ slept
+
+We can cause threads to stop (if they aren't busy):
+
+ >>> ZEO.thready.stop_thread()
+
+ >>> for i in range(5):
+ ... ZEO.thready.queue.put(sleep)
+
+ >>> time.sleep(.11)
+ slept
+ slept
+ slept
+
+ >>> time.sleep(.1)
+ slept
+ slept
+
+And we can start new threads:
+
+ >>> ZEO.thready.start_thread()
+
+ >>> for i in range(5):
+ ... ZEO.thready.queue.put(sleep)
+
+ >>> time.sleep(.15)
+ slept
+ slept
+ slept
+ slept
+
+ >>> time.sleep(.1)
+ slept
+
+Errors don't kill threads, but errors are logged:
+
+ >>> ZEO.thready.queue.put(lambda: XXX); time.sleep(.1) # doctest: +ELLIPSIS
+ Error in thready job <function <lambda> at ...>
+ Traceback (most recent call last):
+ ...
+ NameError: global name 'XXX' is not defined
+
+ >>> for i in range(5):
+ ... ZEO.thready.queue.put(sleep)
+
+ >>> time.sleep(.11)
+ slept
+ slept
+ slept
+ slept
+
+ >>> time.sleep(.1)
+ slept
+
+There's a decorator that causes methods to be delayed:
+
+ >>> class C:
+ ...
+ ... @ZEO.thready.delayed
+ ... def foo(self, fail=0):
+ ... time.sleep(0.1)
+ ... if fail:
+ ... raise ValueError()
+ ... print 'foo', fail
+
+ >>> c = C()
+
+ >>> d = c.foo()
+
+We get back a delay object. Nothing happens until we call set_sender on the
+delay to tell it where to send results:
+
+ >>> def win(*args):
+ ... print 'win', args
+ >>> def lose(*args):
+ ... print 'lose', args
+
+ >>> d.set_sender(42, win, lose)
+ >>> time.sleep(.15)
+ foo 0
+ win (42, None)
+
+ >>> d = c.foo(1)
+ >>> d.set_sender(42, win, lose)
+ >>> time.sleep(.15)
+ lose (42, 0, <type 'exceptions.ValueError'>, ValueError())
+
+Cleanup:
+
+ >>> ZEO.thready.logger.removeHandler(handler)
+ >>> ZEO.thready.logger.setLevel(logging.NOTSET)
Property changes on: ZODB/branches/jim-thready-zeo/src/ZEO/thready.test
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py 2009-09-09 17:24:02 UTC (rev 103694)
+++ ZODB/branches/jim-thready-zeo/src/ZEO/zrpc/connection.py 2009-09-09 21:46:11 UTC (rev 103695)
@@ -34,6 +34,9 @@
exception_type_type = type(Exception)
+# Exception types that should not be logged:
+unlogged_exception_types = (ZODB.POSException.POSKeyError, )
+
##############################################################################
# Dedicated Client select loop:
client_timeout = 30.0
@@ -176,7 +179,8 @@
self.send_reply(self.msgid, obj)
def error(self, exc_info):
- log("Error raised in delayed method", logging.ERROR, exc_info=True)
+ if not isinstance(exc_info[1], unlogged_exception_types):
+ log("Error raised in delayed method", logging.ERROR, exc_info=True)
self.return_error(self.msgid, 0, *exc_info[:2])
class MTDelay(Delay):
@@ -378,9 +382,6 @@
# sends Z303 to server
# OK, because Z303 is in the server's clients_we_can_talk_to
- # Exception types that should not be logged:
- unlogged_exception_types = ()
-
# Client constructor passes 'C' for tag, server constructor 'S'. This
# is used in log messages, and to determine whether we can speak with
# our peer.
@@ -573,7 +574,7 @@
except (SystemExit, KeyboardInterrupt):
raise
except Exception, msg:
- if not isinstance(msg, self.unlogged_exception_types):
+ if not isinstance(msg, unlogged_exception_types):
self.log("%s() raised exception: %s" % (name, msg),
logging.INFO, exc_info=True)
error = sys.exc_info()[:2]
@@ -785,9 +786,6 @@
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
- # Exception types that should not be logged:
- unlogged_exception_types = (ZODB.POSException.POSKeyError, )
-
# Servers use a shared server trigger that uses the asyncore socket map
trigger = trigger()
More information about the Zodb-checkins
mailing list