[Checkins] SVN: zc.zkzeo/trunk/src/zc/zkzeo/ Refactored zookeeper connection logic and added connection edge case
Jim Fulton
jim at zope.com
Fri Dec 9 22:45:06 UTC 2011
Log message for revision 123653:
Refactored zookeeper connection logic and added connection edge case
tests.
Also simplified main test a little.
Changed:
U zc.zkzeo/trunk/src/zc/zkzeo/README.txt
U zc.zkzeo/trunk/src/zc/zkzeo/_client.py
U zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
U zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
U zc.zkzeo/trunk/src/zc/zkzeo/tests.py
A zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test
-=-
Modified: zc.zkzeo/trunk/src/zc/zkzeo/README.txt
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/README.txt 2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/README.txt 2011-12-09 22:45:05 UTC (rev 123653)
@@ -54,7 +54,7 @@
>>> import zc.zkzeo.runzeo, zc.zk
>>> stop = zc.zkzeo.runzeo.test(
- ... server_conf, zookeeper='zookeeper.example.com:2181')
+ ... server_conf)
>>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
>>> print zk.export_tree('/databases/demo', ephemeral=True),
/demo
@@ -78,7 +78,7 @@
>>> import zc.zkzeo
>>> client = zc.zkzeo.client(
... 'zookeeper.example.com:2181', '/databases/demo',
- ... read_only=True)
+ ... max_disconnect_poll=1)
You pass a ZooKeeper connection string and a path. The ``Client``
constructor will create a client storage with addresses found as
@@ -101,7 +101,7 @@
<zkzeoclient>
zookeeper zookeeper.example.com:2181
server /databases/demo
- read-only true
+ max-disconnect-poll 1
</zkzeoclient>
</zodb>
@@ -118,19 +118,13 @@
.. test
- Double check the clients are working by opening a writable
- connection and maing sure we see changes:
+ Double check the clients are working by opening a
+ connection and making sure we see changes:
- >>> writable_db = zc.zkzeo.DB('zookeeper.example.com:2181',
- ... '/databases/demo')
- >>> with writable_db.transaction() as conn:
- ... conn.root.x = 1
-
>>> import ZODB.config
>>> db_from_config = ZODB.config.databaseFromString(conf)
>>> with db_from_config.transaction() as conn:
- ... print conn.root()
- {'x': 1}
+ ... conn.root.x = 1
>>> import ZODB
>>> db_from_py = ZODB.DB(client)
@@ -138,18 +132,42 @@
... print conn.root()
{'x': 1}
- Restart the storage server and make sure clients reconnect:
+ When we stop the storage server, we'll get warnings from zc.zkzeo, the
+ clients will disconnect and will have no addresses:
+ >>> import zope.testing.loggingsupport
+ >>> handler = zope.testing.loggingsupport.Handler('zc.zkzeo')
+ >>> handler.install()
+
>>> [old_addr] = zk.get_children('/databases/demo')
>>> stop().exception
>>> wait_until(lambda : not client.is_connected())
- >>> wait_until(lambda : not writable_db.storage.is_connected())
>>> wait_until(lambda : not db_from_config.storage.is_connected())
- >>> stop = zc.zkzeo.runzeo.test(
- ... server_conf, zookeeper='zookeeper.example.com:2181')
+ >>> print handler
+ zc.zkzeo WARNING
+ No addresses from <zookeeper.example.com:2181/databases/demo>
+ zc.zkzeo WARNING
+ No addresses from <zookeeper.example.com:2181/databases/demo>
+ >>> handler.clear()
+
+ Looking at the client manager, we see that the address list is now empty:
+
+ >>> client._rpc_mgr
+ <ConnectionManager for []>
+
+ Let's sleep for a while to make sure we can wake up. Of course, we
+ won't sleep *that* long, it's a test.
+
+ >>> import time
+ >>> time.sleep(9)
+
+ Now, we'll restart the server and clients will reconnect
+
+ >>> stop = zc.zkzeo.runzeo.test(server_conf)
+
>>> [addr] = zk.get_children('/databases/demo')
>>> addr != old_addr
True
@@ -158,22 +176,23 @@
/127.0.0.1:56837
pid = 88841
-
- >>> wait_until(writable_db.storage.is_connected)
- >>> with writable_db.transaction() as conn:
- ... conn.root.x = 2
-
>>> wait_until(db_from_config.storage.is_connected)
>>> with db_from_config.transaction() as conn:
- ... print conn.root()
- {'x': 2}
+ ... conn.root.x = 2
>>> wait_until(client.is_connected)
>>> with db_from_py.transaction() as conn:
... print conn.root()
{'x': 2}
+ >>> print handler
+ zc.zkzeo WARNING
+ New address from <zookeeper.example.com:2181/databases/demo> (CLEAR)
+ zc.zkzeo WARNING
+ New address from <zookeeper.example.com:2181/databases/demo> (CLEAR)
+
+ >>> zk.close()
+ >>> handler.uninstall()
>>> db_from_py.close()
>>> db_from_config.close()
- >>> writable_db.close()
>>> stop().exception
Modified: zc.zkzeo/trunk/src/zc/zkzeo/_client.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/_client.py 2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/_client.py 2011-12-09 22:45:05 UTC (rev 123653)
@@ -11,31 +11,28 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+import logging
import time
import zc.zk
import ZEO.ClientStorage
import threading
-def client(zk, path, *args, **kw):
- zk = zc.zk.ZooKeeper(zk)
+logger = logging.getLogger('zc.zkzeo')
+
+def client(zkaddr, path, *args, **kw):
+ zk = zc.zk.ZooKeeper(zkaddr)
addresses = zk.children(path)
+ wait = kw.get('wait', kw.get('wait_for_server_on_startup', True))
client = ZEO.ClientStorage.ClientStorage(
- _wait_addresses(addresses, parse_addr),
+ _wait_addresses(addresses, parse_addr, zkaddr, path, wait),
*args, **kw)
- return _client(addresses, client)
+ return _client(addresses, client, zkaddr, path)
-def DB(*args, **kw):
- import ZODB
- return ZODB.DB(client(*args, **kw))
-
-def connection(*args, **kw):
- return DB(*args, **kw).open_once()
-
def parse_addr(addr):
host, port = addr.split(':')
return host, int(port)
-def _client(addresses, client):
+def _client(addresses, client, zkaddr, path):
new_addr = getattr(client, 'new_addr', None)
if new_addr is None:
@@ -48,22 +45,41 @@
if manager.thread is not None:
manager.thread.addrlist = manager.addrlist
+ warned = set()
+
@addresses
def changed(addresses):
addrs = map(parse_addr, addresses)
if addrs:
- new_addr(addrs)
+ if warned:
+ logger.warning('New address from <%s%s> (CLEAR)', zkaddr, path)
+ warned.clear()
+ else:
+ logger.info('New address from <%s%s>', zkaddr, path)
+ else:
+ logger.warning('No addresses from <%s%s>', zkaddr, path)
+ warned.add(1)
+ new_addr(addrs)
client.zookeeper_addresses = addresses
return client
-def _wait_addresses(addresses, transform):
+def _wait_addresses(addresses, transform, zkaddr, path, wait):
+ n = 0
while 1:
result = [transform(addr) for addr in addresses]
if result:
+ if n:
+ logger.warning("Got addresses at <%s%s> (CLEAR)",
+ zkaddr, path)
return result
- time.sleep(1)
+ if (n%30000) == 0: # warn every few minutes
+ logger.warning("No addresses at <%s%s>", zkaddr, path)
+ if not wait:
+ return result
+ time.sleep(.01)
+ n += 1
class ZConfig:
@@ -75,7 +91,8 @@
import ZConfig.datatypes
import ZODB.config
- zk = zc.zk.ZooKeeper(self.config.zookeeper)
+ zkaddr = self.config.zookeeper
+ zk = zc.zk.ZooKeeper(zkaddr)
paths = [server.address for server in self.config.server]
if len(paths) > 1:
raise TypeError("Only one server option is allowed")
@@ -84,7 +101,8 @@
raise TypeError("server must be a ZooKeeper path, %r" % path)
addresses = zk.children(path)
self.config.server = _wait_addresses(
- addresses, ZConfig.datatypes.SocketAddress)
+ addresses, ZConfig.datatypes.SocketAddress,
+ zkaddr, path, self.config.wait)
client = ZODB.config.ZEOClient(self.config).open()
- return _client(addresses, client)
+ return _client(addresses, client, zkaddr, path)
Modified: zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py 2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py 2011-12-09 22:45:05 UTC (rev 123653)
@@ -1,4 +1,5 @@
import os
+import select
import sys
import threading
import time
@@ -19,6 +20,7 @@
self.add('zkconnection', 'zookeeper.connection')
self.add('zkpath', 'zookeeper.path')
+ self.add('zookeeper_session_timeout', 'zookeeper.session_timeout')
class ZKServer(ZEO.runzeo.ZEOServer):
@@ -27,23 +29,30 @@
ZEO.runzeo.ZEOServer.create_server(self)
if not self.options.zkpath:
return
+
addr = self.server.dispatcher.socket.getsockname()
- if self.__using_dynamic_port:
- self.__zk = zc.zk.ZooKeeper(self.options.zkconnection, timeout=9)
- if self.__zk.handle is None:
- raise SystemError("Couldn;'t connect to ZooKeeper at %r"
- % self.options.zkconnection)
-
- @zc.thread.Thread
- def register_w_zk():
- if self.__zk is None:
- self.__zk = zc.zk.ZooKeeper(self.options.zkconnection)
- while self.__zk.handle is None:
- time.sleep(.1)
+ def register():
self.__zk.register_server(self.options.zkpath, addr)
if self.__testing is not None:
self.__testing()
+ if self.__using_dynamic_port:
+ self.__zk = zc.zk.ZooKeeper(
+ self.options.zkconnection,
+ self.options.zookeeper_session_timeout,
+ )
+ register()
+ return
+
+ @zc.thread.Thread
+ def zookeeper_registration_thread():
+ self.__zk = zc.zk.ZooKeeper(
+ self.options.zkconnection,
+ self.options.zookeeper_session_timeout,
+ wait = True,
+ )
+ register()
+
def clear_socket(self):
if self.__zk is not None:
self.__zk.close()
@@ -96,7 +105,7 @@
for name, storage in self.storages.iteritems():
storage.close()
-def test(config, storage=None, zookeeper='127.0.0.1:2181'):
+def test(config, storage=None, zookeeper=None, threaded=True):
"""Run a server in a thread, mainly for testing.
"""
import tempfile
@@ -125,13 +134,19 @@
server = main(['-C', confpath], event.set)
os.remove(confpath)
+ if not threaded:
+ return server.main()
+
@zc.thread.Thread
def run_zeo_server_for_testing():
try:
server.main()
+ except select.error:
+ pass
except:
import logging
- logging.getLogger(__name__).exception('wtf')
+ logging.getLogger(__name__+'.test').exception(
+ 'wtf %r', sys.exc_info()[1])
def stop():
close = getattr(server.server, 'close', None)
Modified: zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml 2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml 2011-12-09 22:45:05 UTC (rev 123653)
@@ -20,6 +20,12 @@
</description>
</key>
+ <key name="session-timeout" datatype="integer" required="no">
+ <description>
+ ZooKeeper session timeout in milliseconds.
+ </description>
+ </key>
+
</sectiontype>
</component>
Modified: zc.zkzeo/trunk/src/zc/zkzeo/tests.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/tests.py 2011-12-09 22:36:09 UTC (rev 123652)
+++ zc.zkzeo/trunk/src/zc/zkzeo/tests.py 2011-12-09 22:45:05 UTC (rev 123653)
@@ -20,9 +20,69 @@
import re
import ZEO.zrpc.connection
import zc.zk.testing
+import zc.zkzeo
import zope.testing.setupstack
import zope.testing.renormalizing
+
+def client_exception_when_no_zookeeper_running():
+ """If ZooKeeper isn't running, we get an immediate error.
+
+ >>> zc.zkzeo.client('192.0.2.42:2181', '/mydb')
+ Traceback (most recent call last):
+ ...
+ FailedConnect: 192.0.2.42:2181
+
+ >>> import ZODB.config
+ >>> ZODB.config.storageFromString('''
+ ... %import zc.zkzeo
+ ...
+ ... <zkzeoclient>
+ ... zookeeper 192.0.2.42:2181
+ ... server /databases/demo
+ ... max-disconnect-poll 1
+ ... </zkzeoclient>
+ ... ''')
+ Traceback (most recent call last):
+ ...
+ FailedConnect: 192.0.2.42:2181
+ """
+
+def server_exception_when_no_zookeeper_running_and_dynamic_port():
+ """If ZooKeeper isn't running, we get an immediate error.
+
+ >>> import zc.zkzeo.runzeo
+ >>> zc.zkzeo.runzeo.test('''
+ ... <zeo>
+ ... address 127.0.0.1
+ ... </zeo>
+ ...
+ ... <zookeeper>
+ ... connection 192.0.2.42:2181
+ ... path /databases/demo
+ ... </zookeeper>
+ ...
+ ... <filestorage>
+ ... path demo.fs
+ ... </filestorage>
+ ... ''', threaded=False)
+ Traceback (most recent call last):
+ ...
+ FailedConnect: 192.0.2.42:2181
+ """
+
+# def server_keeps_trying_when_no_zookeeper_running_and_fixed_port():
+# """Don't keep a server from running if it has a fixed addr.
+# """
+
+# # General principle is to keep a server running if clients can find it.
+
+# def behavior_of_wait_object_when_there_are_no_addresses():
+# """
+# """
+
+
+
def setUp(test):
zc.zk.testing.setUp(test, tree='/databases\n /demo\n')
test.globs['_server_loop'] = ZEO.zrpc.connection.server_loop
@@ -43,18 +103,23 @@
ZEO.zrpc.connection.server_loop = test.globs['_server_loop']
def test_suite():
- return unittest.TestSuite((
- # doctest.DocFileSuite('README.test'),
- # doctest.DocTestSuite(),
+ checker = zope.testing.renormalizing.RENormalizing([
+ (re.compile(r'pid = \d+'), 'pid = PID'),
+ (re.compile(r'/127.0.0.1:\d+'), '/127.0.0.1:PORT'),
+ ])
+ suite = unittest.TestSuite((
+ doctest.DocTestSuite(),
manuel.testing.TestSuite(
- manuel.doctest.Manuel(
- checker = zope.testing.renormalizing.RENormalizing([
- (re.compile(r'pid = \d+'), 'pid = PID'),
- (re.compile(r'/127.0.0.1:\d+'), '/127.0.0.1:PORT'),
- ])
- ) + manuel.capture.Manuel(),
+ manuel.doctest.Manuel(checker=checker) + manuel.capture.Manuel(),
'README.txt',
setUp=setUp, tearDown=zc.zk.testing.tearDown,
),
))
+ if not zc.zk.testing.testing_with_real_zookeeper():
+ suite.addTest(doctest.DocFileSuite(
+ 'wait-for-zookeeper.test',
+ setUp=setUp, tearDown=zc.zk.testing.tearDown,
+ checker=checker))
+ return suite
+
Added: zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test 2011-12-09 22:45:05 UTC (rev 123653)
@@ -0,0 +1,71 @@
+
+If a storage server is configured with a static poer and it can't talk
+to ZooKeeper, it won't error, but will keep trying to talk to
+zookeeper in a thread.
+
+ >>> import ZEO.tests.forker
+ >>> port = ZEO.tests.forker.get_port()
+
+ >>> import zc.zkzeo.runzeo, zope.testing.loggingsupport
+ >>> handle = zope.testing.loggingsupport.InstalledHandler('zc.zk')
+
+ >>> stop = zc.zkzeo.runzeo.test('''
+ ... <zeo>
+ ... address 127.0.0.1:%s
+ ... </zeo>
+ ...
+ ... <zookeeper>
+ ... connection 192.0.2.42:2181
+ ... path /databases/demo
+ ... </zookeeper>
+ ...
+ ... <mappingstorage>
+ ... </mappingstorage>
+ ... ''' % port)
+
+We can connect to the server just fine:
+
+ >>> client = ZEO.client(port)
+ >>> client.is_connected()
+ True
+
+Let's wait a while:
+
+ >>> import time
+ >>> time.sleep(4)
+
+But data isn't in zookeeper yet:
+
+ >>> import zc.zk
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> zk.print_tree('/databases/demo')
+ /demo
+
+And the registration thread is still going:
+
+ >>> import threading
+ >>> [registration_thread] = [
+ ... t for t in threading.enumerate()
+ ... if t.name == 'zookeeper_registration_thread'
+ ... ]
+
+Now, we'll "start" ZooKeeper:
+
+ >>> ZooKeeper._allow_connection('192.0.2.42:2181')
+
+ >>> registration_thread.join(1)
+
+And we not have the entry in the tree:
+
+ >>> zk.print_tree('/databases/demo')
+ /demo
+ /127.0.0.1:24491
+ pid = 1013
+
+And out client is still connected (of course):
+
+ >>> client.is_connected()
+ True
+
+ >>> client.close()
+ >>> _ = stop()
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/wait-for-zookeeper.test
___________________________________________________________________
Added: svn:eol-style
+ native
More information about the checkins
mailing list