[Zodb-checkins] CVS: Packages/ZEO - zrpc2.py:1.1.2.5
jeremy@digicool.com
jeremy@digicool.com
Fri, 30 Mar 2001 16:19:37 -0500 (EST)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv14682
Modified Files:
Tag: ZEO-ZRPC-Dev
zrpc2.py
Log Message:
XXX Update zrpc2 to work with Zope.
Still doesn't work quite right.
Change default log() output to zLOG.BLATHER.
Define DisconnectedError. Raise it if call() or callAsync() is called
when connection is closed.
DebugLock:
replace builtin object id with simple counter
print the thread id too
Move async handling, inc. ThreadedAsync callback into methods that can
be overriden by ManagedConnection. A ManagedConnection uses the
trigger of its manager.
handle_error() -- print the actual exception too
Add __reply_lock.release() in _do_io() async==1, wait==1. I don't
think this is right, but it works for now.
--- Updated File zrpc2.py in package Packages/ZEO --
--- zrpc2.py 2001/03/30 04:47:30 1.1.2.4
+++ zrpc2.py 2001/03/30 21:19:36 1.1.2.5
@@ -34,15 +34,18 @@
REPLY = ".reply" # message name used for replies
ASYNC = 1
-def log(message, level=zLOG.INFO, label="zrpc:%s" % os.getpid()):
+def log(message, level=zLOG.BLATHER, label="zrpc:%s" % os.getpid()):
zLOG.LOG(label, level, message)
-class ZRPCError(Exception):
+class ZRPCError(POSException.StorageError):
pass
class DecodingError(ZRPCError):
"""A ZRPC message could not be decoded."""
+class DisconnectedError(ZRPCError):
+ """The database storage is disconnected from the storage server."""
+
# Export the mainloop function from asycnore to zrpc clients
loop = asyncore.loop
@@ -84,7 +87,8 @@
try:
msgid, flags, name, args = unpickler.load()
- except (cPickle.UnpicklingError, IndexError), msg:
+ except (cPickle.UnpicklingError, IndexError), err_msg:
+ log("can't decode %s" % repr(msg), level=zLOG.ERROR)
raise DecodingError(msg)
return msgid, flags, name, args
@@ -104,18 +108,26 @@
self.send_reply(self.msgid, obj)
class DebugLock:
+
+ __locks = 0
+
def __init__(self):
self.lock = thread.allocate_lock()
+ # XXX this actually needs to be locked too
+ self.__lock_id = self.__locks
+ self.__locks += 1
def _debug(self):
method = sys._getframe().f_back
caller = method.f_back
filename = os.path.split(caller.f_code.co_filename)[1]
- log("LOCK %s: %s called by %s, %s, line %s" % (id(self.lock),
- method.f_code.co_name,
- caller.f_code.co_name,
- filename,
- caller.f_lineno))
+ log("LOCK %s (tid %s): " \
+ "%s called by %s, %s, line %s" % (self.__lock_id,
+ thread.get_ident(),
+ method.f_code.co_name,
+ caller.f_code.co_name,
+ filename,
+ caller.f_lineno))
def acquire(self, wait=None):
self._debug()
@@ -127,6 +139,10 @@
def release(self):
self._debug()
return self.lock.release()
+
+ def locked(self):
+ self._debug()
+ return self.lock.locked()
class Connection(smac.SizedMessageAsyncConnection):
"""Dispatcher for RPC on object
@@ -153,12 +169,11 @@
self.async = 0
# The reply lock is used to block when a synchronous call is
# waiting for a response
- self.__reply_lock = thread.allocate_lock()
- self.__reply_lock.acquire()
- # The async mode lock is used to prevent a race between the
- # write of self.async by set_async() and its use in _do_io()
- self.__async_mode_lock = thread.allocate_lock()
self.__super_init(sock, addr)
+ self._prepare_async()
+ self.__reply_lock = DebugLock()
+## self.__reply_lock = thread.allocate_lock()
+ self.__reply_lock.acquire()
if isinstance(obj, Handler):
self.set_caller = 1
else:
@@ -195,7 +210,7 @@
self.__reply_lock.release() # will fail if lock is unlocked
def handle_request(self, msgid, flags, name, args):
- log("%s%s" % (name, repr(args)[:40]), zLOG.BLATHER)
+ log("%s%s" % (name, repr(args)[:40]), zLOG.TRACE)
if not self.check_method(name):
raise ZRPCError("Invalid method name: %s" % name)
@@ -221,7 +236,7 @@
if ret is not None:
raise ZRPCError("async method returned value")
else:
- log("%s reply %s" % (name, repr(ret)[:40]), zLOG.BLATHER)
+ log("%s reply %s" % (name, repr(ret)[:40]), zLOG.TRACE)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply)
else:
@@ -229,6 +244,7 @@
def handle_error(self):
t, v, tb = sys.exc_info()
+ print t, v
traceback.print_tb(tb)
def check_method(self, name):
@@ -287,19 +303,24 @@
# The next two methods are used by clients to invoke methods on
# remote objects
- # XXX these two methods should raise an nice exception if there
- # are called after the connection is closed
+ # XXX Should revise design to allow multiple outstanding
+ # synchronous calls
def call(self, method, *args):
+ if self.closed:
+ raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid += 1
log("call %s %s" % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
self.__reply = None
+ # lock is currently held
self._do_io(wait=1)
+ # lock is held again...
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
+ log("call acquired lock")
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
if type(r_args) == types.TupleType \
@@ -311,6 +332,8 @@
return r_args
def callAsync(self, method, *args):
+ if self.closed:
+ raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid += 1
log("async %s %s" % (msgid, method))
@@ -319,16 +342,33 @@
# handle IO, possibly in async mode
+ def _prepare_async(self):
+ self._async = 0
+ ThreadedAsync.register_loop_callback(self.set_async)
+ # XXX If we are not in async mode, this will cause dead
+ # Connections to be leaked.
+
+ def set_async(self, map):
+ # XXX do we need a lock around this? I'm not sure there is
+ # any harm to a race with _do_io().
+ self._async = 1
+ self.trigger = trigger.trigger()
+
+ def is_async(self):
+ return self._async
+
def _do_io(self, wait=0): # XXX need better name
# XXX invariant? lock must be held when calling with wait==1
# otherwise, in non-async mode, there will be no poll
- log("_do_io(wait=%d), async=%d" % (wait, self.async),
- level=zLOG.BLATHER)
- if self.async:
+ log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
+ level=zLOG.TRACE)
+ if self.is_async():
self.trigger.pull_trigger()
if wait:
self.__reply_lock.acquire()
+ # wait until reply...
+ self.__reply_lock.release()
else:
if wait:
# do loop only if lock is already acquired
@@ -338,7 +378,12 @@
else:
asyncore.poll(0.0, self)
+ # XXX it seems that we need to release before returning if
+ # called with wait==1. perhaps the caller need not acquire
+ # upon return...
+
class ServerConnection(Connection):
+ # XXX this is a hack
def _do_io(self, wait=0):
"""If this is a server, there is no explicit IO to do"""
pass
@@ -346,6 +391,9 @@
class ConnectionManager:
"""Keeps a connection up over time"""
+ # XXX requires that obj implement notifyConnected and
+ # notifyDisconnected. make this optional?
+
def __init__(self, addr, obj=None, debug=1, tmin=None, tmax=None):
self.addr = addr
self.obj = obj
@@ -354,29 +402,34 @@
self.debug = debug
self.connected = 0
self._thread = None
- self._callback = None
+ self._connect_lock = threading.Lock()
+ self.trigger = None
+ self.async = 0
ThreadedAsync.register_loop_callback(self.set_async)
def register_object(self, obj):
self.obj = obj
- def set_async(self):
+ def set_async(self, map):
+ # XXX need each connection started with async==0 to have a callback
self.async = 1 # XXX needs to be set on the Connection
self.trigger = trigger.trigger()
- def connect(self, sync=0, callback=None):
+ def connect(self, sync=0):
if self.connected == 1:
return
- self._callback = callback
- # XXX probably need lock around self._thread
- if self._thread is None:
- self._thread = threading.Thread(target=self.__connect, args=(1,))
- self._thread.start()
+ self._connect_lock.acquire()
+ try:
+ if self._thread is None:
+ self._thread = threading.Thread(target=self.__connect,
+ args=(1,))
+ self._thread.start()
+ finally:
+ self._connect_lock.release()
if sync:
self._thread.join()
- def attempt_connect(self, callback=None):
- self._callback = callback
+ def attempt_connect(self):
self.__connect(repeat=0)
return self.connected
@@ -402,12 +455,9 @@
log("Connected to server", level=zLOG.DEBUG)
self.connected = 1
if self.connected:
- # XXX how do we get here with s being defined?
c = ManagedConnection(s, self.addr, self.obj, self)
log("Connection created: %s" % c)
- log("callback = %s" % self._callback)
- if self._callback:
- self._callback(c)
+ self.obj.notifyConnected(c)
self.__thread = None
def _wait(self, t):
@@ -419,7 +469,7 @@
def closed(self, conn):
self.connected = 0
- # perhaps call ClientStorage(XXX) and tell it we closed
+ self.obj.notifyDisconnected(None)
self.connect()
class ManagedServerConnection(ServerConnection):
@@ -436,13 +486,35 @@
self.__mgr.closed(self)
class ManagedConnection(Connection):
- """A connection that notifies its ConnectionManager of closing"""
+ """A connection that notifies its ConnectionManager of closing.
+
+ A managed connection also defers the ThreadedAsync work to its
+ manager.
+ """
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr, pickle=None):
self.__mgr = mgr
+ if self.__mgr.async:
+ self.__async = 1
+ self.trigger = self.__mgr.trigger
+ else:
+ self.__async = None
self.__super_init(sock, addr, obj, pickle)
+
+ def _prepare_async(self):
+ # Don't do the register_loop_callback that the superclass does
+ pass
+
+ def is_async(self):
+ if self.__async:
+ return 1
+ async = self.__mgr.async
+ if async:
+ self.__async = 1
+ self.trigger = self.__mgr.trigger
+ return async
def close(self):
self.__super_close()