[Zodb-checkins]
SVN: ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py
Fixed seevral invalidation-during connection bugs that could cause
Jim Fulton
jim at zope.com
Fri Jul 11 17:41:42 EDT 2008
Log message for revision 88271:
Fixed seevral invalidation-during connection bugs that could cause
cache corruption or inconsistencies.
Changed:
U ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py
-=-
Modified: ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py 2008-07-11 21:41:34 UTC (rev 88270)
+++ ZODB/branches/jim-3.8-connection/src/ZEO/ClientStorage.py 2008-07-11 21:41:40 UTC (rev 88271)
@@ -491,6 +491,7 @@
# If we are upgrading from a read-only fallback connection,
# we must close the old connection to prevent it from being
# used while the cache is verified against the new connection.
+ self._connection.register_object(None) # Don't call me!
self._connection.close()
self._connection = None
self._ready.clear()
@@ -558,54 +559,6 @@
else:
return '%s:%s' % (self._storage, self._server_addr)
- def verify_cache(self, server):
- """Internal routine called to verify the cache.
-
- The return value (indicating which path we took) is used by
- the test suite.
- """
-
- # If verify_cache() finishes the cache verification process,
- # it should set self._server. If it goes through full cache
- # verification, then endVerify() should self._server.
-
- last_inval_tid = self._cache.getLastTid()
- if last_inval_tid is not None:
- ltid = server.lastTransaction()
- if ltid == last_inval_tid:
- log2("No verification necessary (last_inval_tid up-to-date)")
- self._server = server
- self._ready.set()
- return "no verification"
-
- # log some hints about last transaction
- log2("last inval tid: %r %s\n"
- % (last_inval_tid, tid2time(last_inval_tid)))
- log2("last transaction: %r %s" %
- (ltid, ltid and tid2time(ltid)))
-
- pair = server.getInvalidations(last_inval_tid)
- if pair is not None:
- log2("Recovering %d invalidations" % len(pair[1]))
- self.invalidateTransaction(*pair)
- self._server = server
- self._ready.set()
- return "quick verification"
-
- log2("Verifying cache")
- # setup tempfile to hold zeoVerify results
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
-
- # TODO: should batch these operations for efficiency; would need
- # to acquire lock ...
- for oid, tid, version in self._cache.contents():
- server.verify(oid, version, tid)
- self._pending_server = server
- server.endZeoVerify()
- return "full verification"
-
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
### notifyDisconnected in the middle of notifyConnected?
@@ -1224,8 +1177,6 @@
"""Storage API: return a sequence of versions in the storage."""
return self._server.versions(max)
- # Below are methods invoked by the StorageServer
-
def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs."""
self._serials.extend(args)
@@ -1234,6 +1185,57 @@
"""Server callback to update the info dictionary."""
self._info.update(dict)
+ def verify_cache(self, server):
+ """Internal routine called to verify the cache.
+
+ The return value (indicating which path we took) is used by
+ the test suite.
+ """
+
+ self._pending_server = server
+
+ # setup tempfile to hold zeoVerify results and interim
+ # invalidation results
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
+ # allow incoming invalidations:
+ self._connection.register_object(self)
+
+ # If verify_cache() finishes the cache verification process,
+ # it should set self._server. If it goes through full cache
+ # verification, then endVerify() should self._server.
+
+ last_inval_tid = self._cache.getLastTid()
+ if last_inval_tid is not None:
+ ltid = server.lastTransaction()
+ if ltid == last_inval_tid:
+ log2("No verification necessary (last_inval_tid up-to-date)")
+ self.finish_verification()
+ return "no verification"
+
+ # log some hints about last transaction
+ log2("last inval tid: %r %s\n"
+ % (last_inval_tid, tid2time(last_inval_tid)))
+ log2("last transaction: %r %s" %
+ (ltid, ltid and tid2time(ltid)))
+
+ pair = server.getInvalidations(last_inval_tid)
+ if pair is not None:
+ log2("Recovering %d invalidations" % len(pair[1]))
+ self.finish_verification(pair)
+ return "quick verification"
+
+ log2("Verifying cache")
+
+ # TODO: should batch these operations for efficiency; would need
+ # to acquire lock ...
+ for oid, tid, version in self._cache.contents():
+ server.verify(oid, version, tid)
+ server.endZeoVerify()
+ return "full verification"
+
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
@@ -1245,68 +1247,93 @@
# This should never happen. TODO: assert it doesn't, or log
# if it does.
return
- self._pickler.dump(args)
+ oid, version = args
+ self._pickler.dump((oid, version, None))
- def _process_invalidations(self, invs):
- # Invalidations are sent by the ZEO server as a sequence of
- # oid, version pairs. The DB's invalidate() method expects a
- # dictionary of oids.
+ def endVerify(self):
+ """Server callback to signal end of cache validation."""
+ log2("endVerify finishing")
+ self.finish_verification()
+ log2("endVerify finished")
+
+ def finish_verification(self, catch_up=None):
self._lock.acquire()
try:
- # versions maps version names to dictionary of invalidations
- versions = {}
- for oid, version, tid in invs:
- if oid == self._load_oid:
- self._load_status = 0
- self._cache.invalidate(oid, version, tid)
- oids = versions.get((version, tid))
- if not oids:
- versions[(version, tid)] = [oid]
- else:
- oids.append(oid)
+ if catch_up:
+ # process catch-up invalidations
+ tid, invalidations = catch_up
+ self._process_invalidations(
+ (oid, version, tid)
+ for oid, version in invalidations
+ )
+
+ if self._pickler is None:
+ return
+ # write end-of-data marker
+ self._pickler.dump((None, None, None))
+ self._pickler = None
+ self._tfile.seek(0)
+ unpickler = cPickle.Unpickler(self._tfile)
+ min_tid = self._cache.getLastTid()
+ def InvalidationLogIterator():
+ while 1:
+ oid, version, tid = unpickler.load()
+ if oid is None:
+ break
+ if ((tid is None)
+ or (min_tid is None)
+ or (tid > min_tid)
+ ):
+ yield oid, version, tid
- if self._db is not None:
- for (version, tid), d in versions.items():
- self._db.invalidate(tid, d, version=version)
+ self._process_invalidations(InvalidationLogIterator())
+ self._tfile.close()
+ self._tfile = None
finally:
self._lock.release()
- def endVerify(self):
- """Server callback to signal end of cache validation."""
- if self._pickler is None:
- return
- # write end-of-data marker
- self._pickler.dump((None, None))
- self._pickler = None
- self._tfile.seek(0)
- f = self._tfile
- self._tfile = None
- self._process_invalidations(InvalidationLogIterator(f))
- f.close()
-
- log2("endVerify finishing")
self._server = self._pending_server
self._ready.set()
- self._pending_conn = None
- log2("endVerify finished")
+ self._pending_server = None
+
def invalidateTransaction(self, tid, args):
- """Invalidate objects modified by tid."""
+ """Server callback: Invalidate objects modified by tid."""
self._lock.acquire()
try:
- self._cache.setLastTid(tid)
+ if self._pickler is not None:
+ log2("Transactional invalidation during cache verification",
+ level=BLATHER)
+ for oid, version in args:
+ self._pickler.dump((oid, version, tid))
+ return
+ self._process_invalidations([(oid, version, tid)
+ for oid, version in args])
finally:
self._lock.release()
- if self._pickler is not None:
- log2("Transactional invalidation during cache verification",
- level=BLATHER)
- for t in args:
- self._pickler.dump(t)
- return
- self._process_invalidations([(oid, version, tid)
- for oid, version in args])
+ def _process_invalidations(self, invs):
+ # Invalidations are sent by the ZEO server as a sequence of
+ # oid, version, tid triples. The DB's invalidate() method expects a
+ # dictionary of oids.
+
+ # versions maps version names to dictionary of invalidations
+ versions = {}
+ for oid, version, tid in invs:
+ if oid == self._load_oid:
+ self._load_status = 0
+ self._cache.invalidate(oid, version, tid)
+ oids = versions.get((version, tid))
+ if not oids:
+ versions[(version, tid)] = [oid]
+ else:
+ oids.append(oid)
+
+ if self._db is not None:
+ for (version, tid), d in versions.items():
+ self._db.invalidate(tid, d, version=version)
+
# The following are for compatibility with protocol version 2.0.0
def invalidateTrans(self, args):
@@ -1315,11 +1342,3 @@
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
-
-def InvalidationLogIterator(fileobj):
- unpickler = cPickle.Unpickler(fileobj)
- while 1:
- oid, version = unpickler.load()
- if oid is None:
- break
- yield oid, version, None
More information about the Zodb-checkins
mailing list