[Zope-Checkins] CVS: Packages/ZEO/zrpc - client.py:1.25.16.7 connection.py:1.49.4.4

Tim Peters tim.one at comcast.net
Fri Feb 4 14:50:14 EST 2005


Update of /cvs-repository/Packages/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv17443/ZEO/zrpc

Modified Files:
      Tag: Zope-2_7-branch
	client.py connection.py 
Log Message:
Fixed several thread and asyncore races in ZEO's connection dance.

ZEO/tests/ConnectionTests.py
    The pollUp() and pollDown() methods were pure busy loops whenever
    the asyncore socket map was empty, and at least on some flavors of
    Linux that starved the other thread(s) trying to do real work.
    This grossly increased the time needed to run tests using these, and
    sometimes caused bogus "timed out" test failures.

ZEO/zrpc/client.py
ZEO/zrpc/connection.py
    Renamed class ManagedConnection to ManagedClientConnection, for clarity.

    Moved the comment block about protocol negotiation from the guts of
    ManagedClientConnection to before the Connection base class -- the
    Connection constructor can't be understood without this context.  Added
    more words about the delicate protocol negotiation dance.

    Connection class:  made this an abstract base clase.  Derived classes
    _must_ implement the handshake() method.  There was really nothing in
    common between server and client wrt what handshake() needs to do, and
    it was confusing for one of them to use the base class handshake() while
    the other replaced handshake() completely.

    Connection.__init__:  It isn't safe to register with asyncore's socket
    map before special-casing for the first (protocol handshake) message is
    set up.  Repaired that.  Also removed the pointless "optionalness" of
    the optional arguments.

    ManagedClientConnection.__init__:  Added machinery to set up correct
    (thread-safe) message queueing.  There was an unrepairable hole before,
    in the transition between "I'm queueing msgs waiting for the server
    handshake" and "I'm done queueing messages":  it was impossible to know
    whether any calls to the client's "queue a message" method were in
    progress (in other threads), so impossible to make the transition safely
    in all cases.  The client had to grow its own message_output() method,
    with a mutex protecting the transition from thread races.

    Changed zrpc-conn log messages to include "(S)" for server-side or
    "(C)" for client-side.  This is especially helpful for figuring out
    logs produced while running the test suite (the server and client
    log messages end up in the same file then).


=== Packages/ZEO/zrpc/client.py 1.25.16.6 => 1.25.16.7 ===
--- Packages/ZEO/zrpc/client.py:1.25.16.6	Mon Oct 18 14:00:23 2004
+++ Packages/ZEO/zrpc/client.py	Fri Feb  4 14:50:13 2005
@@ -26,7 +26,7 @@
 
 from ZEO.zrpc.log import log
 from ZEO.zrpc.trigger import trigger
-from ZEO.zrpc.connection import ManagedConnection
+from ZEO.zrpc.connection import ManagedClientConnection
 
 class ConnectionManager(object):
     """Keeps a connection up over time"""
@@ -475,8 +475,8 @@
         Call the client's testConnection(), giving the client a chance
         to do app-level check of the connection.
         """
-        self.conn = ManagedConnection(self.sock, self.addr,
-                                      self.client, self.mgr)
+        self.conn = ManagedClientConnection(self.sock, self.addr,
+                                            self.client, self.mgr)
         self.sock = None # The socket is now owned by the connection
         try:
             self.preferred = self.client.testConnection(self.conn)


=== Packages/ZEO/zrpc/connection.py 1.49.4.3 => 1.49.4.4 ===
--- Packages/ZEO/zrpc/connection.py:1.49.4.3	Mon Oct 18 14:00:23 2004
+++ Packages/ZEO/zrpc/connection.py	Fri Feb  4 14:50:13 2005
@@ -67,6 +67,64 @@
         self.ready.wait()
         Delay.error(self, exc_info)
 
+# PROTOCOL NEGOTIATION
+#
+# The code implementing protocol version 2.0.0 (which is deployed
+# in the field and cannot be changed) *only* talks to peers that
+# send a handshake indicating protocol version 2.0.0.  In that
+# version, both the client and the server immediately send out
+# their protocol handshake when a connection is established,
+# without waiting for their peer, and disconnect when a different
+# handshake is receive.
+#
+# The new protocol uses this to enable new clients to talk to
+# 2.0.0 servers.  In the new protocol:
+#
+#    The server sends its protocol handshake to the client at once.
+#
+#    The client waits until it receives the server's protocol handshake
+#    before sending its own handshake.  The client sends the lower of its
+#    own protocol version and the server protocol version, allowing it to
+#    talk to servers using later protocol versions (2.0.2 and higher) as
+#    well:  the effective protocol used will be the lower of the client
+#    and server protocol.
+#
+# [Ugly details:  In order to treat the first received message (protocol
+#  handshake) differently than all later messages, both client and server
+#  start by patching their message_input() method to refer to their
+#  recv_handshake() method instead.  In addition, the client has to arrange
+#  to queue (delay) outgoing messages until it receives the server's
+#  handshake, so that the first message the client sends to the server is
+#  the client's handshake.  This multiply-special treatment of the first
+#  message is delicate, and several asyncore and thread subtleties were
+#  handled unsafely before ZODB 3.2.6.
+# ]
+#
+# The ZEO modules ClientStorage and ServerStub have backwards
+# compatibility code for dealing with the previous version of the
+# protocol.  The client accepts the old version of some messages,
+# and will not send new messages when talking to an old server.
+#
+# As long as the client hasn't sent its handshake, it can't send
+# anything else; output messages are queued during this time.
+# (Output can happen because the connection testing machinery can
+# start sending requests before the handshake is received.)
+#
+# UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
+#
+# Because a new client can talk to an old server, but not vice
+# versa, all clients should be upgraded before upgrading any
+# servers.  Protocol upgrades beyond 2.0.1 will not have this
+# restriction, because clients using protocol 2.0.1 or later can
+# talk to both older and newer servers.
+#
+# No compatibility with protocol version 1 is provided.
+
+# Connection is abstract (it must be derived from).  ManagedServerConnection
+# and ManagedClientConnection are the concrete subclasses.  They need to
+# supply a handshake() method appropriate for their role in protocol
+# negotiation.
+
 class Connection(smac.SizedMessageAsyncConnection, object):
     """Dispatcher for RPC on object on both sides of socket.
 
@@ -136,17 +194,33 @@
     #             getExtensionMethods().
     #             getInvalidations().
 
-    def __init__(self, sock, addr, obj=None):
+    # Client constructor passes 'C' for tag, server constructor 'S'.  This
+    # is used in log messages.
+    def __init__(self, sock, addr, obj, tag):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = 0
-        self.msgid = 0
-        self.peer_protocol_version = None # Set in recv_handshake()
+        self.peer_protocol_version = None # set in recv_handshake()
+
+        assert tag in "CS"
+        tag = "zrpc-conn(%c):" % tag
         if isinstance(addr, types.TupleType):
-            self.log_label = "zrpc-conn:%s:%d" % addr
+            self.log_label = tag + "%s:%d" % addr
         else:
-            self.log_label = "zrpc-conn:%s" % addr
-        self.__super_init(sock, addr)
+            self.log_label = tag + str(addr)
+
+        # Supply our own socket map, so that we don't get registered with
+        # the asyncore socket map just yet.  The initial protocol messages
+        # are treated very specially, and we dare not get invoked by asyncore
+        # before that special-case setup is complete.  Some of that setup
+        # occurs near the end of this constructor, and the rest is done by
+        # a concrete subclass's handshake() method.  Unfortunately, because
+        # we ultimately derive from asyncore.dispatcher, it's not possible
+        # to invoke the superclass constructor without asyncore stuffing
+        # us into _some_ socket map.
+        ourmap = {}
+        self.__super_init(sock, addr, map=ourmap)
+
         # A Connection either uses asyncore directly or relies on an
         # asyncore mainloop running in a separate thread.  If
         # thr_async is true, then the mainloop is running in a
@@ -156,24 +230,43 @@
         self.thr_async = 0
         self.trigger = None
         self._prepare_async()
+
         # The singleton dict is used in synchronous mode when a method
         # needs to call into asyncore to try to force some I/O to occur.
         # The singleton dict is a socket map containing only this object.
         self._singleton = {self._fileno: self}
+
         # msgid_lock guards access to msgid
+        self.msgid = 0
         self.msgid_lock = threading.Lock()
+
         # replies_cond is used to block when a synchronous call is
         # waiting for a response
         self.replies_cond = threading.Condition()
         self.replies = {}
+
         # waiting_for_reply is used internally to indicate whether
         # a call is in progress.  setting a session key is deferred
         # until after the call returns.
         self.waiting_for_reply = 0
         self.delay_sesskey = None
         self.register_object(obj)
+
+        # The first message we see is a protocol handshake.  message_input()
+        # is temporarily replaced by recv_handshake() to treat that message
+        # specially.  revc_handshake() does "del self.message_input", which
+        # uncovers the normal message_input() method thereafter.
+        self.message_input = self.recv_handshake
+
+        # Server and client need to do different things for protocol
+        # negotiation, and handshake() is implemented differently in each.
         self.handshake()
 
+        # Now it's safe to register with asyncore's socket map; it was not
+        # safe before message_input was replaced, or before handshake() was
+        # invoked.
+        asyncore.socket_map.update(ourmap)
+
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.addr)
 
@@ -191,7 +284,7 @@
         self.__super_close()
 
     def close_trigger(self):
-        # Overridden by ManagedConnection
+        # Overridden by ManagedClientConnection.
         if self.trigger is not None:
             self.trigger.close()
 
@@ -199,24 +292,26 @@
         """Register obj as the true object to invoke methods on"""
         self.obj = obj
 
-    def handshake(self, proto=None):
-        # Overridden by ManagedConnection
-
-        # When a connection is created the first message sent is a
-        # 4-byte protocol version.  This mechanism should allow the
-        # protocol to evolve over time, and let servers handle clients
-        # using multiple versions of the protocol.
-
-        # The mechanism replaces the message_input() method for the
-        # first message received.
-
-        # The client sends the protocol version it is using.
-        self.message_input = self.recv_handshake
-        self.message_output(proto or self.protocol_version)
+    # Subclass must implement.  handshake() is called by the constructor,
+    # near its end, but before self is added to asyncore's socket map.
+    # When a connection is created the first message sent is a 4-byte
+    # protocol version.  This allows the protocol to evolve over time, and
+    # lets servers handle clients using multiple versions of the protocol.
+    # In general, the server's handshake() just needs to send the server's
+    # preferred protocol; the client's also needs to queue (delay) outgoing
+    # messages until it sees the handshake from the server.
+    def handshake(self):
+        raise NotImplementedError
 
+    # Replaces message_input() for the first message received.  Records the
+    # protocol sent by the peer in `peer_protocol_version`, restores the
+    # normal message_input() method, and raises an exception if the peer's
+    # protocol is unacceptable.  That's all the server needs to do.  The
+    # client needs to do additional work in response to the server's
+    # handshake, and extends this method.
     def recv_handshake(self, proto):
-        # Extended by ManagedConnection
-        del self.message_input
+        # Extended by ManagedClientConnection.
+        del self.message_input  # uncover normal-case message_input()
         self.peer_protocol_version = proto
         if self.oldest_protocol_version <= proto <= self.protocol_version:
             self.log("received handshake %r" % proto, level=zLOG.INFO)
@@ -225,7 +320,7 @@
             raise ZRPCError("bad handshake %r" % proto)
 
     def message_input(self, message):
-        """Decoding an incoming message and dispatch it"""
+        """Decode an incoming message and dispatch it"""
         # If something goes wrong during decoding, the marshaller
         # will raise an exception.  The exception will ultimately
         # result in asycnore calling handle_error(), which will
@@ -562,81 +657,85 @@
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
-        self.__super_init(sock, addr, obj)
+        self.__super_init(sock, addr, obj, 'S')
         self.obj.notifyConnected(self)
 
+    def handshake(self):
+        # Send the server's preferred protocol to the client.
+        self.message_output(self.protocol_version)
+
     def close(self):
         self.obj.notifyDisconnected()
         self.mgr.close_conn(self)
         self.__super_close()
 
-class ManagedConnection(Connection):
+class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""
     __super_init = Connection.__init__
     __super_close = Connection.close
+    base_message_output = Connection.message_output
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
-        self.__super_init(sock, addr, obj)
+
+        # We can't use the base smac's message_output directly because the
+        # client needs to queue outgoing messages until it's seen the
+        # initial protocol handshake from the server.  So we have our own
+        # message_ouput() method, and support for initial queueing.  This is
+        # a delicate design, requiring an output mutex to be wholly
+        # thread-safe.
+        # Caution:  we must set this up before calling the base class
+        # constructor, because the latter registers us with asyncore;
+        # we need to guarantee that we'll queue outgoing messages before
+        # asyncore learns about us.
+        self.output_lock = threading.Lock()
+        self.queue_output = True
+        self.queued_messages = []
+
+        self.__super_init(sock, addr, obj, tag='C')
         self.check_mgr_async()
 
-    # PROTOCOL NEGOTIATION:
-    #
-    # The code implementing protocol version 2.0.0 (which is deployed
-    # in the field and cannot be changed) *only* talks to peers that
-    # send a handshake indicating protocol version 2.0.0.  In that
-    # version, both the client and the server immediately send out
-    # their protocol handshake when a connection is established,
-    # without waiting for their peer, and disconnect when a different
-    # handshake is receive.
-    #
-    # The new protocol uses this to enable new clients to talk to
-    # 2.0.0 servers: in the new protocol, the client waits until it
-    # receives the server's protocol handshake before sending its own
-    # handshake.  The client sends the lower of its own protocol
-    # version and the server protocol version, allowing it to talk to
-    # servers using later protocol versions (2.0.2 and higher) as
-    # well: the effective protocol used will be the lower of the
-    # client and server protocol.
-    #
-    # The ZEO modules ClientStorage and ServerStub have backwards
-    # compatibility code for dealing with the previous version of the
-    # protocol.  The client accept the old version of some messages,
-    # and will not send new messages when talking to an old server.
-    #
-    # As long as the client hasn't sent its handshake, it can't send
-    # anything else; output messages are queued during this time.
-    # (Output can happen because the connection testing machinery can
-    # start sending requests before the handshake is received.)
-    #
-    # UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
-    #
-    # Because a new client can talk to an old server, but not vice
-    # versa, all clients should be upgraded before upgrading any
-    # servers.  Protocol upgrades beyond 2.0.1 will not have this
-    # restriction, because clients using protocol 2.0.1 or later can
-    # talk to both older and newer servers.
-    #
-    # No compatibility with protocol version 1 is provided.
+    # Our message_ouput() queues messages until recv_handshake() gets the
+    # protocol handshake from the server.
+    def message_output(self, message):
+        self.output_lock.acquire()
+        try:
+            if self.queue_output:
+                self.queued_messages.append(message)
+            else:
+                assert not self.queued_messages
+                self.base_message_output(message)
+        finally:
+            self.output_lock.release()
 
     def handshake(self):
-        self.message_input = self.recv_handshake
-        self.message_output = self.queue_output
-        self.output_queue = []
-        # The handshake is sent by recv_handshake() below
-
-    def queue_output(self, message):
-        self.output_queue.append(message)
+        # The client waits to see the server's handshake.  Outgoing messages
+        # aere queued for the duration.  The client will send its own
+        # handshake after the server's handshake is seen, in recv_handshake()
+        # below.  It will then send any messages queued while waiting.
+        assert self.queue_output # the constructor already set this
 
     def recv_handshake(self, proto):
-        del self.message_output
+        # The protocol to use is the older of our and the server's preferred
+        # protocols.
         proto = min(proto, self.protocol_version)
-        Connection.recv_handshake(self, proto) # Raise error if wrong proto
-        self.message_output(proto)
-        queue = self.output_queue
-        del self.output_queue
-        for message in queue:
-            self.message_output(message)
+
+        # Restore the normal message_input method, and raise an exception
+        # if the protocol version is too old.
+        Connection.recv_handshake(self, proto)
+
+        # Tell the server the protocol in use, then send any messages that
+        # were queued while waiting to hear the server's protocol, and stop
+        # queueing messages.
+        self.output_lock.acquire()
+        try:
+            self.base_message_output(proto)
+            for message in self.queued_messages:
+                self.base_message_output(message)
+            self.queued_messages = []
+            self.queue_output = False
+        finally:
+            self.output_lock.release()
 
     # Defer the ThreadedAsync work to the manager.
 



More information about the Zope-Checkins mailing list