[Checkins] SVN: zc.zkzeo/trunk/src/zc/zkzeo/ Refactored zookeeper connection logic and added connection edge case

Jim Fulton jim at zope.com
Fri Dec 9 22:45:06 UTC 2011


Log message for revision 123653:
  Refactored zookeeper connection logic and added connection edge case
  tests.
  
  Also simplified main test a little.
  

Changed:
  U   zc.zkzeo/trunk/src/zc/zkzeo/README.txt
  U   zc.zkzeo/trunk/src/zc/zkzeo/_client.py
  U   zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
  U   zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
  U   zc.zkzeo/trunk/src/zc/zkzeo/tests.py
  A   zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test

-=-
Modified: zc.zkzeo/trunk/src/zc/zkzeo/README.txt
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/README.txt	2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/README.txt	2011-12-09 22:45:05 UTC (rev 123653)
@@ -54,7 +54,7 @@
 
     >>> import zc.zkzeo.runzeo, zc.zk
     >>> stop = zc.zkzeo.runzeo.test(
-    ...     server_conf, zookeeper='zookeeper.example.com:2181')
+    ...     server_conf)
     >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
     >>> print zk.export_tree('/databases/demo', ephemeral=True),
     /demo
@@ -78,7 +78,7 @@
     >>> import zc.zkzeo
     >>> client = zc.zkzeo.client(
     ...     'zookeeper.example.com:2181', '/databases/demo',
-    ...     read_only=True)
+    ...     max_disconnect_poll=1)
 
 You pass a ZooKeeper connection string and a path.  The ``Client``
 constructor will create a client storage with addresses found as
@@ -101,7 +101,7 @@
        <zkzeoclient>
           zookeeper zookeeper.example.com:2181
           server /databases/demo
-          read-only true
+          max-disconnect-poll 1
        </zkzeoclient>
     </zodb>
 
@@ -118,19 +118,13 @@
 
 .. test
 
-  Double check the clients are working by opening a writable
-  connection and maing sure we see changes:
+  Double check the clients are working by opening a
+  connection and making sure we see changes:
 
-    >>> writable_db = zc.zkzeo.DB('zookeeper.example.com:2181',
-    ...                           '/databases/demo')
-    >>> with writable_db.transaction() as conn:
-    ...     conn.root.x = 1
-
     >>> import ZODB.config
     >>> db_from_config = ZODB.config.databaseFromString(conf)
     >>> with db_from_config.transaction() as conn:
-    ...     print conn.root()
-    {'x': 1}
+    ...     conn.root.x = 1
 
     >>> import ZODB
     >>> db_from_py = ZODB.DB(client)
@@ -138,18 +132,42 @@
     ...     print conn.root()
     {'x': 1}
 
-  Restart the storage server and make sure clients reconnect:
+  When we stop the storage server, we'll get warnings from zc.zkzeo, the
+  clients will disconnect and will have no addresses:
 
+    >>> import zope.testing.loggingsupport
+    >>> handler = zope.testing.loggingsupport.Handler('zc.zkzeo')
+    >>> handler.install()
+
     >>> [old_addr] = zk.get_children('/databases/demo')
     >>> stop().exception
 
     >>> wait_until(lambda : not client.is_connected())
-    >>> wait_until(lambda : not writable_db.storage.is_connected())
     >>> wait_until(lambda : not db_from_config.storage.is_connected())
 
-    >>> stop = zc.zkzeo.runzeo.test(
-    ...     server_conf, zookeeper='zookeeper.example.com:2181')
+    >>> print handler
+    zc.zkzeo WARNING
+      No addresses from <zookeeper.example.com:2181/databases/demo>
+    zc.zkzeo WARNING
+      No addresses from <zookeeper.example.com:2181/databases/demo>
 
+    >>> handler.clear()
+
+  Looking at the client manager, we see that the address list is now empty:
+
+    >>> client._rpc_mgr
+    <ConnectionManager for []>
+
+  Let's sleep for a while to make sure we can wake up.  Of course, we
+  won't sleep *that* long, it's a test.
+
+    >>> import time
+    >>> time.sleep(9)
+
+  Now, we'll restart the server and clients will reconnect
+
+    >>> stop = zc.zkzeo.runzeo.test(server_conf)
+
     >>> [addr] = zk.get_children('/databases/demo')
     >>> addr != old_addr
     True
@@ -158,22 +176,23 @@
       /127.0.0.1:56837
         pid = 88841
 
-
-    >>> wait_until(writable_db.storage.is_connected)
-    >>> with writable_db.transaction() as conn:
-    ...     conn.root.x = 2
-
     >>> wait_until(db_from_config.storage.is_connected)
     >>> with db_from_config.transaction() as conn:
-    ...     print conn.root()
-    {'x': 2}
+    ...     conn.root.x = 2
     >>> wait_until(client.is_connected)
     >>> with db_from_py.transaction() as conn:
     ...     print conn.root()
     {'x': 2}
 
+    >>> print handler
+    zc.zkzeo WARNING
+      New address from <zookeeper.example.com:2181/databases/demo> (CLEAR)
+    zc.zkzeo WARNING
+      New address from <zookeeper.example.com:2181/databases/demo> (CLEAR)
+
+    >>> zk.close()
+    >>> handler.uninstall()
     >>> db_from_py.close()
     >>> db_from_config.close()
-    >>> writable_db.close()
     >>> stop().exception
 

Modified: zc.zkzeo/trunk/src/zc/zkzeo/_client.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/_client.py	2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/_client.py	2011-12-09 22:45:05 UTC (rev 123653)
@@ -11,31 +11,28 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
+import logging
 import time
 import zc.zk
 import ZEO.ClientStorage
 import threading
 
-def client(zk, path, *args, **kw):
-    zk = zc.zk.ZooKeeper(zk)
+logger = logging.getLogger('zc.zkzeo')
+
+def client(zkaddr, path, *args, **kw):
+    zk = zc.zk.ZooKeeper(zkaddr)
     addresses = zk.children(path)
+    wait = kw.get('wait', kw.get('wait_for_server_on_startup', True))
     client = ZEO.ClientStorage.ClientStorage(
-        _wait_addresses(addresses, parse_addr),
+        _wait_addresses(addresses, parse_addr, zkaddr, path, wait),
         *args, **kw)
-    return _client(addresses, client)
+    return _client(addresses, client, zkaddr, path)
 
-def DB(*args, **kw):
-    import ZODB
-    return ZODB.DB(client(*args, **kw))
-
-def connection(*args, **kw):
-    return DB(*args, **kw).open_once()
-
 def parse_addr(addr):
     host, port = addr.split(':')
     return host, int(port)
 
-def _client(addresses, client):
+def _client(addresses, client, zkaddr, path):
 
     new_addr = getattr(client, 'new_addr', None)
     if new_addr is None:
@@ -48,22 +45,41 @@
                 if manager.thread is not None:
                     manager.thread.addrlist = manager.addrlist
 
+    warned = set()
+
     @addresses
     def changed(addresses):
         addrs = map(parse_addr, addresses)
         if addrs:
-            new_addr(addrs)
+            if warned:
+                logger.warning('New address from <%s%s> (CLEAR)', zkaddr, path)
+                warned.clear()
+            else:
+                logger.info('New address from <%s%s>', zkaddr, path)
+        else:
+            logger.warning('No addresses from <%s%s>', zkaddr, path)
+            warned.add(1)
+        new_addr(addrs)
 
     client.zookeeper_addresses = addresses
 
     return client
 
-def _wait_addresses(addresses, transform):
+def _wait_addresses(addresses, transform, zkaddr, path, wait):
+    n = 0
     while 1:
         result = [transform(addr) for addr in addresses]
         if result:
+            if n:
+                logger.warning("Got addresses at <%s%s> (CLEAR)",
+                               zkaddr, path)
             return result
-        time.sleep(1)
+        if (n%30000) == 0: # warn every few minutes
+            logger.warning("No addresses at <%s%s>", zkaddr, path)
+        if not wait:
+            return result
+        time.sleep(.01)
+        n += 1
 
 class ZConfig:
 
@@ -75,7 +91,8 @@
         import ZConfig.datatypes
         import ZODB.config
 
-        zk = zc.zk.ZooKeeper(self.config.zookeeper)
+        zkaddr = self.config.zookeeper
+        zk = zc.zk.ZooKeeper(zkaddr)
         paths = [server.address for server in self.config.server]
         if len(paths) > 1:
             raise TypeError("Only one server option is allowed")
@@ -84,7 +101,8 @@
             raise TypeError("server must be a ZooKeeper path, %r" % path)
         addresses = zk.children(path)
         self.config.server = _wait_addresses(
-            addresses, ZConfig.datatypes.SocketAddress)
+            addresses, ZConfig.datatypes.SocketAddress,
+            zkaddr, path, self.config.wait)
 
         client = ZODB.config.ZEOClient(self.config).open()
-        return _client(addresses, client)
+        return _client(addresses, client, zkaddr, path)

Modified: zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py	2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py	2011-12-09 22:45:05 UTC (rev 123653)
@@ -1,4 +1,5 @@
 import os
+import select
 import sys
 import threading
 import time
@@ -19,6 +20,7 @@
 
         self.add('zkconnection', 'zookeeper.connection')
         self.add('zkpath', 'zookeeper.path')
+        self.add('zookeeper_session_timeout', 'zookeeper.session_timeout')
 
 class ZKServer(ZEO.runzeo.ZEOServer):
 
@@ -27,23 +29,30 @@
         ZEO.runzeo.ZEOServer.create_server(self)
         if not self.options.zkpath:
             return
+
         addr = self.server.dispatcher.socket.getsockname()
-        if self.__using_dynamic_port:
-            self.__zk = zc.zk.ZooKeeper(self.options.zkconnection, timeout=9)
-            if self.__zk.handle is None:
-                raise SystemError("Couldn;'t connect to ZooKeeper at %r"
-                                  % self.options.zkconnection)
-
-        @zc.thread.Thread
-        def register_w_zk():
-            if self.__zk is None:
-                self.__zk = zc.zk.ZooKeeper(self.options.zkconnection)
-            while self.__zk.handle is None:
-                time.sleep(.1)
+        def register():
             self.__zk.register_server(self.options.zkpath, addr)
             if self.__testing is not None:
                 self.__testing()
 
+        if self.__using_dynamic_port:
+            self.__zk = zc.zk.ZooKeeper(
+                self.options.zkconnection,
+                self.options.zookeeper_session_timeout,
+                )
+            register()
+            return
+
+        @zc.thread.Thread
+        def zookeeper_registration_thread():
+            self.__zk = zc.zk.ZooKeeper(
+                self.options.zkconnection,
+                self.options.zookeeper_session_timeout,
+                wait = True,
+                )
+            register()
+
     def clear_socket(self):
         if self.__zk is not None:
             self.__zk.close()
@@ -96,7 +105,7 @@
     for name, storage in self.storages.iteritems():
         storage.close()
 
-def test(config, storage=None, zookeeper='127.0.0.1:2181'):
+def test(config, storage=None, zookeeper=None, threaded=True):
     """Run a server in a thread, mainly for testing.
     """
     import tempfile
@@ -125,13 +134,19 @@
     server = main(['-C', confpath], event.set)
     os.remove(confpath)
 
+    if not threaded:
+        return server.main()
+
     @zc.thread.Thread
     def run_zeo_server_for_testing():
         try:
             server.main()
+        except select.error:
+            pass
         except:
             import logging
-            logging.getLogger(__name__).exception('wtf')
+            logging.getLogger(__name__+'.test').exception(
+                'wtf %r', sys.exc_info()[1])
 
     def stop():
         close = getattr(server.server, 'close', None)

Modified: zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml	2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml	2011-12-09 22:45:05 UTC (rev 123653)
@@ -20,6 +20,12 @@
       </description>
     </key>
 
+    <key name="session-timeout" datatype="integer" required="no">
+      <description>
+        ZooKeeper session timeout in milliseconds.
+      </description>
+    </key>
+
   </sectiontype>
 
 </component>

Modified: zc.zkzeo/trunk/src/zc/zkzeo/tests.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/tests.py	2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/tests.py	2011-12-09 22:45:05 UTC (rev 123653)
@@ -20,9 +20,69 @@
 import re
 import ZEO.zrpc.connection
 import zc.zk.testing
+import zc.zkzeo
 import zope.testing.setupstack
 import zope.testing.renormalizing
 
+
+def client_exception_when_no_zookeeper_running():
+    """If ZooKeeper isn't running, we get an immediate error.
+
+    >>> zc.zkzeo.client('192.0.2.42:2181', '/mydb')
+    Traceback (most recent call last):
+    ...
+    FailedConnect: 192.0.2.42:2181
+
+    >>> import ZODB.config
+    >>> ZODB.config.storageFromString('''
+    ... %import zc.zkzeo
+    ...
+    ... <zkzeoclient>
+    ...   zookeeper 192.0.2.42:2181
+    ...   server /databases/demo
+    ...   max-disconnect-poll 1
+    ... </zkzeoclient>
+    ... ''')
+    Traceback (most recent call last):
+    ...
+    FailedConnect: 192.0.2.42:2181
+    """
+
+def server_exception_when_no_zookeeper_running_and_dynamic_port():
+    """If ZooKeeper isn't running, we get an immediate error.
+
+    >>> import zc.zkzeo.runzeo
+    >>> zc.zkzeo.runzeo.test('''
+    ...   <zeo>
+    ...      address 127.0.0.1
+    ...   </zeo>
+    ...
+    ...   <zookeeper>
+    ...      connection 192.0.2.42:2181
+    ...      path /databases/demo
+    ...   </zookeeper>
+    ...
+    ...   <filestorage>
+    ...      path demo.fs
+    ...   </filestorage>
+    ... ''', threaded=False)
+    Traceback (most recent call last):
+    ...
+    FailedConnect: 192.0.2.42:2181
+    """
+
+# def server_keeps_trying_when_no_zookeeper_running_and_fixed_port():
+#     """Don't keep a server from running if it has a fixed addr.
+#     """
+
+# # General principle is to keep a server running if clients can find it.
+
+# def behavior_of_wait_object_when_there_are_no_addresses():
+#     """
+#     """
+
+
+
 def setUp(test):
     zc.zk.testing.setUp(test, tree='/databases\n  /demo\n')
     test.globs['_server_loop'] = ZEO.zrpc.connection.server_loop
@@ -43,18 +103,23 @@
     ZEO.zrpc.connection.server_loop = test.globs['_server_loop']
 
 def test_suite():
-    return unittest.TestSuite((
-        # doctest.DocFileSuite('README.test'),
-        # doctest.DocTestSuite(),
+    checker = zope.testing.renormalizing.RENormalizing([
+        (re.compile(r'pid = \d+'), 'pid = PID'),
+        (re.compile(r'/127.0.0.1:\d+'), '/127.0.0.1:PORT'),
+        ])
+    suite = unittest.TestSuite((
+        doctest.DocTestSuite(),
         manuel.testing.TestSuite(
-            manuel.doctest.Manuel(
-                checker = zope.testing.renormalizing.RENormalizing([
-                    (re.compile(r'pid = \d+'), 'pid = PID'),
-                    (re.compile(r'/127.0.0.1:\d+'), '/127.0.0.1:PORT'),
-                    ])
-                ) + manuel.capture.Manuel(),
+            manuel.doctest.Manuel(checker=checker) + manuel.capture.Manuel(),
             'README.txt',
             setUp=setUp, tearDown=zc.zk.testing.tearDown,
             ),
         ))
+    if not zc.zk.testing.testing_with_real_zookeeper():
+        suite.addTest(doctest.DocFileSuite(
+            'wait-for-zookeeper.test',
+            setUp=setUp, tearDown=zc.zk.testing.tearDown,
+            checker=checker))
 
+    return suite
+

Added: zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test	2011-12-09 22:45:05 UTC (rev 123653)
@@ -0,0 +1,71 @@
+
+If a storage server is configured with a static poer and it can't talk
+to ZooKeeper, it won't error, but will keep trying to talk to
+zookeeper in a thread.
+
+    >>> import ZEO.tests.forker
+    >>> port = ZEO.tests.forker.get_port()
+
+    >>> import zc.zkzeo.runzeo, zope.testing.loggingsupport
+    >>> handle = zope.testing.loggingsupport.InstalledHandler('zc.zk')
+
+    >>> stop = zc.zkzeo.runzeo.test('''
+    ...   <zeo>
+    ...      address 127.0.0.1:%s
+    ...   </zeo>
+    ...
+    ...   <zookeeper>
+    ...      connection 192.0.2.42:2181
+    ...      path /databases/demo
+    ...   </zookeeper>
+    ...
+    ...   <mappingstorage>
+    ...   </mappingstorage>
+    ... ''' % port)
+
+We can connect to the server just fine:
+
+    >>> client = ZEO.client(port)
+    >>> client.is_connected()
+    True
+
+Let's wait a while:
+
+    >>> import time
+    >>> time.sleep(4)
+
+But data isn't in zookeeper yet:
+
+    >>> import zc.zk
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> zk.print_tree('/databases/demo')
+    /demo
+
+And the registration thread is still going:
+
+    >>> import threading
+    >>> [registration_thread] = [
+    ...     t for t in threading.enumerate()
+    ...     if t.name == 'zookeeper_registration_thread'
+    ...     ]
+
+Now, we'll "start" ZooKeeper:
+
+    >>> ZooKeeper._allow_connection('192.0.2.42:2181')
+
+    >>> registration_thread.join(1)
+
+And we not have the entry in the tree:
+
+    >>> zk.print_tree('/databases/demo')
+    /demo
+      /127.0.0.1:24491
+        pid = 1013
+
+And out client is still connected (of course):
+
+    >>> client.is_connected()
+    True
+
+    >>> client.close()
+    >>> _ = stop()


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test
___________________________________________________________________
Added: svn:eol-style
   + native



More information about the checkins mailing list