[Zodb-checkins] SVN: ZODB/trunk/ - (3.8a1) ZEO's strategoes for avoiding client cache verification were

Jim Fulton jim at zope.com
Mon Mar 26 18:04:49 EDT 2007


Log message for revision 73655:
  - (3.8a1) ZEO's strategoes for avoiding client cache verification were
    improved in the case that servers are restarted.  Before, if
    transactions were committed after the restart, clients that were up
    to date or nearly up to date at the time of the restart and then
    connected had to verify their caches.  Now, it is far more likely
    that a client that reconnects soon after a server restart won't have
    to verify its cache.
  
  - Fixed a serious bug that could cause clients that disconnect from and
    reconnect to a server to get bad invalidation data if the server
    serves multiple storages with active writes.
  

Changed:
  U   ZODB/trunk/NEWS.txt
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py

-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt	2007-03-26 22:04:46 UTC (rev 73654)
+++ ZODB/trunk/NEWS.txt	2007-03-26 22:04:48 UTC (rev 73655)
@@ -1,6 +1,21 @@
 What's new on ZODB 3.8a1?
 =========================
 
+ZEO
+---
+
+- (3.8a1) ZEO's strategoes for avoiding client cache verification were
+  improved in the case that servers are restarted.  Before, if
+  transactions were committed after the restart, clients that were up
+  to date or nearly up to date at the time of the restart and then
+  connected had to verify their caches.  Now, it is far more likely
+  that a client that reconnects soon after a server restart won't have
+  to verify its cache.
+
+- Fixed a serious bug that could cause clients that disconnect from and
+  reconnect to a server to get bad invalidation data if the server
+  serves multiple storages with active writes.
+
 Transactions
 ------------
 

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2007-03-26 22:04:46 UTC (rev 73654)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2007-03-26 22:04:48 UTC (rev 73655)
@@ -283,7 +283,7 @@
         return p, s, v, pv, sv
 
     def getInvalidations(self, tid):
-        invtid, invlist = self.server.get_invalidations(tid)
+        invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
         if invtid is None:
             return None
         self.log("Return %d invalidations up to tid %s"
@@ -787,11 +787,20 @@
         self.database = None
         if auth_protocol:
             self._setup_auth(auth_protocol)
-        # A list of at most invalidation_queue_size invalidations.
+        # A list, by server, of at most invalidation_queue_size invalidations.
         # The list is kept in sorted order with the most recent
         # invalidation at the front.  The list never has more than
         # self.invq_bound elements.
-        self.invq = []
+        self.invq = {}
+        for name, storage in storages.items():
+            lastInvalidations = getattr(storage, 'lastInvalidations', None)
+            if lastInvalidations is None:
+                self.invq[name] = [(storage.lastTransaction(), None)]
+            else:
+                self.invq[name] = list(
+                    lastInvalidations(invalidation_queue_size)
+                    )
+                self.invq[name].reverse()
         self.invq_bound = invalidation_queue_size
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
@@ -906,17 +915,20 @@
           the current client.
 
         """
+
         if invalidated:
-            if len(self.invq) >= self.invq_bound:
-                self.invq.pop()
-            self.invq.insert(0, (tid, invalidated))
+            invq = self.invq[storage_id]
+            if len(invq) >= self.invq_bound:
+                invq.pop()
+            invq.insert(0, (tid, invalidated))
+
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
                 p.client.invalidateTransaction(tid, invalidated)
             elif info is not None:
                 p.client.info(info)
 
-    def get_invalidations(self, tid):
+    def get_invalidations(self, storage_id, tid):
         """Return a tid and list of all objects invalidation since tid.
 
         The tid is the most recent transaction id seen by the client.
@@ -926,22 +938,23 @@
         do full cache verification.
         """
 
-        if not self.invq:
+        invq = self.invq[storage_id]
+        if not invq:
             log("invq empty")
             return None, []
 
-        earliest_tid = self.invq[-1][0]
+        earliest_tid = invq[-1][0]
         if earliest_tid > tid:
             log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
             return None, []
 
         oids = {}
-        for _tid, L in self.invq:
+        for _tid, L in invq:
             if _tid <= tid:
                 break
             for key in L:
                 oids[key] = 1
-        latest_tid = self.invq[0][0]
+        latest_tid = invq[0][0]
         return latest_tid, oids.keys()
 
     def close_server(self):

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2007-03-26 22:04:46 UTC (rev 73654)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2007-03-26 22:04:48 UTC (rev 73655)
@@ -15,6 +15,7 @@
 
 # System imports
 import asyncore
+import doctest
 import logging
 import os
 import random
@@ -27,6 +28,7 @@
 
 # ZODB test support
 import ZODB
+import ZODB.tests.util
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle
 
@@ -46,6 +48,8 @@
 
 import ZEO.tests.ConnectionTests
 
+import ZEO.StorageServer
+
 logger = logging.getLogger('ZEO.tests.testZEO')
 
 class DummyDB:
@@ -580,11 +584,212 @@
         super(BlobWritableCacheTests, self).setUp()
 
 
+class StorageServerClientWrapper:
+
+    def __init__(self):
+        self.serials = []
+
+    def serialnos(self, serials):
+        self.serials.extend(serials)
+
+    def info(self, info):
+        pass
+
+class StorageServerWrapper:
+
+    def __init__(self, server, storage_id):
+        self.storage_id = storage_id
+        self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+        self.server.register(storage_id, False)
+        self.server._thunk = lambda : None
+        self.server.client = StorageServerClientWrapper()
+
+    def sortKey(self):
+        return self.storage_id
+
+    def __getattr__(self, name):
+        return getattr(self.server, name)
+
+    def registerDB(self, *args):
+        pass
+
+    def supportsUndo(self):
+        return False
+
+    def supportsVersions(self):
+        return False
+
+    def new_oid(self):
+        return self.server.new_oids(1)[0]
+
+    def tpc_begin(self, transaction):
+        self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
+
+    def tpc_vote(self, transaction):
+        self.server._restart()
+        self.server.vote(id(transaction))
+        result = self.server.client.serials[:]
+        del self.server.client.serials[:]
+        return result
+
+    def store(self, oid, serial, data, version, transaction):
+        self.server.storea(oid, serial, data, version, id(transaction))
+
+    def tpc_finish(self, transaction, func = lambda: None):
+        self.server.tpc_finish(id(transaction))
+
+
+def multiple_storages_invalidation_queue_is_not_insane():
+    """
+    >>> from ZEO.StorageServer import StorageServer, ZEOStorage
+    >>> from ZODB.FileStorage import FileStorage
+    >>> from ZODB.DB import DB
+    >>> from persistent.dict import PersistentDict
+    >>> from transaction import commit
+    >>> fs1 = FileStorage('t1.fs')
+    >>> fs2 = FileStorage('t2.fs')
+    >>> server = StorageServer(('', get_port()), dict(fs1=fs1, fs2=fs2))
+
+    >>> s1 = StorageServerWrapper(server, 'fs1')
+    >>> s2 = StorageServerWrapper(server, 'fs2')
+
+    >>> db1 = DB(s1); conn1 = db1.open()
+    >>> db2 = DB(s2); conn2 = db2.open()
+
+    >>> commit()
+    >>> o1 = conn1.root()
+    >>> for i in range(10):
+    ...     o1.x = PersistentDict(); o1 = o1.x
+    ...     commit()
+
+    >>> last = fs1.lastTransaction()
+    >>> for i in range(5):
+    ...     o1.x = PersistentDict(); o1 = o1.x
+    ...     commit()
+
+    >>> o2 = conn2.root()
+    >>> for i in range(20):
+    ...     o2.x = PersistentDict(); o2 = o2.x
+    ...     commit()
+    
+    >>> trans, oids = s1.getInvalidations(last)
+    >>> from ZODB.utils import u64
+    >>> sorted([u64(oid) for (oid, v) in oids])
+    [10L, 11L, 12L, 13L, 14L]
+    
+    >>> server.close_server()
+    """
+
+def getInvalidationsAfterServerRestart():
+    """
+
+Clients were often forced to verify their caches after a server
+restart even if there weren't many transactions between the server
+restart and the client connect.
+
+Let's create a file storage and stuff some data into it:
+
+    >>> from ZEO.StorageServer import StorageServer, ZEOStorage
+    >>> from ZODB.FileStorage import FileStorage
+    >>> from ZODB.DB import DB
+    >>> from persistent.dict import PersistentDict
+    >>> fs = FileStorage('t.fs')
+    >>> db = DB(fs)
+    >>> conn = db.open()
+    >>> from transaction import commit
+    >>> last = []
+    >>> for i in range(100):
+    ...     conn.root()[i] = PersistentDict()
+    ...     commit()
+    ...     last.append(fs.lastTransaction())
+    >>> db.close()
+
+Now we'll open a storage server on the data, simulating a restart:
+    
+    >>> fs = FileStorage('t.fs')
+    >>> sv = StorageServer(('', get_port()), dict(fs=fs))
+    >>> s = ZEOStorage(sv, sv.read_only)
+    >>> s.register('fs', False)
+
+If we ask for the last transaction, we should get the last transaction
+we saved:
+
+    >>> s.lastTransaction() == last[-1]
+    True
+
+If a storage implements the method lastInvalidations, as FileStorage
+does, then the stroage server will populate its invalidation data
+structure using lastTransactions.
+
+
+    >>> tid, oids = s.getInvalidations(last[-10])
+    >>> tid == last[-1]
+    True
+
+
+    >>> from ZODB.utils import u64
+    >>> sorted([u64(oid) for (oid, version) in oids])
+    [0L, 92L, 93L, 94L, 95L, 96L, 97L, 98L, 99L, 100L]
+
+(Note that the fact that we get oids for 92-100 is actually an
+artifact of the fact that the FileStorage lastInvalidations method
+returns all OIDs written by transactions, even if the OIDs were
+created and not modified. FileStorages don't record whether objects
+were created rather than modified. Objects that are just created don't
+need to be invalidated.  This means we'll invalidate objects that
+dont' need to be invalidated, however, that's better than verifying
+caches.)
+
+    >>> sv.close_server()
+    >>> fs.close()
+
+If a storage doesn't implement lastInvalidations, a client can still
+avoid verifying its cache if it was up to date when the server
+restarted.  To illustrate this, we'll create a subclass of FileStorage
+without this method:
+
+    >>> class FS(FileStorage):
+    ...     lastInvalidations = property()
+
+    >>> fs = FS('t.fs')
+    >>> sv = StorageServer(('', get_port()), dict(fs=fs))
+    >>> st = StorageServerWrapper(sv, 'fs')
+    >>> s = st.server
+    
+Now, if we ask fior the invalidations since the last committed
+transaction, we'll get a result:
+
+    >>> tid, oids = s.getInvalidations(last[-1])
+    >>> tid == last[-1]
+    True
+    >>> oids
+    []
+
+    >>> db = DB(st); conn = db.open()
+    >>> ob = conn.root()
+    >>> for i in range(5):
+    ...     ob.x = PersistentDict(); ob = ob.x
+    ...     commit()
+    ...     last.append(fs.lastTransaction())
+
+    >>> ntid, oids = s.getInvalidations(tid)
+    >>> ntid == last[-1]
+    True
+
+    >>> sorted([u64(oid) for (oid, version) in oids])
+    [0L, 101L, 102L, 103L, 104L]
+
+    """
+
+
 test_classes = [FileStorageTests, MappingStorageTests,
                 BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 
+
 def test_suite():
     suite = unittest.TestSuite()
+    suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
+                                       tearDown=ZODB.tests.util.tearDown))
     for klass in test_classes:
         sub = unittest.makeSuite(klass, "check")
         suite.addTest(sub)



More information about the Zodb-checkins mailing list