[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Provide shorter code path for loads, which are most common operation.
Jim Fulton
jim at zope.com
Wed Apr 6 10:23:24 EDT 2011
Log message for revision 121306:
Provide shorter code path for loads, which are most common operation.
Simplified and optimized marshalling code.
Changed:
U ZODB/trunk/src/ZEO/tests/ConnectionTests.py
U ZODB/trunk/src/ZEO/zrpc/connection.py
U ZODB/trunk/src/ZEO/zrpc/marshal.py
-=-
Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2011-04-06 13:50:21 UTC (rev 121305)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2011-04-06 14:23:24 UTC (rev 121306)
@@ -24,7 +24,7 @@
import ZEO.ServerStub
from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected
-from ZEO.zrpc.marshal import Marshaller
+from ZEO.zrpc.marshal import encode
from ZEO.tests import forker
from ZODB.DB import DB
@@ -475,7 +475,7 @@
class Hack:
pass
- msg = Marshaller().encode(1, 0, "foo", (Hack(),))
+ msg = encode(1, 0, "foo", (Hack(),))
self._bad_message(msg)
del Hack
Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py 2011-04-06 13:50:21 UTC (rev 121305)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py 2011-04-06 14:23:24 UTC (rev 121306)
@@ -13,11 +13,13 @@
##############################################################################
import asyncore
import atexit
+import cPickle
import errno
import select
import sys
import threading
import logging
+import ZEO.zrpc.marshal
import traceback, time
@@ -25,7 +27,6 @@
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
-from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
from ZEO.zrpc.log import short_repr, log
from ZODB.loglevels import BLATHER, TRACE
import ZODB.POSException
@@ -287,7 +288,10 @@
# our peer.
def __init__(self, sock, addr, obj, tag, map=None):
self.obj = None
- self.marshal = Marshaller()
+ self.decode = ZEO.zrpc.marshal.decode
+ self.encode = ZEO.zrpc.marshal.encode
+ self.fast_encode = ZEO.zrpc.marshal.fast_encode
+
self.closed = False
self.peer_protocol_version = None # set in recv_handshake()
@@ -413,13 +417,34 @@
# will raise an exception. The exception will ultimately
# result in asycnore calling handle_error(), which will
# close the connection.
- msgid, async, name, args = self.marshal.decode(message)
+ msgid, async, name, args = self.decode(message)
if debug_zrpc:
self.log("recv msg: %s, %s, %s, %s" % (msgid, async, name,
short_repr(args)),
level=TRACE)
- if name == REPLY:
+
+ if name == 'loadEx':
+
+ # Special case and inline the heck out of load case:
+ try:
+ ret = self.obj.loadEx(*args)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, msg:
+ if not isinstance(msg, self.unlogged_exception_types):
+ self.log("%s() raised exception: %s" % (name, msg),
+ logging.ERROR, exc_info=True)
+ self.return_error(msgid, *sys.exc_info()[:2])
+ else:
+ try:
+ self.message_output(self.fast_encode(msgid, 0, REPLY, ret))
+ self.poll()
+ except:
+ # Fall back to normal version for better error handling
+ self.send_reply(msgid, ret)
+
+ elif name == REPLY:
assert not async
self.handle_reply(msgid, args)
else:
@@ -493,14 +518,14 @@
# it's acceptable -- we really do want to catch every exception
# cPickle may raise.
try:
- msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
+ msg = self.encode(msgid, 0, REPLY, (err_type, err_value))
except: # see above
try:
r = short_repr(err_value)
except:
r = "<unreprable>"
err = ZRPCError("Couldn't pickle error %.100s" % r)
- msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+ msg = self.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self.poll()
@@ -527,7 +552,7 @@
if debug_zrpc:
self.log("send msg: %d, %d, %s, ..." % (msgid, async, method),
level=TRACE)
- buf = self.marshal.encode(msgid, async, method, args)
+ buf = self.encode(msgid, async, method, args)
self.message_output(buf)
return msgid
@@ -560,7 +585,7 @@
The calls will not be interleaved with other calls from the same
client.
"""
- self.message_output(self.marshal.encode(0, 1, method, args)
+ self.message_output(self.encode(0, 1, method, args)
for method, args in iterator)
def handle_reply(self, msgid, ret):
@@ -573,6 +598,8 @@
self.trigger.pull_trigger()
+# import cProfile, time
+
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
@@ -583,7 +610,9 @@
self.mgr = mgr
map = {}
Connection.__init__(self, sock, addr, obj, 'S', map=map)
- self.marshal = ServerMarshaller()
+
+ self.decode = ZEO.zrpc.marshal.server_decode
+
self.trigger = ZEO.zrpc.trigger.trigger(map)
self.call_from_thread = self.trigger.pull_trigger
@@ -591,6 +620,15 @@
t.setDaemon(True)
t.start()
+ # self.profile = cProfile.Profile()
+
+ # def message_input(self, message):
+ # self.profile.enable()
+ # try:
+ # Connection.message_input(self, message)
+ # finally:
+ # self.profile.disable()
+
def handshake(self):
# Send the server's preferred protocol to the client.
self.message_output(self.current_protocol)
@@ -602,6 +640,7 @@
def close(self):
self.obj.notifyDisconnected()
Connection.close(self)
+ # self.profile.dump_stats(str(time.time())+'.stats')
def send_reply(self, msgid, ret, immediately=True):
# encode() can pass on a wide variety of exceptions from cPickle.
@@ -609,14 +648,14 @@
# it's acceptable -- we really do want to catch every exception
# cPickle may raise.
try:
- msg = self.marshal.encode(msgid, 0, REPLY, ret)
+ msg = self.encode(msgid, 0, REPLY, ret)
except: # see above
try:
r = short_repr(ret)
except:
r = "<unreprable>"
err = ZRPCError("Couldn't pickle return %.100s" % r)
- msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+ msg = self.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
if immediately:
self.poll()
Modified: ZODB/trunk/src/ZEO/zrpc/marshal.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/marshal.py 2011-04-06 13:50:21 UTC (rev 121305)
+++ ZODB/trunk/src/ZEO/zrpc/marshal.py 2011-04-06 14:23:24 UTC (rev 121306)
@@ -11,61 +11,66 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
-import cPickle
+from cPickle import Unpickler, Pickler
from cStringIO import StringIO
import logging
from ZEO.zrpc.error import ZRPCError
from ZEO.zrpc.log import log, short_repr
-class Marshaller:
- """Marshal requests and replies to second across network"""
+def encode(*args): # args: (msgid, flags, name, args)
+ # (We used to have a global pickler, but that's not thread-safe. :-( )
- def encode(self, msgid, flags, name, args):
- """Returns an encoded message"""
- # (We used to have a global pickler, but that's not thread-safe. :-( )
- # Note that args may contain very large binary pickles already; for
- # this reason, it's important to use proto 1 (or higher) pickles here
- # too. For a long time, this used proto 0 pickles, and that can
- # bloat our pickle to 4x the size (due to high-bit and control bytes
- # being represented by \xij escapes in proto 0).
- # Undocumented: cPickle.Pickler accepts a lone protocol argument;
- # pickle.py does not.
- pickler = cPickle.Pickler(1)
- pickler.fast = 1
+ # It's not thread safe if, in the couse of pickling, we call the
+ # Python interpeter, which releases the GIL.
- # Undocumented: pickler.dump(), for a cPickle.Pickler, takes
- # an optional boolean argument. When true, it returns the pickle;
- # when false or unspecified, it returns the pickler object itself.
- # pickle.py does none of this.
- return pickler.dump((msgid, flags, name, args), 1)
+ # Note that args may contain very large binary pickles already; for
+ # this reason, it's important to use proto 1 (or higher) pickles here
+ # too. For a long time, this used proto 0 pickles, and that can
+ # bloat our pickle to 4x the size (due to high-bit and control bytes
+ # being represented by \xij escapes in proto 0).
+ # Undocumented: cPickle.Pickler accepts a lone protocol argument;
+ # pickle.py does not.
+ pickler = Pickler(1)
+ pickler.fast = 1
+ return pickler.dump(args, 1)
- def decode(self, msg):
- """Decodes msg and returns its parts"""
- unpickler = cPickle.Unpickler(StringIO(msg))
- unpickler.find_global = find_global
- try:
- return unpickler.load() # msgid, flags, name, args
- except:
- log("can't decode message: %s" % short_repr(msg),
- level=logging.ERROR)
- raise
+ at apply
+def fast_encode():
+ # Only use in cases where you *know* the data contains only basic
+ # Python objects
+ pickler = Pickler(1)
+ pickler.fast = 1
+ dump = pickler.dump
+ def fast_encode(*args):
+ return dump(args, 1)
+ return fast_encode
-class ServerMarshaller(Marshaller):
+def decode(msg):
+ """Decodes msg and returns its parts"""
+ unpickler = Unpickler(StringIO(msg))
+ unpickler.find_global = find_global
- def decode(self, msg):
- """Decodes msg and returns its parts"""
- unpickler = cPickle.Unpickler(StringIO(msg))
- unpickler.find_global = server_find_global
+ try:
+ return unpickler.load() # msgid, flags, name, args
+ except:
+ log("can't decode message: %s" % short_repr(msg),
+ level=logging.ERROR)
+ raise
- try:
- return unpickler.load() # msgid, flags, name, args
- except:
- log("can't decode message: %s" % short_repr(msg),
- level=logging.ERROR)
- raise
+def server_decode(msg):
+ """Decodes msg and returns its parts"""
+ unpickler = Unpickler(StringIO(msg))
+ unpickler.find_global = server_find_global
+ try:
+ return unpickler.load() # msgid, flags, name, args
+ except:
+ log("can't decode message: %s" % short_repr(msg),
+ level=logging.ERROR)
+ raise
+
_globals = globals()
_silly = ('__doc__',)
More information about the Zodb-checkins
mailing list