[Zodb-checkins] SVN: ZODB/trunk/src/ - Storage servers now emit Serving and Closed events so subscribers
Jim Fulton
jim at zope.com
Mon Nov 21 12:33:31 UTC 2011
Log message for revision 123454:
- Storage servers now emit Serving and Closed events so subscribers
can discover addresses when dynamic port assignment (bind to port 0)
is used. This could, for example, be used to update address
information in a ZooKeeper database.
- Client storagers have a method, new_addr, that can be used to change
the server address(es). This can be used, for example, to update a
dynamically determined server address from information in a
ZooKeeper database.
- Moved some responsibility from runzeo to StorageServer to make it
easier to use storage servers without runzeo.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/runzeo.py
A ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test
A ZODB/trunk/src/ZEO/tests/new_addr.test
U ZODB/trunk/src/ZEO/tests/registerDB.test
U ZODB/trunk/src/ZEO/tests/servertesting.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/tests/zeoserver.py
U ZODB/trunk/src/ZEO/zrpc/client.py
U ZODB/trunk/src/ZEO/zrpc/connection.py
U ZODB/trunk/src/ZEO/zrpc/server.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/CHANGES.txt 2011-11-21 12:33:31 UTC (rev 123454)
@@ -2,8 +2,8 @@
Change History
================
-3.11.0 (2010-??-??)
-===================
+3.11.0a1 (2011-??-??)
+=====================
New Features
------------
@@ -19,6 +19,16 @@
comparison inherited from object. (This doesn't apply to old-style
class instances.)
+- Storage servers now emit Serving and Closed events so subscribers
+ can discover addresses when dynamic port assignment (bind to port 0)
+ is used. This could, for example, be used to update address
+ information in a ZooKeeper database.
+
+- Client storagers have a method, new_addr, that can be used to change
+ the server address(es). This can be used, for example, to update a
+ dynamically determined server address from information in a
+ ZooKeeper database.
+
3.10.5 (2011-11-19)
===================
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -424,8 +424,10 @@
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
+ def new_addr(self, addr):
+ self._addr = addr
+ self._rpc_mgr.new_addrs(addr)
-
def _wait(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
@@ -503,10 +505,15 @@
"""
self._db = db
- def is_connected(self):
+ def is_connected(self, test=False):
"""Return whether the storage is currently connected to a server."""
# This function is used by clients, so we only report that a
# connection exists when the connection is ready to use.
+ if test:
+ try:
+ self._server.lastTransaction()
+ except Exception:
+ pass
return self._ready.isSet()
def sync(self):
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -46,6 +46,7 @@
import warnings
import ZEO.zrpc.error
import ZODB.blob
+import ZODB.event
import ZODB.serialize
import ZODB.TimeStamp
import zope.interface
@@ -782,18 +783,20 @@
# Classes we instantiate. A subclass might override.
- DispatcherClass = Dispatcher
+ DispatcherClass = ZEO.zrpc.server.Dispatcher
ZEOStorageClass = ZEOStorage
ManagedServerConnectionClass = ManagedServerConnection
- def __init__(self, addr, storages, read_only=0,
+ def __init__(self, addr, storages,
+ read_only=0,
invalidation_queue_size=100,
invalidation_age=None,
transaction_timeout=None,
monitor_address=None,
auth_protocol=None,
auth_database=None,
- auth_realm=None):
+ auth_realm=None,
+ ):
"""StorageServer constructor.
This is typically invoked from the start.py script.
@@ -891,8 +894,13 @@
storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age
self.connections = {}
- self.dispatcher = self.DispatcherClass(addr,
- factory=self.new_connection)
+ self.socket_map = {}
+ self.dispatcher = self.DispatcherClass(
+ addr, factory=self.new_connection, map=self.socket_map)
+ if len(self.addr) == 2 and self.addr[1] == 0 and self.addr[0]:
+ self.addr = self.dispatcher.socket.getsockname()
+ ZODB.event.notify(
+ Serving(self, address=self.dispatcher.socket.getsockname()))
self.stats = {}
self.timeouts = {}
for name in self.storages.keys():
@@ -1137,26 +1145,53 @@
return latest_tid, list(oids)
- def close_server(self):
+ def loop(self):
+ try:
+ asyncore.loop(map=self.socket_map)
+ except Exception:
+ if not self.__closed:
+ raise # Unexpected exc
+
+ __thread = None
+ def start_thread(self, daemon=True):
+ self.__thread = thread = threading.Thread(target=self.loop)
+ thread.setDaemon(daemon)
+ thread.start()
+
+ __closed = False
+ def close(self, join_timeout=1):
"""Close the dispatcher so that there are no new connections.
This is only called from the test suite, AFAICT.
"""
+ if self.__closed:
+ return
+ self.__closed = True
+
+ # Stop accepting connections
self.dispatcher.close()
if self.monitor is not None:
self.monitor.close()
- # Force the asyncore mainloop to exit by hackery, i.e. close
- # every socket in the map. loop() will return when the map is
- # empty.
- for s in asyncore.socket_map.values():
- try:
- s.close()
- except:
- pass
- asyncore.socket_map.clear()
- for storage in self.storages.values():
+
+ ZODB.event.notify(Closed(self))
+
+ # Close open client connections
+ for sid, connections in self.connections.items():
+ for conn in connections[:]:
+ try:
+ conn.connection.close()
+ except:
+ pass
+
+ for name, storage in self.storages.iteritems():
+ logger.info("closing storage %r", name)
storage.close()
+ if self.__thread is not None:
+ self.__thread.join(join_timeout)
+
+ close_server = close
+
def close_conn(self, conn):
"""Internal: remove the given connection from self.connections.
@@ -1570,3 +1605,16 @@
if self.file:
self.file.close()
self.file = None
+
+class ServerEvent:
+
+ def __init__(self, server, **kw):
+ self.__dict__.update(kw)
+ self.server = server
+
+class Serving(ServerEvent):
+ pass
+
+class Closed(ServerEvent):
+ pass
+
Modified: ZODB/trunk/src/ZEO/runzeo.py
===================================================================
--- ZODB/trunk/src/ZEO/runzeo.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/runzeo.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -160,7 +160,7 @@
self.create_server()
self.loop_forever()
finally:
- self.close_storages()
+ self.server.close_server()
self.clear_socket()
self.remove_pidfile()
@@ -178,6 +178,10 @@
root.addHandler(handler)
def check_socket(self):
+ if (isinstance(self.options.address, tuple) and
+ self.options.address[1] is None):
+ self.options.address = self.options.address[0], 0
+ return
if self.can_connect(self.options.family, self.options.address):
self.options.usage("address %s already in use" %
repr(self.options.address))
@@ -254,7 +258,7 @@
if self.options.testing_exit_immediately:
print "testing exit immediately"
else:
- asyncore.loop()
+ self.server.loop()
def handle_sigterm(self):
log("terminated by SIGTERM")
@@ -289,20 +293,6 @@
handler.rotate()
log("Log files rotation complete", level=logging.INFO)
-
-
-
-
-
- def close_storages(self):
- for name, storage in self.storages.items():
- log("closing storage %r" % name)
- try:
- storage.close()
- except: # Keep going
- log("failed to close storage %r" % name,
- level=logging.ERROR, exc_info=True)
-
def _get_pidfile(self):
pidfile = self.options.pid_file
# 'pidfile' is marked as not required.
Added: ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test (rev 0)
+++ ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test 2011-11-21 12:33:31 UTC (rev 123454)
@@ -0,0 +1,106 @@
+The storage server can be told to bind to port 0, allowing the OS to
+pick a port dynamically. For this to be useful, there needs to be a
+way to tell someone. For this reason, the server posts events to
+ZODB.notify.
+
+ >>> import ZODB.event
+ >>> old_notify = ZODB.event.notify
+
+ >>> last_event = None
+ >>> def notify(event):
+ ... global last_event
+ ... last_event = event
+ >>> ZODB.event.notify = notify
+
+Now, let's start a server and verify that we get a serving event:
+
+ >>> import ZEO.StorageServer, ZODB.MappingStorage
+ >>> server = ZEO.StorageServer.StorageServer(
+ ... ('127.0.0.1', 0), {'1': ZODB.MappingStorage.MappingStorage()})
+
+ >>> isinstance(last_event, ZEO.StorageServer.Serving)
+ True
+ >>> last_event.server is server
+ True
+
+ >>> last_event.address[0], last_event.address[1] > 0
+ ('127.0.0.1', True)
+
+If the host part pf the address passed to the constructor is not an
+empty string. then the server addr attribute is the same as the
+address attribute of the event:
+
+ >>> server.addr == last_event.address
+ True
+
+Let's run the server in a thread, to make sure we can connect.
+
+ >>> server.start_thread()
+
+ >>> client = ZEO.client(last_event.address)
+ >>> client.is_connected()
+ True
+
+If we close the server, we'll get a closed event:
+
+ >>> server.close()
+ >>> isinstance(last_event, ZEO.StorageServer.Closed)
+ True
+ >>> last_event.server is server
+ True
+
+ >>> wait_until(lambda : not client.is_connected(test=True))
+ >>> client.close()
+
+If we pass an empty string as the host part of the server address, we
+can't really assign a single address, so the server addr attribute is
+left alone:
+
+ >>> server = ZEO.StorageServer.StorageServer(
+ ... ('', 0), {'1': ZODB.MappingStorage.MappingStorage()})
+
+ >>> isinstance(last_event, ZEO.StorageServer.Serving)
+ True
+ >>> last_event.server is server
+ True
+
+ >>> last_event.address[1] > 0
+ True
+
+If the host part pf the address passed to the constructor is not an
+empty string. then the server addr attribute is the same as the
+address attribute of the event:
+
+ >>> server.addr
+ ('', 0)
+
+ >>> server.close()
+
+The runzeo module provides some process support, including getting the
+server configuration via a ZConfig configuration file. To spell a
+dynamic port using ZConfig, you'd use a hostname by itself. In this
+case, ZConfig passes None as the port.
+
+ >>> import ZEO.runzeo
+ >>> open('conf', 'w').write("""
+ ... <zeo>
+ ... address 127.0.0.1
+ ... </zeo>
+ ... <mappingstorage>
+ ... </mappingstorage>
+ ... """)
+ >>> options = ZEO.runzeo.ZEOOptions()
+ >>> options.realize('-C conf'.split())
+ >>> options.address
+ ('127.0.0.1', None)
+
+ >>> rs = ZEO.runzeo.ZEOServer(options)
+ >>> rs.check_socket()
+ >>> options.address
+ ('127.0.0.1', 0)
+
+
+.. cleanup
+
+ >>> ZODB.event.notify = old_notify
+
Property changes on: ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test
___________________________________________________________________
Added: svn:eol-style
+ native
Added: ZODB/trunk/src/ZEO/tests/new_addr.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/new_addr.test (rev 0)
+++ ZODB/trunk/src/ZEO/tests/new_addr.test 2011-11-21 12:33:31 UTC (rev 123454)
@@ -0,0 +1,52 @@
+You can change the address(es) of a client storaage.
+
+We'll start by setting up a server and connecting to it:
+
+ >>> import ZEO, ZEO.StorageServer, ZODB.FileStorage, transaction
+ >>> server = ZEO.StorageServer.StorageServer(
+ ... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
+ >>> server.start_thread()
+
+ >>> conn = ZEO.connection(server.addr)
+ >>> client = conn.db().storage
+ >>> client.is_connected()
+ True
+ >>> conn.root()
+ {}
+ >>> conn.root.x = 1
+ >>> transaction.commit()
+
+Now we'll close the server:
+
+ >>> server.close()
+
+And wait for the connectin to notice it's disconnected:
+
+ >>> wait_until(lambda : not client.is_connected())
+
+Now, we'll restart the server and update the connection:
+
+ >>> server = ZEO.StorageServer.StorageServer(
+ ... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
+ >>> server.start_thread()
+ >>> client.new_addr(server.addr)
+
+
+Update with another client:
+
+ >>> conn2 = ZEO.connection(server.addr)
+ >>> conn2.root.x += 1
+ >>> transaction.commit()
+
+Wait for connect:
+
+ >>> wait_until(lambda : client.is_connected())
+ >>> _ = transaction.begin()
+ >>> conn.root()
+ {'x': 2}
+
+.. cleanup
+
+ >>> conn.close()
+ >>> conn2.close()
+ >>> server.close()
Property changes on: ZODB/trunk/src/ZEO/tests/new_addr.test
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/registerDB.test 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/registerDB.test 2011-11-21 12:33:31 UTC (rev 123454)
@@ -9,7 +9,7 @@
We'll create a Faux storage that has a registerDB method.
>>> class FauxStorage:
- ... invalidations = [('trans0', ['ob0']),
+ ... invalidations = [('trans0', ['ob0']),
... ('trans1', ['ob0', 'ob1']),
... ]
... def registerDB(self, db):
@@ -28,7 +28,10 @@
>>> import ZEO.StorageServer
>>> class StorageServer(ZEO.StorageServer.StorageServer):
- ... DispatcherClass = lambda *a, **k: None
+ ... class DispatcherClass:
+ ... __init__ = lambda *a, **kw: None
+ ... class socket:
+ ... getsockname = staticmethod(lambda : 'socket')
We'll create a storage instance and a storage server using it:
@@ -80,7 +83,7 @@
>>> _ = server.register_connection('t', ZEOStorage(server, 1))
>>> _ = server.register_connection('t', ZEOStorage(server, 2))
-
+
Now, if we call invalidate, we'll see it propigate to the client:
>>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
@@ -112,7 +115,7 @@
closed 2
The connections will then reopen and revalidate their caches.
-
+
The servers's invalidation queue will get reset
>>> for tid, invalidated in server.invq['t']:
Modified: ZODB/trunk/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/servertesting.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/servertesting.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -40,9 +40,12 @@
storages = {'1': ZODB.MappingStorage.MappingStorage()}
ZEO.StorageServer.StorageServer.__init__(self, addr, storages, **kw)
- def DispatcherClass(*args, **kw):
- pass
+ class DispatcherClass:
+ __init__ = lambda *a, **kw: None
+ class socket:
+ getsockname = staticmethod(lambda : 'socket')
+
class Connection:
peer_protocol_version = ZEO.zrpc.connection.Connection.current_protocol
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -898,7 +898,7 @@
>>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14]
- >>> server.close_server()
+ >>> server.close()
"""
def getInvalidationsAfterServerRestart():
@@ -962,7 +962,7 @@
dont' need to be invalidated, however, that's better than verifying
caches.)
- >>> sv.close_server()
+ >>> sv.close()
>>> fs.close()
If a storage doesn't implement lastInvalidations, a client can still
@@ -1249,7 +1249,7 @@
------
--T INFO ZEO.zrpc () listening on ...
------
- --T INFO ZEO.runzeo () closing storage '1'
+ --T INFO ZEO.StorageServer closing storage '1'
testing exit immediately
"""
@@ -1761,6 +1761,7 @@
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
+ 'dynamic_server_ports.test', 'new_addr.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
Modified: ZODB/trunk/src/ZEO/tests/zeoserver.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeoserver.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/zeoserver.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -94,7 +94,7 @@
# the ack character until the storage is finished closing.
if self._count <= 0:
self.log('closing the storage')
- self._server.close_server()
+ self._server.close()
if not self._keep:
for storage in self._server.storages.values():
cleanup(storage)
@@ -206,6 +206,7 @@
d.start()
# Loop for socket events
log(label, 'entering asyncore loop')
+ server.start_thread()
asyncore.loop()
Modified: ZODB/trunk/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/client.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/zrpc/client.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -146,6 +146,9 @@
# attempting to connect.
self.thread = None # Protected by self.cond
+ def new_addrs(self, addrs):
+ self.addrlist = self._parse_addrs(addrs)
+
def _start_asyncore_loop(self):
self.map = {}
self.trigger = ZEO.zrpc.trigger.trigger(self.map)
@@ -269,9 +272,7 @@
t = self.thread
if t is None:
log("CM.connect(): starting ConnectThread")
- self.thread = t = ConnectThread(self, self.client,
- self.addrlist,
- self.tmin, self.tmax)
+ self.thread = t = ConnectThread(self, self.client)
t.setDaemon(1)
t.start()
if sync:
@@ -362,13 +363,10 @@
# We don't expect clients to call any methods of this Thread other
# than close() and those defined by the Thread API.
- def __init__(self, mgr, client, addrlist, tmin, tmax):
- self.__super_init(name="Connect(%s)" % addrlist)
+ def __init__(self, mgr, client):
+ self.__super_init(name="Connect(%s)" % mgr.addrlist)
self.mgr = mgr
self.client = client
- self.addrlist = addrlist
- self.tmin = tmin
- self.tmax = tmax
self.stopped = 0
self.one_attempt = threading.Event()
# A ConnectThread keeps track of whether it has finished a
@@ -380,7 +378,7 @@
self.stopped = 1
def run(self):
- delay = self.tmin
+ delay = self.mgr.tmin
success = 0
# Don't wait too long the first time.
# TODO: make timeout configurable?
@@ -396,11 +394,11 @@
if self.mgr.is_connected():
log("CT: still trying to replace fallback connection",
level=logging.INFO)
- delay = min(delay*2, self.tmax)
+ delay = min(delay*2, self.mgr.tmax)
log("CT: exiting thread: %s" % self.getName())
def try_connecting(self, timeout):
- """Try connecting to all self.addrlist addresses.
+ """Try connecting to all self.mgr.addrlist addresses.
Return 1 if a preferred connection was found; 0 if no
connection was found; and -1 if a fallback connection was
@@ -408,7 +406,7 @@
If no connection is found within timeout seconds, return 0.
"""
- log("CT: attempting to connect on %d sockets" % len(self.addrlist))
+ log("CT: attempting to connect on %d sockets" % len(self.mgr.addrlist))
deadline = time.time() + timeout
wrappers = self._create_wrappers()
for wrap in wrappers.keys():
@@ -434,7 +432,7 @@
return 0
def _expand_addrlist(self):
- for domain, addr in self.addrlist:
+ for domain, addr in self.mgr.addrlist:
# AF_INET really means either IPv4 or IPv6, possibly
# indirected by DNS. By design, DNS lookup is deferred
# until connections get established, so that DNS
Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -12,6 +12,7 @@
#
##############################################################################
import asyncore
+import errno
import sys
import threading
import logging
@@ -658,7 +659,11 @@
def server_loop(map):
while len(map) > 1:
- asyncore.poll(30.0, map)
+ try:
+ asyncore.poll(30.0, map)
+ except Exception, v:
+ if v.args[0] != errno.EBADF:
+ raise
for o in map.values():
o.close()
Modified: ZODB/trunk/src/ZEO/zrpc/server.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/server.py 2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/zrpc/server.py 2011-11-21 12:33:31 UTC (rev 123454)
@@ -43,8 +43,8 @@
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
- def __init__(self, addr, factory=Connection):
- self.__super_init()
+ def __init__(self, addr, factory=Connection, map=None):
+ self.__super_init(map=map)
self.addr = addr
self.factory = factory
self._open_socket()
More information about the Zodb-checkins
mailing list