[Zope-Checkins] CVS: ZODB3/ZODB - Connection.py:1.88 DB.py:1.48 cPickleCache.c:1.81

Jeremy Hylton jeremy@zope.com
Tue, 8 Apr 2003 11:56:15 -0400


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv13071/ZODB

Modified Files:
	Connection.py DB.py cPickleCache.c 
Log Message:
Backport atomic invalidations code from Zope3.

The DB's invalidate() method takes a set of oids corresponding to all
the changes from a data manager for one transaction.  All the objects
are invalidated at once.

Add a few tests in testZODB of the new code.  The tests just cover
corner cases, because I can't think of a sensible way to test the
atomicity.  When it has failed in the past, it's been caused by
nearly-impossible to reproduce data races.

This fix needs to be backported to Zope 2.6, but only after assessing
how significant an impact the API change will have.


=== ZODB3/ZODB/Connection.py 1.87 => 1.88 ===
--- ZODB3/ZODB/Connection.py:1.87	Tue Mar  4 16:00:23 2003
+++ ZODB3/ZODB/Connection.py	Tue Apr  8 11:55:44 2003
@@ -14,9 +14,8 @@
 """Database connection support
 
 $Id$"""
-
 from cPickleCache import PickleCache
-from POSException import ConflictError, ReadConflictError
+from POSException import ConflictError, ReadConflictError, TransactionError
 from ExtensionClass import Base
 import ExportImport, TmpStore
 from zLOG import LOG, ERROR, BLATHER, WARNING
@@ -27,6 +26,7 @@
 from cPickle import Unpickler, Pickler
 from cStringIO import StringIO
 import sys
+import threading
 from time import time
 from types import StringType, ClassType
 
@@ -73,14 +73,29 @@
             # XXX Why do we want version caches to behave this way?
 
             self._cache.cache_drain_resistance = 100
-        self._incrgc=self.cacheGC=cache.incrgc
-        self._invalidated=d={}
-        self._invalid=d.has_key
-        self._committed=[]
+        self._incrgc = self.cacheGC = cache.incrgc
+        self._committed = []
         self._code_timestamp = global_code_timestamp
         self._load_count = 0   # Number of objects unghosted
         self._store_count = 0  # Number of objects stored
 
+        # _invalidated queues invalidate messages delivered from the DB
+        # _inv_lock prevents one thread from modifying the set while
+        # another is processing invalidations.  All the invalidations
+        # from a single transaction should be applied atomically, so
+        # the lock must be held when reading _invalidated.
+
+        # XXX It sucks that we have to hold the lock to read
+        # _invalidated.  Normally, _invalidated is written by call
+        # dict.update, which will execute atomically by virtue of the
+        # GIL.  But some storage might generate oids where hash or
+        # compare invokes Python code.  In that case, the GIL can't
+        # save us.
+        self._inv_lock = threading.Lock()
+        self._invalidated = d = {}
+        self._invalid = d.has_key
+        self._conflicts = {}
+
     def getTransaction(self):
         t = self._transaction
         if t is None:
@@ -91,8 +106,6 @@
     def setLocalTransaction(self):
         """Use a transaction bound to the connection rather than the thread"""
         if self._transaction is None:
-            # XXX The connection may already be registered with a
-            # transaction.  I guess we should abort that transaction.
             self._transaction = Transaction()
         return self._transaction
 
@@ -150,7 +163,7 @@
             not args and not hasattr(klass,'__getinitargs__')):
             object=klass.__basicnew__()
         else:
-            object=apply(klass,args)
+            object = klass(*args)
             if klass is not ExtensionKlass:
                 object.__dict__.clear()
 
@@ -221,7 +234,7 @@
             # New code is in place.  Start a new cache.
             self._resetCache()
         else:
-            self._cache.invalidate(self._invalidated)
+            self._flush_invalidations()
         self._opened=time()
 
         return self
@@ -242,7 +255,7 @@
         This just deactivates the thing.
         """
         if object is self:
-            self._cache.invalidate(self._invalidated)
+            self._flush_invalidations()
         else:
             assert object._p_oid is not None
             self._cache.invalidate(object._p_oid)
@@ -263,7 +276,6 @@
 
     def close(self):
         self._incrgc() # This is a good time to do some GC
-        db=self._db
 
         # Call the close callbacks.
         if self.__onCloseCallbacks is not None:
@@ -274,10 +286,10 @@
                     LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
                         error=sys.exc_info())
             self.__onCloseCallbacks = None
-        self._db=self._storage=self._tmp=self.new_oid=self._opened=None
-        self._debug_info=()
+        self._storage = self._tmp = self.new_oid = self._opened = None
+        self._debug_info = ()
         # Return the connection to the pool.
-        db._closeConnection(self)
+        self._db._closeConnection(self)
 
     __onCommitActions = None
 
@@ -292,10 +304,13 @@
             # We registered ourself.  Execute a commit action, if any.
             if self.__onCommitActions is not None:
                 method_name, args, kw = self.__onCommitActions.pop(0)
-                apply(getattr(self, method_name), (transaction,) + args, kw)
+                getattr(self, method_name)(transaction, *args, **kw)
             return
         
         oid = object._p_oid
+        if self._conflicts.has_key(oid):
+            raise ReadConflictError(oid)
+        
         invalid = self._invalid
         if oid is None or object._p_jar is not self:
             # new object
@@ -305,9 +320,11 @@
             self._creating.append(oid)
 
         elif object._p_changed:
-            if invalid(oid) and not hasattr(object, '_p_resolveConflict'):
-                raise ConflictError(object=object)
-            self._invalidating.append(oid)
+            if invalid(oid):
+                resolve = getattr(object, "_p_resolveConflict", None)
+                if resolve is None:
+                    raise ConflictError(object=object)
+            self._modified.append(oid)
 
         else:
             # Nothing to do
@@ -369,7 +386,7 @@
                 #XXX We should never get here
                 if invalid(oid) and not hasattr(object, '_p_resolveConflict'):
                     raise ConflictError(object=object)
-                self._invalidating.append(oid)
+                self._modified.append(oid)
 
             klass = object.__class__
 
@@ -433,9 +450,9 @@
         oids=src._index.keys()
 
         # Copy invalidating and creating info from temporary storage:
-        invalidating=self._invalidating
-        invalidating[len(invalidating):]=oids
-        creating=self._creating
+        modified = self._modified
+        modified[len(modified):] = oids
+        creating = self._creating
         creating[len(creating):]=src._creating
 
         for oid in oids:
@@ -479,15 +496,28 @@
     def isReadOnly(self):
         return self._storage.isReadOnly()
 
-    def invalidate(self, oid):
-        """Invalidate a particular oid
+    def invalidate(self, oids):
+        """Invalidate a set of oids.
 
         This marks the oid as invalid, but doesn't actually invalidate
         it.  The object data will be actually invalidated at certain
         transaction boundaries.
         """
-        assert oid is not None
-        self._invalidated[oid] = 1
+        self._inv_lock.acquire()
+        try:
+            self._invalidated.update(oids)
+        finally:
+            self._inv_lock.release()
+
+    def _flush_invalidations(self):
+        self._inv_lock.acquire()
+        try:
+            self._cache.invalidate(self._invalidated)
+            self._invalidated.clear()
+        finally:
+            self._inv_lock.release()
+        # Now is a good time to collect some garbage
+        self._cache.incrgc()
 
     def modifiedInVersion(self, oid):
         try: return self._db.modifiedInVersion(oid)
@@ -508,8 +538,8 @@
     def root(self):
         return self['\0\0\0\0\0\0\0\0']
 
-    def setstate(self, object):
-        oid = object._p_oid
+    def setstate(self, obj):
+        oid = obj._p_oid
 
         if self._storage is None:
             msg = ("Shouldn't load state for %s "
@@ -518,54 +548,20 @@
             raise RuntimeError(msg)
 
         try:
+            # Avoid reading data from a transaction that committed
+            # after the current transaction started, as that might
+            # lead to mixing of cached data from earlier transactions
+            # and new inconsistent data.
+            #
+            # Wait for check until after data is loaded from storage
+            # to avoid time-of-check to time-of-use race.
             p, serial = self._storage.load(oid, self._version)
             self._load_count = self._load_count + 1
-
-            # XXX this is quite conservative!
-            # We need, however, to avoid reading data from a transaction
-            # that committed after the current "session" started, as
-            # that might lead to mixing of cached data from earlier
-            # transactions and new inconsistent data.
-            #
-            # Note that we (carefully) wait until after we call the
-            # storage to make sure that we don't miss an invaildation
-            # notifications between the time we check and the time we
-            # read.
-
-            # XXX Need unit tests for _p_independent.
-            if self._invalid(oid):
-                if not hasattr(object.__class__, '_p_independent'):
-                    self.getTransaction().register(self)
-                    raise ReadConflictError(object=object)
-                invalid = 1
-            else:
-                invalid = 0
-
-            file = StringIO(p)
-            unpickler = Unpickler(file)
-            unpickler.persistent_load = self._persistent_load
-            unpickler.load()
-            state = unpickler.load()
-
-            if hasattr(object, '__setstate__'):
-                object.__setstate__(state)
-            else:
-                d = object.__dict__
-                for k, v in state.items():
-                    d[k] = v
-
-            object._p_serial = serial
-
+            invalid = self._is_invalidated(obj)
+            self._set_ghost_state(obj, p)
+            obj._p_serial = serial
             if invalid:
-                if object._p_independent():
-                    try:
-                        del self._invalidated[oid]
-                    except KeyError:
-                        pass
-                else:
-                    self.getTransaction().register(self)
-                    raise ConflictError(object=object)
-
+                self._handle_independent(obj)
         except ConflictError:
             raise
         except:
@@ -573,6 +569,59 @@
                 error=sys.exc_info())
             raise
 
+    def _is_invalidated(self, obj):
+        # Helper method for setstate() covers three cases:
+        # returns false if obj is valid
+        # returns true if obj was invalidation, but is independent
+        # otherwise, raises ConflictError for invalidated objects
+        self._inv_lock.acquire()
+        try:
+            if self._invalidated.has_key(obj._p_oid):
+                # Defer _p_independent() call until state is loaded.
+                ind = getattr(obj, "_p_independent", None)
+                if ind is not None:
+                    # Defer _p_independent() call until state is loaded.
+                    return 1
+                else:
+                    self.getTransaction().register(self)
+                    self._conflicts[obj._p_oid] = 1
+                    raise ReadConflictError(object=obj)
+            else:
+                return 0
+        finally:
+            self._inv_lock.release()
+
+    def _set_ghost_state(self, obj, p):
+        file = StringIO(p)
+        unpickler = Unpickler(file)
+        unpickler.persistent_load = self._persistent_load
+        unpickler.load()
+        state = unpickler.load()
+
+        setstate = getattr(obj, "__setstate__", None)
+        if setstate is None:
+            obj.update(state)
+        else:
+            setstate(state)
+
+    def _handle_independent(self, obj):
+        # Helper method for setstate() handles possibly independent objects
+        # Call _p_independent(), if it returns True, setstate() wins.
+        # Otherwise, raise a ConflictError.
+
+        if obj._p_independent():
+            self._inv_lock.acquire()
+            try:
+                try:
+                    del self._invalidated[obj._p_oid]
+                except KeyError:
+                    pass
+            finally:
+                self._inv_lock.release()
+        else:
+            self.getTransaction().register(obj)
+            raise ReadConflictError(object=obj)
+        
     def oldstate(self, object, serial):
         oid=object._p_oid
         p = self._storage.loadSerial(oid, serial)
@@ -601,7 +650,7 @@
                     % getattr(object,'__name__','(?)'))
                 return
 
-            copy=apply(klass,args)
+            copy = klass(*args)
             object.__dict__.clear()
             object.__dict__.update(copy.__dict__)
 
@@ -617,12 +666,13 @@
         if self.__onCommitActions is not None:
             del self.__onCommitActions
         self._storage.tpc_abort(transaction)
-        self._cache.invalidate(self._invalidated)
-        self._cache.invalidate(self._invalidating)
+        self._cache.invalidate(self._modified)
+        self._flush_invalidations()
+        self._conflicts.clear()
         self._invalidate_creating()
 
     def tpc_begin(self, transaction, sub=None):
-        self._invalidating = []
+        self._modified = []
         self._creating = []
         if sub:
             # Sub-transaction!
@@ -688,10 +738,10 @@
 
     def tpc_finish(self, transaction):
         # It's important that the storage call the function we pass
-        # (self._invalidate_invalidating) while it still has it's
-        # lock.  We don't want another thread to be able to read any
-        # updated data until we've had a chance to send an
-        # invalidation message to all of the other connections!
+        # while it still has it's lock.  We don't want another thread
+        # to be able to read any updated data until we've had a chance
+        # to send an invalidation message to all of the other
+        # connections!
 
         if self._tmp is not None:
             # Commiting a subtransaction!
@@ -700,25 +750,21 @@
             self._storage._creating[:0]=self._creating
             del self._creating[:]
         else:
-            self._db.begin_invalidation()
-            self._storage.tpc_finish(transaction,
-                                     self._invalidate_invalidating)
+            def callback():
+                d = {}
+                for oid in self._modified:
+                    d[oid] = 1 
+                self._db.invalidate(d, self)
+            self._storage.tpc_finish(transaction, callback)
 
-        self._cache.invalidate(self._invalidated)
-        self._incrgc() # This is a good time to do some GC
-
-    def _invalidate_invalidating(self):
-        for oid in self._invalidating:
-            assert oid is not None
-            self._db.invalidate(oid, self)
-        self._db.finish_invalidation()
+        self._conflicts.clear()
+        self._flush_invalidations()
 
     def sync(self):
         self.getTransaction().abort()
         sync=getattr(self._storage, 'sync', 0)
         if sync != 0: sync()
-        self._cache.invalidate(self._invalidated)
-        self._incrgc() # This is a good time to do some GC
+        self._flush_invalidations()
 
     def getDebugInfo(self):
         return self._debug_info


=== ZODB3/ZODB/DB.py 1.47 => 1.48 ===
--- ZODB3/ZODB/DB.py:1.47	Fri Jan 17 12:23:14 2003
+++ ZODB3/ZODB/DB.py	Tue Apr  8 11:55:44 2003
@@ -26,6 +26,12 @@
 
 from types import StringType
 
+def list2dict(L):
+    d = {}
+    for elt in L:
+        d[elt] = 1
+    return d
+
 class DB(UndoLogCompatible.UndoLogCompatible):
     """The Object Database
 
@@ -282,17 +288,7 @@
     def importFile(self, file):
         raise 'Not yet implemented'
 
-    def begin_invalidation(self):
-        # Must be called before first call to invalidate and before
-        # the storage lock is held.
-        self._a()
-
-    def finish_invalidation(self):
-        # Must be called after begin_invalidation() and after final
-        # invalidate() call.
-        self._r()
-
-    def invalidate(self, oid, connection=None, version='',
+    def invalidate(self, oids, connection=None, version='',
                    rc=sys.getrefcount):
         """Invalidate references to a given oid.
 
@@ -304,9 +300,11 @@
         if connection is not None:
             version=connection._version
         # Update modified in version cache
-        h=hash(oid)%131
-        o=self._miv_cache.get(h, None)
-        if o is not None and o[0]==oid: del self._miv_cache[h]
+        # XXX must make this work with list or dict to backport to 2.6
+        for oid in oids:
+            h=hash(oid)%131
+            o=self._miv_cache.get(h, None)
+            if o is not None and o[0]==oid: del self._miv_cache[h]
 
         # Notify connections
         for pool, allocated in self._pools[1]:
@@ -315,7 +313,7 @@
                     (not version or cc._version==version)):
                     if rc(cc) <= 3:
                         cc.close()
-                    cc.invalidate(oid)
+                    cc.invalidate(oids)
 
         temps=self._temps
         if temps:
@@ -324,7 +322,7 @@
                 if rc(cc) > 3:
                     if (cc is not connection and
                         (not version or cc._version==version)):
-                        cc.invalidate(oid)
+                        cc.invalidate(oids)
                     t.append(cc)
                 else: cc.close()
             self._temps=t
@@ -561,8 +559,10 @@
             transaction.register(TransactionalUndo(self, id))
         else:
             # fall back to old undo
+            d = {}
             for oid in storage.undo(id):
-                self.invalidate(oid)
+                d[oid] = 1
+            self.invalidate(d)
 
     def versionEmpty(self, version):
         return self._storage.versionEmpty(version)
@@ -589,14 +589,14 @@
     def abort(self, reallyme, t): pass
 
     def commit(self, reallyme, t):
-        db=self._db
         dest=self._dest
-        oids=db._storage.commitVersion(self._version, dest, t)
-        for oid in oids: db.invalidate(oid, version=dest)
+        oids = self._db._storage.commitVersion(self._version, dest, t)
+        oids = list2dict(oids)
+        self._db.invalidate(oids, version=dest)
         if dest:
             # the code above just invalidated the dest version.
             # now we need to invalidate the source!
-            for oid in oids: db.invalidate(oid, version=self._version)
+            self._db.invalidate(oids, version=self._version)
 
 class AbortVersion(CommitVersion):
     """An object that will see to version abortion
@@ -605,11 +605,9 @@
     """
 
     def commit(self, reallyme, t):
-        db=self._db
         version=self._version
-        oids = db._storage.abortVersion(version, t)
-        for oid in oids:
-            db.invalidate(oid, version=version)
+        oids = self._db._storage.abortVersion(version, t)
+        self._db.invalidate(list2dict(oids), version=version)
 
 
 class TransactionalUndo(CommitVersion):
@@ -623,7 +621,5 @@
     # similarity of rhythm that I think it's justified.
 
     def commit(self, reallyme, t):
-        db=self._db
-        oids=db._storage.transactionalUndo(self._version, t)
-        for oid in oids:
-            db.invalidate(oid)
+        oids = self._db._storage.transactionalUndo(self._version, t)
+        self._db.invalidate(list2dict(oids))


=== ZODB3/ZODB/cPickleCache.c 1.80 => 1.81 ===
--- ZODB3/ZODB/cPickleCache.c:1.80	Wed Apr  2 11:50:49 2003
+++ ZODB3/ZODB/cPickleCache.c	Tue Apr  8 11:55:44 2003
@@ -352,6 +352,7 @@
 	      _invalidate(self, key);
 	      Py_DECREF(key);
 	  }
+	  /* XXX Do we really want to modify the input? */
 	  PySequence_DelSlice(inv, 0, l);
       }
   }