[Zodb-checkins] CVS: Zope3/src/zodb/zeo/zrpc - interfaces.py:1.2.6.1 smac.py:1.2.12.1 marshal.py:1.2.12.1 connection.py:1.2.12.1 client.py:1.3.4.1 error.py:NONE

Jeremy Hylton jeremy@zope.com
Wed, 12 Mar 2003 16:41:53 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv29357/zeo/zrpc

Modified Files:
      Tag: opaque-pickles-branch
	smac.py marshal.py connection.py client.py 
Added Files:
      Tag: opaque-pickles-branch
	interfaces.py 
Removed Files:
      Tag: opaque-pickles-branch
	error.py 
Log Message:
Update from trunk.

Resolve a few import conflicts.


=== Added File Zope3/src/zodb/zeo/zrpc/interfaces.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from zodb.storage.interfaces import StorageError
from zodb.zeo.interfaces import ClientDisconnected

class ZRPCError(StorageError):
    pass

class DisconnectedError(ZRPCError, ClientDisconnected):
    """The database storage is disconnected from the storage server.

    The error occurred because a problem in the low-level RPC connection,
    or because the connection was closed.
    """

    # This subclass is raised when zrpc catches the error.


=== Zope3/src/zodb/zeo/zrpc/smac.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/zrpc/smac.py:1.2	Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/smac.py	Wed Mar 12 16:41:15 2003
@@ -18,7 +18,7 @@
 import socket, errno
 from types import StringType
 
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc.interfaces import DisconnectedError
 from zodb.zeo.zrpc import log
 
 # Use the dictionary to make sure we get the minimum number of errno
@@ -206,7 +206,7 @@
                           len(message), log.short_repr(message))
 
         if self.__closed:
-            raise Disconnected("Action is temporarily unavailable")
+            raise DisconnectedError("Action is temporarily unavailable")
         self.__output_lock.acquire()
         try:
             # do two separate appends to avoid copying the message string


=== Zope3/src/zodb/zeo/zrpc/marshal.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/zrpc/marshal.py:1.2	Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/marshal.py	Wed Mar 12 16:41:15 2003
@@ -15,7 +15,7 @@
 from cStringIO import StringIO
 import types
 
-from zodb.zeo.zrpc.error import ZRPCError
+from zodb.zeo.zrpc.interfaces import ZRPCError
 from zodb.zeo.zrpc import log
 
 class Marshaller:


=== Zope3/src/zodb/zeo/zrpc/connection.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/zrpc/connection.py:1.2	Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/connection.py	Wed Mar 12 16:41:15 2003
@@ -21,7 +21,7 @@
 
 from zodb.zeo import threadedasync
 from zodb.zeo.zrpc import smac
-from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
+from zodb.zeo.zrpc.interfaces import ZRPCError, DisconnectedError
 from zodb.zeo.zrpc import log
 from zodb.zeo.zrpc.marshal import Marshaller
 from zodb.zeo.zrpc.trigger import trigger
@@ -114,13 +114,17 @@
     __super_init = smac.SizedMessageAsyncConnection.__init__
     __super_close = smac.SizedMessageAsyncConnection.close
 
-    protocol_version = "Z200"
+    oldest_protocol_version = "Z400"
+    protocol_version = "Z400"
+
+    # Z400 -- the first protocol defined for ZODB4
 
     def __init__(self, sock, addr, obj=None):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = 0
         self.msgid = 0
+        self.peer_protocol_version = None # set in recv_handshake()
         if isinstance(addr, types.TupleType):
             self.log_label = "zrpc-conn:%s:%d" % addr
         else:
@@ -167,7 +171,7 @@
         """Register obj as the true object to invoke methods on"""
         self.obj = obj
 
-    def handshake(self):
+    def handshake(self, protocol=None):
         # When a connection is created the first message sent is a
         # 4-byte protocol version.  This mechanism should allow the
         # protocol to evolve over time, and let servers handle clients
@@ -179,16 +183,19 @@
         # The client sends the protocol version it is using.
         self._message_input = self.message_input
         self.message_input = self.recv_handshake
-        self.message_output(self.protocol_version)
+        self.message_output(protocol or self.protocol_version)
 
-    def recv_handshake(self, message):
-        if message == self.protocol_version:
-            self.message_input = self._message_input
-        else:
-            log.error("%s: recv_handshake: bad handshake %s",
-                      self.log_label,
-                      log.short_repr(message))
-        # otherwise do something else...
+    def recv_handshake(self, protocol):
+         # Extended by ManagedConnection
+         del self.message_input
+         self.peer_protocol_version = protocol
+         if self.oldest_protocol_version <= protocol <= self.protocol_version:
+             log.warn("%s: received handshake %r", self.log_label,
+                      protocol)
+         else:
+             log.error("%s: recv_handshake: bad handshake %s", self.log_label,
+                       protocol)
+             raise ZRPCError("bad handshake %r" % protocol)
 
     def message_input(self, message):
         """Decoding an incoming message and dispatch it"""
@@ -329,6 +336,29 @@
         else:
             return r_args
 
+    # For testing purposes, it is useful to begin a synchronous call
+    # but not block waiting for its response.  Since these methods are
+    # used for testing they can assume they are not in async mode and
+    # call asyncore.poll() directly to get the message out without
+    # also waiting for the reply.
+ 
+    def _deferred_call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError()
+        msgid = self.send_call(method, args, 0)
+        asyncore.poll(0.01, self._map)
+        return msgid
+
+    def _deferred_wait(self, msgid):
+        r_flags, r_args = self.wait(msgid)
+        if (isinstance(r_args, types.TupleType)
+            and type(r_args[0]) == types.ClassType
+            and issubclass(r_args[0], Exception)):
+            inst = r_args[1]
+            raise inst # error raised by server
+        else:
+            return r_args
+
     def callAsync(self, method, *args):
         if self.closed:
             raise DisconnectedError()
@@ -418,7 +448,7 @@
         else:
             asyncore.poll(0.0, self._map)
 
-    def pending(self):
+    def pending(self, timeout=0):
         """Invoke mainloop until any pending messages are handled."""
         if __debug__:
             log.debug("%s: pending(), async=%d", self.log_label,
@@ -427,26 +457,50 @@
             return
         # Inline the asyncore poll() function to know whether any input
         # was actually read.  Repeat until no input is ready.
-        # XXX This only does reads.
-        r_in = [self._fileno]
-        w_in = []
+
+        # Pending does reads and writes.  In the case of server
+        # startup, we may need to write out zeoVerify() messages.
+        # Always check for read status, but don't check for write status
+        # only there is output to do.  Only continue in this loop as
+        # long as there is data to read.
+        r = r_in = [self._fileno]
         x_in = []
-        while 1:
+        while r and not self.closed:
+            if self.writable():
+                w_in = [self._fileno]
+            else:
+                w_in = []
             try:
-                r, w, x = select.select(r_in, w_in, x_in, 0)
+                r, w, x = select.select(r_in, w_in, x_in, timeout)
             except select.error, err:
                 if err[0] == errno.EINTR:
+                    timeout = 0
                     continue
                 else:
                     raise
-            if not r:
-                break
-            try:
-                self.handle_read_event()
-            except asyncore.ExitNow:
-                raise
-            except:
-                self.handle_error()
+            else:
+                # Make sure any subsequent select does not block.  The
+                # loop is only intended to make sure all incoming data is
+                # returned.
+
+                # XXX What if the server sends a lot of invalidations,
+                # such that pending never finishes?  Seems unlikely, but
+                # not impossible.
+                timeout = 0
+            if r:
+                try:
+                    self.handle_read_event()
+                except asyncore.ExitNow:
+                    raise
+                except:
+                    self.handle_error()
+            if w:
+                try:
+                    self.handle_write_event()
+                except asyncore.ExitNow:
+                    raise
+                except:
+                    self.handle_error()
 
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
@@ -472,6 +526,54 @@
         self.mgr = mgr
         self.__super_init(sock, addr, obj)
         self.check_mgr_async()
+
+    # PROTOCOL NEGOTIATION:
+    #
+    # The code implementing protocol version 2.0.0 (which is deployed
+    # in the field and cannot be changed) *only* talks to peers that
+    # send a handshake indicating protocol version 2.0.0.  In that
+    # version, both the client and the server immediately send out
+    # their protocol handshake when a connection is established,
+    # without waiting for their peer, and disconnect when a different
+    # handshake is receive.
+    #
+    # The new protocol uses this to enable new clients to talk to
+    # 2.0.0 servers: in the new protocol, the client waits until it
+    # receives the server's protocol handshake before sending its own
+    # handshake.  The client sends the lower of its own protocol
+    # version and the server protocol version, allowing it to talk to
+    # servers using later protocol versions (2.0.2 and higher) as
+    # well: the effective protocol used will be the lower of the
+    # client and server protocol.
+    #
+    # The ZEO modules ClientStorage and ServerStub have backwards
+    # compatibility code for dealing with the previous version of the
+    # protocol.  The client accept the old version of some messages,
+    # and will not send new messages when talking to an old server.
+    #
+    # As long as the client hasn't sent its handshake, it can't send
+    # anything else; output messages are queued during this time.
+    # (Output can happen because the connection testing machinery can
+    # start sending requests before the handshake is received.)
+
+    def handshake(self):
+        self.message_input = self.recv_handshake
+        self.message_output = self.queue_output
+        self.output_queue = []
+        # The handshake is sent by recv_handshake() below
+
+    def queue_output(self, message):
+        self.output_queue.append(message)
+
+    def recv_handshake(self, proto):
+        del self.message_output
+        proto = min(proto, self.protocol_version)
+        Connection.recv_handshake(self, proto) # Raise error if wrong proto
+        self.message_output(proto)
+        queue = self.output_queue
+        del self.output_queue
+        for message in queue:
+            self.message_output(message)
 
     # Defer the ThreadedAsync work to the manager.
 


=== Zope3/src/zodb/zeo/zrpc/client.py 1.3 => 1.3.4.1 ===
--- Zope3/src/zodb/zeo/zrpc/client.py:1.3	Wed Feb  5 18:28:20 2003
+++ Zope3/src/zodb/zeo/zrpc/client.py	Wed Mar 12 16:41:15 2003
@@ -335,10 +335,9 @@
             wrap = ConnectWrapper(domain, addr, self.mgr, self.client)
             wrap.connect_procedure()
             if wrap.state == "notified":
-                for wrap in wrappers.keys():
-                    wrap.close()
-                wrappers[wrap] = wrap
-                return wrappers
+                for w in wrappers.keys():
+                    w.close()
+                return {wrap: wrap}
             if wrap.state != "closed":
                 wrappers[wrap] = wrap
         return wrappers

=== Removed File Zope3/src/zodb/zeo/zrpc/error.py ===