[Checkins] SVN: zc.zk/trunk/src/zc/zk/ Fixed bug: Ephemeral nodes weren't recreated when sessions were
Jim Fulton
jim at zope.com
Thu Dec 8 18:39:42 UTC 2011
Log message for revision 123635:
Fixed bug: Ephemeral nodes weren't recreated when sessions were
reestablished.
This required:
- Implementing node-modification methods so we could track changes
to ephemeral nodes.
- Updating the testing framework to implement a lot more methods, to
have more realistic node meta data, to check versions, and to
provide better session emulation.
Changed:
U zc.zk/trunk/src/zc/zk/README.txt
U zc.zk/trunk/src/zc/zk/__init__.py
U zc.zk/trunk/src/zc/zk/testing.py
U zc.zk/trunk/src/zc/zk/tests.py
-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt 2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/README.txt 2011-12-08 18:39:41 UTC (rev 123635)
@@ -175,21 +175,6 @@
from keyword parameters. Keyword parameters take precedent over the
positional data argument.
-ZooKeeper Session Management
-----------------------------
-
-``zc.zk`` takes care of ZooKeeper session management for you. It
-establishes and, if necessary, reestablishes sessions for you. In
-particular, it takes care of reestablishing ZooKeeper watches when a
-session is reestablished.
-
-ZooKeeper logging
------------------
-
-``zc.zk`` bridges the low-level ZooKeeper logging API and the Python
-logging API. ZooKeeper log messages are forwarded to the Python
-``'ZooKeeper'`` logger.
-
Tree-definition format, import, and export
------------------------------------------
@@ -329,7 +314,7 @@
extra path not trimmed: /lb/pools/retail
We got a warning about nodes left over from the old tree. We can see
-this if we export the tree::
+this if we look at the tree::
>>> print zk.export_tree(),
/cms : z4m cms
@@ -558,6 +543,27 @@
It would be bad, in practice, to remove a node that processes are
watching.
+ZooKeeper Session Management
+----------------------------
+
+``zc.zk`` takes care of ZooKeeper session management for you. It
+establishes and, if necessary, reestablishes sessions for you. In
+particular, it takes care of reestablishing ZooKeeper watches and
+ephemeral nodes when a session is reestablished.
+
+Note
+ To reestablish ephemeral nodes, it's necessary for ``zc.zk`` to
+ track node-moderation operations, so you have to access the
+ ZooKeeper APIs through the `zc.zk.ZooKeeper`_ object, rather than
+ using the low-level extension directly.
+
+ZooKeeper logging
+-----------------
+
+``zc.zk`` bridges the low-level ZooKeeper logging API and the Python
+logging API. ZooKeeper log messages are forwarded to the Python
+``'ZooKeeper'`` logger.
+
zc.zk.ZooKeeper
---------------
@@ -572,41 +578,20 @@
them up when they are no-longer used. If you only want to get the
list of children once, use ``get_children``.
-``properties(path)``
- Return a `zc.zk.Properties`_ for the path.
+``close()``
+ Close the ZooKeeper session.
- Note that there is a fair bit of machinery in `zc.zk.Properties`_
- objects to support keeping them up to date, callbacks, and cleaning
- them up when they are no-longer used. If you only want to get the
- properties once, use ``get_properties``.
+ This should be called when cleanly shutting down servers to more
+ quickly remove ephemeral nodes.
-``handle``
- The ZooKeeper session handle
+``delete_recursive(path[, dry_run])``
+ Delete a node and all of it's sub-nodes.
- This attribute can be used to call the lower-level API provided by
- the ``zookeeper`` extension.
+ Ephemeral nodes or nodes containing them are not deleted.
-``import_tree(text[, path='/'[, trim[, acl[, dry_run]]]])``
- Create tree nodes by importing a textual tree representation.
+ The dry_run option causes a summary of what would be deleted to be
+ printed without actually deleting anything.
- text
- A textual representation of the tree.
-
- path
- The path at which to create the top-level nodes.
-
- trim
- Boolean, defaulting to false, indicating whether nodes not in
- the textual representation should be removed.
-
- acl
- An access control-list to use for imported nodes. If not
- specified, then full access is allowed to everyone.
-
- dry_run
- Boolean, defaulting to false, indicating whether to do a dry
- run of the import, without applying any changes.
-
``export_tree(path[, ephemeral[, name]])``
Export a tree to a text representation.
@@ -627,14 +612,6 @@
Normally, when exporting the root node, ``/``, the root isn't
included, but it is included if a name is given.
-``delete_recursive(path[, dry_run])``
- Delete a node and all of it's sub-nodes.
-
- Ephemeral nodes or nodes containing them are not deleted.
-
- The dry_run option causes a summary of what would be deleted to be
- printed without actually deleting anything.
-
``get_children(path)``
Get a list of the names of the children the node at the given path.
@@ -649,6 +626,27 @@
read the properties once, as it doesn't create a
`zc.zk.Properties`_ object.
+``import_tree(text[, path='/'[, trim[, acl[, dry_run]]]])``
+ Create tree nodes by importing a textual tree representation.
+
+ text
+ A textual representation of the tree.
+
+ path
+ The path at which to create the top-level nodes.
+
+ trim
+ Boolean, defaulting to false, indicating whether nodes not in
+ the textual representation should be removed.
+
+ acl
+ An access control-list to use for imported nodes. If not
+ specified, then full access is allowed to everyone.
+
+ dry_run
+ Boolean, defaulting to false, indicating whether to do a dry
+ run of the import, without applying any changes.
+
``ln(source, destination)``
Create a symbolic link at the destination path pointing to the
source path.
@@ -656,10 +654,22 @@
If the destination path ends with ``'/'``, then the source name is
appended to the destination.
-``resolve(path)``
- Find the real path for the given path.
+``print_tree(path='/')``
+ Print the tree at the given path.
-``register_server(path, address, **data)``
+ This is just a short-hand for::
+
+ print zk.export_tree(path, ephemeral=True),
+
+``properties(path)``
+ Return a `zc.zk.Properties`_ for the path.
+
+ Note that there is a fair bit of machinery in `zc.zk.Properties`_
+ objects to support keeping them up to date, callbacks, and cleaning
+ them up when they are no-longer used. If you only want to get the
+ properties once, use ``get_properties``.
+
+``register_server(path, address, acl=zc.zk.READ_ACL_UNSAFE, **data)``
Register a server at a path with the address.
An ephemeral child node of ``path`` will be created with name equal
@@ -667,23 +677,21 @@
``address`` must be a host and port tuple.
+ ``acl`` is a ZooKeeper access control list.
+
Optional node properties can be provided as keyword arguments.
-``close()``
- Close the ZooKeeper session.
+``resolve(path)``
+ Find the real path for the given path.
- This should be called when cleanly shutting down servers to more
- quickly remove ephemeral nodes.
-
In addition, ``ZooKeeper`` instances provide access to the following
ZooKeeper functions as methods: ``acreate``, ``add_auth``,
``adelete``, ``aexists``, ``aget``, ``aget_acl``, ``aget_children``,
-``aset``, ``aset_acl``, ``async``, ``client_id``, ``create``,
-``delete``, ``exists``, ``get``, ``get_acl``, ``is_unrecoverable``,
-``recv_timeout``, ``set``, ``set2``, ``set_acl``, ``set_debug_level``,
-``set_log_stream``, ``set_watcher``, and ``zerror``. When calling
-these as methods on ``ZooKeeper`` instances, it isn't necessary to
-pass a handle, as that is provided automatically.
+``aset``, ``aset_acl``, ``async``, ``create``, ``delete``, ``exists``,
+``get``, ``get_acl``, ``is_unrecoverable``, ``recv_timeout``, ``set``,
+``set2``, ``set_acl``, and ``set_watcher``. When calling these as
+methods on ``ZooKeeper`` instances, it isn't necessary to pass a
+handle, as that is provided automatically.
zc.zk.Children
--------------
@@ -750,6 +758,9 @@
0.3.0 (2011-12-??)
------------------
+- Fixed bug: Ephemeral nodes weren't recreated when sessions were
+ reestablished.
+
- Added a testing module that provides ZooKeeper emulation for
testing complex interactions with zc.zk without needing a running
ZooKeeper server.
Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py 2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/__init__.py 2011-12-08 18:39:41 UTC (rev 123635)
@@ -117,14 +117,18 @@
class ZooKeeper:
- def __init__(self, zkaddr=2181, timeout=1):
+ def __init__(self, zkaddr=2181, zktimeout=None, timeout=1):
if isinstance(zkaddr, int):
zkaddr = "127.0.0.1:%s" % zkaddr
self.timeout = timeout
self.zkaddr = zkaddr
self.watches = WatchManager()
+ self.ephemeral = {}
self.connected = threading.Event()
- handle = zookeeper.init(zkaddr, self._watch_session)
+ if zktimeout:
+ handle = zookeeper.init(zkaddr, self._watch_session, zktimeout)
+ else:
+ handle = zookeeper.init(zkaddr, self._watch_session)
self.connected.wait(timeout)
if not self.connected.is_set():
zookeeper.close(handle)
@@ -139,6 +143,9 @@
self.handle = handle
for watch in self.watches.clear():
self._watch(watch, False)
+ for path, data in self.ephemeral.items():
+ zookeeper.create(self.handle, path, data['data'],
+ data['acl'], data['flags'])
else:
assert handle == self.handle
self.connected.set()
@@ -153,15 +160,61 @@
else:
logger.critical('unexpected session event %s %s', handle, state)
- def register_server(self, path, addr, **kw):
+ def register_server(self, path, addr, acl=READ_ACL_UNSAFE, **kw):
kw['pid'] = os.getpid()
if not isinstance(addr, str):
addr = '%s:%s' % addr
self.connected.wait(self.timeout)
path = self.resolve(path)
- zookeeper.create(self.handle, path + '/' + addr, encode(kw),
- [world_permission()], zookeeper.EPHEMERAL)
+ self.create(path + '/' + addr, encode(kw), acl, zookeeper.EPHEMERAL)
+
+ def _async(self, completion, meth, *args):
+ post = getattr(self, '_post_'+meth)
+ if completion is None:
+ result = getattr(zookeeper, meth)(self.handle, *args)
+ post(*args)
+ return result
+
+ def asynccb(handle, status, *cargs):
+ assert handle == self.handle
+ if status == 0:
+ post(*args)
+ completion(handle, status, *cargs)
+
+ return getattr(zookeeper, 'a'+meth)(self.handle, *(args+(asynccb,)))
+
+ def create(self, path, data, acl, flags=0, completion=None):
+ return self._async(completion, 'create', path, data, acl, flags)
+ acreate = create
+
+ def _post_create(self, path, data, acl, flags):
+ if flags & zookeeper.EPHEMERAL:
+ self.ephemeral[path] = dict(data=data, acl=acl, flags=flags)
+
+ def delete(self, path, version=-1, completion=None):
+ return self._async(completion, 'delete', path, version)
+ adelete = delete
+
+ def _post_delete(self, path, version):
+ self.ephemeral.pop(path, None)
+
+ def set(self, path, data, version=-1, completion=None):
+ return self._async(completion, 'set', path, data, version)
+ aset = set2 = set
+
+ def _post_set(self, path, data, version):
+ if path in self.ephemeral:
+ self.ephemeral[path]['data'] = data
+
+ def set_acl(self, path, version, acl, completion=None):
+ return self._async(completion, 'set_acl', path, version, acl)
+ aset_acl = set_acl
+
+ def _post_set_acl(self, path, version, acl):
+ if path in self.ephemeral:
+ self.ephemeral[path]['acl'] = acl
+
def _watch(self, watch, wait=True):
event_type = watch.event_type
if wait:
@@ -437,6 +490,9 @@
export_tree(path, '', name)
return '\n'.join(output)+'\n'
+ def print_tree(self, path='/'):
+ print self.export_tree(path, True),
+
def resolve(self, path, seen=()):
if self.exists(path):
return path
@@ -462,7 +518,7 @@
def _set(self, path, data):
self.connected.wait(self.timeout)
- return zookeeper.set(self.handle, path, data)
+ return self.set(path, data)
def ln(self, target, source):
@@ -483,18 +539,15 @@
return zookeeper.CONNECTING_STATE
return zookeeper.state(self.handle)
-
def _make_method(name):
return (lambda self, *a, **kw:
getattr(zookeeper, name)(self.handle, *a, **kw))
for name in (
- 'acreate', 'add_auth', 'adelete', 'aexists', 'aget', 'aget_acl',
- 'aget_children', 'aset', 'aset_acl', 'async', 'client_id',
- 'create', 'delete', 'exists', 'get', 'get_acl',
- 'get_children', 'is_unrecoverable', 'recv_timeout', 'set',
- 'set2', 'set_acl', 'set_debug_level', 'set_log_stream',
- 'set_watcher', 'zerror',
+ 'add_auth', 'aexists', 'aget', 'aget_acl',
+ 'aget_children', 'async', 'client_id',
+ 'exists', 'get', 'get_acl',
+ 'get_children', 'is_unrecoverable', 'recv_timeout',
):
setattr(ZooKeeper, name, _make_method(name))
@@ -653,7 +706,7 @@
def _set(self, data):
self.data = data
- zookeeper.set(self.session.handle, self.path, encode(data))
+ self.session._set(self.path, encode(data))
def set(self, data=None, **properties):
data = data and dict(data) or {}
Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py 2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/testing.py 2011-12-08 18:39:41 UTC (rev 123635)
@@ -21,18 +21,27 @@
"""
import json
import mock
+import sys
import threading
import time
+import traceback
import zc.zk
+import zc.thread
import zookeeper
__all__ = ['assert_', 'setUp', 'tearDown']
-def assert_(cond, mess=''):
- """A simple assertion function for use in doctests.
+def assert_(cond, mess='', error=True):
+ """A simple assertion function.
+
+ If ``error``, raise an AssertionError if the assertion fails,
+ otherwise, print a message.
"""
if not cond:
- print 'assertion failed: ', mess
+ if error:
+ raise AssertionError(mess)
+ else:
+ print 'assertion failed: ', mess
def wait_until(func=None, timeout=9):
"""Wait until a function returns true.
@@ -97,6 +106,8 @@
globs = getattr(test, 'globs', test.__dict__)
globs['wait_until'] = wait_until
globs['zc.zk.testing'] = teardowns
+ globs['ZooKeeper'] = zk
+ globs.setdefault('assert_', assert_)
def tearDown(test):
"""The matching tearDown for setUp.
@@ -107,6 +118,67 @@
for cm in globs['zc.zk.testing']:
cm()
+class Session:
+
+ def __init__(self, zk, handle, watch=None):
+ self.zk = zk
+ self.handle = handle
+ self.nodes = set()
+ self.add = self.nodes.add
+ self.remove = self.nodes.remove
+ self.watch = watch
+ self.state = zookeeper.CONNECTING_STATE
+
+ def connect(self):
+ self.newstate(zookeeper.CONNECTED_STATE)
+
+ def disconnect(self):
+ self.newstate(zookeeper.CONNECTING_STATE)
+
+ def expire(self):
+ self.zk._clear_session(self)
+ self.newstate(zookeeper.EXPIRED_SESSION_STATE)
+
+ def newstate(self, state):
+ self.state = state
+ if self.watch is not None:
+ self.watch(self.handle, zookeeper.SESSION_EVENT, state, '')
+
+ def check(self):
+ if self.state == zookeeper.CONNECTING_STATE:
+ raise zookeeper.ConnectionLossException()
+ elif self.state == zookeeper.EXPIRED_SESSION_STATE:
+ raise zookeeper.SessionExpiredException()
+ elif self.state != zookeeper.CONNECTED_STATE:
+ raise AssertionError('Invalid state')
+
+exception_codes = {
+ zookeeper.ApiErrorException: zookeeper.APIERROR,
+ zookeeper.AuthFailedException: zookeeper.AUTHFAILED,
+ zookeeper.BadArgumentsException: zookeeper.BADARGUMENTS,
+ zookeeper.BadVersionException: zookeeper.BADVERSION,
+ zookeeper.ClosingException: zookeeper.CLOSING,
+ zookeeper.ConnectionLossException: zookeeper.CONNECTIONLOSS,
+ zookeeper.DataInconsistencyException: zookeeper.DATAINCONSISTENCY,
+ zookeeper.InvalidACLException: zookeeper.INVALIDACL,
+ zookeeper.InvalidCallbackException: zookeeper.INVALIDCALLBACK,
+ zookeeper.InvalidStateException: zookeeper.INVALIDSTATE,
+ zookeeper.MarshallingErrorException: zookeeper.MARSHALLINGERROR,
+ zookeeper.NoAuthException: zookeeper.NOAUTH,
+ zookeeper.NoChildrenForEphemeralsException:
+ zookeeper.NOCHILDRENFOREPHEMERALS,
+ zookeeper.NoNodeException: zookeeper.NONODE,
+ zookeeper.NodeExistsException: zookeeper.NODEEXISTS,
+ zookeeper.NotEmptyException: zookeeper.NOTEMPTY,
+ zookeeper.NothingException: zookeeper.NOTHING,
+ zookeeper.OperationTimeoutException: zookeeper.OPERATIONTIMEOUT,
+ zookeeper.RuntimeInconsistencyException: zookeeper.RUNTIMEINCONSISTENCY,
+ zookeeper.SessionExpiredException: zookeeper.SESSIONEXPIRED,
+ zookeeper.SessionMovedException: zookeeper.SESSIONMOVED,
+ zookeeper.SystemErrorException: zookeeper.SYSTEMERROR,
+ zookeeper.UnimplementedException: zookeeper.UNIMPLEMENTED,
+}
+
class ZooKeeper:
def __init__(self, connection_string, tree):
@@ -114,6 +186,7 @@
self.root = tree
self.sessions = {}
self.lock = threading.RLock()
+ self.connect_immediately = True
def init(self, addr, watch=None):
with self.lock:
@@ -121,41 +194,76 @@
handle = 0
while handle in self.sessions:
handle += 1
- self.sessions[handle] = set()
- if watch:
- watch(handle,
- zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+ self.sessions[handle] = Session(self, handle, watch)
+ if self.connect_immediately:
+ self.sessions[handle].connect()
- def _check_handle(self, handle):
- with self.lock:
- if handle not in self.sessions:
- raise zookeeper.ZooKeeperException('handle out of range')
+ def _check_handle(self, handle, checkstate=True):
+ try:
+ session = self.sessions[handle]
+ except KeyError:
+ raise zookeeper.ZooKeeperException('handle out of range')
+ if checkstate:
+ session.check()
+ return session
def _traverse(self, path):
+ node = self.root
+ for name in path.split('/')[1:]:
+ if not name:
+ continue
+ try:
+ node = node.children[name]
+ except KeyError:
+ raise zookeeper.NoNodeException('no node')
+
+ return node
+
+ def _clear_session(self, session):
with self.lock:
- node = self.root
- for name in path.split('/')[1:]:
- if not name:
- continue
- try:
- node = node.children[name]
- except KeyError:
- raise zookeeper.NoNodeException('no node')
+ for path in list(session.nodes):
+ self._delete(session.handle, path)
+ self.root.clear_watchers(session.handle)
- return node
+ def _doasync(self, completion, handle, nreturn, func, *args):
+ if completion is None:
+ return func(*args)
+ if isinstance(nreturn, int):
+ nerror = nreturn
+ else:
+ nreturn, nerror = nreturn
+
+ @zc.thread.Thread
+ def doasync():
+ try:
+ # print 'doasync', func, args
+ with self.lock:
+ status = 0
+ try:
+ r = func(*args)
+ except Exception, v:
+ status = exception_codes.get(v.__class__, -1)
+ r = (None, ) * nerror
+ if not isinstance(r, tuple):
+ if nreturn == 1:
+ r = (r, )
+ else:
+ r = ()
+ completion(*((handle, status) + r))
+ except:
+ traceback.print_exc(file=sys.stdout)
+
+ return 0
+
def close(self, handle):
with self.lock:
- self._check_handle(handle)
- for path in list(self.sessions[handle]):
- self.delete(handle, path)
+ self._clear_session(self._check_handle(handle, False))
del self.sessions[handle]
- self.root.clear_watchers(handle)
def state(self, handle):
with self.lock:
- self._check_handle(handle)
- return zookeeper.CONNECTED_STATE
+ return self._check_handle(handle, False).state
def create(self, handle, path, data, acl, flags=0):
with self.lock:
@@ -165,26 +273,43 @@
if name in node.children:
raise zookeeper.NodeExistsException()
node.children[name] = newnode = Node(data)
- newnode.acls = acl
+ newnode.acl = acl
newnode.flags = flags
node.children_changed(handle, zookeeper.CONNECTED_STATE, base)
if flags & zookeeper.EPHEMERAL:
self.sessions[handle].add(path)
return path
- def delete(self, handle, path):
+ def acreate(self, handle, path, data, acl, flags=0, completion=None):
+ return self._doasync(completion, handle, 1,
+ self.create, handle, path, data, acl, flags)
+
+ def _delete(self, handle, path, version=-1):
+ node = self._traverse(path)
+ if version != -1 and node.version != version:
+ raise zookeeper.BadVersionException('bad version')
+ if node.children:
+ raise zookeeper.NotEmptyException('not empty')
+ base, name = path.rsplit('/', 1)
+ bnode = self._traverse(base)
+ del bnode.children[name]
+ node.deleted(handle, zookeeper.CONNECTED_STATE, path)
+ bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+ if path in self.sessions[handle].nodes:
+ self.sessions[handle].remove(path)
+
+ def delete(self, handle, path, version=-1):
with self.lock:
self._check_handle(handle)
- node = self._traverse(path)
- base, name = path.rsplit('/', 1)
- bnode = self._traverse(base)
- del bnode.children[name]
- node.deleted(handle, zookeeper.CONNECTED_STATE, path)
- bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
- if path in self.sessions[handle]:
- self.sessions[handle].remove(path)
+ self._delete(handle, path, version)
- def exists(self, handle, path):
+ def adelete(self, handle, path, version=-1, completion=None):
+ return self._doasync(completion, handle, 0,
+ self.delete, handle, path, version)
+
+ def exists(self, handle, path, watch=None):
+ if watch is not None:
+ raise TypeError('exists watch not supported')
with self.lock:
self._check_handle(handle)
try:
@@ -193,6 +318,10 @@
except zookeeper.NoNodeException:
return False
+ def aexists(self, handle, path, watch=None, completion=None):
+ return self._doasync(completion, handle, 1,
+ self.exists, handle, path, watch)
+
def get_children(self, handle, path, watch=None):
with self.lock:
self._check_handle(handle)
@@ -201,29 +330,49 @@
node.child_watchers += ((handle, watch), )
return sorted(node.children)
+ def aget_children(self, handle, path, watch=None, completion=None):
+ return self._doasync(completion, handle, 1,
+ self.get_children, handle, path, watch)
+
def get(self, handle, path, watch=None):
with self.lock:
self._check_handle(handle)
node = self._traverse(path)
if watch:
node.watchers += ((handle, watch), )
- return node.data, dict(
- ephemeralOwner=(1 if node.flags & zookeeper.EPHEMERAL else 0),
- )
+ return node.data, node.meta()
- def set(self, handle, path, data):
+ def aget(self, handle, path, watch=None, completion=None):
+ return self._doasync(completion, handle, 2,
+ self.get, handle, path, watch)
+
+ def set(self, handle, path, data, version=-1, async=False):
with self.lock:
self._check_handle(handle)
node = self._traverse(path)
+ if version != -1 and node.version != version:
+ raise zookeeper.BadVersionException('bad version')
node.data = data
node.changed(handle, zookeeper.CONNECTED_STATE, path)
+ if async:
+ return node.meta()
+ else:
+ return 0
+ def aset(self, handle, path, data, version=-1, completion=None):
+ return self._doasync(completion, handle, 1,
+ self.set, handle, path, data, version, True)
+
def get_acl(self, handle, path):
with self.lock:
self._check_handle(handle)
node = self._traverse(path)
- return dict(aversion=node.aversion), node.acl
+ return node.meta(), node.acl
+ def aget_acl(self, handle, path, completion=None):
+ return self._doasync(completion, handle,
+ self.get_acl, handle, path)
+
def set_acl(self, handle, path, aversion, acl):
with self.lock:
self._check_handle(handle)
@@ -233,27 +382,49 @@
node.aversion += 1
node.acl = acl
+ return 0
+
+ def aset_acl(self, handle, path, aversion, acl, completion=None):
+ return self._doasync(completion, handle, (1, 0),
+ self.set_acl, handle, path, aversion, acl)
+
class Node:
watchers = child_watchers = ()
flags = 0
- aversion = 0
+ version = aversion = cversion = 0
acl = zc.zk.OPEN_ACL_UNSAFE
+ def meta(self):
+ return dict(
+ version = self.version,
+ aversion = self.aversion,
+ cversion = self.cversion,
+ ctime = self.ctime,
+ mtime = self.mtime,
+ numChildren = len(self.children),
+ dataLength = len(self.data),
+ ephemeralOwner=(1 if self.flags & zookeeper.EPHEMERAL else 0),
+ )
+
def __init__(self, data='', **children):
self.data = data
self.children = children
+ self.ctime = self.mtime = time.time()
def children_changed(self, handle, state, path):
watchers = self.child_watchers
self.child_watchers = ()
for h, w in watchers:
w(h, zookeeper.CHILD_EVENT, state, path)
+ self.cversion += 1
def changed(self, handle, state, path):
watchers = self.watchers
self.watchers = ()
for h, w in watchers:
w(h, zookeeper.CHANGED_EVENT, state, path)
+ self.version += 1
+ self.mtime = time.time()
def deleted(self, handle, state, path):
watchers = self.watchers
Modified: zc.zk/trunk/src/zc/zk/tests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/tests.py 2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/tests.py 2011-12-08 18:39:41 UTC (rev 123635)
@@ -23,6 +23,7 @@
import re
import StringIO
import sys
+import threading
import time
import zc.zk
import zc.zk.testing
@@ -293,7 +294,7 @@
self.assertEqual(dict(properties), data)
@side_effect(set)
- def _(handle, path_, data):
+ def _(handle, path_, data, version=-1):
self.__set_data = json.loads(data)
self.assertEqual((handle, path_), (0, path))
@@ -336,7 +337,7 @@
self.assertEqual(dict(properties), dict(string_value='\n{xxx}\n'))
@side_effect(set)
- def _(handle, path_, data):
+ def _(handle, path_, data, version=-1):
self.__set_data = data
self.assertEqual((handle, path_), (0, path))
@@ -572,6 +573,7 @@
>>> zk.set('/test', '{"b": 2}')
3 {u'b': 2}
4 zc.zk.Properties(0, /test)
+ 0
Hack data into the child watcher to verify it's cleared:
@@ -790,17 +792,306 @@
/providers
"""
+def test_recovery_of_servers_on_session_reestablishment():
+ """
+
+First, a basic test:
+
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> zk.register_server('/fooservice/providers', 'test')
+ >>> zk.get_children('/fooservice/providers')
+ ['test']
+
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+
+ >>> zk.get_children('/fooservice/providers')
+ ['test']
+
+Now, some variations.
+
+If the node is deleted, we don't recreate it:
+
+ >>> zk.delete('/fooservice/providers/test')
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_children('/fooservice/providers')
+ []
+
+First, some non-standard data and acl:
+
+ >>> acl = [zc.zk.world_permission(3)]
+ >>> zk.register_server('/fooservice/providers', 'test', acl, a=1)
+ >>> zk.print_tree('/fooservice/providers')
+ /providers
+ /test
+ a = 1
+ pid = 362
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.print_tree('/fooservice/providers')
+ /providers
+ /test
+ a = 1
+ pid = 362
+
+ >>> zk.get_acl('/fooservice/providers/test')[1] == acl
+ True
+
+Delete again:
+
+ >>> zk.delete('/fooservice/providers/test')
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_children('/fooservice/providers')
+ []
+
+Let's use the low-level creation api:
+
+ >>> zk.create('/fooservice/providers/test', 'x', acl, zookeeper.EPHEMERAL)
+ '/fooservice/providers/test'
+
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_acl('/fooservice/providers/test')[1] == acl
+ True
+ >>> zk.get('/fooservice/providers/test')[0]
+ 'x'
+
+We track changes:
+
+ >>> _ = zk.set('/fooservice/providers/test', 'y')
+ >>> acl2 = [zc.zk.world_permission(4)]
+ >>> _ = zk.set_acl('/fooservice/providers/test', 0, acl2)
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_acl('/fooservice/providers/test')[1] == acl2
+ True
+ >>> zk.get('/fooservice/providers/test')[0]
+ 'y'
+
+Delete again:
+
+ >>> zk.delete('/fooservice/providers/test')
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_children('/fooservice/providers')
+ []
+
+Let's do it all asyncronously :)
+
+ >>> zk.acreate('/fooservice/providers/test', 'x', acl, zookeeper.EPHEMERAL,
+ ... check_async(0))
+ 0
+ >>> event.wait(1); assert_(event.is_set(), error=False)
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_acl('/fooservice/providers/test')[1] == acl
+ True
+ >>> zk.get('/fooservice/providers/test')[0]
+ 'x'
+
+ >>> zk.aset('/fooservice/providers/test', 'y', -1, check_async(0))
+ 0
+ >>> event.wait(1); assert_(event.is_set())
+ >>> acl2 = [zc.zk.world_permission(4)]
+ >>> _ = zk.aset_acl('/fooservice/providers/test', 0, acl2, check_async(0))
+ >>> event.wait(1); assert_(event.is_set())
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_acl('/fooservice/providers/test')[1] == acl2
+ True
+ >>> zk.get('/fooservice/providers/test')[0]
+ 'y'
+
+ >>> _ = zk.adelete('/fooservice/providers/test', -1, check_async(0))
+ >>> event.wait(1); assert_(event.is_set())
+ >>> ZooKeeper.sessions[zk.handle].disconnect()
+ >>> ZooKeeper.sessions[zk.handle].expire()
+ >>> zk.get_children('/fooservice/providers')
+ []
+ """
+
+def test_set():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> zk.get('/')[0]
+ ''
+ >>> zk.set('/', 'a'); zk.get('/')[0]
+ 0
+ 'a'
+
+ >>> zk.set('/', 'b', 0)
+ Traceback (most recent call last):
+ ...
+ BadVersionException: bad version
+
+ >>> zk.set('/', 'b'); zk.get('/')[0]
+ 0
+ 'b'
+
+ >>> r = zk.aset('/', 'c', -1, check_async()); event.wait(1)
+ ... # doctest: +ELLIPSIS
+ async callback got (...
+ >>> r
+ 0
+
+ >>> zk.get('/')[0]
+ 'c'
+
+ >>> r = zk.aset('/', 'd', 0,
+ ... check_async(expected_status=zookeeper.BADVERSION)
+ ... ); event.wait(1)
+ async callback got (None,)
+
+ >>> r
+ 0
+
+ >>> r = zk.aset('/', 'd', 3, check_async()); event.wait(1)
+ ... # doctest: +ELLIPSIS
+ async callback got (...
+ >>> r
+ 0
+
+ >>> zk.get('/')[0]
+ 'd'
+ """
+
+def test_delete():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+
+Synchronous variations:
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> _ = zk.set('/test/a', '1')
+ >>> _ = zk.set('/test/a', '2')
+ >>> zk.delete('/test/a', 0)
+ Traceback (most recent call last):
+ ...
+ BadVersionException: bad version
+
+ >>> zk.get_children('/test')
+ ['a']
+ >>> zk.delete('/test/a', 2)
+ >>> zk.get_children('/test')
+ []
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> _ = zk.set('/test/a', '1')
+ >>> _ = zk.set('/test/a', '2')
+ >>> zk.delete('/test/a', -1)
+ >>> zk.get_children('/test')
+ []
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> _ = zk.set('/test/a', '1')
+ >>> _ = zk.set('/test/a', '2')
+ >>> zk.delete('/test/a')
+ >>> zk.get_children('/test')
+ []
+
+
+Asynchronous variations:
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> _ = zk.set('/test/a', '1')
+ >>> _ = zk.set('/test/a', '2')
+ >>> r = zk.adelete('/test/a', 0,
+ ... check_async(expected_status=zookeeper.BADVERSION)
+ ... ); event.wait(1)
+ async callback got ()
+ >>> r
+ 0
+
+ >>> zk.get_children('/test')
+ ['a']
+ >>> r = zk.adelete('/test/a', 2, check_async()); event.wait(1)
+ async callback got ()
+ >>> r, zk.get_children('/test')
+ (0, [])
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> _ = zk.set('/test/a', '1')
+ >>> _ = zk.set('/test/a', '2')
+ >>> r = zk.adelete('/test/a', -1, check_async()); event.wait(1)
+ async callback got ()
+ >>> r, zk.get_children('/test')
+ (0, [])
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> _ = zk.set('/test/a', '1')
+ >>> _ = zk.set('/test/a', '2')
+ >>> r = zk.adelete('/test/a', completion=check_async()); event.wait(1)
+ async callback got ()
+ >>> r, zk.get_children('/test')
+ (0, [])
+
+ """
+
+def test_set_acl():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> zk.get_acl('/test')[1] == zc.zk.OPEN_ACL_UNSAFE
+ True
+ >>> zk.set_acl('/test', 0, [zc.zk.world_permission(1)])
+ 0
+ >>> zk.get_acl('/test')[1] == [zc.zk.world_permission(1)]
+ True
+
+ >>> zk.set_acl('/test', 0, [zc.zk.world_permission(1)])
+ Traceback (most recent call last):
+ ...
+ BadVersionException: bad version
+
+ >>> zk.set_acl('/test', 1, [zc.zk.world_permission(2)])
+ 0
+ >>> zk.get_acl('/test')[1] == [zc.zk.world_permission(2)]
+ True
+
+ >>> r = zk.aset_acl('/test', 0, [zc.zk.world_permission(3)],
+ ... check_async(expected_status=zookeeper.BADVERSION)
+ ... ); event.wait(1)
+ async callback got ()
+ >>> r
+ 0
+
+ >>> r = zk.aset_acl('/test', 2, [zc.zk.world_permission(3)],
+ ... check_async()); event.wait(1)
+ async callback got (0,)
+ >>> r
+ 0
+ >>> zk.get_acl('/test')[1] == [zc.zk.world_permission(3)]
+ True
+ """
+
+event = threading.Event()
+def check_async(show=True, expected_status=0):
+ event.clear()
+ def check(handle, status, *args):
+ if show:
+ print 'async callback got', args
+ event.set()
+ zc.zk.testing.assert_(
+ status==expected_status,
+ "Bad cb status %s" % status,
+ error=False)
+ return check
+
def test_suite():
+ checker = zope.testing.renormalizing.RENormalizing([
+ (re.compile('pid = \d+'), 'pid = 9999')
+ ])
return unittest.TestSuite((
unittest.makeSuite(Tests),
doctest.DocTestSuite(
setUp=zc.zk.testing.setUp, tearDown=zc.zk.testing.tearDown,
+ checker=checker,
),
manuel.testing.TestSuite(
- manuel.doctest.Manuel(
- checker = zope.testing.renormalizing.RENormalizing([
- (re.compile('pid = \d+'), 'pid = 9999')
- ])) + manuel.capture.Manuel(),
+ manuel.doctest.Manuel(checker=checker) + manuel.capture.Manuel(),
'README.txt',
setUp=zc.zk.testing.setUp, tearDown=zc.zk.testing.tearDown,
),
More information about the checkins
mailing list