[Zodb-checkins] CVS: Zope/lib/python/ZEO/zrpc - client.py:1.20.2.4 connection.py:1.38.4.4
Chris McDonough
chrism@zope.com
Sat, 4 Jan 2003 13:19:01 -0500
Update of /cvs-repository/Zope/lib/python/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv3944/zrpc
Modified Files:
Tag: chrism-install-branch
client.py connection.py
Log Message:
Merge with HEAD.
=== Zope/lib/python/ZEO/zrpc/client.py 1.20.2.3 => 1.20.2.4 ===
--- Zope/lib/python/ZEO/zrpc/client.py:1.20.2.3 Fri Jan 3 01:36:35 2003
+++ Zope/lib/python/ZEO/zrpc/client.py Sat Jan 4 13:18:58 2003
@@ -119,7 +119,7 @@
# XXX need each connection started with async==0 to have a
# callback
- log("CM.set_async(%s)" % repr(map))
+ log("CM.set_async(%s)" % repr(map), level=zLOG.DEBUG)
if not self.closed and self.trigger is None:
log("CM.set_async(): first call")
self.trigger = trigger()
@@ -294,6 +294,9 @@
if success > 0:
break
time.sleep(delay)
+ if self.mgr.is_connected():
+ log("CT: still trying to replace fallback connection",
+ level=zLOG.INFO)
delay = min(delay*2, self.tmax)
log("CT: exiting thread: %s" % self.getName())
=== Zope/lib/python/ZEO/zrpc/connection.py 1.38.4.3 => 1.38.4.4 ===
--- Zope/lib/python/ZEO/zrpc/connection.py:1.38.4.3 Fri Jan 3 01:36:35 2003
+++ Zope/lib/python/ZEO/zrpc/connection.py Sat Jan 4 13:18:58 2003
@@ -21,7 +21,7 @@
import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
-from ZEO.zrpc.log import log, short_repr
+from ZEO.zrpc.log import short_repr, log
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
@@ -115,13 +115,32 @@
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
- protocol_version = "Z200"
+ # Protocol variables:
+ #
+ # oldest_protocol_version -- the oldest protocol version we support
+ # protocol_version -- the newest protocol version we support; preferred
+
+ oldest_protocol_version = "Z200"
+ protocol_version = "Z201"
+
+ # Protocol history:
+ #
+ # Z200 -- original ZEO 2.0 protocol
+ #
+ # Z201 -- added invalidateTransaction() to client;
+ # renamed several client methods;
+ # added lastTransaction() to server
def __init__(self, sock, addr, obj=None):
self.obj = None
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
+ self.peer_protocol_version = None # Set in recv_handshake()
+ if isinstance(addr, types.TupleType):
+ self.log_label = "zrpc-conn:%s:%d" % addr
+ else:
+ self.log_label = "zrpc-conn:%s" % addr
self.__super_init(sock, addr)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
@@ -147,6 +166,9 @@
__str__ = __repr__ # Defeat asyncore's dreaded __getattr__
+ def log(self, message, level=zLOG.BLATHER, error=None):
+ zLOG.LOG(self.log_label, level, message, error=error)
+
def close(self):
if self.closed:
return
@@ -156,7 +178,7 @@
self.__super_close()
def close_trigger(self):
- # overridden by ManagedConnection
+ # Overridden by ManagedConnection
if self.trigger is not None:
self.trigger.close()
@@ -164,7 +186,9 @@
"""Register obj as the true object to invoke methods on"""
self.obj = obj
- def handshake(self):
+ def handshake(self, proto=None):
+ # Overridden by ManagedConnection
+
# When a connection is created the first message sent is a
# 4-byte protocol version. This mechanism should allow the
# protocol to evolve over time, and let servers handle clients
@@ -174,17 +198,18 @@
# first message received.
# The client sends the protocol version it is using.
- self._message_input = self.message_input
self.message_input = self.recv_handshake
- self.message_output(self.protocol_version)
+ self.message_output(proto or self.protocol_version)
- def recv_handshake(self, message):
- if message == self.protocol_version:
- self.message_input = self._message_input
+ def recv_handshake(self, proto):
+ # Extended by ManagedConnection
+ del self.message_input
+ self.peer_protocol_version = proto
+ if self.oldest_protocol_version <= proto <= self.protocol_version:
+ self.log("received handshake %r" % proto, level=zLOG.INFO)
else:
- log("recv_handshake: bad handshake %s" % short_repr(message),
- level=zLOG.ERROR)
- # otherwise do something else...
+ self.log("bad handshake %s" % short_repr(proto), level=zLOG.ERROR)
+ raise ZRPCError("bad handshake %r" % proto)
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
@@ -195,9 +220,9 @@
msgid, flags, name, args = self.marshal.decode(message)
if __debug__:
- log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
- short_repr(args)),
- level=zLOG.TRACE)
+ self.log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
+ short_repr(args)),
+ level=zLOG.TRACE)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
@@ -205,8 +230,8 @@
def handle_reply(self, msgid, flags, args):
if __debug__:
- log("recv reply: %s, %s, %s" % (msgid, flags, short_repr(args)),
- level=zLOG.DEBUG)
+ self.log("recv reply: %s, %s, %s"
+ % (msgid, flags, short_repr(args)), level=zLOG.DEBUG)
self.replies_cond.acquire()
try:
self.replies[msgid] = flags, args
@@ -219,7 +244,8 @@
msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
raise ZRPCError(msg)
if __debug__:
- log("calling %s%s" % (name, short_repr(args)), level=zLOG.BLATHER)
+ self.log("calling %s%s" % (name, short_repr(args)),
+ level=zLOG.BLATHER)
meth = getattr(self.obj, name)
try:
@@ -228,8 +254,8 @@
raise
except Exception, msg:
error = sys.exc_info()
- log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
- error=error)
+ self.log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
+ error=error)
error = error[:2]
return self.return_error(msgid, flags, *error)
@@ -239,7 +265,7 @@
(name, short_repr(ret)))
else:
if __debug__:
- log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
+ self.log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply, self.return_error)
else:
@@ -252,7 +278,7 @@
self.close()
def log_error(self, msg="No error message supplied"):
- log(msg, zLOG.ERROR, error=sys.exc_info())
+ self.log(msg, zLOG.ERROR, error=sys.exc_info())
def check_method(self, name):
# XXX Is this sufficient "security" for now?
@@ -304,8 +330,8 @@
finally:
self.msgid_lock.release()
if __debug__:
- log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
- zLOG.TRACE)
+ self.log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
+ zLOG.TRACE)
buf = self.marshal.encode(msgid, flags, method, args)
self.message_output(buf)
return msgid
@@ -342,7 +368,7 @@
self.thr_async = 1
def is_async(self):
- # overridden for ManagedConnection
+ # Overridden by ManagedConnection
if self.thr_async:
return 1
else:
@@ -360,8 +386,8 @@
def wait(self, msgid):
"""Invoke asyncore mainloop and wait for reply."""
if __debug__:
- log("wait(%d), async=%d" % (msgid, self.is_async()),
- level=zLOG.TRACE)
+ self.log("wait(%d), async=%d" % (msgid, self.is_async()),
+ level=zLOG.TRACE)
if self.is_async():
self._pull_trigger()
@@ -378,8 +404,8 @@
if reply is not None:
del self.replies[msgid]
if __debug__:
- log("wait(%d): reply=%s" % (msgid, short_repr(reply)),
- level=zLOG.DEBUG)
+ self.log("wait(%d): reply=%s" %
+ (msgid, short_repr(reply)), level=zLOG.DEBUG)
return reply
if self.is_async():
self.replies_cond.wait(10.0)
@@ -388,14 +414,14 @@
try:
try:
if __debug__:
- log("wait(%d): asyncore.poll(%s)" %
- (msgid, delay), level=zLOG.TRACE)
+ self.log("wait(%d): asyncore.poll(%s)" %
+ (msgid, delay), level=zLOG.TRACE)
asyncore.poll(delay, self._map)
if delay < 1.0:
delay += delay
except select.error, err:
- log("Closing. asyncore.poll() raised %s." % err,
- level=zLOG.BLATHER)
+ self.log("Closing. asyncore.poll() raised %s."
+ % err, level=zLOG.BLATHER)
self.close()
finally:
self.replies_cond.acquire()
@@ -405,7 +431,7 @@
def poll(self):
"""Invoke asyncore mainloop to get pending message out."""
if __debug__:
- log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
+ self.log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
if self.is_async():
self._pull_trigger()
else:
@@ -414,7 +440,7 @@
def pending(self):
"""Invoke mainloop until any pending messages are handled."""
if __debug__:
- log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
+ self.log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
if self.is_async():
return
# Inline the asyncore poll() function to know whether any input
@@ -464,6 +490,64 @@
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
+
+ # PROTOCOL NEGOTIATION:
+ #
+ # The code implementing protocol version 2.0.0 (which is deployed
+ # in the field and cannot be changed) *only* talks to peers that
+ # send a handshake indicating protocol version 2.0.0. In that
+ # version, both the client and the server immediately send out
+ # their protocol handshake when a connection is established,
+ # without waiting for their peer, and disconnect when a different
+ # handshake is receive.
+ #
+ # The new protocol uses this to enable new clients to talk to
+ # 2.0.0 servers: in the new protocol, the client waits until it
+ # receives the server's protocol handshake before sending its own
+ # handshake. The client sends the lower of its own protocol
+ # version and the server protocol version, allowing it to talk to
+ # servers using later protocol versions (2.0.2 and higher) as
+ # well: the effective protocol used will be the lower of the
+ # client and server protocol.
+ #
+ # The ZEO modules ClientStorage and ServerStub have backwards
+ # compatibility code for dealing with the previous version of the
+ # protocol. The client accept the old version of some messages,
+ # and will not send new messages when talking to an old server.
+ #
+ # As long as the client hasn't sent its handshake, it can't send
+ # anything else; output messages are queued during this time.
+ # (Output can happen because the connection testing machinery can
+ # start sending requests before the handshake is received.)
+ #
+ # UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
+ #
+ # Because a new client can talk to an old server, but not vice
+ # versa, all clients should be upgraded before upgrading any
+ # servers. Protocol upgrades beyond 2.0.1 will not have this
+ # restriction, because clients using protocol 2.0.1 or later can
+ # talk to both older and newer servers.
+ #
+ # No compatibility with protocol version 1 is provided.
+
+ def handshake(self):
+ self.message_input = self.recv_handshake
+ self.message_output = self.queue_output
+ self.output_queue = []
+ # The handshake is sent by recv_handshake() below
+
+ def queue_output(self, message):
+ self.output_queue.append(message)
+
+ def recv_handshake(self, proto):
+ del self.message_output
+ proto = min(proto, self.protocol_version)
+ Connection.recv_handshake(self, proto) # Raise error if wrong proto
+ self.message_output(proto)
+ queue = self.output_queue
+ del self.output_queue
+ for message in queue:
+ self.message_output(message)
# Defer the ThreadedAsync work to the manager.