[Zope-Checkins] CVS: ZODB3/ZEO/zrpc - client.py:1.24 connection.py:1.41 trigger.py:1.14

Jeremy Hylton jeremy@zope.com
Fri, 3 Jan 2003 17:08:15 -0500


Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv21352/ZEO/zrpc

Modified Files:
	client.py connection.py trigger.py 
Log Message:
Merge ZODB3-fast-restart-branch to the trunk


=== ZODB3/ZEO/zrpc/client.py 1.23 => 1.24 ===
--- ZODB3/ZEO/zrpc/client.py:1.23	Fri Dec 13 17:03:11 2002
+++ ZODB3/ZEO/zrpc/client.py	Fri Jan  3 17:07:41 2003
@@ -119,7 +119,7 @@
 
         # XXX need each connection started with async==0 to have a
         # callback
-        log("CM.set_async(%s)" % repr(map))
+        log("CM.set_async(%s)" % repr(map), level=zLOG.DEBUG)
         if not self.closed and self.trigger is None:
             log("CM.set_async(): first call")
             self.trigger = trigger()
@@ -294,6 +294,9 @@
             if success > 0:
                 break
             time.sleep(delay)
+            if self.mgr.is_connected():
+                log("CT: still trying to replace fallback connection",
+                    level=zLOG.INFO)
             delay = min(delay*2, self.tmax)
         log("CT: exiting thread: %s" % self.getName())
 


=== ZODB3/ZEO/zrpc/connection.py 1.40 => 1.41 ===
--- ZODB3/ZEO/zrpc/connection.py:1.40	Fri Dec 13 17:03:28 2002
+++ ZODB3/ZEO/zrpc/connection.py	Fri Jan  3 17:07:41 2003
@@ -21,7 +21,7 @@
 import ThreadedAsync
 from ZEO.zrpc import smac
 from ZEO.zrpc.error import ZRPCError, DisconnectedError
-from ZEO.zrpc.log import log, short_repr
+from ZEO.zrpc.log import short_repr, log
 from ZEO.zrpc.marshal import Marshaller
 from ZEO.zrpc.trigger import trigger
 import zLOG
@@ -115,13 +115,32 @@
     __super_init = smac.SizedMessageAsyncConnection.__init__
     __super_close = smac.SizedMessageAsyncConnection.close
 
-    protocol_version = "Z200"
+    # Protocol variables:
+    #
+    # oldest_protocol_version -- the oldest protocol version we support
+    # protocol_version -- the newest protocol version we support; preferred
+
+    oldest_protocol_version = "Z200"
+    protocol_version = "Z201"
+
+    # Protocol history:
+    #
+    # Z200 -- original ZEO 2.0 protocol
+    #
+    # Z201 -- added invalidateTransaction() to client;
+    #         renamed several client methods;
+    #         added lastTransaction() to server
 
     def __init__(self, sock, addr, obj=None):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = 0
         self.msgid = 0
+        self.peer_protocol_version = None # Set in recv_handshake()
+        if isinstance(addr, types.TupleType):
+            self.log_label = "zrpc-conn:%s:%d" % addr
+        else:
+            self.log_label = "zrpc-conn:%s" % addr
         self.__super_init(sock, addr)
         # A Connection either uses asyncore directly or relies on an
         # asyncore mainloop running in a separate thread.  If
@@ -147,6 +166,9 @@
 
     __str__ = __repr__ # Defeat asyncore's dreaded __getattr__
 
+    def log(self, message, level=zLOG.BLATHER, error=None):
+        zLOG.LOG(self.log_label, level, message, error=error)
+
     def close(self):
         if self.closed:
             return
@@ -156,7 +178,7 @@
         self.__super_close()
 
     def close_trigger(self):
-        # overridden by ManagedConnection
+        # Overridden by ManagedConnection
         if self.trigger is not None:
             self.trigger.close()
 
@@ -164,7 +186,9 @@
         """Register obj as the true object to invoke methods on"""
         self.obj = obj
 
-    def handshake(self):
+    def handshake(self, proto=None):
+        # Overridden by ManagedConnection
+
         # When a connection is created the first message sent is a
         # 4-byte protocol version.  This mechanism should allow the
         # protocol to evolve over time, and let servers handle clients
@@ -174,17 +198,18 @@
         # first message received.
 
         # The client sends the protocol version it is using.
-        self._message_input = self.message_input
         self.message_input = self.recv_handshake
-        self.message_output(self.protocol_version)
+        self.message_output(proto or self.protocol_version)
 
-    def recv_handshake(self, message):
-        if message == self.protocol_version:
-            self.message_input = self._message_input
+    def recv_handshake(self, proto):
+        # Extended by ManagedConnection
+        del self.message_input
+        self.peer_protocol_version = proto
+        if self.oldest_protocol_version <= proto <= self.protocol_version:
+            self.log("received handshake %r" % proto, level=zLOG.INFO)
         else:
-            log("recv_handshake: bad handshake %s" % short_repr(message),
-                level=zLOG.ERROR)
-        # otherwise do something else...
+            self.log("bad handshake %s" % short_repr(proto), level=zLOG.ERROR)
+            raise ZRPCError("bad handshake %r" % proto)
 
     def message_input(self, message):
         """Decoding an incoming message and dispatch it"""
@@ -195,9 +220,9 @@
         msgid, flags, name, args = self.marshal.decode(message)
 
         if __debug__:
-            log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
-                                              short_repr(args)),
-                level=zLOG.TRACE)
+            self.log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
+                                                   short_repr(args)),
+                     level=zLOG.TRACE)
         if name == REPLY:
             self.handle_reply(msgid, flags, args)
         else:
@@ -205,8 +230,8 @@
 
     def handle_reply(self, msgid, flags, args):
         if __debug__:
-            log("recv reply: %s, %s, %s" % (msgid, flags, short_repr(args)),
-                level=zLOG.DEBUG)
+            self.log("recv reply: %s, %s, %s"
+                     % (msgid, flags, short_repr(args)), level=zLOG.DEBUG)
         self.replies_cond.acquire()
         try:
             self.replies[msgid] = flags, args
@@ -219,7 +244,8 @@
             msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
             raise ZRPCError(msg)
         if __debug__:
-            log("calling %s%s" % (name, short_repr(args)), level=zLOG.BLATHER)
+            self.log("calling %s%s" % (name, short_repr(args)),
+                     level=zLOG.BLATHER)
 
         meth = getattr(self.obj, name)
         try:
@@ -228,8 +254,8 @@
             raise
         except Exception, msg:
             error = sys.exc_info()
-            log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
-                error=error)
+            self.log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
+                     error=error)
             error = error[:2]
             return self.return_error(msgid, flags, *error)
 
@@ -239,7 +265,7 @@
                                 (name, short_repr(ret)))
         else:
             if __debug__:
-                log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
+                self.log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
             if isinstance(ret, Delay):
                 ret.set_sender(msgid, self.send_reply, self.return_error)
             else:
@@ -252,7 +278,7 @@
         self.close()
 
     def log_error(self, msg="No error message supplied"):
-        log(msg, zLOG.ERROR, error=sys.exc_info())
+        self.log(msg, zLOG.ERROR, error=sys.exc_info())
 
     def check_method(self, name):
         # XXX Is this sufficient "security" for now?
@@ -304,8 +330,8 @@
         finally:
             self.msgid_lock.release()
         if __debug__:
-            log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
-                zLOG.TRACE)
+            self.log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
+                     zLOG.TRACE)
         buf = self.marshal.encode(msgid, flags, method, args)
         self.message_output(buf)
         return msgid
@@ -342,7 +368,7 @@
         self.thr_async = 1
 
     def is_async(self):
-        # overridden for ManagedConnection
+        # Overridden by ManagedConnection
         if self.thr_async:
             return 1
         else:
@@ -360,8 +386,8 @@
     def wait(self, msgid):
         """Invoke asyncore mainloop and wait for reply."""
         if __debug__:
-            log("wait(%d), async=%d" % (msgid, self.is_async()),
-                level=zLOG.TRACE)
+            self.log("wait(%d), async=%d" % (msgid, self.is_async()),
+                     level=zLOG.TRACE)
         if self.is_async():
             self._pull_trigger()
 
@@ -378,8 +404,8 @@
                 if reply is not None:
                     del self.replies[msgid]
                     if __debug__:
-                        log("wait(%d): reply=%s" % (msgid, short_repr(reply)),
-                            level=zLOG.DEBUG)
+                        self.log("wait(%d): reply=%s" %
+                                 (msgid, short_repr(reply)), level=zLOG.DEBUG)
                     return reply
                 if self.is_async():
                     self.replies_cond.wait(10.0)
@@ -388,14 +414,14 @@
                     try:
                         try:
                             if __debug__:
-                                log("wait(%d): asyncore.poll(%s)" %
-                                    (msgid, delay), level=zLOG.TRACE)
+                                self.log("wait(%d): asyncore.poll(%s)" %
+                                         (msgid, delay), level=zLOG.TRACE)
                             asyncore.poll(delay, self._map)
                             if delay < 1.0:
                                 delay += delay
                         except select.error, err:
-                            log("Closing.  asyncore.poll() raised %s." % err,
-                                level=zLOG.BLATHER)
+                            self.log("Closing.  asyncore.poll() raised %s."
+                                     % err, level=zLOG.BLATHER)
                             self.close()
                     finally:
                         self.replies_cond.acquire()
@@ -405,7 +431,7 @@
     def poll(self):
         """Invoke asyncore mainloop to get pending message out."""
         if __debug__:
-            log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
+            self.log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
         if self.is_async():
             self._pull_trigger()
         else:
@@ -414,7 +440,7 @@
     def pending(self):
         """Invoke mainloop until any pending messages are handled."""
         if __debug__:
-            log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
+            self.log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
         if self.is_async():
             return
         # Inline the asyncore poll() function to know whether any input
@@ -464,6 +490,64 @@
         self.mgr = mgr
         self.__super_init(sock, addr, obj)
         self.check_mgr_async()
+
+    # PROTOCOL NEGOTIATION:
+    #
+    # The code implementing protocol version 2.0.0 (which is deployed
+    # in the field and cannot be changed) *only* talks to peers that
+    # send a handshake indicating protocol version 2.0.0.  In that
+    # version, both the client and the server immediately send out
+    # their protocol handshake when a connection is established,
+    # without waiting for their peer, and disconnect when a different
+    # handshake is receive.
+    #
+    # The new protocol uses this to enable new clients to talk to
+    # 2.0.0 servers: in the new protocol, the client waits until it
+    # receives the server's protocol handshake before sending its own
+    # handshake.  The client sends the lower of its own protocol
+    # version and the server protocol version, allowing it to talk to
+    # servers using later protocol versions (2.0.2 and higher) as
+    # well: the effective protocol used will be the lower of the
+    # client and server protocol.
+    #
+    # The ZEO modules ClientStorage and ServerStub have backwards
+    # compatibility code for dealing with the previous version of the
+    # protocol.  The client accept the old version of some messages,
+    # and will not send new messages when talking to an old server.
+    #
+    # As long as the client hasn't sent its handshake, it can't send
+    # anything else; output messages are queued during this time.
+    # (Output can happen because the connection testing machinery can
+    # start sending requests before the handshake is received.)
+    #
+    # UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
+    #
+    # Because a new client can talk to an old server, but not vice
+    # versa, all clients should be upgraded before upgrading any
+    # servers.  Protocol upgrades beyond 2.0.1 will not have this
+    # restriction, because clients using protocol 2.0.1 or later can
+    # talk to both older and newer servers.
+    #
+    # No compatibility with protocol version 1 is provided.
+
+    def handshake(self):
+        self.message_input = self.recv_handshake
+        self.message_output = self.queue_output
+        self.output_queue = []
+        # The handshake is sent by recv_handshake() below
+
+    def queue_output(self, message):
+        self.output_queue.append(message)
+
+    def recv_handshake(self, proto):
+        del self.message_output
+        proto = min(proto, self.protocol_version)
+        Connection.recv_handshake(self, proto) # Raise error if wrong proto
+        self.message_output(proto)
+        queue = self.output_queue
+        del self.output_queue
+        for message in queue:
+            self.message_output(message)
 
     # Defer the ThreadedAsync work to the manager.
 


=== ZODB3/ZEO/zrpc/trigger.py 1.13 => 1.14 ===
--- ZODB3/ZEO/zrpc/trigger.py:1.13	Fri Dec 13 17:03:41 2002
+++ ZODB3/ZEO/zrpc/trigger.py	Fri Jan  3 17:07:41 2003
@@ -16,6 +16,7 @@
 import os
 import socket
 import thread
+import errno
 
 if os.name == 'posix':