[Zodb-checkins] SVN: ZODB/branches/jim-zeo-registerdb/src/Z
checkpointing.
Jim Fulton
jim at zope.com
Thu May 10 18:10:52 EDT 2007
Log message for revision 75679:
checkpointing.
Changed:
U ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
U ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py
U ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
U ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
A ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
U ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py
U ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py
U ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py 2007-05-10 22:10:52 UTC (rev 75679)
@@ -32,6 +32,7 @@
import transaction
import ZODB.serialize
+import ZEO.zrpc.error
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
@@ -630,20 +631,33 @@
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
- if newserial == ResolvedSerial:
- self.stats.conflicts_resolved += 1
- self.log("conflict resolved oid=%s" % oid_repr(oid), BLATHER)
- self.serials.append((oid, newserial))
+
+ if isinstance(newserial, str):
+ newserial = (oid, newserial)
+
+ if newserial:
+ for oid, s in newserial:
+
+ if s == ResolvedSerial:
+ self.stats.conflicts_resolved += 1
+ self.log("conflict resolved oid=%s"
+ % oid_repr(oid), BLATHER)
+
+ self.serials.append((oid, s))
+
return err is None
def _vote(self):
+ if not self.store_failed:
+ # Only call tpc_vote of no store call failed, otherwise
+ # the serialnos() call will deliver an exception that will be
+ # handled by the client in its tpc_vote() method.
+ serials = self.storage.tpc_vote(self.transaction)
+ if serials:
+ self.serials.extend(serials)
+
self.client.serialnos(self.serials)
- # If a store call failed, then return to the client immediately.
- # The serialnos() call will deliver an exception that will be
- # handled by the client in its tpc_vote() method.
- if self.store_failed:
- return
- return self.storage.tpc_vote(self.transaction)
+ return
def _abortVersion(self, src):
tid, oids = self.storage.abortVersion(src, self.transaction)
@@ -747,7 +761,7 @@
def __init__(self, server, storage_id):
self.server = server
self.storage_id = storage_id
- self.references = ZODB.serial.referencesf
+ self.references = ZODB.serialize.referencesf
def invalidate(self, tid, oids, version=''):
self.server.invalidate(
@@ -756,7 +770,8 @@
)
def invalidateCache(self):
- pass
+ self.server._invalidateCache(self.storage_id)
+
class StorageServer:
@@ -861,17 +876,12 @@
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
+ self.invq_bound = invalidation_queue_size
self.invq = {}
for name, storage in storages.items():
- lastInvalidations = getattr(storage, 'lastInvalidations', None)
- if lastInvalidations is None:
- self.invq[name] = [(storage.lastTransaction(), None)]
- else:
- self.invq[name] = list(
- lastInvalidations(invalidation_queue_size)
- )
- self.invq[name].reverse()
- self.invq_bound = invalidation_queue_size
+ self._setup_invq(name, storage)
+ storage.registerDB(StorageServerDB(self, name))
+
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection)
@@ -891,6 +901,17 @@
else:
self.monitor = None
+ def _setup_invq(self, name, storage):
+ lastInvalidations = getattr(storage, 'lastInvalidations', None)
+ if lastInvalidations is None:
+ self.invq[name] = [(storage.lastTransaction(), None)]
+ else:
+ self.invq[name] = list(
+ lastInvalidations(self.invq_bound)
+ )
+ self.invq[name].reverse()
+
+
def _setup_auth(self, protocol):
# Can't be done in global scope, because of cyclic references
from ZEO.auth import get_module
@@ -963,6 +984,49 @@
stats.clients += 1
return self.timeouts[storage_id], stats
+ def _invalidateCache(self, storage_id):
+ """We need to invalidate any caches we have.
+
+ This basically means telling our clients to
+ invalidate/revalidate their caches. We do this by closing them
+ and making them reconnect.
+ """
+
+ # This method can be called from foreign threads. We have to
+ # worry about interaction with the main thread.
+
+ # 1. We modify self.invq which is read by get_invalidations
+ # below. This is why get_invalidations makes a copy of
+ # self.invq.
+
+ # 2. We access connections. There are two dangers:
+ #
+ # a. We miss a new connection. This is not a problem because
+ # if a client connects after we get the list of connections,
+ # then it will have to read the invalidation queue, which
+ # has already been reset.
+ #
+ # b. A connection is closes while we are iterating. This
+ # doesn't matter, bacause we can call should_close on a closed
+ # connection.
+
+ # Rebuild invq
+ self._setup_invq(storage_id, self.storages[storage_id])
+
+ connections = self.connections.get(storage_id, ())
+
+ # Make a copy since we are going to be mutating the
+ # connections indirectoy by closing them. We don't care about
+ # later transactions since they will have to validate their
+ # caches anyway.
+ connections = connections[:]
+ for p in connections:
+ try:
+ p.connection.should_close()
+ except ZEO.zrpc.error.DisconnectedError:
+ pass
+
+
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
@@ -988,6 +1052,27 @@
"""
+ # This method can be called from foreign threads. We have to
+ # worry about interaction with the main thread.
+
+ # 1. We modify self.invq which is read by get_invalidations
+ # below. This is why get_invalidations makes a copy of
+ # self.invq.
+
+ # 2. We access connections. There are two dangers:
+ #
+ # a. We miss a new connection. This is not a problem because
+ # we are called while the storage lock is held. A new
+ # connection that tries to read data won't read committed
+ # data without first recieving an invalidation. Also, if a
+ # client connects after getting the list of connections,
+ # then it will have to read the invalidation queue, which
+ # has been updated to reflect the invalidations.
+ #
+ # b. A connection is closes while we are iterating. We'll need
+ # to cactch and ignore Disconnected errors.
+
+
if invalidated:
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
@@ -996,7 +1081,11 @@
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
- p.client.invalidateTransaction(tid, invalidated)
+ try:
+ p.client.invalidateTransaction(tid, invalidated)
+ except ZEO.zrpc.error.DisconnectedError:
+ pass
+
elif info is not None:
p.client.info(info)
@@ -1010,7 +1099,13 @@
do full cache verification.
"""
+
invq = self.invq[storage_id]
+
+ # We make a copy of invq because it might be modified by a
+ # foreign (other than main thread) calling invalidate above.
+ invq = invq[:]
+
if not invq:
log("invq empty")
return None, []
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py 2007-05-10 22:10:52 UTC (rev 75679)
@@ -43,5 +43,5 @@
An iterable of up to size entries must be returned, where each
entry is a transaction id and a sequence of object-id/version
pairs describing the objects and versions written by the
- transaction, ordered starting at the most recent.
+ transaction, in chronological order.
"""
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test 2007-05-10 22:10:52 UTC (rev 75679)
@@ -1,7 +1,7 @@
Storage Servers should call registerDB on storages to propigate invalidations
=============================================================================
-Storages servers propigate invalidations from their storages. Among
+Storages servers propagate invalidations from their storages. Among
other things, this allows client storages to be used in storage
servers, allowing storage-server fan out, spreading read load over
multiple storage servers.
@@ -23,12 +23,18 @@
... def lastInvalidations(self, size):
... return list(self.invalidations)
-We'll create a storage and a storage server using it:
+We dont' want the storage server to try to bind to a socket. We'll
+subclass it and give it a do-nothing dispatcher "class":
- >>> storage = FauxStorage()
>>> import ZEO.StorageServer
- >>> server = ZEO.StorageServer.StorageServer('addr', dict(t=storage))
+ >>> class StorageServer(ZEO.StorageServer.StorageServer):
+ ... DispatcherClass = lambda *a, **k: None
+We'll create a storage instance and a storage server using it:
+
+ >>> storage = FauxStorage()
+ >>> server = StorageServer('addr', dict(t=storage))
+
Our storage now has a db attribute that provides IStorageDB. It's
references method is just the referencesf function from ZODB.Serialize
@@ -50,47 +56,50 @@
... def __init__(self, mgr, obj):
... self.mgr = mgr
... self.obj = obj
- ... def close(self):
+ ... def should_close(self):
... print 'closed', self.obj.name
... self.mgr.close_conn(self)
>>> class ZEOStorage:
... def __init__(self, server, name):
+ ... self.name = name
... self.connection = Connection(server, self)
... self.client = Client(name)
Now, we'll register the client with the storage server:
- >>> server.register_connection('t', ZEOStorage(server, 1))
- >>> server.register_connection('t', ZEOStorage(server, 2))
+ >>> _ = server.register_connection('t', ZEOStorage(server, 1))
+ >>> _ = server.register_connection('t', ZEOStorage(server, 2))
Now, if we call invalidate, we'll see it propigate to the client:
- >>> storage.db.invalidate('trans2, ['ob1', 'ob2'])
- invalidateTransaction trans1 1
+ >>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
+ invalidateTransaction trans2 1
[('ob1', ''), ('ob2', '')]
- invalidateTransaction trans1 2
+ invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')]
- >>> storage.db.invalidate('trans3, ['ob1', 'ob2'], 'v')
- invalidateTransaction trans2 1
+ >>> storage.db.invalidate('trans3', ['ob1', 'ob2'], 'v')
+ invalidateTransaction trans3 1
[('ob1', 'v'), ('ob2', 'v')]
- invalidateTransaction trans2 2
+ invalidateTransaction trans3 2
[('ob1', 'v'), ('ob2', 'v')]
The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
- `trans2` [('ob1', 'v'), ('ob2', 'v')]
- `trans1` [('ob1', ''), ('ob2', '')]
+ 'trans3' [('ob1', 'v'), ('ob2', 'v')]
+ 'trans2' [('ob1', ''), ('ob2', '')]
+ 'trans1' [('ob0', ''), ('ob1', '')]
+ 'trans0' [('ob0', '')]
If we call invalidateCache, the storage server will close each of it's
connections:
>>> storage.db.invalidateCache()
- close 1
- close 2
+ closed 1
+ closed 2
The connections will then reopen and revalidate their caches.
@@ -98,3 +107,5 @@
>>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated
+ 'trans1' [('ob0', ''), ('ob1', '')]
+ 'trans0' [('ob0', '')]
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py 2007-05-10 22:10:52 UTC (rev 75679)
@@ -26,6 +26,8 @@
import unittest
import shutil
+import zope.testing.setupstack
+
# ZODB test support
import ZODB
import ZODB.tests.util
@@ -150,11 +152,13 @@
self._servers = [adminaddr]
self._conf_path = path
if not self.blob_cache_dir:
- self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
- self._storage = ClientStorage(zport, '1', cache_size=20000000,
- min_disconnect_poll=0.5, wait=1,
- wait_timeout=60, blob_dir=self.blob_cache_dir,
- blob_cache_writable=self.blob_cache_writable)
+ # This is the blob cache for ClientStorage
+ self.blob_cache_dir = tempfile.mkdtemp()
+ self._storage = ClientStorage(
+ zport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1,
+ wait_timeout=60, blob_dir=self.blob_cache_dir,
+ blob_cache_writable=self.blob_cache_writable)
self._storage.registerDB(DummyDB())
def tearDown(self):
@@ -816,11 +820,20 @@
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
+def zeoFanOutSetup(test):
+ zope.testing.setupstack.setUpDirectory(test)
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown))
suite.addTest(doctest.DocFileSuite('registerDB.test'))
+ suite.addTest(
+ doctest.DocFileSuite('zeo-fan-out.test',
+ setUp=zeoFanOutSetup,
+ tearDown=zope.testing.setupstack.tearDown,
+ ),
+ )
for klass in test_classes:
sub = unittest.makeSuite(klass, "check")
suite.addTest(sub)
Added: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test 2007-05-10 22:10:52 UTC (rev 75679)
@@ -0,0 +1,133 @@
+ZEO Fan Out
+===========
+
+We should be able to set up ZEO servers with ZEO clients. Let's see
+if we can make it work.
+
+We'll use some helper functions. The first is a helpter that starts
+ZEO servers for us and another one that picks ports.
+
+We'll start the first server:
+
+ >>> import ZEO.tests.forker, ZEO.tests.testZEO
+ >>> port0 = ZEO.tests.testZEO.get_port()
+ >>> zconf0 = ZEO.tests.forker.ZEOConfig(('', port0))
+ >>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
+ ... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
+
+
+Then we''ll start 2 others that use this one:
+
+ >>> port1 = ZEO.tests.testZEO.get_port()
+ >>> zconf1 = ZEO.tests.forker.ZEOConfig(('', port1))
+ >>> zport1, adminaddr1, pid1, path1 = ZEO.tests.forker.start_zeo_server(
+ ... '<zeoclient 1>\n server %s\n</zeoclient>\n' % port0,
+ ... zconf1, port1)
+
+ >>> port2 = ZEO.tests.testZEO.get_port()
+ >>> zconf2 = ZEO.tests.forker.ZEOConfig(('', port2))
+ >>> zport2, adminaddr2, pid2, path2 = ZEO.tests.forker.start_zeo_server(
+ ... '<zeoclient 1>\n server %s\n</zeoclient>\n' % port0,
+ ... zconf2, port2)
+
+Now, let's create some client storages that connect to these:
+
+ >>> import ZEO.ClientStorage
+ >>> cs1 = ZEO.ClientStorage.ClientStorage(('', port1), '1')
+ >>> cs2 = ZEO.ClientStorage.ClientStorage(('', port2), '1')
+
+And some databases and connections around these:
+
+ >>> from ZODB.DB import DB
+ >>> import transaction
+
+ >>> db1 = DB(cs1)
+ >>> tm1 = transaction.TransactionManager()
+ >>> c1 = db1.open(transaction_manager=tm1)
+ >>> r1 = c1.root()
+ >>> r1
+ {}
+
+ >>> db2 = DB(cs2)
+ >>> tm2 = transaction.TransactionManager()
+ >>> c2 = db2.open(transaction_manager=tm2)
+ >>> r2 = c2.root()
+ >>> r2
+ {}
+
+If we update c1, we'll eventually see the change in c2:
+
+ >>> import persistent
+ >>> class P(persistent.Persistent):
+ ... pass
+
+ >>> r1[1] = P()
+ >>> r1[1].v = 1000
+ >>> r1[2] = P()
+ >>> r1[2].v = -1000
+
+ >>> import time
+ >>> for i in range(100):
+ ... c2.sync()
+ ... if r2:
+ ... break
+ ... time.sleep(0.01)
+
+ >>> r2[1].v
+ 1000
+
+ >>> r2[2].v
+ -1000
+
+Now, let's see if we can break it. :)
+
+ >>> def f():
+ ... for in in range(100):
+ ... r1[1] -= 1
+ ... r1[2] += 1
+ ... tm1.commit()
+ ... time.sleep(0.01)
+ >>> import thread
+ >>> thread.start_new_thread(f, ())
+
+ >>> for i in range(1000):
+ ... c2.sync()
+ ... if c2[1] + c2[2]:
+ ... print 'oops', c2[1], c2[2]
+ ... if not c2[1]:
+ ... break
+ ... time.sleep(0.01)
+
+If we shutdown and restart the source server, the variables will be
+invalidated:
+
+ >>> forker.shutdown_zeo_server(adminaddr0)
+ >>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
+ ... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
+
+ >>> for i in range(1000):
+ ... c1.sync()
+ ... c2.sync()
+ ... if (
+ ... (r1[1]._p_changed is None)
+ ... and
+ ... (r1[2]._p_changed is None)
+ ... and
+ ... (r2[1]._p_changed is None)
+ ... and
+ ... (r2[2]._p_changed is None)
+ ... ):
+ ... print 'Cool'
+ ... break
+ ... time.sleep(0.01)
+ ... else:
+ ... print 'Dang'
+ Cool
+
+Cleanup:
+
+ >>> db1.close()
+ >>> db2.close()
+ >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr0)
+ >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr1)
+ >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr2)
Property changes on: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py 2007-05-10 22:10:52 UTC (rev 75679)
@@ -68,6 +68,8 @@
MAC_BIT = 0x80000000L
+_close_marker = object()
+
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
@@ -235,6 +237,9 @@
else:
return True
+ def should_close(self):
+ self.__output.append(_close_marker)
+
def handle_write(self):
self.__output_lock.acquire()
try:
@@ -250,7 +255,13 @@
l = 0
for i in range(len(output)):
- l += len(output[i])
+ try:
+ l += len(output[i])
+ except TypeError:
+ # We had an output marker, close the connection
+ assert output[i] is _close_marker
+ return self.close()
+
if l > SEND_SIZE:
break
Modified: ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py 2007-05-10 22:10:52 UTC (rev 75679)
@@ -614,6 +614,12 @@
- An iterable of object-id and serial pairs giving new serials
for objects.
+
+ A serial, returned as a string or in a sequence of oid/serial
+ pairs, may be the special value
+ ZODB.ConflictResolution.ResolvedSerial to indicate that a
+ conflict occured and that the object should be invalidated.
+
"""
def tpc_abort(transaction):
@@ -663,8 +669,14 @@
The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction.
- After the tpc_vote call, bew serials must have been returned,
+ After the tpc_vote call, new serials must have been returned,
either from tpc_vote or store for objects passed to store.
+
+ A serial returned in a sequence of oid/serial pairs, may be
+ the special value ZODB.ConflictResolution.ResolvedSerial to
+ indicate that a conflict occured and that the object should be
+ invalidated.
+
"""
class IStorageRestoreable(IStorage):
Modified: ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py 2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py 2007-05-10 22:10:52 UTC (rev 75679)
@@ -483,7 +483,7 @@
def lastInvalidations():
"""
-The last invalidations method is used by a storage server to pupulate
+The last invalidations method is used by a storage server to populate
it's data structure of recent invalidations. The lastInvalidations
method is passed a count and must return up to count number of the
most recent transactions.
More information about the Zodb-checkins
mailing list