[Zodb-checkins] SVN: ZODB/branches/3.3/ Port from ZODB 3.2.

Tim Peters tim.one at comcast.net
Fri Feb 4 19:36:05 EST 2005


Log message for revision 29052:
  Port from ZODB 3.2.
  
  Fixed several thread and asyncore races in ZEO's connection dance.
  
  ZEO/tests/ConnectionTests.py
      The pollUp() and pollDown() methods were pure busy loops whenever
      the asyncore socket map was empty, and at least on some flavors of
      Linux that starved the other thread(s) trying to do real work.
      This grossly increased the time needed to run tests using these, and
      sometimes caused bogus "timed out" test failures.
  
  ZEO/zrpc/client.py
  ZEO/zrpc/connection.py
      Renamed class ManagedConnection to ManagedClientConnection, for clarity.
  
      Moved the comment block about protocol negotiation from the guts of
      ManagedClientConnection to before the Connection base class -- the
      Connection constructor can't be understood without this context.  Added
      more words about the delicate protocol negotiation dance.
  
      Connection class:  made this an abstract base clase.  Derived classes
      _must_ implement the handshake() method.  There was really nothing in
      common between server and client wrt what handshake() needs to do, and
      it was confusing for one of them to use the base class handshake() while
      the other replaced handshake() completely.
  
      Connection.__init__:  It isn't safe to register with asyncore's socket
      map before special-casing for the first (protocol handshake) message is
      set up.  Repaired that.  Also removed the pointless "optionalness" of
      the optional arguments.
  
      ManagedClientConnection.__init__:  Added machinery to set up correct
      (thread-safe) message queueing.  There was an unrepairable hole before,
      in the transition between "I'm queueing msgs waiting for the server
      handshake" and "I'm done queueing messages":  it was impossible to know
      whether any calls to the client's "queue a message" method were in
      progress (in other threads), so impossible to make the transition safely
      in all cases.  The client had to grow its own message_output() method,
      with a mutex protecting the transition from thread races.
  
      Changed zrpc-conn log messages to include "(S)" for server-side or
      "(C)" for client-side.  This is especially helpful for figuring out
      logs produced while running the test suite (the server and client
      log messages end up in the same file then).
  

Changed:
  U   ZODB/branches/3.3/NEWS.txt
  U   ZODB/branches/3.3/src/ZEO/tests/ConnectionTests.py
  U   ZODB/branches/3.3/src/ZEO/zrpc/client.py
  U   ZODB/branches/3.3/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/3.3/NEWS.txt
===================================================================
--- ZODB/branches/3.3/NEWS.txt	2005-02-04 23:50:26 UTC (rev 29051)
+++ ZODB/branches/3.3/NEWS.txt	2005-02-05 00:36:05 UTC (rev 29052)
@@ -1,3 +1,34 @@
+What's new in ZODB3 3.3.1a2?
+============================
+Release date: DD-MMM-2005
+
+ZEO
+---
+
+Repaired subtle race conditions in establishing ZEO connections, both client-
+and server-side.  These account for intermittent cases where ZEO failed
+to make a connection (or reconnection), accompanied by a log message showing
+an error caught in ``asyncore`` and having a traceback ending with:
+
+    ``UnpicklingError: invalid load key, 'Z'.``
+
+or:
+
+    ``ZRPCError: bad handshake '(K\x00K\x00U\x0fgetAuthProtocol)t.'``
+
+or:
+
+    ``error: (9, 'Bad file descriptor')``
+
+or an ``AttributeError``.
+
+These were exacerbated when running the test suite, because of an unintended
+busy loop in the test scaffolding, which could starve the thread trying to
+make a connection.  The ZEO reconnection tests may run much faster now,
+depending on platform, and should suffer far fewer (if any) intermittent
+"timed out waiting for storage to connect" failures.
+
+
 What's new in ZODB3 3.3.1a1?
 ============================
 Release date: 11-Jan-2005

Modified: ZODB/branches/3.3/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/branches/3.3/src/ZEO/tests/ConnectionTests.py	2005-02-04 23:50:26 UTC (rev 29051)
+++ ZODB/branches/3.3/src/ZEO/tests/ConnectionTests.py	2005-02-05 00:36:05 UTC (rev 29052)
@@ -216,7 +216,7 @@
     def pollUp(self, timeout=30.0, storage=None):
         if storage is None:
             storage = self._storage
-        # Poll until we're connected
+        # Poll until we're connected.
         now = time.time()
         giveup = now + timeout
         while not storage.is_connected():
@@ -224,9 +224,15 @@
             now = time.time()
             if now > giveup:
                 self.fail("timed out waiting for storage to connect")
+            # When the socket map is empty, poll() returns immediately,
+            # and this is a pure busy-loop then.  At least on some Linux
+            # flavors, that can starve the thread trying to connect,
+            # leading to grossly increased runtime (typical) or bogus
+            # "timed out" failures.  A little sleep here cures both.
+            time.sleep(0.1)
 
     def pollDown(self, timeout=30.0):
-        # Poll until we're disconnected
+        # Poll until we're disconnected.
         now = time.time()
         giveup = now + timeout
         while self._storage.is_connected():
@@ -234,6 +240,8 @@
             now = time.time()
             if now > giveup:
                 self.fail("timed out waiting for storage to disconnect")
+            # See pollUp() for why we sleep a little here.
+            time.sleep(0.1)
 
 
 class ConnectionTests(CommonSetupTearDown):

Modified: ZODB/branches/3.3/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/3.3/src/ZEO/zrpc/client.py	2005-02-04 23:50:26 UTC (rev 29051)
+++ ZODB/branches/3.3/src/ZEO/zrpc/client.py	2005-02-05 00:36:05 UTC (rev 29052)
@@ -27,7 +27,7 @@
 
 from ZEO.zrpc.log import log
 from ZEO.zrpc.trigger import trigger
-from ZEO.zrpc.connection import ManagedConnection
+from ZEO.zrpc.connection import ManagedClientConnection
 
 class ConnectionManager(object):
     """Keeps a connection up over time"""
@@ -476,8 +476,8 @@
         Call the client's testConnection(), giving the client a chance
         to do app-level check of the connection.
         """
-        self.conn = ManagedConnection(self.sock, self.addr,
-                                      self.client, self.mgr)
+        self.conn = ManagedClientConnection(self.sock, self.addr,
+                                            self.client, self.mgr)
         self.sock = None # The socket is now owned by the connection
         try:
             self.preferred = self.client.testConnection(self.conn)

Modified: ZODB/branches/3.3/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/3.3/src/ZEO/zrpc/connection.py	2005-02-04 23:50:26 UTC (rev 29051)
+++ ZODB/branches/3.3/src/ZEO/zrpc/connection.py	2005-02-05 00:36:05 UTC (rev 29052)
@@ -67,6 +67,64 @@
         self.ready.wait()
         Delay.error(self, exc_info)
 
+# PROTOCOL NEGOTIATION
+#
+# The code implementing protocol version 2.0.0 (which is deployed
+# in the field and cannot be changed) *only* talks to peers that
+# send a handshake indicating protocol version 2.0.0.  In that
+# version, both the client and the server immediately send out
+# their protocol handshake when a connection is established,
+# without waiting for their peer, and disconnect when a different
+# handshake is receive.
+#
+# The new protocol uses this to enable new clients to talk to
+# 2.0.0 servers.  In the new protocol:
+#
+#    The server sends its protocol handshake to the client at once.
+#
+#    The client waits until it receives the server's protocol handshake
+#    before sending its own handshake.  The client sends the lower of its
+#    own protocol version and the server protocol version, allowing it to
+#    talk to servers using later protocol versions (2.0.2 and higher) as
+#    well:  the effective protocol used will be the lower of the client
+#    and server protocol.
+#
+# [Ugly details:  In order to treat the first received message (protocol
+#  handshake) differently than all later messages, both client and server
+#  start by patching their message_input() method to refer to their
+#  recv_handshake() method instead.  In addition, the client has to arrange
+#  to queue (delay) outgoing messages until it receives the server's
+#  handshake, so that the first message the client sends to the server is
+#  the client's handshake.  This multiply-special treatment of the first
+#  message is delicate, and several asyncore and thread subtleties were
+#  handled unsafely before ZODB 3.2.6.
+# ]
+#
+# The ZEO modules ClientStorage and ServerStub have backwards
+# compatibility code for dealing with the previous version of the
+# protocol.  The client accepts the old version of some messages,
+# and will not send new messages when talking to an old server.
+#
+# As long as the client hasn't sent its handshake, it can't send
+# anything else; output messages are queued during this time.
+# (Output can happen because the connection testing machinery can
+# start sending requests before the handshake is received.)
+#
+# UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
+#
+# Because a new client can talk to an old server, but not vice
+# versa, all clients should be upgraded before upgrading any
+# servers.  Protocol upgrades beyond 2.0.1 will not have this
+# restriction, because clients using protocol 2.0.1 or later can
+# talk to both older and newer servers.
+#
+# No compatibility with protocol version 1 is provided.
+
+# Connection is abstract (it must be derived from).  ManagedServerConnection
+# and ManagedClientConnection are the concrete subclasses.  They need to
+# supply a handshake() method appropriate for their role in protocol
+# negotiation.
+
 class Connection(smac.SizedMessageAsyncConnection, object):
     """Dispatcher for RPC on object on both sides of socket.
 
@@ -136,18 +194,33 @@
     #             getExtensionMethods().
     #             getInvalidations().
 
-    def __init__(self, sock, addr, obj=None):
+    # Client constructor passes 'C' for tag, server constructor 'S'.  This
+    # is used in log messages.
+    def __init__(self, sock, addr, obj, tag):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = False
-        self.msgid = 0
-        self.peer_protocol_version = None # Set in recv_handshake()
-        self.logger = logging.getLogger('ZEO.zrpc.Connection')
+        self.peer_protocol_version = None # set in recv_handshake()
+
+        assert tag in "CS"
+        self.logger = logging.getLogger('ZEO.zrpc.Connection(%c)' % tag)
         if isinstance(addr, types.TupleType):
             self.log_label = "(%s:%d) " % addr
         else:
             self.log_label = "(%s) " % addr
-        self.__super_init(sock, addr)
+
+        # Supply our own socket map, so that we don't get registered with
+        # the asyncore socket map just yet.  The initial protocol messages
+        # are treated very specially, and we dare not get invoked by asyncore
+        # before that special-case setup is complete.  Some of that setup
+        # occurs near the end of this constructor, and the rest is done by
+        # a concrete subclass's handshake() method.  Unfortunately, because
+        # we ultimately derive from asyncore.dispatcher, it's not possible
+        # to invoke the superclass constructor without asyncore stuffing
+        # us into _some_ socket map.
+        ourmap = {}
+        self.__super_init(sock, addr, map=ourmap)
+
         # A Connection either uses asyncore directly or relies on an
         # asyncore mainloop running in a separate thread.  If
         # thr_async is true, then the mainloop is running in a
@@ -157,24 +230,43 @@
         self.thr_async = False
         self.trigger = None
         self._prepare_async()
+
         # The singleton dict is used in synchronous mode when a method
         # needs to call into asyncore to try to force some I/O to occur.
         # The singleton dict is a socket map containing only this object.
         self._singleton = {self._fileno: self}
+
         # msgid_lock guards access to msgid
+        self.msgid = 0
         self.msgid_lock = threading.Lock()
+
         # replies_cond is used to block when a synchronous call is
         # waiting for a response
         self.replies_cond = threading.Condition()
         self.replies = {}
+
         # waiting_for_reply is used internally to indicate whether
         # a call is in progress.  setting a session key is deferred
         # until after the call returns.
         self.waiting_for_reply = False
         self.delay_sesskey = None
         self.register_object(obj)
+
+        # The first message we see is a protocol handshake.  message_input()
+        # is temporarily replaced by recv_handshake() to treat that message
+        # specially.  revc_handshake() does "del self.message_input", which
+        # uncovers the normal message_input() method thereafter.
+        self.message_input = self.recv_handshake
+
+        # Server and client need to do different things for protocol
+        # negotiation, and handshake() is implemented differently in each.
         self.handshake()
 
+        # Now it's safe to register with asyncore's socket map; it was not
+        # safe before message_input was replaced, or before handshake() was
+        # invoked.
+        asyncore.socket_map.update(ourmap)
+
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.addr)
 
@@ -192,7 +284,7 @@
         self.__super_close()
 
     def close_trigger(self):
-        # Overridden by ManagedConnection
+        # Overridden by ManagedClientConnection.
         if self.trigger is not None:
             self.trigger.close()
 
@@ -200,24 +292,26 @@
         """Register obj as the true object to invoke methods on."""
         self.obj = obj
 
-    def handshake(self, proto=None):
-        # Overridden by ManagedConnection
+    # Subclass must implement.  handshake() is called by the constructor,
+    # near its end, but before self is added to asyncore's socket map.
+    # When a connection is created the first message sent is a 4-byte
+    # protocol version.  This allows the protocol to evolve over time, and
+    # lets servers handle clients using multiple versions of the protocol.
+    # In general, the server's handshake() just needs to send the server's
+    # preferred protocol; the client's also needs to queue (delay) outgoing
+    # messages until it sees the handshake from the server.
+    def handshake(self):
+        raise NotImplementedError
 
-        # When a connection is created the first message sent is a
-        # 4-byte protocol version.  This mechanism should allow the
-        # protocol to evolve over time, and let servers handle clients
-        # using multiple versions of the protocol.
-
-        # The mechanism replaces the message_input() method for the
-        # first message received.
-
-        # The client sends the protocol version it is using.
-        self.message_input = self.recv_handshake
-        self.message_output(proto or self.protocol_version)
-
+    # Replaces message_input() for the first message received.  Records the
+    # protocol sent by the peer in `peer_protocol_version`, restores the
+    # normal message_input() method, and raises an exception if the peer's
+    # protocol is unacceptable.  That's all the server needs to do.  The
+    # client needs to do additional work in response to the server's
+    # handshake, and extends this method.
     def recv_handshake(self, proto):
-        # Extended by ManagedConnection
-        del self.message_input
+        # Extended by ManagedClientConnection.
+        del self.message_input  # uncover normal-case message_input()
         self.peer_protocol_version = proto
         if self.oldest_protocol_version <= proto <= self.protocol_version:
             self.log("received handshake %r" % proto, level=logging.INFO)
@@ -227,7 +321,7 @@
             raise ZRPCError("bad handshake %r" % proto)
 
     def message_input(self, message):
-        """Decoding an incoming message and dispatch it"""
+        """Decode an incoming message and dispatch it"""
         # If something goes wrong during decoding, the marshaller
         # will raise an exception.  The exception will ultimately
         # result in asycnore calling handle_error(), which will
@@ -563,82 +657,86 @@
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
-        self.__super_init(sock, addr, obj)
+        self.__super_init(sock, addr, obj, 'S')
         self.obj.notifyConnected(self)
 
+    def handshake(self):
+        # Send the server's preferred protocol to the client.
+        self.message_output(self.protocol_version)
+
     def close(self):
         self.obj.notifyDisconnected()
         self.mgr.close_conn(self)
         self.__super_close()
 
-class ManagedConnection(Connection):
+class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""
     __super_init = Connection.__init__
     __super_close = Connection.close
+    base_message_output = Connection.message_output
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
-        self.__super_init(sock, addr, obj)
+
+        # We can't use the base smac's message_output directly because the
+        # client needs to queue outgoing messages until it's seen the
+        # initial protocol handshake from the server.  So we have our own
+        # message_ouput() method, and support for initial queueing.  This is
+        # a delicate design, requiring an output mutex to be wholly
+        # thread-safe.
+        # Caution:  we must set this up before calling the base class
+        # constructor, because the latter registers us with asyncore;
+        # we need to guarantee that we'll queue outgoing messages before
+        # asyncore learns about us.
+        self.output_lock = threading.Lock()
+        self.queue_output = True
+        self.queued_messages = []
+
+        self.__super_init(sock, addr, obj, tag='C')
         self.check_mgr_async()
 
-    # PROTOCOL NEGOTIATION:
-    #
-    # The code implementing protocol version 2.0.0 (which is deployed
-    # in the field and cannot be changed) *only* talks to peers that
-    # send a handshake indicating protocol version 2.0.0.  In that
-    # version, both the client and the server immediately send out
-    # their protocol handshake when a connection is established,
-    # without waiting for their peer, and disconnect when a different
-    # handshake is receive.
-    #
-    # The new protocol uses this to enable new clients to talk to
-    # 2.0.0 servers: in the new protocol, the client waits until it
-    # receives the server's protocol handshake before sending its own
-    # handshake.  The client sends the lower of its own protocol
-    # version and the server protocol version, allowing it to talk to
-    # servers using later protocol versions (2.0.2 and higher) as
-    # well: the effective protocol used will be the lower of the
-    # client and server protocol.
-    #
-    # The ZEO modules ClientStorage and ServerStub have backwards
-    # compatibility code for dealing with the previous version of the
-    # protocol.  The client accept the old version of some messages,
-    # and will not send new messages when talking to an old server.
-    #
-    # As long as the client hasn't sent its handshake, it can't send
-    # anything else; output messages are queued during this time.
-    # (Output can happen because the connection testing machinery can
-    # start sending requests before the handshake is received.)
-    #
-    # UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
-    #
-    # Because a new client can talk to an old server, but not vice
-    # versa, all clients should be upgraded before upgrading any
-    # servers.  Protocol upgrades beyond 2.0.1 will not have this
-    # restriction, because clients using protocol 2.0.1 or later can
-    # talk to both older and newer servers.
-    #
-    # No compatibility with protocol version 1 is provided.
+    # Our message_ouput() queues messages until recv_handshake() gets the
+    # protocol handshake from the server.
+    def message_output(self, message):
+        self.output_lock.acquire()
+        try:
+            if self.queue_output:
+                self.queued_messages.append(message)
+            else:
+                assert not self.queued_messages
+                self.base_message_output(message)
+        finally:
+            self.output_lock.release()
 
     def handshake(self):
-        self.message_input = self.recv_handshake
-        self.message_output = self.queue_output
-        self.output_queue = []
-        # The handshake is sent by recv_handshake() below
+        # The client waits to see the server's handshake.  Outgoing messages
+        # are queued for the duration.  The client will send its own
+        # handshake after the server's handshake is seen, in recv_handshake()
+        # below.  It will then send any messages queued while waiting.
+        assert self.queue_output # the constructor already set this
 
-    def queue_output(self, message):
-        self.output_queue.append(message)
-
     def recv_handshake(self, proto):
-        del self.message_output
+        # The protocol to use is the older of our and the server's preferred
+        # protocols.
         proto = min(proto, self.protocol_version)
-        Connection.recv_handshake(self, proto) # Raise error if wrong proto
-        self.message_output(proto)
-        queue = self.output_queue
-        del self.output_queue
-        for message in queue:
-            self.message_output(message)
 
+        # Restore the normal message_input method, and raise an exception
+        # if the protocol version is too old.
+        Connection.recv_handshake(self, proto)
+
+        # Tell the server the protocol in use, then send any messages that
+        # were queued while waiting to hear the server's protocol, and stop
+        # queueing messages.
+        self.output_lock.acquire()
+        try:
+            self.base_message_output(proto)
+            for message in self.queued_messages:
+                self.base_message_output(message)
+            self.queued_messages = []
+            self.queue_output = False
+        finally:
+            self.output_lock.release()
+
     # Defer the ThreadedAsync work to the manager.
 
     def close_trigger(self):



More information about the Zodb-checkins mailing list