[Zodb-checkins] SVN: ZODB/trunk/ - (3.8a1) ZEO's strategoes for
avoiding client cache verification were
Jim Fulton
jim at zope.com
Mon Mar 26 18:04:49 EDT 2007
Log message for revision 73655:
- (3.8a1) ZEO's strategoes for avoiding client cache verification were
improved in the case that servers are restarted. Before, if
transactions were committed after the restart, clients that were up
to date or nearly up to date at the time of the restart and then
connected had to verify their caches. Now, it is far more likely
that a client that reconnects soon after a server restart won't have
to verify its cache.
- Fixed a serious bug that could cause clients that disconnect from and
reconnect to a server to get bad invalidation data if the server
serves multiple storages with active writes.
Changed:
U ZODB/trunk/NEWS.txt
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt 2007-03-26 22:04:46 UTC (rev 73654)
+++ ZODB/trunk/NEWS.txt 2007-03-26 22:04:48 UTC (rev 73655)
@@ -1,6 +1,21 @@
What's new on ZODB 3.8a1?
=========================
+ZEO
+---
+
+- (3.8a1) ZEO's strategoes for avoiding client cache verification were
+ improved in the case that servers are restarted. Before, if
+ transactions were committed after the restart, clients that were up
+ to date or nearly up to date at the time of the restart and then
+ connected had to verify their caches. Now, it is far more likely
+ that a client that reconnects soon after a server restart won't have
+ to verify its cache.
+
+- Fixed a serious bug that could cause clients that disconnect from and
+ reconnect to a server to get bad invalidation data if the server
+ serves multiple storages with active writes.
+
Transactions
------------
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2007-03-26 22:04:46 UTC (rev 73654)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2007-03-26 22:04:48 UTC (rev 73655)
@@ -283,7 +283,7 @@
return p, s, v, pv, sv
def getInvalidations(self, tid):
- invtid, invlist = self.server.get_invalidations(tid)
+ invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
if invtid is None:
return None
self.log("Return %d invalidations up to tid %s"
@@ -787,11 +787,20 @@
self.database = None
if auth_protocol:
self._setup_auth(auth_protocol)
- # A list of at most invalidation_queue_size invalidations.
+ # A list, by server, of at most invalidation_queue_size invalidations.
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
- self.invq = []
+ self.invq = {}
+ for name, storage in storages.items():
+ lastInvalidations = getattr(storage, 'lastInvalidations', None)
+ if lastInvalidations is None:
+ self.invq[name] = [(storage.lastTransaction(), None)]
+ else:
+ self.invq[name] = list(
+ lastInvalidations(invalidation_queue_size)
+ )
+ self.invq[name].reverse()
self.invq_bound = invalidation_queue_size
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
@@ -906,17 +915,20 @@
the current client.
"""
+
if invalidated:
- if len(self.invq) >= self.invq_bound:
- self.invq.pop()
- self.invq.insert(0, (tid, invalidated))
+ invq = self.invq[storage_id]
+ if len(invq) >= self.invq_bound:
+ invq.pop()
+ invq.insert(0, (tid, invalidated))
+
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated)
elif info is not None:
p.client.info(info)
- def get_invalidations(self, tid):
+ def get_invalidations(self, storage_id, tid):
"""Return a tid and list of all objects invalidation since tid.
The tid is the most recent transaction id seen by the client.
@@ -926,22 +938,23 @@
do full cache verification.
"""
- if not self.invq:
+ invq = self.invq[storage_id]
+ if not invq:
log("invq empty")
return None, []
- earliest_tid = self.invq[-1][0]
+ earliest_tid = invq[-1][0]
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
oids = {}
- for _tid, L in self.invq:
+ for _tid, L in invq:
if _tid <= tid:
break
for key in L:
oids[key] = 1
- latest_tid = self.invq[0][0]
+ latest_tid = invq[0][0]
return latest_tid, oids.keys()
def close_server(self):
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2007-03-26 22:04:46 UTC (rev 73654)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2007-03-26 22:04:48 UTC (rev 73655)
@@ -15,6 +15,7 @@
# System imports
import asyncore
+import doctest
import logging
import os
import random
@@ -27,6 +28,7 @@
# ZODB test support
import ZODB
+import ZODB.tests.util
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
@@ -46,6 +48,8 @@
import ZEO.tests.ConnectionTests
+import ZEO.StorageServer
+
logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB:
@@ -580,11 +584,212 @@
super(BlobWritableCacheTests, self).setUp()
+class StorageServerClientWrapper:
+
+ def __init__(self):
+ self.serials = []
+
+ def serialnos(self, serials):
+ self.serials.extend(serials)
+
+ def info(self, info):
+ pass
+
+class StorageServerWrapper:
+
+ def __init__(self, server, storage_id):
+ self.storage_id = storage_id
+ self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+ self.server.register(storage_id, False)
+ self.server._thunk = lambda : None
+ self.server.client = StorageServerClientWrapper()
+
+ def sortKey(self):
+ return self.storage_id
+
+ def __getattr__(self, name):
+ return getattr(self.server, name)
+
+ def registerDB(self, *args):
+ pass
+
+ def supportsUndo(self):
+ return False
+
+ def supportsVersions(self):
+ return False
+
+ def new_oid(self):
+ return self.server.new_oids(1)[0]
+
+ def tpc_begin(self, transaction):
+ self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
+
+ def tpc_vote(self, transaction):
+ self.server._restart()
+ self.server.vote(id(transaction))
+ result = self.server.client.serials[:]
+ del self.server.client.serials[:]
+ return result
+
+ def store(self, oid, serial, data, version, transaction):
+ self.server.storea(oid, serial, data, version, id(transaction))
+
+ def tpc_finish(self, transaction, func = lambda: None):
+ self.server.tpc_finish(id(transaction))
+
+
+def multiple_storages_invalidation_queue_is_not_insane():
+ """
+ >>> from ZEO.StorageServer import StorageServer, ZEOStorage
+ >>> from ZODB.FileStorage import FileStorage
+ >>> from ZODB.DB import DB
+ >>> from persistent.dict import PersistentDict
+ >>> from transaction import commit
+ >>> fs1 = FileStorage('t1.fs')
+ >>> fs2 = FileStorage('t2.fs')
+ >>> server = StorageServer(('', get_port()), dict(fs1=fs1, fs2=fs2))
+
+ >>> s1 = StorageServerWrapper(server, 'fs1')
+ >>> s2 = StorageServerWrapper(server, 'fs2')
+
+ >>> db1 = DB(s1); conn1 = db1.open()
+ >>> db2 = DB(s2); conn2 = db2.open()
+
+ >>> commit()
+ >>> o1 = conn1.root()
+ >>> for i in range(10):
+ ... o1.x = PersistentDict(); o1 = o1.x
+ ... commit()
+
+ >>> last = fs1.lastTransaction()
+ >>> for i in range(5):
+ ... o1.x = PersistentDict(); o1 = o1.x
+ ... commit()
+
+ >>> o2 = conn2.root()
+ >>> for i in range(20):
+ ... o2.x = PersistentDict(); o2 = o2.x
+ ... commit()
+
+ >>> trans, oids = s1.getInvalidations(last)
+ >>> from ZODB.utils import u64
+ >>> sorted([u64(oid) for (oid, v) in oids])
+ [10L, 11L, 12L, 13L, 14L]
+
+ >>> server.close_server()
+ """
+
+def getInvalidationsAfterServerRestart():
+ """
+
+Clients were often forced to verify their caches after a server
+restart even if there weren't many transactions between the server
+restart and the client connect.
+
+Let's create a file storage and stuff some data into it:
+
+ >>> from ZEO.StorageServer import StorageServer, ZEOStorage
+ >>> from ZODB.FileStorage import FileStorage
+ >>> from ZODB.DB import DB
+ >>> from persistent.dict import PersistentDict
+ >>> fs = FileStorage('t.fs')
+ >>> db = DB(fs)
+ >>> conn = db.open()
+ >>> from transaction import commit
+ >>> last = []
+ >>> for i in range(100):
+ ... conn.root()[i] = PersistentDict()
+ ... commit()
+ ... last.append(fs.lastTransaction())
+ >>> db.close()
+
+Now we'll open a storage server on the data, simulating a restart:
+
+ >>> fs = FileStorage('t.fs')
+ >>> sv = StorageServer(('', get_port()), dict(fs=fs))
+ >>> s = ZEOStorage(sv, sv.read_only)
+ >>> s.register('fs', False)
+
+If we ask for the last transaction, we should get the last transaction
+we saved:
+
+ >>> s.lastTransaction() == last[-1]
+ True
+
+If a storage implements the method lastInvalidations, as FileStorage
+does, then the stroage server will populate its invalidation data
+structure using lastTransactions.
+
+
+ >>> tid, oids = s.getInvalidations(last[-10])
+ >>> tid == last[-1]
+ True
+
+
+ >>> from ZODB.utils import u64
+ >>> sorted([u64(oid) for (oid, version) in oids])
+ [0L, 92L, 93L, 94L, 95L, 96L, 97L, 98L, 99L, 100L]
+
+(Note that the fact that we get oids for 92-100 is actually an
+artifact of the fact that the FileStorage lastInvalidations method
+returns all OIDs written by transactions, even if the OIDs were
+created and not modified. FileStorages don't record whether objects
+were created rather than modified. Objects that are just created don't
+need to be invalidated. This means we'll invalidate objects that
+dont' need to be invalidated, however, that's better than verifying
+caches.)
+
+ >>> sv.close_server()
+ >>> fs.close()
+
+If a storage doesn't implement lastInvalidations, a client can still
+avoid verifying its cache if it was up to date when the server
+restarted. To illustrate this, we'll create a subclass of FileStorage
+without this method:
+
+ >>> class FS(FileStorage):
+ ... lastInvalidations = property()
+
+ >>> fs = FS('t.fs')
+ >>> sv = StorageServer(('', get_port()), dict(fs=fs))
+ >>> st = StorageServerWrapper(sv, 'fs')
+ >>> s = st.server
+
+Now, if we ask fior the invalidations since the last committed
+transaction, we'll get a result:
+
+ >>> tid, oids = s.getInvalidations(last[-1])
+ >>> tid == last[-1]
+ True
+ >>> oids
+ []
+
+ >>> db = DB(st); conn = db.open()
+ >>> ob = conn.root()
+ >>> for i in range(5):
+ ... ob.x = PersistentDict(); ob = ob.x
+ ... commit()
+ ... last.append(fs.lastTransaction())
+
+ >>> ntid, oids = s.getInvalidations(tid)
+ >>> ntid == last[-1]
+ True
+
+ >>> sorted([u64(oid) for (oid, version) in oids])
+ [0L, 101L, 102L, 103L, 104L]
+
+ """
+
+
test_classes = [FileStorageTests, MappingStorageTests,
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
+
def test_suite():
suite = unittest.TestSuite()
+ suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
+ tearDown=ZODB.tests.util.tearDown))
for klass in test_classes:
sub = unittest.makeSuite(klass, "check")
suite.addTest(sub)
More information about the Zodb-checkins
mailing list