[Zodb-checkins] CVS: Packages/ZEO - zrpc2.py:1.1.2.5

jeremy@digicool.com jeremy@digicool.com
Fri, 30 Mar 2001 16:19:37 -0500 (EST)


Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv14682

Modified Files:
      Tag: ZEO-ZRPC-Dev
	zrpc2.py 
Log Message:
XXX Update zrpc2 to work with Zope.  

Still doesn't work quite right.

Change default log() output to zLOG.BLATHER.
Define DisconnectedError.  Raise it if call() or callAsync() is called
    when connection is closed.

DebugLock: 
    replace builtin object id with simple counter
    print the thread id too

Move async handling, inc. ThreadedAsync callback into methods that can
be overriden by ManagedConnection.  A ManagedConnection uses the
trigger of its manager.

handle_error() -- print the actual exception too

Add __reply_lock.release() in _do_io() async==1, wait==1.  I don't
think this is right, but it works for now.



--- Updated File zrpc2.py in package Packages/ZEO --
--- zrpc2.py	2001/03/30 04:47:30	1.1.2.4
+++ zrpc2.py	2001/03/30 21:19:36	1.1.2.5
@@ -34,15 +34,18 @@
 REPLY = ".reply" # message name used for replies
 ASYNC = 1
 
-def log(message, level=zLOG.INFO, label="zrpc:%s" % os.getpid()):
+def log(message, level=zLOG.BLATHER, label="zrpc:%s" % os.getpid()):
     zLOG.LOG(label, level, message)
 
-class ZRPCError(Exception):
+class ZRPCError(POSException.StorageError):
     pass
 
 class DecodingError(ZRPCError):
     """A ZRPC message could not be decoded."""
 
+class DisconnectedError(ZRPCError):
+    """The database storage is disconnected from the storage server."""
+
 # Export the mainloop function from asycnore to zrpc clients
 loop = asyncore.loop
 
@@ -84,7 +87,8 @@
 
         try:
             msgid, flags, name, args = unpickler.load()
-        except (cPickle.UnpicklingError, IndexError), msg:
+        except (cPickle.UnpicklingError, IndexError), err_msg:
+            log("can't decode %s" % repr(msg), level=zLOG.ERROR)
             raise DecodingError(msg)
         return msgid, flags, name, args
 
@@ -104,18 +108,26 @@
         self.send_reply(self.msgid, obj)
 
 class DebugLock:
+
+    __locks = 0
+    
     def __init__(self):
         self.lock = thread.allocate_lock()
+        # XXX this actually needs to be locked too
+        self.__lock_id = self.__locks
+        self.__locks += 1
 
     def _debug(self):
         method = sys._getframe().f_back
         caller = method.f_back
         filename = os.path.split(caller.f_code.co_filename)[1]
-        log("LOCK %s: %s called by %s, %s, line %s" % (id(self.lock),
-                                                         method.f_code.co_name,
-                                                         caller.f_code.co_name,
-                                                         filename,
-                                                         caller.f_lineno))
+        log("LOCK %s (tid %s): " \
+            "%s called by %s, %s, line %s" % (self.__lock_id,
+                                              thread.get_ident(),
+                                              method.f_code.co_name,
+                                              caller.f_code.co_name,
+                                              filename,
+                                              caller.f_lineno))
 
     def acquire(self, wait=None):
         self._debug()
@@ -127,6 +139,10 @@
     def release(self):
         self._debug()
         return self.lock.release()
+
+    def locked(self):
+        self._debug()
+        return self.lock.locked()
     
 class Connection(smac.SizedMessageAsyncConnection):
     """Dispatcher for RPC on object
@@ -153,12 +169,11 @@
         self.async = 0
         # The reply lock is used to block when a synchronous call is
         # waiting for a response
-        self.__reply_lock = thread.allocate_lock()
-        self.__reply_lock.acquire()
-        # The async mode lock is used to prevent a race between the
-        # write of self.async by set_async() and its use in _do_io()
-        self.__async_mode_lock = thread.allocate_lock()
         self.__super_init(sock, addr)
+        self._prepare_async()
+        self.__reply_lock = DebugLock()
+##        self.__reply_lock = thread.allocate_lock()
+        self.__reply_lock.acquire()
         if isinstance(obj, Handler):
             self.set_caller = 1
         else:
@@ -195,7 +210,7 @@
         self.__reply_lock.release() # will fail if lock is unlocked
 
     def handle_request(self, msgid, flags, name, args):
-        log("%s%s" % (name, repr(args)[:40]), zLOG.BLATHER)
+        log("%s%s" % (name, repr(args)[:40]), zLOG.TRACE)
         if not self.check_method(name):
             raise ZRPCError("Invalid method name: %s" % name)
 
@@ -221,7 +236,7 @@
             if ret is not None:
                 raise ZRPCError("async method returned value")
         else:
-            log("%s reply %s" % (name, repr(ret)[:40]), zLOG.BLATHER)
+            log("%s reply %s" % (name, repr(ret)[:40]), zLOG.TRACE)
             if isinstance(ret, Delay):
                 ret.set_sender(msgid, self.send_reply)
             else:
@@ -229,6 +244,7 @@
 
     def handle_error(self):
         t, v, tb = sys.exc_info()
+        print t, v
         traceback.print_tb(tb)
 
     def check_method(self, name):
@@ -287,19 +303,24 @@
     # The next two methods are used by clients to invoke methods on
     # remote objects  
 
-    # XXX these two methods should raise an nice exception if there
-    # are called after the connection is closed
+    # XXX Should revise design to allow multiple outstanding
+    # synchronous calls
 
     def call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError("This action is temporarily unavailable")
         msgid = self.msgid
         self.msgid += 1
         log("call %s %s" % (msgid, method))
         self.message_output(self.marshal.encode(msgid, 0, method, args))
 
         self.__reply = None
+        # lock is currently held
         self._do_io(wait=1)
+        # lock is held again...
         r_msgid, r_flags, r_args = self.__reply
         self.__reply_lock.acquire()
+        log("call acquired lock")
         assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
 
         if type(r_args) == types.TupleType \
@@ -311,6 +332,8 @@
         return r_args
 
     def callAsync(self, method, *args):
+        if self.closed:
+            raise DisconnectedError("This action is temporarily unavailable")
         msgid = self.msgid
         self.msgid += 1
         log("async %s %s" % (msgid, method))
@@ -319,16 +342,33 @@
 
     # handle IO, possibly in async mode
 
+    def _prepare_async(self):
+        self._async = 0
+        ThreadedAsync.register_loop_callback(self.set_async)
+        # XXX If we are not in async mode, this will cause dead
+        # Connections to be leaked.
+
+    def set_async(self, map):
+        # XXX do we need a lock around this?  I'm not sure there is
+        # any harm to a race with _do_io().
+        self._async = 1
+        self.trigger = trigger.trigger()
+
+    def is_async(self):
+        return self._async
+            
     def _do_io(self, wait=0): # XXX need better name
         # XXX invariant? lock must be held when calling with wait==1
         # otherwise, in non-async mode, there will be no poll
         
-        log("_do_io(wait=%d), async=%d" % (wait, self.async),
-            level=zLOG.BLATHER)
-        if self.async:
+        log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
+            level=zLOG.TRACE)
+        if self.is_async():
             self.trigger.pull_trigger()
             if wait:
                 self.__reply_lock.acquire()
+                # wait until reply...
+                self.__reply_lock.release()
         else:
             if wait:
                 # do loop only if lock is already acquired
@@ -338,7 +378,12 @@
             else:
                 asyncore.poll(0.0, self)
 
+        # XXX it seems that we need to release before returning if
+        # called with wait==1.  perhaps the caller need not acquire
+        # upon return...
+
 class ServerConnection(Connection):
+    # XXX this is a hack
     def _do_io(self, wait=0):
         """If this is a server, there is no explicit IO to do"""
         pass
@@ -346,6 +391,9 @@
 class ConnectionManager:
     """Keeps a connection up over time"""
 
+    # XXX requires that obj implement notifyConnected and
+    # notifyDisconnected.   make this optional?
+
     def __init__(self, addr, obj=None, debug=1, tmin=None, tmax=None):
         self.addr = addr
         self.obj = obj
@@ -354,29 +402,34 @@
         self.debug = debug
         self.connected = 0
         self._thread = None
-        self._callback = None
+        self._connect_lock = threading.Lock()
+        self.trigger = None
+        self.async = 0
         ThreadedAsync.register_loop_callback(self.set_async)
 
     def register_object(self, obj):
         self.obj = obj
 
-    def set_async(self):
+    def set_async(self, map):
+        # XXX need each connection started with async==0 to have a callback
         self.async = 1 # XXX needs to be set on the Connection
         self.trigger = trigger.trigger()
 
-    def connect(self, sync=0, callback=None):
+    def connect(self, sync=0):
         if self.connected == 1:
             return
-        self._callback = callback
-        # XXX probably need lock around self._thread
-        if self._thread is None:
-            self._thread = threading.Thread(target=self.__connect, args=(1,))
-            self._thread.start()
+        self._connect_lock.acquire()
+        try:
+            if self._thread is None:
+                self._thread = threading.Thread(target=self.__connect,
+                                                args=(1,))
+                self._thread.start()
+        finally:
+            self._connect_lock.release()
         if sync:
             self._thread.join()
 
-    def attempt_connect(self, callback=None):
-        self._callback = callback
+    def attempt_connect(self):
         self.__connect(repeat=0)
         return self.connected
 
@@ -402,12 +455,9 @@
                     log("Connected to server", level=zLOG.DEBUG)
                 self.connected = 1
         if self.connected:
-            # XXX how do we get here with s being defined?
             c = ManagedConnection(s, self.addr, self.obj, self)
             log("Connection created: %s" % c)
-            log("callback = %s" % self._callback)
-            if self._callback:
-                self._callback(c)
+            self.obj.notifyConnected(c)
         self.__thread = None
 
     def _wait(self, t):
@@ -419,7 +469,7 @@
 
     def closed(self, conn):
         self.connected = 0
-        # perhaps call ClientStorage(XXX) and tell it we closed
+        self.obj.notifyDisconnected(None)
         self.connect()
 
 class ManagedServerConnection(ServerConnection):
@@ -436,13 +486,35 @@
         self.__mgr.closed(self)
 
 class ManagedConnection(Connection):
-    """A connection that notifies its ConnectionManager of closing"""
+    """A connection that notifies its ConnectionManager of closing.
+
+    A managed connection also defers the ThreadedAsync work to its
+    manager. 
+    """
     __super_init = Connection.__init__
     __super_close = Connection.close
 
     def __init__(self, sock, addr, obj, mgr, pickle=None):
         self.__mgr = mgr
+        if self.__mgr.async:
+            self.__async = 1
+            self.trigger = self.__mgr.trigger
+        else:
+            self.__async = None
         self.__super_init(sock, addr, obj, pickle)
+
+    def _prepare_async(self):
+        # Don't do the register_loop_callback that the superclass does
+        pass
+
+    def is_async(self):
+        if self.__async:
+            return 1
+        async = self.__mgr.async
+        if async:
+            self.__async = 1
+            self.trigger = self.__mgr.trigger
+        return async
 
     def close(self):
         self.__super_close()