[Zodb-checkins] SVN: ZODB/branches/jim-async-client/src/ZEO/ Provide clients with their own asyncore main loop eliminating the need

Jim Fulton jim at zope.com
Wed Jul 12 06:17:37 EDT 2006


Log message for revision 69098:
  Provide clients with their own asyncore main loop eliminating the need
  for a separate sync mode.
  
  All of the level one tests pass and there is only one failing level 2
  test.  We may be able to remove additional vestiges of the sync mode
  from the client code.  We ould to be able to remove it from the server
  code too, although it's not clear that we'll have time for that in
  this release cycle.
  
  Before doing more, I need to test this on mac os x and windows.
  In particular, I want to see if this actually addresses the moticating
  test failutes on mac os x.
  

Changed:
  U   ZODB/branches/jim-async-client/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-async-client/src/ZEO/ServerStub.py
  U   ZODB/branches/jim-async-client/src/ZEO/tests/ConnectionTests.py
  U   ZODB/branches/jim-async-client/src/ZEO/tests/testZEO.py
  U   ZODB/branches/jim-async-client/src/ZEO/zrpc/client.py
  U   ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py
  U   ZODB/branches/jim-async-client/src/ZEO/zrpc/trigger.py

-=-
Modified: ZODB/branches/jim-async-client/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/ClientStorage.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/ClientStorage.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -339,43 +339,16 @@
         # still be going on.  This code must wait until validation
         # finishes, but if the connection isn't a zrpc async
         # connection it also needs to poll for input.
-        if self._connection.is_async():
-            while 1:
-                self._ready.wait(30)
-                if self._ready.isSet():
-                    break
-                if timeout and time.time() > deadline:
-                    log2("Timed out waiting for connection",
-                         level=logging.WARNING)
-                    break
-                log2("Waiting for cache verification to finish")
-        else:
-            self._wait_sync(deadline)
-
-    def _wait_sync(self, deadline=None):
-        # Log no more than one "waiting" message per LOG_THROTTLE seconds.
-        LOG_THROTTLE = 300 # 5 minutes
-        next_log_time = time.time()
-
-        while not self._ready.isSet():
-            now = time.time()
-            if deadline and now > deadline:
-                log2("Timed out waiting for connection", level=logging.WARNING)
+        assert self._connection.is_async()
+        while 1:
+            self._ready.wait(30)
+            if self._ready.isSet():
                 break
-            if now >= next_log_time:
-                log2("Waiting for cache verification to finish")
-                next_log_time = now + LOG_THROTTLE
-            if self._connection is None:
-                # If the connection was closed while we were
-                # waiting for it to become ready, start over.
-                if deadline is None:
-                    timeout = None
-                else:
-                    timeout = deadline - now
-                return self._wait(timeout)
-            # No mainloop ia running, so we need to call something fancy to
-            # handle asyncore events.
-            self._connection.pending(30)
+            if timeout and time.time() > deadline:
+                log2("Timed out waiting for connection",
+                     level=logging.WARNING)
+                break
+            log2("Waiting for cache verification to finish")
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -403,18 +376,9 @@
         return self._ready.isSet()
 
     def sync(self):
-        """Handle any pending invalidation messages.
+        # The separate async thread should keep us up to date
+        pass
 
-        This is called by the sync method in ZODB.Connection.
-        """
-        # If there is no connection, return immediately.  Technically,
-        # there are no pending invalidations so they are all handled.
-        # There doesn't seem to be much benefit to raising an exception.
-
-        cn = self._connection
-        if cn is not None:
-            cn.pending()
-
     def doAuth(self, protocol, stub):
         if not (self._username and self._password):
             raise AuthError("empty username or password")
@@ -517,11 +481,17 @@
 
         stub = self.StorageServerStubClass(conn)
         self._oids = []
-        self._info.update(stub.get_info())
         self.verify_cache(stub)
-        if not conn.is_async():
-            log2("Waiting for cache verification to finish")
-            self._wait_sync()
+
+        # It's important to call get_info after calling verify_cache.
+        # If we end up doing a full-verification, we need to wait till
+        # it's done.  By doing a synchonous call, we are guarenteed
+        # that the verification will be done because operations are
+        # handled in order.        
+        self._info.update(stub.get_info())
+
+        assert conn.is_async()
+
         self._handle_extensions()
 
     def _handle_extensions(self):

Modified: ZODB/branches/jim-async-client/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/ServerStub.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/ServerStub.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -13,6 +13,8 @@
 ##############################################################################
 """RPC stubs for interface exported by StorageServer."""
 
+import time
+
 ##
 # ZEO storage server.
 # <p>
@@ -44,9 +46,11 @@
         zrpc.connection.Connection class.
         """
         self.rpc = rpc
+        
         # Wait until we know what version the other side is using.
         while rpc.peer_protocol_version is None:
-            rpc.pending()
+            time.sleep(0.1)
+
         if rpc.peer_protocol_version == 'Z200':
             self.lastTransaction = lambda: None
             self.getInvalidations = lambda tid: None

Modified: ZODB/branches/jim-async-client/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/tests/ConnectionTests.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/tests/ConnectionTests.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -55,6 +55,12 @@
 
     StorageServerStubClass = TestServerStub
 
+    connection_count_for_tests = 0
+
+    def notifyConnected(self, conn):
+        ClientStorage.notifyConnected(self, conn)
+        self.connection_count_for_tests += 1
+
     def verify_cache(self, stub):
         self.end_verify = threading.Event()
         self.verify_result = ClientStorage.verify_cache(self, stub)
@@ -959,40 +965,39 @@
         storage.close()
 
     def checkTimeoutAfterVote(self):
-        raises = self.assertRaises
-        unless = self.failUnless
         self._storage = storage = self.openClientStorage()
         # Assert that the zeo cache is empty
-        unless(not list(storage._cache.contents()))
+        self.assert_(not list(storage._cache.contents()))
         # Create the object
         oid = storage.new_oid()
         obj = MinPO(7)
         # Now do a store, sleeping before the finish so as to cause a timeout
         t = Transaction()
+        old_connection_count = storage.connection_count_for_tests
         storage.tpc_begin(t)
         revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
         storage.tpc_vote(t)
         # Now sleep long enough for the storage to time out
         time.sleep(3)
-        storage.sync()
-        unless(not storage.is_connected())
+        self.assert_(
+            (not storage.is_connected())
+            or
+            (storage.connection_count_for_tests > old_connection_count)
+            )
         storage._wait()
-        unless(storage.is_connected())
+        self.assert_(storage.is_connected())
         # We expect finish to fail
-        raises(ClientDisconnected, storage.tpc_finish, t)
+        self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
         # The cache should still be empty
-        unless(not list(storage._cache.contents()))
+        self.assert_(not list(storage._cache.contents()))
         # Load should fail since the object should not be in either the cache
         # or the server.
-        raises(KeyError, storage.load, oid, '')
+        self.assertRaises(KeyError, storage.load, oid, '')
 
     def checkTimeoutProvokingConflicts(self):
-        eq = self.assertEqual
-        raises = self.assertRaises
-        require = self.assert_
         self._storage = storage = self.openClientStorage()
         # Assert that the zeo cache is empty.
-        require(not list(storage._cache.contents()))
+        self.assert_(not list(storage._cache.contents()))
         # Create the object
         oid = storage.new_oid()
         obj = MinPO(7)
@@ -1007,6 +1012,7 @@
         # Now do a store, sleeping before the finish so as to cause a timeout.
         obj.value = 8
         t = Transaction()
+        old_connection_count = storage.connection_count_for_tests
         storage.tpc_begin(t)
         revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
         revid2b = storage.tpc_vote(t)
@@ -1020,17 +1026,21 @@
         # of 3).
         deadline = time.time() + 60 # wait up to a minute
         while time.time() < deadline:
-            if storage.is_connected():
+            if (storage.is_connected() and
+                (storage.connection_count_for_tests == old_connection_count)
+                ):
                 time.sleep(self.timeout / 1.8)
-                storage.sync()
             else:
                 break
-        storage.sync()
-        require(not storage.is_connected())
+        self.assert_(
+            (not storage.is_connected())
+            or
+            (storage.connection_count_for_tests > old_connection_count)
+            )
         storage._wait()
-        require(storage.is_connected())
+        self.assert_(storage.is_connected())
         # We expect finish to fail.
-        raises(ClientDisconnected, storage.tpc_finish, t)
+        self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
         # Now we think we've committed the second transaction, but we really
         # haven't.  A third one should produce a POSKeyError on the server,
         # which manifests as a ConflictError on the client.
@@ -1038,7 +1048,7 @@
         t = Transaction()
         storage.tpc_begin(t)
         storage.store(oid, revid2, zodb_pickle(obj), '', t)
-        raises(ConflictError, storage.tpc_vote, t)
+        self.assertRaises(ConflictError, storage.tpc_vote, t)
         # Even aborting won't help.
         storage.tpc_abort(t)
         storage.tpc_finish(t)
@@ -1048,7 +1058,7 @@
         storage.tpc_begin(t)
         storage.store(oid, revid2, zodb_pickle(obj), '', t)
         # Even aborting won't help.
-        raises(ConflictError, storage.tpc_vote, t)
+        self.assertRaises(ConflictError, storage.tpc_vote, t)
         # Abort this one and try a transaction that should succeed.
         storage.tpc_abort(t)
         storage.tpc_finish(t)
@@ -1062,8 +1072,8 @@
         storage.tpc_finish(t)
         # Now load the object and verify that it has a value of 11.
         data, revid = storage.load(oid, '')
-        eq(zodb_unpickle(data), MinPO(11))
-        eq(revid, revid2)
+        self.assertEqual(zodb_unpickle(data), MinPO(11))
+        self.assertEqual(revid, revid2)
 
 class MSTThread(threading.Thread):
 

Modified: ZODB/branches/jim-async-client/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/tests/testZEO.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/tests/testZEO.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -19,6 +19,7 @@
 import socket
 import asyncore
 import tempfile
+import time
 import unittest
 import logging
 
@@ -70,14 +71,19 @@
             self.assertEqual(zodb_unpickle(data), MinPO('first'))
             self.assertEqual(serial, revid1)
             revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
-            for n in range(3):
-                # Let the server and client talk for a moment.
-                # Is there a better way to do this?
-                asyncore.poll(0.1)
-            data, serial = storage2.load(oid, '')
-            self.assertEqual(zodb_unpickle(data), MinPO('second'),
-                             'Invalidation message was not sent!')
-            self.assertEqual(serial, revid2)
+
+            # Now, storage 2 should eventually get the new data. It
+            # will take some time, although hopefully not much.
+            # We'll poll till we get it and whine if we time out:
+            for n in range(30):
+                time.sleep(.1)
+                data, serial = storage2.load(oid, '')
+                if (serial == revid2 and
+                    zodb_unpickle(data) == MinPO('second')
+                    ):
+                    break
+            else:
+                raise AssertionError('Invalidation message was not sent!')
         finally:
             storage2.close()
 

Modified: ZODB/branches/jim-async-client/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/zrpc/client.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/zrpc/client.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -11,6 +11,7 @@
 # FOR A PARTICULAR PURPOSE
 #
 ##############################################################################
+import asyncore
 import errno
 import select
 import socket
@@ -20,13 +21,11 @@
 import types
 import logging
 
-import ThreadedAsync
-
 from ZODB.POSException import ReadOnlyError
 from ZODB.loglevels import BLATHER
 
 from ZEO.zrpc.log import log
-from ZEO.zrpc.trigger import trigger
+import ZEO.zrpc.trigger
 from ZEO.zrpc.connection import ManagedClientConnection
 
 class ConnectionManager(object):
@@ -43,9 +42,6 @@
         # If thread is not None, then there is a helper thread
         # attempting to connect.
         self.thread = None # Protected by self.cond
-        self.trigger = None
-        self.thr_async = 0
-        ThreadedAsync.register_loop_callback(self.set_async)
 
     def __repr__(self):
         return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
@@ -85,7 +81,6 @@
     def close(self):
         """Prevent ConnectionManager from opening new connections"""
         self.closed = 1
-        ThreadedAsync.remove_loop_callback(self.set_async)
         self.cond.acquire()
         try:
             t = self.thread
@@ -103,30 +98,7 @@
         if conn is not None:
             # This will call close_conn() below which clears self.connection
             conn.close()
-        if self.trigger is not None:
-            self.trigger.close()
-            self.trigger = None
-        ThreadedAsync.remove_loop_callback(self.set_async)
 
-    def set_async(self, map):
-        # This is the callback registered with ThreadedAsync.  The
-        # callback might be called multiple times, so it shouldn't
-        # create a trigger every time and should never do anything
-        # after it's closed.
-
-        # It may be that the only case where it is called multiple
-        # times is in the test suite, where ThreadedAsync's loop can
-        # be started in a child process after a fork.  Regardless,
-        # it's good to be defensive.
-
-        # We need each connection started with async==0 to have a
-        # callback.
-        log("CM.set_async(%s)" % repr(map), level=logging.DEBUG)
-        if not self.closed and self.trigger is None:
-            log("CM.set_async(): first call")
-            self.trigger = trigger()
-            self.thr_async = 1 # needs to be set on the Connection
-
     def attempt_connect(self):
         """Attempt a connection to the server without blocking too long.
 

Modified: ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -30,6 +30,27 @@
 REPLY = ".reply" # message name used for replies
 ASYNC = 1
 
+##############################################################################
+# Dedicated Client select loop:
+client_map = {}
+client_trigger = trigger(client_map)
+
+def client_loop():
+    map = client_map
+    poll_fun = asyncore.poll
+    logger = logging.getLogger('ZEO.zrpc.client_loop')
+    while map:
+        try:
+            poll_fun(30.0, map)
+        except:
+            logger.exception('poll failure')
+
+client_thread = threading.Thread(target=client_loop)
+client_thread.setDaemon(True)
+client_thread.start()
+#
+##############################################################################
+
 class Delay:
     """Used to delay response to client for synchronous calls.
 
@@ -235,7 +256,7 @@
     # Client constructor passes 'C' for tag, server constructor 'S'.  This
     # is used in log messages, and to determine whether we can speak with
     # our peer.
-    def __init__(self, sock, addr, obj, tag):
+    def __init__(self, sock, addr, obj, tag, map=None):
         self.obj = None
         self.marshal = Marshaller()
         self.closed = False
@@ -315,8 +336,10 @@
         # isn't necessary before Python 2.4, but doesn't hurt then (it just
         # gives us an unused attribute in 2.3); updating the global socket
         # map is necessary regardless of Python version.
-        self._map = asyncore.socket_map
-        asyncore.socket_map.update(ourmap)
+        if map is None:
+            map = asyncore.socket_map
+        self._map = map
+        map.update(ourmap)
 
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.addr)
@@ -663,7 +686,7 @@
         else:
             asyncore.poll(0.0, self._singleton)
 
-    def pending(self, timeout=0):
+    def _pending(self, timeout=0):
         """Invoke mainloop until any pending messages are handled."""
         if __debug__:
             self.log("pending(), async=%d" % self.is_async(), level=TRACE)
@@ -758,8 +781,10 @@
         self.queue_output = True
         self.queued_messages = []
 
-        self.__super_init(sock, addr, obj, tag='C')
-        self.check_mgr_async()
+        self.__super_init(sock, addr, obj, tag='C', map=client_map)
+        self.thr_async = True
+        self.trigger = client_trigger
+        client_trigger.pull_trigger()
 
     # Our message_ouput() queues messages until recv_handshake() gets the
     # protocol handshake from the server.
@@ -806,9 +831,10 @@
     # Defer the ThreadedAsync work to the manager.
 
     def close_trigger(self):
-        # the manager should actually close the trigger
-        # TODO: what is that comment trying to say?  What 'manager'?
-        del self.trigger
+        # We are using a shared trigger for all client connections.
+        # We never want to close it.
+        #del self.trigger
+        pass
 
     def set_async(self, map):
         pass
@@ -817,20 +843,8 @@
         # Don't do the register_loop_callback that the superclass does
         pass
 
-    def check_mgr_async(self):
-        if not self.thr_async and self.mgr.thr_async:
-            assert self.mgr.trigger is not None, \
-                   "manager (%s) has no trigger" % self.mgr
-            self.thr_async = True
-            self.trigger = self.mgr.trigger
-            return 1
-        return 0
-
     def is_async(self):
-        # TODO: could the check_mgr_async() be avoided on each test?
-        if self.thr_async:
-            return 1
-        return self.check_mgr_async()
+        return True
 
     def close(self):
         self.mgr.close_conn(self)

Modified: ZODB/branches/jim-async-client/src/ZEO/zrpc/trigger.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/zrpc/trigger.py	2006-07-11 19:51:48 UTC (rev 69097)
+++ ZODB/branches/jim-async-client/src/ZEO/zrpc/trigger.py	2006-07-12 10:17:34 UTC (rev 69098)
@@ -135,10 +135,10 @@
     class trigger(_triggerbase, asyncore.file_dispatcher):
         kind = "pipe"
 
-        def __init__(self):
+        def __init__(self, map=None):
             _triggerbase.__init__(self)
             r, self.trigger = self._fds = os.pipe()
-            asyncore.file_dispatcher.__init__(self, r)
+            asyncore.file_dispatcher.__init__(self, r, map)
 
         def _close(self):
             for fd in self._fds:
@@ -155,7 +155,7 @@
     class trigger(_triggerbase, asyncore.dispatcher):
         kind = "loopback"
 
-        def __init__(self):
+        def __init__(self, map=None):
             _triggerbase.__init__(self)
 
             # Get a pair of connected sockets.  The trigger is the 'w'
@@ -208,7 +208,7 @@
             r, addr = a.accept()  # r becomes asyncore's (self.)socket
             a.close()
             self.trigger = w
-            asyncore.dispatcher.__init__(self, r)
+            asyncore.dispatcher.__init__(self, r, map)
 
         def _close(self):
             # self.socket is r, and self.trigger is w, from __init__



More information about the Zodb-checkins mailing list