[Zodb-checkins] CVS: Zope3/src/zodb/zeo/zrpc - interfaces.py:1.2 connection.py:1.3 marshal.py:1.3 smac.py:1.3 error.py:NONE

Jeremy Hylton jeremy@zope.com
Tue, 25 Feb 2003 13:55:07 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv23205/src/zodb/zeo/zrpc

Modified Files:
	connection.py marshal.py smac.py 
Added Files:
	interfaces.py 
Removed Files:
	error.py 
Log Message:
Merge the ZODB3-2-integration branch to the trunk.

The primary changes here are to port the many new ZEO features from
ZODB 3.2.  There are also many changes to the test suite.

python2.2 tells me: Ran 3755 tests in 560.277s


=== Zope3/src/zodb/zeo/zrpc/interfaces.py 1.1 => 1.2 ===
--- /dev/null	Tue Feb 25 13:55:07 2003
+++ Zope3/src/zodb/zeo/zrpc/interfaces.py	Tue Feb 25 13:55:06 2003
@@ -0,0 +1,27 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+from zodb.storage.interfaces import StorageError
+from zodb.zeo.interfaces import ClientDisconnected
+
+class ZRPCError(StorageError):
+    pass
+
+class DisconnectedError(ZRPCError, ClientDisconnected):
+    """The database storage is disconnected from the storage server.
+
+    The error occurred because a problem in the low-level RPC connection,
+    or because the connection was closed.
+    """
+
+    # This subclass is raised when zrpc catches the error.


=== Zope3/src/zodb/zeo/zrpc/connection.py 1.2 => 1.3 ===
--- Zope3/src/zodb/zeo/zrpc/connection.py:1.2	Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/connection.py	Tue Feb 25 13:55:06 2003
@@ -21,7 +21,7 @@
 
 from zodb.zeo import threadedasync
 from zodb.zeo.zrpc import smac
-from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
+from zodb.zeo.zrpc.interfaces import ZRPCError, DisconnectedError
 from zodb.zeo.zrpc import log
 from zodb.zeo.zrpc.marshal import Marshaller
 from zodb.zeo.zrpc.trigger import trigger
@@ -114,13 +114,17 @@
     __super_init = smac.SizedMessageAsyncConnection.__init__
     __super_close = smac.SizedMessageAsyncConnection.close
 
-    protocol_version = "Z200"
+    oldest_protocol_version = "Z400"
+    protocol_version = "Z400"
+
+    # Z400 -- the first protocol defined for ZODB4
 
     def __init__(self, sock, addr, obj=None):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = 0
         self.msgid = 0
+        self.peer_protocol_version = None # set in recv_handshake()
         if isinstance(addr, types.TupleType):
             self.log_label = "zrpc-conn:%s:%d" % addr
         else:
@@ -167,7 +171,7 @@
         """Register obj as the true object to invoke methods on"""
         self.obj = obj
 
-    def handshake(self):
+    def handshake(self, protocol=None):
         # When a connection is created the first message sent is a
         # 4-byte protocol version.  This mechanism should allow the
         # protocol to evolve over time, and let servers handle clients
@@ -179,16 +183,19 @@
         # The client sends the protocol version it is using.
         self._message_input = self.message_input
         self.message_input = self.recv_handshake
-        self.message_output(self.protocol_version)
+        self.message_output(protocol or self.protocol_version)
 
-    def recv_handshake(self, message):
-        if message == self.protocol_version:
-            self.message_input = self._message_input
-        else:
-            log.error("%s: recv_handshake: bad handshake %s",
-                      self.log_label,
-                      log.short_repr(message))
-        # otherwise do something else...
+    def recv_handshake(self, protocol):
+         # Extended by ManagedConnection
+         del self.message_input
+         self.peer_protocol_version = protocol
+         if self.oldest_protocol_version <= protocol <= self.protocol_version:
+             log.warn("%s: received handshake %r", self.log_label,
+                      protocol)
+         else:
+             log.error("%s: recv_handshake: bad handshake %s", self.log_label,
+                       protocol)
+             raise ZRPCError("bad handshake %r" % protocol)
 
     def message_input(self, message):
         """Decoding an incoming message and dispatch it"""
@@ -329,6 +336,29 @@
         else:
             return r_args
 
+    # For testing purposes, it is useful to begin a synchronous call
+    # but not block waiting for its response.  Since these methods are
+    # used for testing they can assume they are not in async mode and
+    # call asyncore.poll() directly to get the message out without
+    # also waiting for the reply.
+ 
+    def _deferred_call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError()
+        msgid = self.send_call(method, args, 0)
+        asyncore.poll(0.01, self._map)
+        return msgid
+
+    def _deferred_wait(self, msgid):
+        r_flags, r_args = self.wait(msgid)
+        if (isinstance(r_args, types.TupleType)
+            and type(r_args[0]) == types.ClassType
+            and issubclass(r_args[0], Exception)):
+            inst = r_args[1]
+            raise inst # error raised by server
+        else:
+            return r_args
+
     def callAsync(self, method, *args):
         if self.closed:
             raise DisconnectedError()
@@ -418,7 +448,7 @@
         else:
             asyncore.poll(0.0, self._map)
 
-    def pending(self):
+    def pending(self, timeout=0):
         """Invoke mainloop until any pending messages are handled."""
         if __debug__:
             log.debug("%s: pending(), async=%d", self.log_label,
@@ -427,26 +457,50 @@
             return
         # Inline the asyncore poll() function to know whether any input
         # was actually read.  Repeat until no input is ready.
-        # XXX This only does reads.
-        r_in = [self._fileno]
-        w_in = []
+
+        # Pending does reads and writes.  In the case of server
+        # startup, we may need to write out zeoVerify() messages.
+        # Always check for read status, but don't check for write status
+        # only there is output to do.  Only continue in this loop as
+        # long as there is data to read.
+        r = r_in = [self._fileno]
         x_in = []
-        while 1:
+        while r and not self.closed:
+            if self.writable():
+                w_in = [self._fileno]
+            else:
+                w_in = []
             try:
-                r, w, x = select.select(r_in, w_in, x_in, 0)
+                r, w, x = select.select(r_in, w_in, x_in, timeout)
             except select.error, err:
                 if err[0] == errno.EINTR:
+                    timeout = 0
                     continue
                 else:
                     raise
-            if not r:
-                break
-            try:
-                self.handle_read_event()
-            except asyncore.ExitNow:
-                raise
-            except:
-                self.handle_error()
+            else:
+                # Make sure any subsequent select does not block.  The
+                # loop is only intended to make sure all incoming data is
+                # returned.
+
+                # XXX What if the server sends a lot of invalidations,
+                # such that pending never finishes?  Seems unlikely, but
+                # not impossible.
+                timeout = 0
+            if r:
+                try:
+                    self.handle_read_event()
+                except asyncore.ExitNow:
+                    raise
+                except:
+                    self.handle_error()
+            if w:
+                try:
+                    self.handle_write_event()
+                except asyncore.ExitNow:
+                    raise
+                except:
+                    self.handle_error()
 
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
@@ -472,6 +526,54 @@
         self.mgr = mgr
         self.__super_init(sock, addr, obj)
         self.check_mgr_async()
+
+    # PROTOCOL NEGOTIATION:
+    #
+    # The code implementing protocol version 2.0.0 (which is deployed
+    # in the field and cannot be changed) *only* talks to peers that
+    # send a handshake indicating protocol version 2.0.0.  In that
+    # version, both the client and the server immediately send out
+    # their protocol handshake when a connection is established,
+    # without waiting for their peer, and disconnect when a different
+    # handshake is receive.
+    #
+    # The new protocol uses this to enable new clients to talk to
+    # 2.0.0 servers: in the new protocol, the client waits until it
+    # receives the server's protocol handshake before sending its own
+    # handshake.  The client sends the lower of its own protocol
+    # version and the server protocol version, allowing it to talk to
+    # servers using later protocol versions (2.0.2 and higher) as
+    # well: the effective protocol used will be the lower of the
+    # client and server protocol.
+    #
+    # The ZEO modules ClientStorage and ServerStub have backwards
+    # compatibility code for dealing with the previous version of the
+    # protocol.  The client accept the old version of some messages,
+    # and will not send new messages when talking to an old server.
+    #
+    # As long as the client hasn't sent its handshake, it can't send
+    # anything else; output messages are queued during this time.
+    # (Output can happen because the connection testing machinery can
+    # start sending requests before the handshake is received.)
+
+    def handshake(self):
+        self.message_input = self.recv_handshake
+        self.message_output = self.queue_output
+        self.output_queue = []
+        # The handshake is sent by recv_handshake() below
+
+    def queue_output(self, message):
+        self.output_queue.append(message)
+
+    def recv_handshake(self, proto):
+        del self.message_output
+        proto = min(proto, self.protocol_version)
+        Connection.recv_handshake(self, proto) # Raise error if wrong proto
+        self.message_output(proto)
+        queue = self.output_queue
+        del self.output_queue
+        for message in queue:
+            self.message_output(message)
 
     # Defer the ThreadedAsync work to the manager.
 


=== Zope3/src/zodb/zeo/zrpc/marshal.py 1.2 => 1.3 ===
--- Zope3/src/zodb/zeo/zrpc/marshal.py:1.2	Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/marshal.py	Tue Feb 25 13:55:06 2003
@@ -15,7 +15,7 @@
 from cStringIO import StringIO
 import types
 
-from zodb.zeo.zrpc.error import ZRPCError
+from zodb.zeo.zrpc.interfaces import ZRPCError
 from zodb.zeo.zrpc import log
 
 class Marshaller:


=== Zope3/src/zodb/zeo/zrpc/smac.py 1.2 => 1.3 ===
--- Zope3/src/zodb/zeo/zrpc/smac.py:1.2	Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/smac.py	Tue Feb 25 13:55:06 2003
@@ -18,7 +18,7 @@
 import socket, errno
 from types import StringType
 
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc.interfaces import DisconnectedError
 from zodb.zeo.zrpc import log
 
 # Use the dictionary to make sure we get the minimum number of errno
@@ -206,7 +206,7 @@
                           len(message), log.short_repr(message))
 
         if self.__closed:
-            raise Disconnected("Action is temporarily unavailable")
+            raise DisconnectedError("Action is temporarily unavailable")
         self.__output_lock.acquire()
         try:
             # do two separate appends to avoid copying the message string

=== Removed File Zope3/src/zodb/zeo/zrpc/error.py ===