[Zope3-checkins] CVS: Zope3/src/zodb/zeo/zrpc - interfaces.py:1.2.6.1 smac.py:1.2.12.1 marshal.py:1.2.12.1 connection.py:1.2.12.1 client.py:1.3.4.1 error.py:NONE
Jeremy Hylton
jeremy@zope.com
Wed, 12 Mar 2003 16:41:53 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv29357/zeo/zrpc
Modified Files:
Tag: opaque-pickles-branch
smac.py marshal.py connection.py client.py
Added Files:
Tag: opaque-pickles-branch
interfaces.py
Removed Files:
Tag: opaque-pickles-branch
error.py
Log Message:
Update from trunk.
Resolve a few import conflicts.
=== Added File Zope3/src/zodb/zeo/zrpc/interfaces.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from zodb.storage.interfaces import StorageError
from zodb.zeo.interfaces import ClientDisconnected
class ZRPCError(StorageError):
pass
class DisconnectedError(ZRPCError, ClientDisconnected):
"""The database storage is disconnected from the storage server.
The error occurred because a problem in the low-level RPC connection,
or because the connection was closed.
"""
# This subclass is raised when zrpc catches the error.
=== Zope3/src/zodb/zeo/zrpc/smac.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/zrpc/smac.py:1.2 Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/smac.py Wed Mar 12 16:41:15 2003
@@ -18,7 +18,7 @@
import socket, errno
from types import StringType
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc.interfaces import DisconnectedError
from zodb.zeo.zrpc import log
# Use the dictionary to make sure we get the minimum number of errno
@@ -206,7 +206,7 @@
len(message), log.short_repr(message))
if self.__closed:
- raise Disconnected("Action is temporarily unavailable")
+ raise DisconnectedError("Action is temporarily unavailable")
self.__output_lock.acquire()
try:
# do two separate appends to avoid copying the message string
=== Zope3/src/zodb/zeo/zrpc/marshal.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/zrpc/marshal.py:1.2 Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/marshal.py Wed Mar 12 16:41:15 2003
@@ -15,7 +15,7 @@
from cStringIO import StringIO
import types
-from zodb.zeo.zrpc.error import ZRPCError
+from zodb.zeo.zrpc.interfaces import ZRPCError
from zodb.zeo.zrpc import log
class Marshaller:
=== Zope3/src/zodb/zeo/zrpc/connection.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/zrpc/connection.py:1.2 Wed Dec 25 09:12:23 2002
+++ Zope3/src/zodb/zeo/zrpc/connection.py Wed Mar 12 16:41:15 2003
@@ -21,7 +21,7 @@
from zodb.zeo import threadedasync
from zodb.zeo.zrpc import smac
-from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
+from zodb.zeo.zrpc.interfaces import ZRPCError, DisconnectedError
from zodb.zeo.zrpc import log
from zodb.zeo.zrpc.marshal import Marshaller
from zodb.zeo.zrpc.trigger import trigger
@@ -114,13 +114,17 @@
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
- protocol_version = "Z200"
+ oldest_protocol_version = "Z400"
+ protocol_version = "Z400"
+
+ # Z400 -- the first protocol defined for ZODB4
def __init__(self, sock, addr, obj=None):
self.obj = None
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
+ self.peer_protocol_version = None # set in recv_handshake()
if isinstance(addr, types.TupleType):
self.log_label = "zrpc-conn:%s:%d" % addr
else:
@@ -167,7 +171,7 @@
"""Register obj as the true object to invoke methods on"""
self.obj = obj
- def handshake(self):
+ def handshake(self, protocol=None):
# When a connection is created the first message sent is a
# 4-byte protocol version. This mechanism should allow the
# protocol to evolve over time, and let servers handle clients
@@ -179,16 +183,19 @@
# The client sends the protocol version it is using.
self._message_input = self.message_input
self.message_input = self.recv_handshake
- self.message_output(self.protocol_version)
+ self.message_output(protocol or self.protocol_version)
- def recv_handshake(self, message):
- if message == self.protocol_version:
- self.message_input = self._message_input
- else:
- log.error("%s: recv_handshake: bad handshake %s",
- self.log_label,
- log.short_repr(message))
- # otherwise do something else...
+ def recv_handshake(self, protocol):
+ # Extended by ManagedConnection
+ del self.message_input
+ self.peer_protocol_version = protocol
+ if self.oldest_protocol_version <= protocol <= self.protocol_version:
+ log.warn("%s: received handshake %r", self.log_label,
+ protocol)
+ else:
+ log.error("%s: recv_handshake: bad handshake %s", self.log_label,
+ protocol)
+ raise ZRPCError("bad handshake %r" % protocol)
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
@@ -329,6 +336,29 @@
else:
return r_args
+ # For testing purposes, it is useful to begin a synchronous call
+ # but not block waiting for its response. Since these methods are
+ # used for testing they can assume they are not in async mode and
+ # call asyncore.poll() directly to get the message out without
+ # also waiting for the reply.
+
+ def _deferred_call(self, method, *args):
+ if self.closed:
+ raise DisconnectedError()
+ msgid = self.send_call(method, args, 0)
+ asyncore.poll(0.01, self._map)
+ return msgid
+
+ def _deferred_wait(self, msgid):
+ r_flags, r_args = self.wait(msgid)
+ if (isinstance(r_args, types.TupleType)
+ and type(r_args[0]) == types.ClassType
+ and issubclass(r_args[0], Exception)):
+ inst = r_args[1]
+ raise inst # error raised by server
+ else:
+ return r_args
+
def callAsync(self, method, *args):
if self.closed:
raise DisconnectedError()
@@ -418,7 +448,7 @@
else:
asyncore.poll(0.0, self._map)
- def pending(self):
+ def pending(self, timeout=0):
"""Invoke mainloop until any pending messages are handled."""
if __debug__:
log.debug("%s: pending(), async=%d", self.log_label,
@@ -427,26 +457,50 @@
return
# Inline the asyncore poll() function to know whether any input
# was actually read. Repeat until no input is ready.
- # XXX This only does reads.
- r_in = [self._fileno]
- w_in = []
+
+ # Pending does reads and writes. In the case of server
+ # startup, we may need to write out zeoVerify() messages.
+ # Always check for read status, but don't check for write status
+ # only there is output to do. Only continue in this loop as
+ # long as there is data to read.
+ r = r_in = [self._fileno]
x_in = []
- while 1:
+ while r and not self.closed:
+ if self.writable():
+ w_in = [self._fileno]
+ else:
+ w_in = []
try:
- r, w, x = select.select(r_in, w_in, x_in, 0)
+ r, w, x = select.select(r_in, w_in, x_in, timeout)
except select.error, err:
if err[0] == errno.EINTR:
+ timeout = 0
continue
else:
raise
- if not r:
- break
- try:
- self.handle_read_event()
- except asyncore.ExitNow:
- raise
- except:
- self.handle_error()
+ else:
+ # Make sure any subsequent select does not block. The
+ # loop is only intended to make sure all incoming data is
+ # returned.
+
+ # XXX What if the server sends a lot of invalidations,
+ # such that pending never finishes? Seems unlikely, but
+ # not impossible.
+ timeout = 0
+ if r:
+ try:
+ self.handle_read_event()
+ except asyncore.ExitNow:
+ raise
+ except:
+ self.handle_error()
+ if w:
+ try:
+ self.handle_write_event()
+ except asyncore.ExitNow:
+ raise
+ except:
+ self.handle_error()
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
@@ -472,6 +526,54 @@
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
+
+ # PROTOCOL NEGOTIATION:
+ #
+ # The code implementing protocol version 2.0.0 (which is deployed
+ # in the field and cannot be changed) *only* talks to peers that
+ # send a handshake indicating protocol version 2.0.0. In that
+ # version, both the client and the server immediately send out
+ # their protocol handshake when a connection is established,
+ # without waiting for their peer, and disconnect when a different
+ # handshake is receive.
+ #
+ # The new protocol uses this to enable new clients to talk to
+ # 2.0.0 servers: in the new protocol, the client waits until it
+ # receives the server's protocol handshake before sending its own
+ # handshake. The client sends the lower of its own protocol
+ # version and the server protocol version, allowing it to talk to
+ # servers using later protocol versions (2.0.2 and higher) as
+ # well: the effective protocol used will be the lower of the
+ # client and server protocol.
+ #
+ # The ZEO modules ClientStorage and ServerStub have backwards
+ # compatibility code for dealing with the previous version of the
+ # protocol. The client accept the old version of some messages,
+ # and will not send new messages when talking to an old server.
+ #
+ # As long as the client hasn't sent its handshake, it can't send
+ # anything else; output messages are queued during this time.
+ # (Output can happen because the connection testing machinery can
+ # start sending requests before the handshake is received.)
+
+ def handshake(self):
+ self.message_input = self.recv_handshake
+ self.message_output = self.queue_output
+ self.output_queue = []
+ # The handshake is sent by recv_handshake() below
+
+ def queue_output(self, message):
+ self.output_queue.append(message)
+
+ def recv_handshake(self, proto):
+ del self.message_output
+ proto = min(proto, self.protocol_version)
+ Connection.recv_handshake(self, proto) # Raise error if wrong proto
+ self.message_output(proto)
+ queue = self.output_queue
+ del self.output_queue
+ for message in queue:
+ self.message_output(message)
# Defer the ThreadedAsync work to the manager.
=== Zope3/src/zodb/zeo/zrpc/client.py 1.3 => 1.3.4.1 ===
--- Zope3/src/zodb/zeo/zrpc/client.py:1.3 Wed Feb 5 18:28:20 2003
+++ Zope3/src/zodb/zeo/zrpc/client.py Wed Mar 12 16:41:15 2003
@@ -335,10 +335,9 @@
wrap = ConnectWrapper(domain, addr, self.mgr, self.client)
wrap.connect_procedure()
if wrap.state == "notified":
- for wrap in wrappers.keys():
- wrap.close()
- wrappers[wrap] = wrap
- return wrappers
+ for w in wrappers.keys():
+ w.close()
+ return {wrap: wrap}
if wrap.state != "closed":
wrappers[wrap] = wrap
return wrappers
=== Removed File Zope3/src/zodb/zeo/zrpc/error.py ===