[Zodb-checkins] SVN: ZODB/trunk/src/Z Fixed the "potential ZODB
cache inconsistency after client reconnect"
Jim Fulton
jim at zope.com
Tue Aug 15 20:24:32 EDT 2006
Log message for revision 69551:
Fixed the "potential ZODB cache inconsistency after client reconnect"
problem reported on zodb-dev:
http://mail.zope.org/pipermail/zodb-dev/2006-August/010343.html
Added a new invalidateCache protocol for DBs and Connections to
invalidate the entire in-memory caches. This is used when ZEO clients
reconnect.
Changed:
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/tests/ConnectionTests.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZODB/Connection.py
U ZODB/trunk/src/ZODB/DB.py
U ZODB/trunk/src/ZODB/interfaces.py
U ZODB/trunk/src/ZODB/tests/testConnection.py
U ZODB/trunk/src/ZODB/tests/testDB.py
-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -460,6 +460,10 @@
# this method before it was stopped.
return
+ # invalidate our db cache
+ if self._db is not None:
+ self._db.invalidateCache()
+
# TODO: report whether we get a read-only connection.
if self._connection is not None:
reconnect = 1
Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -79,6 +79,9 @@
def invalidate(self, *args, **kwargs):
pass
+ def invalidateCache(self):
+ pass
+
class CommonSetupTearDown(StorageTestBase):
"""Common boilerplate"""
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -306,6 +306,53 @@
self.assert_('exc_info' in log[0][1])
self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
self.assert_('exc_info' in log[1][1])
+
+class ConnectionInvalidationOnReconnect(
+ ZEO.tests.ConnectionTests.CommonSetupTearDown):
+ """Test what happens when the client loop falls over
+ """
+
+ def getConfig(self, path, create, read_only):
+ return """<mappingstorage 1/>"""
+
+ def checkConnectionInvalidationOnReconnect(self):
+
+ storage = ClientStorage(self.addr, wait=1, min_disconnect_poll=0.1)
+ self._storage = storage
+
+ # and we'll wait for the storage to be reconnected:
+ for i in range(100):
+ if storage.is_connected():
+ break
+ time.sleep(0.1)
+ else:
+ raise AssertionError("Couldn't connect to server")
+
+ class DummyDB:
+ _invalidatedCache = 0
+ def invalidateCache(self):
+ self._invalidatedCache += 1
+ def invalidate(*a, **k):
+ pass
+
+ db = DummyDB()
+ storage.registerDB(db, None)
+
+ base = db._invalidatedCache
+
+ # Now we'll force a disconnection and reconnection
+ storage._connection.close()
+
+ # and we'll wait for the storage to be reconnected:
+ for i in range(100):
+ if storage.is_connected():
+ break
+ time.sleep(0.1)
+ else:
+ raise AssertionError("Couldn't connect to server")
+
+ # Now, the root object in the connection should have been invalidated:
+ self.assertEqual(db._invalidatedCache, base+1)
class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
@@ -345,6 +392,7 @@
DemoStorageWrappedAroundClientStorage,
HeartbeatTests,
CatastrophicClientLoopFailure,
+ ConnectionInvalidationOnReconnect,
]
def test_suite():
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZODB/Connection.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -159,6 +159,9 @@
self._inv_lock = threading.Lock()
self._invalidated = {}
+ # Flag indicating whether the cache has been invalidated:
+ self._invalidatedCache = False
+
# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
# experienced ReadConflictError. Any time we raise ReadConflictError,
@@ -311,6 +314,14 @@
finally:
self._inv_lock.release()
+ def invalidateCache(self):
+ self._inv_lock.acquire()
+ try:
+ self._invalidatedCache = True
+ finally:
+ self._inv_lock.release()
+
+
def root(self):
"""Return the database root object."""
return self.get(z64)
@@ -473,6 +484,9 @@
invalidated = self._invalidated
self._invalidated = {}
self._txn_time = None
+ if self._invalidatedCache:
+ self._invalidatedCache = False
+ invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()
@@ -524,6 +538,9 @@
self._added_during_commit = []
+ if self._invalidatedCache:
+ raise ConflictError()
+
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
@@ -782,6 +799,10 @@
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
+
+ if self._invalidatedCache:
+ raise ReadConflictError()
+
if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
@@ -970,6 +991,7 @@
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
+ self._invalidatedCache = False
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZODB/DB.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -480,6 +480,12 @@
c.invalidate(tid, oids)
self._connectionMap(inval)
+ def invalidateCache(self):
+ """Invalidate each of the connection caches
+ """
+ self._miv_cache.clear()
+ self._connectionMap(lambda c: c.invalidateCache())
+
def modifiedInVersion(self, oid):
h = hash(oid) % 131
cache = self._miv_cache
Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZODB/interfaces.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -283,40 +283,51 @@
If clear is True, reset the counters.
"""
+ def invalidateCache():
+ """Invalidate the connection cache
+
+ This invalidates *all* objects in the cache. If the connection
+ is open, subsequent reads will fail until a new transaction
+ begins or until the connection os reopned.
+
+ """
+
class IDatabase(Interface):
"""ZODB DB.
TODO: This interface is incomplete.
"""
- def __init__(storage,
- pool_size=7,
- cache_size=400,
- version_pool_size=3,
- version_cache_size=100,
- database_name='unnamed',
- databases=None,
- ):
- """Create an object database.
+## __init__ methods don't belong in interfaces:
+##
+## def __init__(storage,
+## pool_size=7,
+## cache_size=400,
+## version_pool_size=3,
+## version_cache_size=100,
+## database_name='unnamed',
+## databases=None,
+## ):
+## """Create an object database.
- storage: the storage used by the database, e.g. FileStorage
- pool_size: expected maximum number of open connections
- cache_size: target size of Connection object cache, in number of
- objects
- version_pool_size: expected maximum number of connections (per
- version)
- version_cache_size: target size of Connection object cache for
- version connections, in number of objects
- database_name: when using a multi-database, the name of this DB
- within the database group. It's a (detected) error if databases
- is specified too and database_name is already a key in it.
- This becomes the value of the DB's database_name attribute.
- databases: when using a multi-database, a mapping to use as the
- binding of this DB's .databases attribute. It's intended
- that the second and following DB's added to a multi-database
- pass the .databases attribute set on the first DB added to the
- collection.
- """
+## storage: the storage used by the database, e.g. FileStorage
+## pool_size: expected maximum number of open connections
+## cache_size: target size of Connection object cache, in number of
+## objects
+## version_pool_size: expected maximum number of connections (per
+## version)
+## version_cache_size: target size of Connection object cache for
+## version connections, in number of objects
+## database_name: when using a multi-database, the name of this DB
+## within the database group. It's a (detected) error if databases
+## is specified too and database_name is already a key in it.
+## This becomes the value of the DB's database_name attribute.
+## databases: when using a multi-database, a mapping to use as the
+## binding of this DB's .databases attribute. It's intended
+## that the second and following DB's added to a multi-database
+## pass the .databases attribute set on the first DB added to the
+## collection.
+## """
databases = Attribute("""\
A mapping from database name to DB (database) object.
@@ -328,6 +339,12 @@
entry.
""")
+ def invalidateCache():
+ """Invalidate all objects in the database object caches
+
+ invalidateCache will be called on each of the database's connections.
+ """
+
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
Modified: ZODB/trunk/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConnection.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZODB/tests/testConnection.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -478,6 +478,80 @@
"""
+def test_invalidateCache():
+ """\
+
+The invalidateCache method invalidates a connection's cache. It also
+prevents reads until the end of a transaction.
+
+ >>> from ZODB.tests.util import DB
+ >>> import transaction
+ >>> db = DB()
+ >>> tm = transaction.TransactionManager()
+ >>> connection = db.open(transaction_manager=tm)
+ >>> connection.root()['a'] = StubObject()
+ >>> connection.root()['a'].x = 1
+ >>> connection.root()['b'] = StubObject()
+ >>> connection.root()['b'].x = 1
+ >>> connection.root()['c'] = StubObject()
+ >>> connection.root()['c'].x = 1
+ >>> tm.commit()
+ >>> connection.root()['b']._p_deactivate()
+ >>> connection.root()['c'].x = 2
+
+So we have a connection and an active transaction with some
+modifications. Lets call invalidateCache:
+
+ >>> connection.invalidateCache()
+
+Now, if we try to load an object, we'll get a read conflict:
+
+ >>> connection.root()['b'].x
+ Traceback (most recent call last):
+ ...
+ ReadConflictError: database read conflict error
+
+If we try to commit the transaction, we'll get a conflict error:
+
+ >>> tm.commit()
+ Traceback (most recent call last):
+ ...
+ ConflictError: database conflict error
+
+and the cache will have been cleared:
+
+ >>> print connection.root()['a']._p_changed
+ None
+ >>> print connection.root()['b']._p_changed
+ None
+ >>> print connection.root()['c']._p_changed
+ None
+
+But we'll be able to access data again:
+
+ >>> connection.root()['b'].x
+ 1
+
+Aborting a transaction after a read conflict also lets us read data
+and go on about our business:
+
+ >>> connection.invalidateCache()
+
+ >>> connection.root()['c'].x
+ Traceback (most recent call last):
+ ...
+ ReadConflictError: database read conflict error
+
+ >>> tm.abort()
+ >>> connection.root()['c'].x
+ 1
+
+ >>> connection.root()['c'].x = 2
+ >>> tm.commit()
+
+ >>> db.close()
+ """
+
# ---- stubs
class StubObject(Persistent):
Modified: ZODB/trunk/src/ZODB/tests/testDB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testDB.py 2006-08-16 00:08:00 UTC (rev 69550)
+++ ZODB/trunk/src/ZODB/tests/testDB.py 2006-08-16 00:24:25 UTC (rev 69551)
@@ -18,6 +18,8 @@
import transaction
+from zope.testing import doctest
+
import ZODB
import ZODB.FileStorage
@@ -130,5 +132,49 @@
self.assertEqual(nconn(pools), 3)
+def test_invalidateCache():
+ """\
+
+The invalidateCache method invalidates a connection caches for all of the connections attached to a database.
+
+ >>> from ZODB.tests.util import DB
+ >>> import transaction
+ >>> db = DB()
+ >>> tm1 = transaction.TransactionManager()
+ >>> c1 = db.open(transaction_manager=tm1)
+ >>> c1.root()['a'] = MinPO(1)
+ >>> tm1.commit()
+ >>> tm2 = transaction.TransactionManager()
+ >>> c2 = db.open(transaction_manager=tm2)
+ >>> c1.root()['a']._p_deactivate()
+ >>> tm3 = transaction.TransactionManager()
+ >>> c3 = db.open(transaction_manager=tm3)
+ >>> c3.root()['a'].value
+ 1
+ >>> c3.close()
+ >>> db.invalidateCache()
+
+ >>> c1.root()['a'].value
+ Traceback (most recent call last):
+ ...
+ ReadConflictError: database read conflict error
+
+ >>> c2.root()['a'].value
+ Traceback (most recent call last):
+ ...
+ ReadConflictError: database read conflict error
+
+ >>> c3 is db.open(transaction_manager=tm3)
+ True
+ >>> print c3.root()['a']._p_changed
+ None
+
+ >>> db.close()
+ """
+
+
+
def test_suite():
- return unittest.makeSuite(DBTests)
+ s = unittest.makeSuite(DBTests)
+ s.addTest(doctest.DocTestSuite())
+ return s
More information about the Zodb-checkins
mailing list