[Checkins] SVN: relstorage/trunk/ Fixed issues:
Shane Hathaway
shane at hathawaymix.org
Sat Jul 31 05:31:34 EDT 2010
Log message for revision 115251:
Fixed issues:
- Always update the RelStorage cache when opening a database connection for
loading, even when no ZODB Connection is using the storage. Otherwise,
code that used the storage interface directly could cause the cache
to fall out of sync; the effects would be seen in the next
ZODB.Connection.
- Added a ZODB monkey patch that passes the "force" parameter to the
sync method. This should help the poll-interval option do its job
better.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/__init__.py
U relstorage/trunk/relstorage/adapters/poller.py
U relstorage/trunk/relstorage/cache.py
U relstorage/trunk/relstorage/options.py
U relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
U relstorage/trunk/setup.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/CHANGES.txt 2010-07-31 09:31:33 UTC (rev 115251)
@@ -1,4 +1,18 @@
+1.4.0c3 (2010-07-31)
+--------------------
+
+- Always update the RelStorage cache when opening a database connection for
+ loading, even when no ZODB Connection is using the storage. Otherwise,
+ code that used the storage interface directly could cause the cache
+ to fall out of sync; the effects would be seen in the next
+ ZODB.Connection.
+
+- Added a ZODB monkey patch that passes the "force" parameter to the
+ sync method. This should help the poll-interval option do its job
+ better.
+
+
1.4.0c2 (2010-07-28)
--------------------
Modified: relstorage/trunk/relstorage/__init__.py
===================================================================
--- relstorage/trunk/relstorage/__init__.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/__init__.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -28,3 +28,34 @@
pass
check_compatible()
+
+
+def patch_zodb_sync():
+ """Patch Connection.sync() and afterCompletion() to pass the 'force' flag.
+ """
+
+ def _storage_sync(self, *ignored, **kw):
+ sync = getattr(self._storage, 'sync', 0)
+ if sync:
+ # By default, do not force the sync, allowing RelStorage
+ # to ignore sync requests for a while.
+ force = kw.get('force', False)
+ try:
+ sync(force=force)
+ except TypeError:
+ # The 'force' parameter is not accepted.
+ sync()
+ self._flush_invalidations()
+
+ def sync(self):
+ """Manually update the view on the database."""
+ self.transaction_manager.abort()
+ self._storage_sync(force=True)
+
+ from ZODB.Connection import Connection
+ Connection._storage_sync = _storage_sync
+ Connection.afterCompletion = _storage_sync
+ Connection.newTransaction = _storage_sync
+ Connection.sync = sync
+
+patch_zodb_sync()
Modified: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/adapters/poller.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -37,13 +37,19 @@
Returns (changes, new_polled_tid), where changes is either
a list of (oid, tid) that have changed, or None to indicate
- that the changes are too complex to list. new_polled_tid is
- never None.
+ that the changes are too complex to list. new_polled_tid can be
+ 0 if there is no data in the database.
"""
# find out the tid of the most recent transaction.
cursor.execute(self.poll_query)
- new_polled_tid = cursor.fetchone()[0]
- assert new_polled_tid is not None
+ rows = list(cursor)
+ if not rows:
+ # No data.
+ return None, 0
+ new_polled_tid = rows[0][0]
+ if not new_polled_tid:
+ # No data.
+ return None, 0
if prev_polled_tid is None:
# This is the first time the connection has polled.
Modified: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/cache.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -102,8 +102,11 @@
def new_instance(self):
"""Return a copy of this instance sharing the same local client"""
- local_client = self.clients_local_first[0]
- return StorageCache(self.adapter, self.options, local_client)
+ if self.options.share_local_cache:
+ local_client = self.clients_local_first[0]
+ return StorageCache(self.adapter, self.options, local_client)
+ else:
+ return StorageCache(self.adapter, self.options)
def clear(self):
"""Remove all data from the cache. Called by speed tests."""
Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/options.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -46,6 +46,11 @@
self.commit_lock_id = 0
self.strict_tpc = default_strict_tpc
+ # If share_local_cache is off, each storage instance has a private
+ # cache rather than a shared cache. This option exists mainly for
+ # simulating disconnected caches in tests.
+ self.share_local_cache = True
+
for key, value in kwoptions.iteritems():
if key in self.__dict__:
setattr(self, key, value)
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/storage.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -215,31 +215,21 @@
raise
self._load_transaction_open = False
- def _restart_load(self):
- """Restart the load connection, creating a new connection if needed"""
- if self._load_cursor is None:
- self._open_load_connection()
- return
- try:
- self._adapter.connmanager.restart_load(
- self._load_conn, self._load_cursor)
- self._load_transaction_open = True
- except self._adapter.connmanager.disconnected_exceptions, e:
- log.warning("Reconnecting load_conn: %s", e)
- self._drop_load_connection()
- try:
- self._open_load_connection()
- except:
- log.exception("Reconnect failed.")
- raise
- else:
- log.info("Reconnected.")
+ def _restart_load_and_call(self, f, *args, **kw):
+ """Restart the load connection and call a function.
- def _with_load(self, f, *args, **kw):
- """Call a function with the load connection and cursor."""
+ The first two function parameters are the load connection and cursor.
+ """
if self._load_cursor is None:
+ need_restart = False
self._open_load_connection()
+ else:
+ need_restart = True
try:
+ if need_restart:
+ self._adapter.connmanager.restart_load(
+ self._load_conn, self._load_cursor)
+ self._load_transaction_open = True
return f(self._load_conn, self._load_cursor, *args, **kw)
except self._adapter.connmanager.disconnected_exceptions, e:
log.warning("Reconnecting load_conn: %s", e)
@@ -452,7 +442,7 @@
self._lock_acquire()
try:
if not self._load_transaction_open:
- self._restart_load()
+ self._restart_load_and_poll()
cursor = self._load_cursor
state, tid_int = cache.load(cursor, oid_int)
finally:
@@ -489,7 +479,7 @@
self._lock_acquire()
try:
if not self._load_transaction_open:
- self._restart_load()
+ self._restart_load_and_poll()
state = self._adapter.mover.load_revision(
self._load_cursor, oid_int, tid_int)
if state is None and self._store_cursor is not None:
@@ -520,7 +510,7 @@
cursor = self._store_cursor
else:
if not self._load_transaction_open:
- self._restart_load()
+ self._restart_load_and_poll()
cursor = self._load_cursor
if not self._adapter.mover.exists(cursor, u64(oid)):
raise POSKeyError(oid)
@@ -740,8 +730,7 @@
self._adapter.mover.replace_temp(
cursor, oid_int, prev_tid_int, data)
resolved.add(oid)
- if cache is not None:
- cache.store_temp(oid_int, data)
+ cache.store_temp(oid_int, data)
# Move the new states into the permanent table
tid_int = u64(self._tid)
@@ -1220,6 +1209,27 @@
return False
+ def _restart_load_and_poll(self):
+ """Call _restart_load, poll for changes, and update self._cache.
+ """
+ # Ignore changes made by the last transaction committed
+ # by this connection.
+ if self._ltid is not None:
+ ignore_tid = u64(self._ltid)
+ else:
+ ignore_tid = None
+ prev = self._prev_polled_tid
+
+ # get a list of changed OIDs and the most recent tid
+ changes, new_polled_tid = self._restart_load_and_call(
+ self._adapter.poller.poll_invalidations, prev, ignore_tid)
+
+ # Inform the cache of the changes.
+ self._cache.after_poll(
+ self._load_cursor, prev, new_polled_tid, changes)
+
+ return changes, new_polled_tid
+
def poll_invalidations(self):
"""Looks for OIDs of objects that changed since _prev_polled_tid
@@ -1238,23 +1248,8 @@
# reset the timeout
self._poll_at = time.time() + self._options.poll_interval
- self._restart_load()
+ changes, new_polled_tid = self._restart_load_and_poll()
- # Ignore changes made by the last transaction committed
- # by this connection.
- if self._ltid is not None:
- ignore_tid = u64(self._ltid)
- else:
- ignore_tid = None
-
- # get a list of changed OIDs and the most recent tid
- prev = self._prev_polled_tid
- changes, new_polled_tid = self._with_load(
- self._adapter.poller.poll_invalidations, prev, ignore_tid)
-
- self._cache.after_poll(
- self._load_cursor, prev, new_polled_tid, changes)
-
self._prev_polled_tid = new_polled_tid
if changes is None:
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -334,65 +334,113 @@
finally:
db.close()
- def checkPollInterval(self, using_cache=True):
+ def checkPollInterval(self, shared_cache=True):
# Verify the poll_interval parameter causes RelStorage to
# delay invalidation polling.
self._storage._options.poll_interval = 3600
db = DB(self._storage)
try:
- c1 = db.open()
+ tm1 = transaction.TransactionManager()
+ c1 = db.open(transaction_manager=tm1)
r1 = c1.root()
r1['alpha'] = 1
- transaction.commit()
+ tm1.commit()
- c2 = db.open()
+ tm2 = transaction.TransactionManager()
+ c2 = db.open(transaction_manager=tm2)
r2 = c2.root()
self.assertEqual(r2['alpha'], 1)
+ self.assertFalse(c2._storage.need_poll())
+ self.assertTrue(c2._storage._poll_at > 0)
r1['alpha'] = 2
- # commit c1 without triggering c2.afterCompletion().
- storage = c1._storage
- t = transaction.Transaction()
- storage.tpc_begin(t)
- c1.commit(t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
+ # commit c1 without committing c2.
+ tm1.commit()
- # flush invalidations to c2, but the poll timer has not
- # yet expired, so the change to r2 should not be seen yet.
- self.assertTrue(c2._storage._poll_at > 0)
- if using_cache:
+ if shared_cache:
# The cache reveals that a poll is needed even though
# the poll timeout has not expired.
self.assertTrue(c2._storage.need_poll())
- c2._flush_invalidations()
+ tm2.commit()
r2 = c2.root()
self.assertEqual(r2['alpha'], 2)
self.assertFalse(c2._storage.need_poll())
else:
- # Now confirm that no poll is needed
+ # The poll timeout has not expired, so no poll should occur
+ # yet, even after a commit.
self.assertFalse(c2._storage.need_poll())
- c2._flush_invalidations()
+ tm2.commit()
r2 = c2.root()
self.assertEqual(r2['alpha'], 1)
# expire the poll timer and verify c2 sees the change
c2._storage._poll_at -= 3601
- c2._flush_invalidations()
+ tm2.commit()
r2 = c2.root()
self.assertEqual(r2['alpha'], 2)
- transaction.abort()
c2.close()
c1.close()
finally:
db.close()
- def checkPollIntervalWithoutCache(self):
- self._storage._options.cache_local_mb = 0
- self.checkPollInterval(using_cache=True)
+ def checkPollIntervalWithUnsharedCache(self):
+ self._storage._options.share_local_cache = False
+ self.checkPollInterval(shared_cache=False)
+ def checkCachePolling(self):
+ self._storage._options.poll_interval = 3600
+ self._storage._options.share_local_cache = False
+ db = DB(self._storage)
+ try:
+ # Set up the database.
+ tm1 = transaction.TransactionManager()
+ c1 = db.open(transaction_manager=tm1)
+ r1 = c1.root()
+ r1['obj'] = obj1 = PersistentMapping({'change': 0})
+ tm1.commit()
+
+ # Load and change the object in an independent connection.
+ tm2 = transaction.TransactionManager()
+ c2 = db.open(transaction_manager=tm2)
+ r2 = c2.root()
+ r2['obj']['change'] = 1
+ tm2.commit()
+ # Now c2 has delta_after0.
+ self.assertEqual(len(c2._storage._cache.delta_after0), 1)
+ c2.close()
+
+ # Change the object in the original connection.
+ c1.sync()
+ obj1['change'] = 2
+ tm1.commit()
+
+ # Close the database connection to c2.
+ c2._storage._drop_load_connection()
+
+ # Make the database connection to c2 reopen without polling.
+ c2._storage.load('\0' * 8, '')
+ self.assertTrue(c2._storage._load_transaction_open)
+
+ # Open a connection, which should be the same connection
+ # as c2.
+ c3 = db.open(transaction_manager=tm2)
+ self.assertTrue(c3 is c2)
+ self.assertEqual(len(c2._storage._cache.delta_after0), 1)
+
+ # Clear the caches (but not delta_after*)
+ c3._resetCache()
+ for client in c3._storage._cache.clients_local_first:
+ client.flush_all()
+
+ obj3 = c3.root()['obj']
+ # Should have loaded the new object.
+ self.assertEqual(obj3['change'], 2)
+
+ finally:
+ db.close()
+
def checkDoubleCommitter(self):
# Verify we can store an object that gets committed twice in
# a single transaction.
Modified: relstorage/trunk/setup.py
===================================================================
--- relstorage/trunk/setup.py 2010-07-30 22:36:11 UTC (rev 115250)
+++ relstorage/trunk/setup.py 2010-07-31 09:31:33 UTC (rev 115251)
@@ -13,7 +13,7 @@
##############################################################################
"""A backend for ZODB that stores pickles in a relational database."""
-VERSION = "1.4.0c2"
+VERSION = "1.4.0c3"
# The choices for the Trove Development Status line:
# Development Status :: 5 - Production/Stable
More information about the checkins
mailing list