[Zope-Checkins] CVS: ZODB3/ZEO/zrpc - connection.py:1.38.2.1.4.2
Guido van Rossum
guido@python.org
Wed, 18 Dec 2002 22:37:10 -0500
Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv23586/zrpc
Modified Files:
Tag: ZODB3-fast-restart-branch
connection.py
Log Message:
Add protocol negotiation. After an insight provided by Tim, I figured
out a way to do protocol negotiation at the connection level in the
client, by waiting until the server handshake is received before
sending the client handshake. Clients running protocol 2.0.1 can now
talk to servers running 2.0.0 or 2.0.1; however, servers running
protocol 2.0.1 can only talk to clients running 2.0.1. This means
that clients must be upgraded before servers. See extensive comments
in connection.py, in class ManagedConnection.
Some backward compatibility code was added to ClientStorage.py and
ServerStub.py.
=== ZODB3/ZEO/zrpc/connection.py 1.38.2.1.4.1 => 1.38.2.1.4.2 ===
--- ZODB3/ZEO/zrpc/connection.py:1.38.2.1.4.1 Tue Dec 17 16:36:29 2002
+++ ZODB3/ZEO/zrpc/connection.py Wed Dec 18 22:37:09 2002
@@ -115,13 +115,28 @@
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
- protocol_version = "Z200"
+ # Protocol variables:
+ #
+ # oldest_protocol_version -- the oldest protocol version we support
+ # protocol_version -- the newest protocol version we support; preferred
+
+ oldest_protocol_version = "Z200"
+ protocol_version = "Z201"
+
+ # Protocol history:
+ #
+ # Z200 -- original ZEO 2.0 protocol
+ #
+ # Z201 -- added invalidateTransaction() to client;
+ # renamed several client methods;
+ # added lastTransaction() to server
def __init__(self, sock, addr, obj=None):
self.obj = None
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
+ self.peer_protocol_version = None # Set in recv_handshake()
if isinstance(addr, types.TupleType):
self.log_label = "zrpc-conn:%s:%d" % addr
else:
@@ -163,7 +178,7 @@
self.__super_close()
def close_trigger(self):
- # overridden by ManagedConnection
+ # Overridden by ManagedConnection
if self.trigger is not None:
self.trigger.close()
@@ -171,7 +186,9 @@
"""Register obj as the true object to invoke methods on"""
self.obj = obj
- def handshake(self):
+ def handshake(self, proto=None):
+ # Overridden by ManagedConnection
+
# When a connection is created the first message sent is a
# 4-byte protocol version. This mechanism should allow the
# protocol to evolve over time, and let servers handle clients
@@ -181,17 +198,18 @@
# first message received.
# The client sends the protocol version it is using.
- self._message_input = self.message_input
self.message_input = self.recv_handshake
- self.message_output(self.protocol_version)
+ self.message_output(proto or self.protocol_version)
- def recv_handshake(self, message):
- if message == self.protocol_version:
- self.message_input = self._message_input
+ def recv_handshake(self, proto):
+ # Extended by ManagedConnection
+ del self.message_input
+ self.peer_protocol_version = proto
+ if self.oldest_protocol_version <= proto <= self.protocol_version:
+ self.log("received handshake %r" % proto, level=zLOG.INFO)
else:
- self.log("recv_handshake: bad handshake %s" % short_repr(message),
- level=zLOG.ERROR)
- # otherwise do something else...
+ self.log("bad handshake %s" % short_repr(proto), level=zLOG.ERROR)
+ raise ZRPCError("bad handshake %r" % proto)
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
@@ -350,7 +368,7 @@
self.thr_async = 1
def is_async(self):
- # overridden for ManagedConnection
+ # Overridden by ManagedConnection
if self.thr_async:
return 1
else:
@@ -472,6 +490,64 @@
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
+
+ # PROTOCOL NEGOTIATION:
+ #
+ # The code implementing protocol version 2.0.0 (which is deployed
+ # in the field and cannot be changed) *only* talks to peers that
+ # send a handshake indicating protocol version 2.0.0. In that
+ # version, both the client and the server immediately send out
+ # their protocol handshake when a connection is established,
+ # without waiting for their peer, and disconnect when a different
+ # handshake is receive.
+ #
+ # The new protocol uses this to enable new clients to talk to
+ # 2.0.0 servers: in the new protocol, the client waits until it
+ # receives the server's protocol handshake before sending its own
+ # handshake. The client sends the lower of its own protocol
+ # version and the server protocol version, allowing it to talk to
+ # servers using later protocol versions (2.0.2 and higher) as
+ # well: the effective protocol used will be the lower of the
+ # client and server protocol.
+ #
+ # The ZEO modules ClientStorage and ServerStub have backwards
+ # compatibility code for dealing with the previous version of the
+ # protocol. The client accept the old version of some messages,
+ # and will not send new messages when talking to an old server.
+ #
+ # As long as the client hasn't sent its handshake, it can't send
+ # anything else; output messages are queued during this time.
+ # (Output can happen because the connection testing machinery can
+ # start sending requests before the handshake is received.)
+ #
+ # UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
+ #
+ # Because a new client can talk to an old server, but not vice
+ # versa, all clients should be upgraded before upgrading any
+ # servers. Protocol upgrades beyond 2.0.1 will not have this
+ # restriction, because clients using protocol 2.0.1 or later can
+ # talk to both older and newer servers.
+ #
+ # No compatibility with protocol version 1 is provided.
+
+ def handshake(self):
+ self.message_input = self.recv_handshake
+ self.message_output = self.queue_output
+ self.output_queue = []
+ # The handshake is sent by recv_handshake() below
+
+ def queue_output(self, message):
+ self.output_queue.append(message)
+
+ def recv_handshake(self, proto):
+ del self.message_output
+ proto = min(proto, self.protocol_version)
+ Connection.recv_handshake(self, proto) # Raise error if wrong proto
+ self.message_output(proto)
+ queue = self.output_queue
+ del self.output_queue
+ for message in queue:
+ self.message_output(message)
# Defer the ThreadedAsync work to the manager.