[Zodb-checkins] SVN: ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py Distributed client- or server-specific methods to appropriate subclasses.
Jim Fulton
jim at zope.com
Wed Jan 27 13:07:57 EST 2010
Log message for revision 108575:
Distributed client- or server-specific methods to appropriate subclasses.
Changed:
U ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py
-=-
Modified: ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py 2010-01-27 18:06:18 UTC (rev 108574)
+++ ZODB/branches/jim-zrpc/src/ZEO/zrpc/connection.py 2010-01-27 18:07:57 UTC (rev 108575)
@@ -189,7 +189,7 @@
def error(self, exc_info):
log("Error raised in delayed method", logging.ERROR, exc_info=True)
- self.return_error(self.msgid, 0, *exc_info[:2])
+ self.return_error(self.msgid, *exc_info[:2])
class MTDelay(Delay):
@@ -541,21 +541,11 @@
short_repr(args)),
level=TRACE)
if name == REPLY:
- self.handle_reply(msgid, async, args)
+ assert not async
+ self.handle_reply(msgid, args)
else:
self.handle_request(msgid, async, name, args)
- def handle_reply(self, msgid, async, args):
- if debug_zrpc:
- self.log("recv reply: %s, %s, %s"
- % (msgid, async, short_repr(args)), level=TRACE)
- self.replies_cond.acquire()
- try:
- self.replies[msgid] = args
- self.replies_cond.notifyAll()
- finally:
- self.replies_cond.release()
-
def handle_request(self, msgid, async, name, args):
obj = self.obj
@@ -587,8 +577,14 @@
self.log("%s() raised exception: %s" % (name, msg),
logging.ERROR, exc_info=True)
error = sys.exc_info()[:2]
- return self.return_error(msgid, async, *error)
+ if async:
+ self.log("Asynchronous call raised exception: %s" % self,
+ level=logging.ERROR, exc_info=True)
+ else:
+ self.return_error(msgid, *error)
+ return
+
if async:
if ret is not None:
raise ZRPCError("async method %s returned value %s" %
@@ -606,35 +602,11 @@
self.__super_setSessionKey(self.delay_sesskey)
self.delay_sesskey = None
- def handle_error(self):
- if sys.exc_info()[0] == SystemExit:
- raise sys.exc_info()
- self.log("Error caught in asyncore",
- level=logging.ERROR, exc_info=True)
- self.close()
+ def return_error(self, msgid, err_type, err_value):
+ # Note that, ideally, this should be defined soley for
+ # servers, but a test arranges to get it called on
+ # a client. Too much trouble to fix it now. :/
- def send_reply(self, msgid, ret):
- # encode() can pass on a wide variety of exceptions from cPickle.
- # While a bare `except` is generally poor practice, in this case
- # it's acceptable -- we really do want to catch every exception
- # cPickle may raise.
- try:
- msg = self.marshal.encode(msgid, 0, REPLY, ret)
- except: # see above
- try:
- r = short_repr(ret)
- except:
- r = "<unreprable>"
- err = ZRPCError("Couldn't pickle return %.100s" % r)
- msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
- self.message_output(msg)
- self.poll()
-
- def return_error(self, msgid, async, err_type, err_value):
- if async:
- self.log("Asynchronous call raised exception: %s" % self,
- level=logging.ERROR, exc_info=True)
- return
if not isinstance(err_value, Exception):
err_value = err_type, err_value
@@ -654,6 +626,13 @@
self.message_output(msg)
self.poll()
+ def handle_error(self):
+ if sys.exc_info()[0] == SystemExit:
+ raise sys.exc_info()
+ self.log("Error caught in asyncore",
+ level=logging.ERROR, exc_info=True)
+ self.close()
+
def setSessionKey(self, key):
if self.waiting_for_reply:
self.delay_sesskey = key
@@ -680,7 +659,7 @@
level=TRACE)
return self.marshal.encode(msgid, async, method, args)
- def send_call(self, method, args, async):
+ def send_call(self, method, args, async=False):
# send a message and return its msgid
msgid = self.__new_msgid()
if debug_zrpc:
@@ -690,39 +669,6 @@
self.message_output(buf)
return msgid
- def call(self, method, *args):
- if self.closed:
- raise DisconnectedError()
- msgid = self.send_call(method, args, 0)
- r_args = self.wait(msgid)
- if (isinstance(r_args, tuple) and len(r_args) > 1
- and type(r_args[0]) == exception_type_type
- and issubclass(r_args[0], Exception)):
- inst = r_args[1]
- raise inst # error raised by server
- else:
- return r_args
-
- # For testing purposes, it is useful to begin a synchronous call
- # but not block waiting for its response.
-
- def _deferred_call(self, method, *args):
- if self.closed:
- raise DisconnectedError()
- msgid = self.send_call(method, args, 0)
- self.trigger.pull_trigger()
- return msgid
-
- def _deferred_wait(self, msgid):
- r_args = self.wait(msgid)
- if (isinstance(r_args, tuple)
- and type(r_args[0]) == exception_type_type
- and issubclass(r_args[0], Exception)):
- inst = r_args[1]
- raise inst # error raised by server
- else:
- return r_args
-
def callAsync(self, method, *args):
if self.closed:
raise DisconnectedError()
@@ -750,33 +696,9 @@
yield self.__call_message(method, args, 1)
- def wait(self, msgid):
- """Invoke asyncore mainloop and wait for reply."""
- if debug_zrpc:
- self.log("wait(%d)" % msgid, level=TRACE)
+ def handle_reply(self, msgid, ret):
+ assert msgid == -1 and ret is None
- self.trigger.pull_trigger()
-
- # Delay used when we call asyncore.poll() directly.
- # Start with a 1 msec delay, double until 1 sec.
- delay = 0.001
-
- self.replies_cond.acquire()
- try:
- while 1:
- if self.closed:
- raise DisconnectedError()
- reply = self.replies.get(msgid, self)
- if reply is not self:
- del self.replies[msgid]
- if debug_zrpc:
- self.log("wait(%d): reply=%s" %
- (msgid, short_repr(reply)), level=TRACE)
- return reply
- self.replies_cond.wait()
- finally:
- self.replies_cond.release()
-
def flush(self):
"""Invoke poll() until the output buffer is empty."""
if debug_zrpc:
@@ -791,7 +713,6 @@
self.trigger.pull_trigger()
-
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
@@ -818,6 +739,23 @@
self.obj.notifyDisconnected()
Connection.close(self)
+ def send_reply(self, msgid, ret):
+ # encode() can pass on a wide variety of exceptions from cPickle.
+ # While a bare `except` is generally poor practice, in this case
+ # it's acceptable -- we really do want to catch every exception
+ # cPickle may raise.
+ try:
+ msg = self.marshal.encode(msgid, 0, REPLY, ret)
+ except: # see above
+ try:
+ r = short_repr(ret)
+ except:
+ r = "<unreprable>"
+ err = ZRPCError("Couldn't pickle return %.100s" % r)
+ msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+ self.message_output(msg)
+ self.poll()
+
class ManagedClientConnection(Connection):
"""Client-side Connection subclass."""
__super_init = Connection.__init__
@@ -887,3 +825,79 @@
self.queue_output = False
finally:
self.output_lock.release()
+
+ def call(self, method, *args):
+ if self.closed:
+ raise DisconnectedError()
+ msgid = self.send_call(method, args)
+ r_args = self.wait(msgid)
+ if (isinstance(r_args, tuple) and len(r_args) > 1
+ and type(r_args[0]) == exception_type_type
+ and issubclass(r_args[0], Exception)):
+ inst = r_args[1]
+ raise inst # error raised by server
+ else:
+ return r_args
+
+ def wait(self, msgid):
+ """Invoke asyncore mainloop and wait for reply."""
+ if debug_zrpc:
+ self.log("wait(%d)" % msgid, level=TRACE)
+
+ self.trigger.pull_trigger()
+
+ # Delay used when we call asyncore.poll() directly.
+ # Start with a 1 msec delay, double until 1 sec.
+ delay = 0.001
+
+ self.replies_cond.acquire()
+ try:
+ while 1:
+ if self.closed:
+ raise DisconnectedError()
+ reply = self.replies.get(msgid, self)
+ if reply is not self:
+ del self.replies[msgid]
+ if debug_zrpc:
+ self.log("wait(%d): reply=%s" %
+ (msgid, short_repr(reply)), level=TRACE)
+ return reply
+ self.replies_cond.wait()
+ finally:
+ self.replies_cond.release()
+
+ # For testing purposes, it is useful to begin a synchronous call
+ # but not block waiting for its response.
+
+ def _deferred_call(self, method, *args):
+ if self.closed:
+ raise DisconnectedError()
+ msgid = self.send_call(method, args)
+ self.trigger.pull_trigger()
+ return msgid
+
+ def _deferred_wait(self, msgid):
+ r_args = self.wait(msgid)
+ if (isinstance(r_args, tuple)
+ and type(r_args[0]) == exception_type_type
+ and issubclass(r_args[0], Exception)):
+ inst = r_args[1]
+ raise inst # error raised by server
+ else:
+ return r_args
+
+ def handle_reply(self, msgid, args):
+ if debug_zrpc:
+ self.log("recv reply: %s, %s"
+ % (msgid, short_repr(args)), level=TRACE)
+ self.replies_cond.acquire()
+ try:
+ self.replies[msgid] = args
+ self.replies_cond.notifyAll()
+ finally:
+ self.replies_cond.release()
+
+ def send_reply(self, msgid, ret):
+ # Whimper. Used to send heartbeat
+ assert msgid == -1 and ret is None
+ self.message_output('(J\xff\xff\xff\xffK\x00U\x06.replyNt.')
More information about the Zodb-checkins
mailing list