[Zodb-checkins] SVN: ZODB/trunk/ Removed the "sync" mode for ClientStorage. Previously, a

Jim Fulton jim at zope.com
Tue Jul 18 14:15:58 EDT 2006


Log message for revision 69179:
  Removed the "sync" mode for ClientStorage.  Previously, a
  ClientStorage could be in either "sync" mode or "async" mode.  Now
  there is just "async" mode.  There is now a dedicicated asyncore main
  loop dedicated to ZEO clients.
  
  This addresses a test failure on Mac OS X,
  http://www.zope.org/Collectors/Zope3-dev/650, that I believe was due
  to a bug in sync mode. Some asyncore-based code was being called from
  multiple threads that didn't expect to be.
  
  Converting to always-async mode revealed some bugs that weren't caught
  before because the tests ran in sync mode.  These problems could
  explain some problems we've seen at times with clients taking a long
  time to reconnect after a disconnect.
  
  Added a partial heart beat to try to detect lost connections that
  aren't otherwise caught,
  http://mail.zope.org/pipermail/zodb-dev/2005-June/008951.html, by
  perioidically writing to all connections during periods of inactivity.
  

Changed:
  U   ZODB/trunk/NEWS.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/tests/CommitLockTests.py
  U   ZODB/trunk/src/ZEO/tests/ConnectionTests.py
  U   ZODB/trunk/src/ZEO/tests/testAuth.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/zrpc/client.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py
  U   ZODB/trunk/src/ZEO/zrpc/trigger.py

-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/NEWS.txt	2006-07-18 18:15:57 UTC (rev 69179)
@@ -1,13 +1,35 @@
-What's new in ZODB3 3.7a1?
-==========================
-Release date: DD-MMM-200Y
+What's new on ZODB 3.7b2?
+=========================
 
+ClientStorage
+-------------
 
-Following is combined news from internal releases (to support ongoing
-Zope development).  These are the dates of the internal releases:
+- (3.7b2) Removed the "sync" mode for ClientStorage.  
 
-- 3.7a1 DD-MMM-200Y
+  Previously, a ClientStorage could be in either "sync" mode or "async"
+  mode.  Now there is just "async" mode.  There is now a dedicicated
+  asyncore main loop dedicated to ZEO clients.
 
+  Applications no-longer need to run an asyncore main loop to cause
+  client storages to run in async mode.  Even if an application runs an
+  asyncore main loop, it is independent of the loop used by client
+  storages. 
+
+  This addresses a test failure on Mac OS X,
+  http://www.zope.org/Collectors/Zope3-dev/650, that I believe was due
+  to a bug in sync mode. Some asyncore-based code was being called from
+  multiple threads that didn't expect to be.
+
+  Converting to always-async mode revealed some bugs that weren't caught
+  before because the tests ran in sync mode.  These problems could
+  explain some problems we've seen at times with clients taking a long
+  time to reconnect after a disconnect.
+
+  Added a partial heart beat to try to detect lost connections that
+  aren't otherwise caught,
+  http://mail.zope.org/pipermail/zodb-dev/2005-June/008951.html, by
+  perioidically writing to all connections during periods of inactivity.
+
 Connection management
 ---------------------
 
@@ -34,6 +56,14 @@
 - (3.7a1) The documentation for ``_p_oid`` now specifies the concrete
   type of oids (in short, an oid is either None or a non-empty string).
 
+Testing
+-------
+
+- (3.7b2) Fixed test-runner output truncation.
+
+  A bug was fixed in the test runner that caused result summaries to be
+  omitted when running on Windows.
+
 Tools
 -----
 

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -339,43 +339,16 @@
         # still be going on.  This code must wait until validation
         # finishes, but if the connection isn't a zrpc async
         # connection it also needs to poll for input.
-        if self._connection.is_async():
-            while 1:
-                self._ready.wait(30)
-                if self._ready.isSet():
-                    break
-                if timeout and time.time() > deadline:
-                    log2("Timed out waiting for connection",
-                         level=logging.WARNING)
-                    break
-                log2("Waiting for cache verification to finish")
-        else:
-            self._wait_sync(deadline)
-
-    def _wait_sync(self, deadline=None):
-        # Log no more than one "waiting" message per LOG_THROTTLE seconds.
-        LOG_THROTTLE = 300 # 5 minutes
-        next_log_time = time.time()
-
-        while not self._ready.isSet():
-            now = time.time()
-            if deadline and now > deadline:
-                log2("Timed out waiting for connection", level=logging.WARNING)
+        assert self._connection.is_async()
+        while 1:
+            self._ready.wait(30)
+            if self._ready.isSet():
                 break
-            if now >= next_log_time:
-                log2("Waiting for cache verification to finish")
-                next_log_time = now + LOG_THROTTLE
-            if self._connection is None:
-                # If the connection was closed while we were
-                # waiting for it to become ready, start over.
-                if deadline is None:
-                    timeout = None
-                else:
-                    timeout = deadline - now
-                return self._wait(timeout)
-            # No mainloop ia running, so we need to call something fancy to
-            # handle asyncore events.
-            self._connection.pending(30)
+            if timeout and time.time() > deadline:
+                log2("Timed out waiting for connection",
+                     level=logging.WARNING)
+                break
+            log2("Waiting for cache verification to finish")
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -403,18 +376,9 @@
         return self._ready.isSet()
 
     def sync(self):
-        """Handle any pending invalidation messages.
+        # The separate async thread should keep us up to date
+        pass
 
-        This is called by the sync method in ZODB.Connection.
-        """
-        # If there is no connection, return immediately.  Technically,
-        # there are no pending invalidations so they are all handled.
-        # There doesn't seem to be much benefit to raising an exception.
-
-        cn = self._connection
-        if cn is not None:
-            cn.pending()
-
     def doAuth(self, protocol, stub):
         if not (self._username and self._password):
             raise AuthError("empty username or password")
@@ -517,11 +481,17 @@
 
         stub = self.StorageServerStubClass(conn)
         self._oids = []
-        self._info.update(stub.get_info())
         self.verify_cache(stub)
-        if not conn.is_async():
-            log2("Waiting for cache verification to finish")
-            self._wait_sync()
+
+        # It's important to call get_info after calling verify_cache.
+        # If we end up doing a full-verification, we need to wait till
+        # it's done.  By doing a synchonous call, we are guarenteed
+        # that the verification will be done because operations are
+        # handled in order.        
+        self._info.update(stub.get_info())
+
+        assert conn.is_async()
+
         self._handle_extensions()
 
     def _handle_extensions(self):

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -13,6 +13,8 @@
 ##############################################################################
 """RPC stubs for interface exported by StorageServer."""
 
+import time
+
 ##
 # ZEO storage server.
 # <p>
@@ -44,9 +46,11 @@
         zrpc.connection.Connection class.
         """
         self.rpc = rpc
+        
         # Wait until we know what version the other side is using.
         while rpc.peer_protocol_version is None:
-            rpc.pending()
+            time.sleep(0.1)
+
         if rpc.peer_protocol_version == 'Z200':
             self.lastTransaction = lambda: None
             self.getInvalidations = lambda tid: None

Modified: ZODB/trunk/src/ZEO/tests/CommitLockTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/CommitLockTests.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/CommitLockTests.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -35,10 +35,9 @@
     # run the entire test in a thread so that the blocking call for
     # tpc_vote() doesn't hang the test suite.
 
-    def __init__(self, storage, trans, method="tpc_finish"):
+    def __init__(self, storage, trans):
         self.storage = storage
         self.trans = trans
-        self.method = method
         self.ready = threading.Event()
         TestThread.__init__(self)
 
@@ -52,10 +51,7 @@
             p = zodb_pickle(MinPO("c"))
             self.storage.store(oid, ZERO, p, '', self.trans)
             self.myvote()
-            if self.method == "tpc_finish":
-                self.storage.tpc_finish(self.trans)
-            else:
-                self.storage.tpc_abort(self.trans)
+            self.storage.tpc_finish(self.trans)
         except ClientDisconnected:
             pass
 
@@ -120,7 +116,7 @@
             t.start()
             t.ready.wait()
 
-            # Close on the connections abnormally to test server response
+            # Close one of the connections abnormally to test server response
             if i == 0:
                 storage.close()
             else:
@@ -237,7 +233,6 @@
         trans_id = self._get_trans_id()
         oid, txn = self._start_txn()
         msgid = self._begin_undo(trans_id, txn)
-
         self._begin_threads()
 
         self._finish_undo(msgid)

Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -55,6 +55,12 @@
 
     StorageServerStubClass = TestServerStub
 
+    connection_count_for_tests = 0
+
+    def notifyConnected(self, conn):
+        ClientStorage.notifyConnected(self, conn)
+        self.connection_count_for_tests += 1
+
     def verify_cache(self, stub):
         self.end_verify = threading.Event()
         self.verify_result = ClientStorage.verify_cache(self, stub)
@@ -959,40 +965,39 @@
         storage.close()
 
     def checkTimeoutAfterVote(self):
-        raises = self.assertRaises
-        unless = self.failUnless
         self._storage = storage = self.openClientStorage()
         # Assert that the zeo cache is empty
-        unless(not list(storage._cache.contents()))
+        self.assert_(not list(storage._cache.contents()))
         # Create the object
         oid = storage.new_oid()
         obj = MinPO(7)
         # Now do a store, sleeping before the finish so as to cause a timeout
         t = Transaction()
+        old_connection_count = storage.connection_count_for_tests
         storage.tpc_begin(t)
         revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
         storage.tpc_vote(t)
         # Now sleep long enough for the storage to time out
         time.sleep(3)
-        storage.sync()
-        unless(not storage.is_connected())
+        self.assert_(
+            (not storage.is_connected())
+            or
+            (storage.connection_count_for_tests > old_connection_count)
+            )
         storage._wait()
-        unless(storage.is_connected())
+        self.assert_(storage.is_connected())
         # We expect finish to fail
-        raises(ClientDisconnected, storage.tpc_finish, t)
+        self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
         # The cache should still be empty
-        unless(not list(storage._cache.contents()))
+        self.assert_(not list(storage._cache.contents()))
         # Load should fail since the object should not be in either the cache
         # or the server.
-        raises(KeyError, storage.load, oid, '')
+        self.assertRaises(KeyError, storage.load, oid, '')
 
     def checkTimeoutProvokingConflicts(self):
-        eq = self.assertEqual
-        raises = self.assertRaises
-        require = self.assert_
         self._storage = storage = self.openClientStorage()
         # Assert that the zeo cache is empty.
-        require(not list(storage._cache.contents()))
+        self.assert_(not list(storage._cache.contents()))
         # Create the object
         oid = storage.new_oid()
         obj = MinPO(7)
@@ -1007,6 +1012,7 @@
         # Now do a store, sleeping before the finish so as to cause a timeout.
         obj.value = 8
         t = Transaction()
+        old_connection_count = storage.connection_count_for_tests
         storage.tpc_begin(t)
         revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
         revid2b = storage.tpc_vote(t)
@@ -1020,17 +1026,21 @@
         # of 3).
         deadline = time.time() + 60 # wait up to a minute
         while time.time() < deadline:
-            if storage.is_connected():
+            if (storage.is_connected() and
+                (storage.connection_count_for_tests == old_connection_count)
+                ):
                 time.sleep(self.timeout / 1.8)
-                storage.sync()
             else:
                 break
-        storage.sync()
-        require(not storage.is_connected())
+        self.assert_(
+            (not storage.is_connected())
+            or
+            (storage.connection_count_for_tests > old_connection_count)
+            )
         storage._wait()
-        require(storage.is_connected())
+        self.assert_(storage.is_connected())
         # We expect finish to fail.
-        raises(ClientDisconnected, storage.tpc_finish, t)
+        self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
         # Now we think we've committed the second transaction, but we really
         # haven't.  A third one should produce a POSKeyError on the server,
         # which manifests as a ConflictError on the client.
@@ -1038,7 +1048,7 @@
         t = Transaction()
         storage.tpc_begin(t)
         storage.store(oid, revid2, zodb_pickle(obj), '', t)
-        raises(ConflictError, storage.tpc_vote, t)
+        self.assertRaises(ConflictError, storage.tpc_vote, t)
         # Even aborting won't help.
         storage.tpc_abort(t)
         storage.tpc_finish(t)
@@ -1048,7 +1058,7 @@
         storage.tpc_begin(t)
         storage.store(oid, revid2, zodb_pickle(obj), '', t)
         # Even aborting won't help.
-        raises(ConflictError, storage.tpc_vote, t)
+        self.assertRaises(ConflictError, storage.tpc_vote, t)
         # Abort this one and try a transaction that should succeed.
         storage.tpc_abort(t)
         storage.tpc_finish(t)
@@ -1062,8 +1072,8 @@
         storage.tpc_finish(t)
         # Now load the object and verify that it has a value of 11.
         data, revid = storage.load(oid, '')
-        eq(zodb_unpickle(data), MinPO(11))
-        eq(revid, revid2)
+        self.assertEqual(zodb_unpickle(data), MinPO(11))
+        self.assertEqual(revid, revid2)
 
 class MSTThread(threading.Thread):
 

Modified: ZODB/trunk/src/ZEO/tests/testAuth.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testAuth.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/testAuth.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -95,10 +95,11 @@
     def testUnauthenticatedMessage(self):
         # Test that an unauthenticated message is rejected by the server
         # if it was sent after the connection was authenticated.
+
+        self._storage = self.openClientStorage(wait=0, username="foo",
+                                              password="bar", realm=self.realm)
         # Sleep for 0.2 seconds to give the server some time to start up
         # seems to be needed before and after creating the storage
-        self._storage = self.openClientStorage(wait=0, username="foo",
-                                              password="bar", realm=self.realm)
         self.wait()
         self._storage.versions()
         # Manually clear the state of the hmac connection

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -14,13 +14,15 @@
 """Test suite for ZEO based on ZODB.tests."""
 
 # System imports
+import asyncore
+import logging
 import os
 import random
+import signal
 import socket
-import asyncore
 import tempfile
+import time
 import unittest
-import logging
 
 # ZODB test support
 import ZODB
@@ -36,8 +38,13 @@
 from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
 
 from ZEO.ClientStorage import ClientStorage
+
+import ZEO.zrpc.connection
+
 from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
 
+import ZEO.tests.ConnectionTests
+
 logger = logging.getLogger('ZEO.tests.testZEO')
 
 class DummyDB:
@@ -70,14 +77,19 @@
             self.assertEqual(zodb_unpickle(data), MinPO('first'))
             self.assertEqual(serial, revid1)
             revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
-            for n in range(3):
-                # Let the server and client talk for a moment.
-                # Is there a better way to do this?
-                asyncore.poll(0.1)
-            data, serial = storage2.load(oid, '')
-            self.assertEqual(zodb_unpickle(data), MinPO('second'),
-                             'Invalidation message was not sent!')
-            self.assertEqual(serial, revid2)
+
+            # Now, storage 2 should eventually get the new data. It
+            # will take some time, although hopefully not much.
+            # We'll poll till we get it and whine if we time out:
+            for n in range(30):
+                time.sleep(.1)
+                data, serial = storage2.load(oid, '')
+                if (serial == revid2 and
+                    zodb_unpickle(data) == MinPO('second')
+                    ):
+                    break
+            else:
+                raise AssertionError('Invalidation message was not sent!')
         finally:
             storage2.close()
 
@@ -198,6 +210,67 @@
     def getConfig(self):
         return """<mappingstorage 1/>"""
 
+
+class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
+    """Make sure a heartbeat is being sent and that it does no harm
+
+    This is really hard to test properly because we can't see the data
+    flow between the client and server and we can't really tell what's
+    going on in the server very well. :(
+
+    """
+
+    def setUp(self):
+        # Crank down the select frequency
+        self.__old_client_timeout = ZEO.zrpc.connection.client_timeout
+        ZEO.zrpc.connection.client_timeout = 0.1
+        ZEO.zrpc.connection.client_trigger.pull_trigger()
+        ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self)
+
+    def tearDown(self):
+        ZEO.zrpc.connection.client_timeout = self.__old_client_timeout
+        ZEO.zrpc.connection.client_trigger.pull_trigger()
+        ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self)
+
+    def getConfig(self, path, create, read_only):
+        return """<mappingstorage 1/>"""
+
+    def checkHeartbeatWithServerClose(self):
+        # This is a minimal test that mainly tests that the heartbeat
+        # function does no harm.
+        client_timeout_count = ZEO.zrpc.connection.client_timeout_count
+        self._storage = self.openClientStorage()
+        time.sleep(1) # allow some time for the select loop to fire a few times
+        self.assert_(ZEO.zrpc.connection.client_timeout_count
+                     > client_timeout_count)
+        self._dostore()
+
+        if hasattr(os, 'kill'):
+            # Kill server violently, in hopes of provoking problem
+            os.kill(self._pids[0], signal.SIGKILL)
+            self._servers[0] = None
+        else:
+            self.shutdownServer()
+
+        for i in range(91):
+            # wait for disconnection
+            if not self._storage.is_connected():
+                break
+            time.sleep(0.1)
+        else:
+            raise AssertionError("Didn't detect server shutdown in 5 seconds")
+
+    def checkHeartbeatWithClientClose(self):
+        # This is a minimal test that mainly tests that the heartbeat
+        # function does no harm.
+        client_timeout_count = ZEO.zrpc.connection.client_timeout_count
+        self._storage = self.openClientStorage()
+        self._storage.close()
+        time.sleep(1) # allow some time for the select loop to fire a few times
+        self.assert_(ZEO.zrpc.connection.client_timeout_count
+                     > client_timeout_count)
+
+
 class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
 
     def getConfig(self):
@@ -233,6 +306,7 @@
                 FileStorageTests,
                 MappingStorageTests,
                 DemoStorageWrappedAroundClientStorage,
+                HeartbeatTests,
                ]
 
 def test_suite():

Modified: ZODB/trunk/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/client.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/zrpc/client.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -11,6 +11,7 @@
 # FOR A PARTICULAR PURPOSE
 #
 ##############################################################################
+import asyncore
 import errno
 import select
 import socket
@@ -20,13 +21,11 @@
 import types
 import logging
 
-import ThreadedAsync
-
 from ZODB.POSException import ReadOnlyError
 from ZODB.loglevels import BLATHER
 
 from ZEO.zrpc.log import log
-from ZEO.zrpc.trigger import trigger
+import ZEO.zrpc.trigger
 from ZEO.zrpc.connection import ManagedClientConnection
 
 class ConnectionManager(object):
@@ -43,9 +42,6 @@
         # If thread is not None, then there is a helper thread
         # attempting to connect.
         self.thread = None # Protected by self.cond
-        self.trigger = None
-        self.thr_async = 0
-        ThreadedAsync.register_loop_callback(self.set_async)
 
     def __repr__(self):
         return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
@@ -85,7 +81,6 @@
     def close(self):
         """Prevent ConnectionManager from opening new connections"""
         self.closed = 1
-        ThreadedAsync.remove_loop_callback(self.set_async)
         self.cond.acquire()
         try:
             t = self.thread
@@ -103,30 +98,7 @@
         if conn is not None:
             # This will call close_conn() below which clears self.connection
             conn.close()
-        if self.trigger is not None:
-            self.trigger.close()
-            self.trigger = None
-        ThreadedAsync.remove_loop_callback(self.set_async)
 
-    def set_async(self, map):
-        # This is the callback registered with ThreadedAsync.  The
-        # callback might be called multiple times, so it shouldn't
-        # create a trigger every time and should never do anything
-        # after it's closed.
-
-        # It may be that the only case where it is called multiple
-        # times is in the test suite, where ThreadedAsync's loop can
-        # be started in a child process after a fork.  Regardless,
-        # it's good to be defensive.
-
-        # We need each connection started with async==0 to have a
-        # callback.
-        log("CM.set_async(%s)" % repr(map), level=logging.DEBUG)
-        if not self.closed and self.trigger is None:
-            log("CM.set_async(): first call")
-            self.trigger = trigger()
-            self.thr_async = 1 # needs to be set on the Connection
-
     def attempt_connect(self):
         """Attempt a connection to the server without blocking too long.
 

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -19,6 +19,8 @@
 import types
 import logging
 
+import traceback, time
+
 import ThreadedAsync
 from ZEO.zrpc import smac
 from ZEO.zrpc.error import ZRPCError, DisconnectedError
@@ -30,6 +32,89 @@
 REPLY = ".reply" # message name used for replies
 ASYNC = 1
 
+##############################################################################
+# Dedicated Client select loop:
+client_map = {}
+client_trigger = trigger(client_map)
+client_timeout = 30.0
+client_timeout_count = 0 # for testing
+
+def client_loop():
+    map = client_map
+    logger = logging.getLogger('ZEO.zrpc.client_loop')
+    logger.addHandler(logging.StreamHandler())
+
+    read = asyncore.read
+    write = asyncore.write
+    _exception = asyncore._exception
+    
+    while map:
+        try:
+            r = e = list(client_map)
+            w = [fd for (fd, obj) in map.iteritems() if obj.writable()]
+
+            try:
+                r, w, e = select.select(r, w, e, client_timeout)
+            except select.error, err:
+                if err[0] != errno.EINTR:
+                    if err[0] == errno.EBADF:
+
+                        # If a connection is closed while we are
+                        # calling select on it, we can get a bad
+                        # file-descriptor error.  We'll check for this
+                        # case by looking for entries in r and w that
+                        # are not in the socket map.
+
+                        if [fd for fd in r if fd not in client_map]:
+                            continue
+                        if [fd for fd in w if fd not in client_map]:
+                            continue
+                        
+                    raise
+                else:
+                    continue
+
+            if not (r or w or e):
+                for obj in client_map.itervalues():
+                    if isinstance(obj, Connection):
+                        # Send a heartbeat message as a reply to a
+                        # non-existent message id.
+                        try:
+                            obj.send_reply(-1, None)
+                        except DisconnectedError:
+                            pass
+                global client_timeout_count
+                client_timeout_count += 1
+                continue
+
+            for fd in r:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                read(obj)
+
+            for fd in w:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                write(obj)
+
+            for fd in e:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                _exception(obj)
+
+        except:
+            logger.exception('poll failure')
+            raise
+
+client_thread = threading.Thread(target=client_loop)
+client_thread.setDaemon(True)
+client_thread.start()
+#
+##############################################################################
+
 class Delay:
     """Used to delay response to client for synchronous calls.
 
@@ -235,7 +320,7 @@
     # Client constructor passes 'C' for tag, server constructor 'S'.  This
     # is used in log messages, and to determine whether we can speak with
     # our peer.
-    def __init__(self, sock, addr, obj, tag):
+    def __init__(self, sock, addr, obj, tag, map=None):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = False
@@ -315,8 +400,10 @@
         # isn't necessary before Python 2.4, but doesn't hurt then (it just
         # gives us an unused attribute in 2.3); updating the global socket
         # map is necessary regardless of Python version.
-        self._map = asyncore.socket_map
-        asyncore.socket_map.update(ourmap)
+        if map is None:
+            map = asyncore.socket_map
+        self._map = map
+        map.update(ourmap)
 
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.addr)
@@ -331,12 +418,13 @@
             return
         self._singleton.clear()
         self.closed = True
+        self.__super_close()
         self.close_trigger()
-        self.__super_close()
 
     def close_trigger(self):
         # Overridden by ManagedClientConnection.
         if self.trigger is not None:
+            self.trigger.pull_trigger()
             self.trigger.close()
 
     def register_object(self, obj):
@@ -538,16 +626,16 @@
             return r_args
 
     # For testing purposes, it is useful to begin a synchronous call
-    # but not block waiting for its response.  Since these methods are
-    # used for testing they can assume they are not in async mode and
-    # call asyncore.poll() directly to get the message out without
-    # also waiting for the reply.
+    # but not block waiting for its response.
 
     def _deferred_call(self, method, *args):
         if self.closed:
             raise DisconnectedError()
         msgid = self.send_call(method, args, 0)
-        asyncore.poll(0.01, self._singleton)
+        if self.is_async():
+            self.trigger.pull_trigger()
+        else:
+            asyncore.poll(0.01, self._singleton)
         return msgid
 
     def _deferred_wait(self, msgid):
@@ -663,7 +751,7 @@
         else:
             asyncore.poll(0.0, self._singleton)
 
-    def pending(self, timeout=0):
+    def _pending(self, timeout=0):
         """Invoke mainloop until any pending messages are handled."""
         if __debug__:
             self.log("pending(), async=%d" % self.is_async(), level=TRACE)
@@ -758,8 +846,10 @@
         self.queue_output = True
         self.queued_messages = []
 
-        self.__super_init(sock, addr, obj, tag='C')
-        self.check_mgr_async()
+        self.__super_init(sock, addr, obj, tag='C', map=client_map)
+        self.thr_async = True
+        self.trigger = client_trigger
+        client_trigger.pull_trigger()
 
     # Our message_ouput() queues messages until recv_handshake() gets the
     # protocol handshake from the server.
@@ -806,10 +896,13 @@
     # Defer the ThreadedAsync work to the manager.
 
     def close_trigger(self):
-        # the manager should actually close the trigger
-        # TODO: what is that comment trying to say?  What 'manager'?
-        del self.trigger
+        # We are using a shared trigger for all client connections.
+        # We never want to close it.
 
+        # We do want to pull it to make sure the select loop detects that
+        # we're closed.
+        self.trigger.pull_trigger()
+
     def set_async(self, map):
         pass
 
@@ -817,20 +910,8 @@
         # Don't do the register_loop_callback that the superclass does
         pass
 
-    def check_mgr_async(self):
-        if not self.thr_async and self.mgr.thr_async:
-            assert self.mgr.trigger is not None, \
-                   "manager (%s) has no trigger" % self.mgr
-            self.thr_async = True
-            self.trigger = self.mgr.trigger
-            return 1
-        return 0
-
     def is_async(self):
-        # TODO: could the check_mgr_async() be avoided on each test?
-        if self.thr_async:
-            return 1
-        return self.check_mgr_async()
+        return True
 
     def close(self):
         self.mgr.close_conn(self)

Modified: ZODB/trunk/src/ZEO/zrpc/trigger.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/trigger.py	2006-07-18 17:25:40 UTC (rev 69178)
+++ ZODB/trunk/src/ZEO/zrpc/trigger.py	2006-07-18 18:15:57 UTC (rev 69179)
@@ -135,10 +135,10 @@
     class trigger(_triggerbase, asyncore.file_dispatcher):
         kind = "pipe"
 
-        def __init__(self):
+        def __init__(self, map=None):
             _triggerbase.__init__(self)
             r, self.trigger = self._fds = os.pipe()
-            asyncore.file_dispatcher.__init__(self, r)
+            asyncore.file_dispatcher.__init__(self, r, map)
 
         def _close(self):
             for fd in self._fds:
@@ -155,7 +155,7 @@
     class trigger(_triggerbase, asyncore.dispatcher):
         kind = "loopback"
 
-        def __init__(self):
+        def __init__(self, map=None):
             _triggerbase.__init__(self)
 
             # Get a pair of connected sockets.  The trigger is the 'w'
@@ -208,7 +208,7 @@
             r, addr = a.accept()  # r becomes asyncore's (self.)socket
             a.close()
             self.trigger = w
-            asyncore.dispatcher.__init__(self, r)
+            asyncore.dispatcher.__init__(self, r, map)
 
         def _close(self):
             # self.socket is r, and self.trigger is w, from __init__



More information about the Zodb-checkins mailing list