[Zodb-checkins] CVS: StandaloneZODB/ZEO/zrpc - connection.py:1.1.2.2
Jeremy Hylton
jeremy@zope.com
Wed, 16 Jan 2002 20:40:40 -0500
Update of /cvs-repository/StandaloneZODB/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv13774/zrpc
Modified Files:
Tag: Standby-branch
connection.py
Log Message:
Many small cleanups and improvements.
Get rid of Handler and set_caller() mechanism. It was unused.
Replace _do_io() with two specialized methods, one of which is only
called by _call(). This should have been more obvious because the
behavior of _do_io() depended entirely on whether the wait kwarg with
0 or 1.
Use short_repr() in conjunction with low-level log() calls.
Extend doc string on Connection.
Simplify error/logging for return val from async call.
Add _ check to check_method() as suggested by Jim.
Extend ManagedServerConnection protocol to use notifyConnected().
This simplifies the newConnection() dance in StorageServer.
=== StandaloneZODB/ZEO/zrpc/connection.py 1.1.2.1 => 1.1.2.2 ===
from ZEO import smac # XXX put smac in zrpc?
from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
-from ZEO.zrpc.log import log
+from ZEO.zrpc.log import log, short_repr
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
@@ -15,19 +15,6 @@
REPLY = ".reply" # message name used for replies
ASYNC = 1
-# XXX get rid of this class and use hasattr()
-class Handler:
- """Base class used to handle RPC caller discovery"""
-
- def set_caller(self, addr):
- self.__caller = addr
-
- def get_caller(self):
- return self.__caller
-
- def clear_caller(self):
- self.__caller = None
-
class Delay:
"""Used to delay response to client for synchronous calls
@@ -44,7 +31,7 @@
self.send_reply(self.msgid, obj)
class Connection(smac.SizedMessageAsyncConnection):
- """Dispatcher for RPC on object
+ """Dispatcher for RPC on object on both sides of socket.
The connection supports synchronous calls, which expect a return,
and asynchronous calls that do not.
@@ -55,7 +42,11 @@
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
The current design only allows a single synchronous call to be
- outstanding.
+ outstanding.
+
+ A socket connection between a client and a server allows either
+ side to invoke methods on the other side. The processes on each
+ end of the socket use a Connection object to manage communication.
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
@@ -63,7 +54,7 @@
__super_writable = smac.SizedMessageAsyncConnection.writable
def __init__(self, sock, addr, obj=None):
- self.obj = obj
+ self.obj = None
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
@@ -83,19 +74,12 @@
# waiting for a response
self.__reply_lock = threading.Lock()
self.__reply_lock.acquire()
- # If the object implements the Handler interface (XXX checked
- # by isinstance), it wants to know who the caller is.
- if isinstance(obj, Handler):
- self.set_caller = 1
- else:
- self.set_caller = 0
+ self.register_object(obj)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
def close(self):
- caller = sys._getframe(1).f_code.co_name
- log("close() caller=%s" % caller)
if self.closed:
return
self.closed = 1
@@ -103,6 +87,7 @@
self.__super_close()
def close_trigger(self):
+ # overridden by ManagedConnection
if self.trigger is not None:
self.trigger.close()
@@ -122,7 +107,7 @@
if __debug__:
log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
- repr(args)[:40]),
+ short_repr(args)),
level=zLOG.DEBUG)
if name == REPLY:
self.handle_reply(msgid, flags, args)
@@ -137,28 +122,13 @@
self.__reply_lock.release() # will fail if lock is unlocked
def handle_request(self, msgid, flags, name, args):
- if __debug__:
- log("call %s%s on %s" % (name, repr(args)[:40], repr(self.obj)),
- zLOG.DEBUG)
if not self.check_method(name):
raise ZRPCError("Invalid method name: %s on %s" % (name,
`self.obj`))
meth = getattr(self.obj, name)
try:
- if self.set_caller:
- self.obj.set_caller(self)
- try:
- ret = meth(*args)
- finally:
- self.obj.clear_caller()
- else:
- ret = meth(*args)
- except (POSException.UndoError,
- POSException.VersionCommitError), msg:
- error = sys.exc_info()[:2]
- log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
- return self.return_error(msgid, flags, error[0], error[1])
+ ret = meth(*args)
except Exception, msg:
error = sys.exc_info()[:2]
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
@@ -166,12 +136,11 @@
if flags & ASYNC:
if ret is not None:
- log("async method %s returned value %s" % (name, repr(ret)),
- zLOG.ERROR)
- raise ZRPCError("async method returned value")
+ raise ZRPCError("async method %s returned value %s" %
+ (name, repr(ret))
else:
if __debug__:
- log("%s return %s" % (name, repr(ret)[:40]), zLOG.DEBUG)
+ log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply)
else:
@@ -182,12 +151,12 @@
self.close()
def log_error(self, msg="No error message supplied"):
- error = sys.exc_info()
- log(msg, zLOG.ERROR, error=error)
- del error
+ log(msg, zLOG.ERROR, error=sys.exc_info())
def check_method(self, name):
- # XXX minimal security check should go here: Is name exported?
+ # XXX Is this sufficient "security" for now?
+ if name.startswith('_'):
+ return None
return hasattr(self.obj, name)
def send_reply(self, msgid, ret):
@@ -210,13 +179,10 @@
err = ZRPCError("Couldn't pickle error %s" % `err_value`)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
- self._do_io()
+ self._do_async_poll()
- # The next two methods are used by clients to invoke methods on
- # remote objects
-
- # XXX Should revise design to allow multiple outstanding
- # synchronous calls
+ # The next two public methods (call and callAsync) are used by
+ # clients to invoke methods on remote objects
def call(self, method, *args):
self.__call_lock.acquire()
@@ -234,10 +200,12 @@
log("send msg: %d, 0, %s, ..." % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
+ # XXX implementation of promises starts here
+
self.__reply = None
- # lock is currently held
- self._do_io(wait=1)
- # lock is held again...
+ # reply lock is currently held
+ self._do_async_loop()
+ # reply lock is held again...
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
@@ -263,7 +231,10 @@
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
- self._do_io()
+ # XXX The message won't go out right away in this case. It
+ # will wait for the asyncore loop to get control again. Seems
+ # okay to comment our for now, but need to understand better.
+## self._do_async_poll()
# handle IO, possibly in async mode
@@ -274,8 +245,6 @@
# Connections to be leaked.
def set_async(self, map):
- # XXX do we need a lock around this? I'm not sure there is
- # any harm to a race with _do_io().
self.trigger = trigger()
self.thr_async = 1
@@ -284,41 +253,45 @@
return 1
else:
return 0
-
- def _do_io(self, wait=0): # XXX need better name
- # XXX invariant? lock must be held when calling with wait==1
- # otherwise, in non-async mode, there will be no poll
+ def _do_async_loop(self):
+ "Invoke asyncore mainloop and wait for reply."
if __debug__:
- log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
+ log("_do_async_loop() async=%d" % self.is_async(),
level=zLOG.DEBUG)
if self.is_async():
self.trigger.pull_trigger()
- if wait:
- self.__reply_lock.acquire()
- # wait until reply...
- self.__reply_lock.release()
+ self.__reply_lock.acquire()
+ # wait until reply...
else:
- if wait:
- # do loop only if lock is already acquired
- while not self.__reply_lock.acquire(0):
- asyncore.poll(10.0, self._map)
- if self.closed:
- raise DisconnectedError()
- self.__reply_lock.release()
- else:
- asyncore.poll(0.0, self._map)
+ # Do loop only if lock is already acquired. XXX But can't
+ # we already guarantee that the lock is already acquired?
+ while not self.__reply_lock.acquire(0):
+ asyncore.poll(10.0, self._map)
+ if self.closed:
+ raise DisconnectedError()
+ self.__reply_lock.release()
+
+ def _do_async_poll(self, wait_for_reply=0):
+ "Invoke asyncore mainloop to get pending message out."
- # XXX it seems that we need to release before returning if
- # called with wait==1. perhaps the caller need not acquire
- # upon return...
+ if __debug__:
+ log("_do_async_poll(), async=%d" % self.is_async(),
+ level=zLOG.DEBUG)
+ if self.is_async():
+ self.trigger.pull_trigger()
+ else:
+ asyncore.poll(0.0, self._map)
class ServerConnection(Connection):
- # XXX this is a hack
- def _do_io(self, wait=0):
+
+ def _do_async_poll(self, wait=0):
"""If this is a server, there is no explicit IO to do"""
pass
+ # XXX _do_async_loop is never called. Should it be defined as
+ # above anyway?
+
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
__super_init = Connection.__init__
@@ -327,6 +300,7 @@
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
+ obj.notifyConnected(self)
def close(self):
self.__super_close()