[Checkins] SVN: relstorage/branches/1. Updated the ZODB 3.7 and 3.8 polling patch to fix issues with blobs
Shane Hathaway
shane at hathawaymix.org
Fri Sep 25 18:19:34 EDT 2009
Log message for revision 104558:
Updated the ZODB 3.7 and 3.8 polling patch to fix issues with blobs
and subtransactions.
Changed:
U relstorage/branches/1.1/CHANGES.txt
D relstorage/branches/1.1/poll-invalidation-1-zodb-3-7-1.patch
D relstorage/branches/1.1/poll-invalidation-1-zodb-3-8-0.patch
A relstorage/branches/1.1/poll-invalidation-zodb-3-7.patch
A relstorage/branches/1.1/poll-invalidation-zodb-3-8.patch
U relstorage/branches/1.2/CHANGES.txt
D relstorage/branches/1.2/poll-invalidation-1-zodb-3-7-1.patch
D relstorage/branches/1.2/poll-invalidation-1-zodb-3-8-0.patch
D relstorage/branches/1.2/poll-invalidation-1-zodb-3-9-0a12.patch
A relstorage/branches/1.2/poll-invalidation-zodb-3-7.patch
A relstorage/branches/1.2/poll-invalidation-zodb-3-8.patch
-=-
Modified: relstorage/branches/1.1/CHANGES.txt
===================================================================
--- relstorage/branches/1.1/CHANGES.txt 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.1/CHANGES.txt 2009-09-25 22:19:33 UTC (rev 104558)
@@ -2,7 +2,8 @@
Next Release
------------
-- ...
+- Updated the ZODB 3.7 and 3.8 polling patch to fix issues with blobs
+ and subtransactions.
Version 1.1.3 (2009-02-04)
Deleted: relstorage/branches/1.1/poll-invalidation-1-zodb-3-7-1.patch
===================================================================
--- relstorage/branches/1.1/poll-invalidation-1-zodb-3-7-1.patch 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.1/poll-invalidation-1-zodb-3-7-1.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -1,96 +0,0 @@
-Index: Connection.py
-===================================================================
---- Connection.py (revision 87280)
-+++ Connection.py (working copy)
-@@ -75,8 +75,14 @@
- """Create a new Connection."""
-
- self._db = db
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
- self._savepoint_storage = None
-
- self.transaction_manager = self._synch = self._mvcc = None
-@@ -170,6 +176,12 @@
- # Multi-database support
- self.connections = {self._db.database_name: self}
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid."""
-@@ -267,6 +279,11 @@
- self.transaction_manager.unregisterSynch(self)
- self._synch = None
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -295,6 +312,10 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -438,8 +459,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: DB.py
-===================================================================
---- DB.py (revision 87280)
-+++ DB.py (working copy)
-@@ -260,6 +260,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Deleted: relstorage/branches/1.1/poll-invalidation-1-zodb-3-8-0.patch
===================================================================
--- relstorage/branches/1.1/poll-invalidation-1-zodb-3-8-0.patch 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.1/poll-invalidation-1-zodb-3-8-0.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -1,97 +0,0 @@
-Index: Connection.py
-===================================================================
---- Connection.py (revision 87666)
-+++ Connection.py (working copy)
-@@ -90,8 +90,15 @@
- self.connections = {self._db.database_name: self}
-
- self._version = version
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
-+
- self._savepoint_storage = None
-
- # Do we need to join a txn manager?
-@@ -151,6 +158,12 @@
- # in the cache on abort and in other connections on finish.
- self._modified = []
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- # _invalidated queues invalidate messages delivered from the DB
- # _inv_lock prevents one thread from modifying the set while
-@@ -297,6 +310,11 @@
- if self._opened:
- self.transaction_manager.unregisterSynch(self)
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -328,6 +346,10 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -469,8 +491,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: DB.py
-===================================================================
---- DB.py (revision 87666)
-+++ DB.py (working copy)
-@@ -284,6 +284,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Copied: relstorage/branches/1.1/poll-invalidation-zodb-3-7.patch (from rev 104556, relstorage/branches/1.1/poll-invalidation-1-zodb-3-7-1.patch)
===================================================================
--- relstorage/branches/1.1/poll-invalidation-zodb-3-7.patch (rev 0)
+++ relstorage/branches/1.1/poll-invalidation-zodb-3-7.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -0,0 +1,96 @@
+Index: Connection.py
+===================================================================
+--- Connection.py (revision 104552)
++++ Connection.py (working copy)
+@@ -75,8 +75,14 @@
+ """Create a new Connection."""
+
+ self._db = db
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
+ self._savepoint_storage = None
+
+ self.transaction_manager = self._synch = self._mvcc = None
+@@ -170,6 +176,12 @@
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ def add(self, obj):
+ """Add a new object 'obj' to the database and assign it an oid."""
+@@ -267,6 +279,11 @@
+ self.transaction_manager.unregisterSynch(self)
+ self._synch = None
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -438,8 +455,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+@@ -698,6 +730,10 @@
+ """Indicate confirmation that the transaction is done."""
+
+ def callback(tid):
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
++
+ d = dict.fromkeys(self._modified)
+ self._db.invalidate(tid, d, self)
+ # It's important that the storage calls the passed function
+Index: DB.py
+===================================================================
+--- DB.py (revision 104552)
++++ DB.py (working copy)
+@@ -260,6 +260,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Property changes on: relstorage/branches/1.1/poll-invalidation-zodb-3-7.patch
___________________________________________________________________
Added: svn:mergeinfo
+
Copied: relstorage/branches/1.1/poll-invalidation-zodb-3-8.patch (from rev 104556, relstorage/branches/1.1/poll-invalidation-1-zodb-3-8-0.patch)
===================================================================
--- relstorage/branches/1.1/poll-invalidation-zodb-3-8.patch (rev 0)
+++ relstorage/branches/1.1/poll-invalidation-zodb-3-8.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -0,0 +1,96 @@
+Index: Connection.py
+===================================================================
+--- Connection.py (revision 104546)
++++ Connection.py (working copy)
+@@ -90,8 +90,15 @@
+ self.connections = {self._db.database_name: self}
+
+ self._version = version
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
++
+ self._savepoint_storage = None
+
+ # Do we need to join a txn manager?
+@@ -151,6 +158,12 @@
+ # in the cache on abort and in other connections on finish.
+ self._modified = []
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ # _invalidated queues invalidate messages delivered from the DB
+ # _inv_lock prevents one thread from modifying the set while
+@@ -297,6 +310,11 @@
+ if self._opened:
+ self.transaction_manager.unregisterSynch(self)
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -469,8 +487,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+@@ -748,6 +781,9 @@
+ """Indicate confirmation that the transaction is done."""
+
+ def callback(tid):
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
+ d = dict.fromkeys(self._modified)
+ self._db.invalidate(tid, d, self)
+ # It's important that the storage calls the passed function
+Index: DB.py
+===================================================================
+--- DB.py (revision 104546)
++++ DB.py (working copy)
+@@ -284,6 +284,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Property changes on: relstorage/branches/1.1/poll-invalidation-zodb-3-8.patch
___________________________________________________________________
Added: svn:mergeinfo
+
Modified: relstorage/branches/1.2/CHANGES.txt
===================================================================
--- relstorage/branches/1.2/CHANGES.txt 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.2/CHANGES.txt 2009-09-25 22:19:33 UTC (rev 104558)
@@ -4,6 +4,10 @@
- PostgreSQL: use the documented ALTER SEQUENCE RESTART WITH
statement instead of ALTER SEQUENCE START WITH.
+- Updated the ZODB 3.7 and 3.8 polling patch to fix issues with blobs
+ and subtransactions.
+
+
1.2.0 (2009-09-04)
------------------
Deleted: relstorage/branches/1.2/poll-invalidation-1-zodb-3-7-1.patch
===================================================================
--- relstorage/branches/1.2/poll-invalidation-1-zodb-3-7-1.patch 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.2/poll-invalidation-1-zodb-3-7-1.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -1,96 +0,0 @@
-Index: Connection.py
-===================================================================
---- Connection.py (revision 87280)
-+++ Connection.py (working copy)
-@@ -75,8 +75,14 @@
- """Create a new Connection."""
-
- self._db = db
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
- self._savepoint_storage = None
-
- self.transaction_manager = self._synch = self._mvcc = None
-@@ -170,6 +176,12 @@
- # Multi-database support
- self.connections = {self._db.database_name: self}
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid."""
-@@ -267,6 +279,11 @@
- self.transaction_manager.unregisterSynch(self)
- self._synch = None
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -295,6 +312,10 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -438,8 +459,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: DB.py
-===================================================================
---- DB.py (revision 87280)
-+++ DB.py (working copy)
-@@ -260,6 +260,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Deleted: relstorage/branches/1.2/poll-invalidation-1-zodb-3-8-0.patch
===================================================================
--- relstorage/branches/1.2/poll-invalidation-1-zodb-3-8-0.patch 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.2/poll-invalidation-1-zodb-3-8-0.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -1,97 +0,0 @@
-Index: Connection.py
-===================================================================
---- Connection.py (revision 87666)
-+++ Connection.py (working copy)
-@@ -90,8 +90,15 @@
- self.connections = {self._db.database_name: self}
-
- self._version = version
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
-+
- self._savepoint_storage = None
-
- # Do we need to join a txn manager?
-@@ -151,6 +158,12 @@
- # in the cache on abort and in other connections on finish.
- self._modified = []
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- # _invalidated queues invalidate messages delivered from the DB
- # _inv_lock prevents one thread from modifying the set while
-@@ -297,6 +310,11 @@
- if self._opened:
- self.transaction_manager.unregisterSynch(self)
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -328,6 +346,10 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -469,8 +491,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: DB.py
-===================================================================
---- DB.py (revision 87666)
-+++ DB.py (working copy)
-@@ -284,6 +284,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Deleted: relstorage/branches/1.2/poll-invalidation-1-zodb-3-9-0a12.patch
===================================================================
--- relstorage/branches/1.2/poll-invalidation-1-zodb-3-9-0a12.patch 2009-09-25 22:18:24 UTC (rev 104557)
+++ relstorage/branches/1.2/poll-invalidation-1-zodb-3-9-0a12.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -1,435 +0,0 @@
-Index: src/ZODB/PollableMappingStorage.py
-===================================================================
---- src/ZODB/PollableMappingStorage.py (revision 0)
-+++ src/ZODB/PollableMappingStorage.py (revision 97553)
-@@ -0,0 +1,93 @@
-+##############################################################################
-+#
-+# Copyright (c) Zope Corporation and Contributors.
-+# All Rights Reserved.
-+#
-+# This software is subject to the provisions of the Zope Public License,
-+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-+# FOR A PARTICULAR PURPOSE
-+#
-+##############################################################################
-+"""An extension of MappingStorage that depends on polling.
-+
-+Each Connection has its own view of the database. Polling updates each
-+connection's view.
-+"""
-+
-+import time
-+
-+import BTrees
-+from ZODB.interfaces import IStoragePollable
-+from ZODB.MappingStorage import MappingStorage
-+from ZODB.TimeStamp import TimeStamp
-+from zope.interface import implements
-+
-+
-+class PollableMappingStorage(MappingStorage):
-+ implements(IStoragePollable)
-+
-+ propagate_invalidations = False
-+
-+ def __init__(self, name="Pollable Mapping Storage"):
-+ MappingStorage.__init__(self, name=name)
-+ # _polled_tid contains the transaction ID at the last poll.
-+ self._polled_tid = ''
-+
-+ def bind_connection(self, connection):
-+ """Returns a storage instance to be used by the given Connection.
-+ """
-+ return BoundStorage(self)
-+
-+ def connection_closing(self):
-+ """Notifies the storage that a connection is closing.
-+ """
-+ pass
-+
-+ def poll_invalidations(self):
-+ """Poll the storage for changes by other connections.
-+ """
-+ new_tid = self._transactions.maxKey()
-+
-+ if self._polled_tid:
-+ if not self._transactions.has_key(self._polled_tid):
-+ # This connection is so old that we can no longer enumerate
-+ # all the changes.
-+ self._polled_tid = new_tid
-+ return None
-+
-+ changed_oids = set()
-+ for tid, txn in self._transactions.items(
-+ self._polled_tid, new_tid, excludemin=True, excludemax=False):
-+ if txn.status == 'p':
-+ # This transaction has been packed, so it is no longer
-+ # possible to enumerate all changed oids.
-+ self._polled_tid = new_tid
-+ return None
-+ if tid == self._ltid:
-+ # ignore the transaction committed by this connection
-+ continue
-+
-+ changes = txn.data
-+ # pull in changes from the transaction log
-+ for oid, value in changes.iteritems():
-+ tid_data = self._data.get(oid)
-+ if tid_data is None:
-+ tid_data = BTrees.OOBTree.OOBucket()
-+ self._data[oid] = tid_data
-+ tid_data[tid] = changes[oid]
-+ changed_oids.update(changes.keys())
-+
-+ self._polled_tid = new_tid
-+ return list(changed_oids)
-+
-+
-+class BoundStorage(PollableMappingStorage):
-+ """A PollableMappingStorage used for a specific Connection."""
-+
-+ def __init__(self, common):
-+ PollableMappingStorage.__init__(self, name=common.__name__)
-+ # bound storages use the same transaction log as the common storage.
-+ self._transactions = common._transactions
-Index: src/ZODB/Connection.py
-===================================================================
---- src/ZODB/Connection.py (revision 97553)
-+++ src/ZODB/Connection.py (working copy)
-@@ -94,8 +94,13 @@
- # Multi-database support
- self.connections = {self._db.database_name: self}
-
-- self._normal_storage = self._storage = db.storage
-- self.new_oid = db.storage.new_oid
-+ storage = db.storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
- self._savepoint_storage = None
-
- # Do we need to join a txn manager?
-@@ -148,6 +153,12 @@
- # in the cache on abort and in other connections on finish.
- self._modified = []
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- # _invalidated queues invalidate messages delivered from the DB
- # _inv_lock prevents one thread from modifying the set while
-@@ -295,6 +306,11 @@
- if self._opened:
- self.transaction_manager.unregisterSynch(self)
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -323,6 +339,9 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
- if self.before is not None:
- # this is an historical connection. Invalidations are irrelevant.
- return
-@@ -460,8 +479,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: src/ZODB/tests/testPollableMappingStorage.py
-===================================================================
---- src/ZODB/tests/testPollableMappingStorage.py (revision 0)
-+++ src/ZODB/tests/testPollableMappingStorage.py (revision 97553)
-@@ -0,0 +1,164 @@
-+##############################################################################
-+#
-+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-+# All Rights Reserved.
-+#
-+# This software is subject to the provisions of the Zope Public License,
-+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-+# FOR A PARTICULAR PURPOSE.
-+#
-+##############################################################################
-+
-+import unittest
-+
-+from persistent.mapping import PersistentMapping
-+import transaction
-+import ZODB.PollableMappingStorage
-+from ZODB.DB import DB
-+
-+
-+from ZODB.tests import (
-+ BasicStorage,
-+ HistoryStorage,
-+ IteratorStorage,
-+ MTStorage,
-+ PackableStorage,
-+ RevisionStorage,
-+ StorageTestBase,
-+ Synchronization,
-+ )
-+
-+class PollableTests:
-+
-+ def checkCrossConnectionInvalidation(self):
-+ # Verify connections see updated state at txn boundaries.
-+ # This will fail if Connection doesn't poll for changes.
-+ db = DB(self._storage)
-+ try:
-+ c1 = db.open()
-+ r1 = c1.root()
-+ r1['myobj'] = 'yes'
-+ c2 = db.open()
-+ r2 = c2.root()
-+ self.assert_('myobj' not in r2)
-+
-+ storage = c1._storage
-+ t = transaction.Transaction()
-+ t.description = 'invalidation test'
-+ storage.tpc_begin(t)
-+ c1.commit(t)
-+ storage.tpc_vote(t)
-+ storage.tpc_finish(t)
-+
-+ self.assert_('myobj' not in r2)
-+ c2.sync()
-+ self.assert_('myobj' in r2)
-+ self.assert_(r2['myobj'] == 'yes')
-+ finally:
-+ db.close()
-+
-+ def checkCrossConnectionIsolation(self):
-+ # Verify MVCC isolates connections.
-+ # This will fail if Connection doesn't poll for changes.
-+ db = DB(self._storage)
-+ try:
-+ c1 = db.open()
-+ r1 = c1.root()
-+ r1['alpha'] = PersistentMapping()
-+ r1['gamma'] = PersistentMapping()
-+ transaction.commit()
-+
-+ # Open a second connection but don't load root['alpha'] yet
-+ c2 = db.open()
-+ r2 = c2.root()
-+
-+ r1['alpha']['beta'] = 'yes'
-+
-+ storage = c1._storage
-+ t = transaction.Transaction()
-+ t.description = 'isolation test 1'
-+ storage.tpc_begin(t)
-+ c1.commit(t)
-+ storage.tpc_vote(t)
-+ storage.tpc_finish(t)
-+
-+ # The second connection will now load root['alpha'], but due to
-+ # MVCC, it should continue to see the old state.
-+ self.assert_(r2['alpha']._p_changed is None) # A ghost
-+ self.assert_(not r2['alpha'])
-+ self.assert_(r2['alpha']._p_changed == 0)
-+
-+ # make root['alpha'] visible to the second connection
-+ c2.sync()
-+
-+ # Now it should be in sync
-+ self.assert_(r2['alpha']._p_changed is None) # A ghost
-+ self.assert_(r2['alpha'])
-+ self.assert_(r2['alpha']._p_changed == 0)
-+ self.assert_(r2['alpha']['beta'] == 'yes')
-+
-+ # Repeat the test with root['gamma']
-+ r1['gamma']['delta'] = 'yes'
-+
-+ storage = c1._storage
-+ t = transaction.Transaction()
-+ t.description = 'isolation test 2'
-+ storage.tpc_begin(t)
-+ c1.commit(t)
-+ storage.tpc_vote(t)
-+ storage.tpc_finish(t)
-+
-+ # The second connection will now load root[3], but due to MVCC,
-+ # it should continue to see the old state.
-+ self.assert_(r2['gamma']._p_changed is None) # A ghost
-+ self.assert_(not r2['gamma'])
-+ self.assert_(r2['gamma']._p_changed == 0)
-+
-+ # make root[3] visible to the second connection
-+ c2.sync()
-+
-+ # Now it should be in sync
-+ self.assert_(r2['gamma']._p_changed is None) # A ghost
-+ self.assert_(r2['gamma'])
-+ self.assert_(r2['gamma']._p_changed == 0)
-+ self.assert_(r2['gamma']['delta'] == 'yes')
-+ finally:
-+ db.close()
-+
-+
-+class PollableMappingStorageTests(
-+ StorageTestBase.StorageTestBase,
-+ BasicStorage.BasicStorage,
-+
-+ HistoryStorage.HistoryStorage,
-+ IteratorStorage.ExtendedIteratorStorage,
-+ IteratorStorage.IteratorStorage,
-+ MTStorage.MTStorage,
-+ PackableStorage.PackableStorageWithOptionalGC,
-+ RevisionStorage.RevisionStorage,
-+ Synchronization.SynchronizedStorage,
-+ PollableTests
-+ ):
-+
-+ def setUp(self):
-+ self._storage = ZODB.PollableMappingStorage.PollableMappingStorage()
-+
-+ def tearDown(self):
-+ self._storage.close()
-+
-+ def checkLoadBeforeUndo(self):
-+ pass # we don't support undo yet
-+ checkUndoZombie = checkLoadBeforeUndo
-+
-+
-+def test_suite():
-+ suite = unittest.makeSuite(PollableMappingStorageTests, 'check')
-+ return suite
-+
-+if __name__ == "__main__":
-+ loader = unittest.TestLoader()
-+ loader.testMethodPrefix = "check"
-+ unittest.main(testLoader=loader)
-Index: src/ZODB/MappingStorage.py
-===================================================================
---- src/ZODB/MappingStorage.py (revision 97553)
-+++ src/ZODB/MappingStorage.py (working copy)
-@@ -37,7 +37,7 @@
- def __init__(self, name='MappingStorage'):
- self.__name__ = name
- self._data = {} # {oid->{tid->pickle}}
-- self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
-+ self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
- self._ltid = None
- self._last_pack = None
- _lock = threading.RLock()
-Index: src/ZODB/interfaces.py
-===================================================================
---- src/ZODB/interfaces.py (revision 97553)
-+++ src/ZODB/interfaces.py (working copy)
-@@ -953,6 +953,56 @@
- # DB pass-through
-
-
-+class IStoragePollable(Interface):
-+ """A storage that can be polled for changes."""
-+
-+ def bind_connection(connection):
-+ """Returns a storage instance to be used by the given Connection.
-+
-+ This method is optional. By implementing this method, a storage
-+ instance can maintain Connection-specific state.
-+
-+ If this method is not provided, all connections to the same database
-+ use the same storage instance (even across threads).
-+ """
-+
-+ propagate_invalidations = Attribute(
-+ """A boolean value indicating whether invalidations should propagate.
-+
-+ ZODB normally sends invalidation notifications between
-+ Connection objects within a Python process. If this
-+ attribute is false, no such invalidations will be sent.
-+ Cross-connection invalidation should normally be enabled, but
-+ it adds unnecessary complexity to storages that expect the connection
-+ to poll for invalidations instead.
-+
-+ If this attribute is not present, it is assumed to be true.
-+ """)
-+
-+ def connection_closing():
-+ """Notifies the storage that a connection is closing.
-+
-+ This method is optional. This method is useful when
-+ bind_connection() provides Connection-specific storage instances.
-+ It lets the storage release resources.
-+ """
-+
-+ def poll_invalidations():
-+ """Poll the storage for external changes.
-+
-+ This method is optional. This method is useful when
-+ bind_connection() provides Connection-specific storage instances.
-+
-+ Returns either a sequence of OIDs that have changed, or None. When a
-+ sequence is returned, the corresponding objects should be removed
-+ from the ZODB in-memory cache. When None is returned, the storage is
-+ indicating that so much time has elapsed since the last poll that it
-+ is no longer possible to enumerate all of the changed OIDs, since the
-+ previous transaction seen by the connection has already been packed.
-+ In that case, the ZODB in-memory cache should be cleared.
-+ """
-+
-+
- class IStorageCurrentRecordIteration(IStorage):
-
- def record_iternext(next=None):
-Index: src/ZODB/DB.py
-===================================================================
---- src/ZODB/DB.py (revision 97553)
-+++ src/ZODB/DB.py (working copy)
-@@ -456,6 +456,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Copied: relstorage/branches/1.2/poll-invalidation-zodb-3-7.patch (from rev 104556, relstorage/branches/1.2/poll-invalidation-1-zodb-3-7-1.patch)
===================================================================
--- relstorage/branches/1.2/poll-invalidation-zodb-3-7.patch (rev 0)
+++ relstorage/branches/1.2/poll-invalidation-zodb-3-7.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -0,0 +1,96 @@
+Index: Connection.py
+===================================================================
+--- Connection.py (revision 104552)
++++ Connection.py (working copy)
+@@ -75,8 +75,14 @@
+ """Create a new Connection."""
+
+ self._db = db
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
+ self._savepoint_storage = None
+
+ self.transaction_manager = self._synch = self._mvcc = None
+@@ -170,6 +176,12 @@
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ def add(self, obj):
+ """Add a new object 'obj' to the database and assign it an oid."""
+@@ -267,6 +279,11 @@
+ self.transaction_manager.unregisterSynch(self)
+ self._synch = None
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -438,8 +455,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+@@ -698,6 +730,10 @@
+ """Indicate confirmation that the transaction is done."""
+
+ def callback(tid):
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
++
+ d = dict.fromkeys(self._modified)
+ self._db.invalidate(tid, d, self)
+ # It's important that the storage calls the passed function
+Index: DB.py
+===================================================================
+--- DB.py (revision 104552)
++++ DB.py (working copy)
+@@ -260,6 +260,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Property changes on: relstorage/branches/1.2/poll-invalidation-zodb-3-7.patch
___________________________________________________________________
Added: svn:mergeinfo
+
Copied: relstorage/branches/1.2/poll-invalidation-zodb-3-8.patch (from rev 104556, relstorage/branches/1.2/poll-invalidation-1-zodb-3-8-0.patch)
===================================================================
--- relstorage/branches/1.2/poll-invalidation-zodb-3-8.patch (rev 0)
+++ relstorage/branches/1.2/poll-invalidation-zodb-3-8.patch 2009-09-25 22:19:33 UTC (rev 104558)
@@ -0,0 +1,96 @@
+Index: Connection.py
+===================================================================
+--- Connection.py (revision 104546)
++++ Connection.py (working copy)
+@@ -90,8 +90,15 @@
+ self.connections = {self._db.database_name: self}
+
+ self._version = version
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
++
+ self._savepoint_storage = None
+
+ # Do we need to join a txn manager?
+@@ -151,6 +158,12 @@
+ # in the cache on abort and in other connections on finish.
+ self._modified = []
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ # _invalidated queues invalidate messages delivered from the DB
+ # _inv_lock prevents one thread from modifying the set while
+@@ -297,6 +310,11 @@
+ if self._opened:
+ self.transaction_manager.unregisterSynch(self)
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -469,8 +487,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+@@ -748,6 +781,9 @@
+ """Indicate confirmation that the transaction is done."""
+
+ def callback(tid):
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
+ d = dict.fromkeys(self._modified)
+ self._db.invalidate(tid, d, self)
+ # It's important that the storage calls the passed function
+Index: DB.py
+===================================================================
+--- DB.py (revision 104546)
++++ DB.py (working copy)
+@@ -284,6 +284,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Property changes on: relstorage/branches/1.2/poll-invalidation-zodb-3-8.patch
___________________________________________________________________
Added: svn:mergeinfo
+
More information about the checkins
mailing list