[Zodb-checkins] CVS: Zope3/lib/python/ZODB - Connection.py:1.60.6.10 DB.py:1.34.4.6 TmpStore.py:1.4.86.2

Jeremy Hylton jeremy@zope.com
Mon, 11 Mar 2002 23:53:44 -0500


Update of /cvs-repository/Zope3/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv23618

Modified Files:
      Tag: Zope-3x-branch
	Connection.py DB.py TmpStore.py 
Log Message:
Refactor invalidation logic.

The old code contained instance variables with similar names and uses:
_invalidated and _invalidating.  I've renamed _invalidating _modified.

The _invalidated dictionary implements a set of oids.  It contains all
of the oids modified by other connections since the current
transaction began.  When the transaction finishes, it updates the
cache based on _invalidated and resets _invalidated.  (There may be a
race condition in the update-and-then-clear.)

The _modified instance variable is a list of oids modified by the
current transaction.  It is reset each time tpc_begin() is called.

Since _invalidated and _modified are past tense, rename _creating to
_created.  (This also effects TmpStore.)

There are many tests in the code using _invalid().  This is equivalent
to self._invalidated.has_key().  Many of these tests check the current
oid and also check None, e.g. self._invalid(None).  It appears,
however, that it is impossible for None to be stored in _invalidated.
Add an assertion to this effect in the code and delete tests for
_invalid(None). 

Refactor implementation of DB.invalidate() into several smaller
methods.

Fix typos in tpc_abort() reported by Steve Alexander and R. David Murray.

Rename Connection _setDB() method to registerDB().



=== Zope3/lib/python/ZODB/Connection.py 1.60.6.9 => 1.60.6.10 ===
         self._cache = cache = Cache(cache_size, cache_deactivate_after)
         self.cacheGC = cache.incrgc
-        
+
+        # _invalidated queues invalidate messages delivered from the DB
         self._invalidated = d = {}
         self._invalid = d.has_key
         self._committed = []
@@ -238,8 +239,7 @@
             # to avoid time-of-check to time-of-use race.
             p, serial = self._storage.load(oid, self._version)
 
-            # XXX What does _invalid(None) mean?
-            if self._invalid(oid) or self._invalid(None):
+            if self._invalid(oid):
                 if not (hasattr(object, '_p_independent')
                         and object._p_independent()):
                     get_transaction().register(self)
@@ -295,6 +295,7 @@
             # New code is in place.  Start a new cache.
             self._resetCache()
         else:
+            # XXX race condition?
             self._cache.invalidateMany(self._invalidated.iterkeys())
             self._invalidated.clear()
         self._opened = time.time()
@@ -361,6 +362,9 @@
     def cacheMinimize(self, dt=0):
         self._cache.minimize(dt)
 
+    def getVersion(self):
+        return self._version
+
     def invalidate(self, oid):
         """Invalidate a particular oid
 
@@ -368,7 +372,8 @@
         it.  The object data will be actually invalidated at certain
         transaction boundaries.
         """
-        # XXX can oid be None?
+        assert oid is not None
+        # XXX race condition?
         self._invalidated[oid] = 1
 
     ######################################################################
@@ -380,6 +385,7 @@
     def abort(self, object, transaction):
         """Invalidate the object (or all objects if None)."""
         if object is self:
+            # XXX race condition?
             self._cache.invalidateMany(self._invalidated.iterkeys())
             self._invalidated.clear()
         else:
@@ -400,14 +406,13 @@
             oid = self.new_oid()
             object._p_jar = self
             object._p_oid = oid
-            self._creating.append(oid)
+            self._created.append(oid)
         elif object._p_changed:
             # XXX Is it kosher to raise a ConflictError on commit?
-            if ((self._invalid(oid)
-                 and not hasattr(object, '_p_resolveConflict'))
-                or self._invalid(None)):
+            if (self._invalid(oid)
+                and not hasattr(object, '_p_resolveConflict')):
                 raise ConflictError(object=object)
-            self._invalidating.append(oid)
+            self._modified.append(oid)
         else:
             return # Nothing to do
 
@@ -428,7 +433,7 @@
         serial = getattr(pobject, '_p_serial', '\0\0\0\0\0\0\0\0')
         if serial == '\0\0\0\0\0\0\0\0':
             # new object
-            self._creating.append(oid)
+            self._created.append(oid)
         else:
             #XXX We should never get here
             #jer: Don't understand previous comment.
@@ -436,7 +441,7 @@
                  not hasattr(pobject, '_p_resolveConflict'))
                 or self._invalidated.has_key(None)):
                 raise ConflictError(oid=oid, serial=serial)
-            self._invalidating.append(oid)
+            self._modified.append(oid)
 
         klass = pobject.__class__
 
@@ -497,8 +502,8 @@
         
         oids = src._index.keys()
         # Copy invalidating and creating info from temporary storage:
-        self._invalidating.extend(oids)
-        self._creating.extend(src._creating)
+        self._modified.extend(oids)
+        self._created.extend(src._created)
         
         for oid in oids:
             data, serial = src.load(oid, src)
@@ -515,16 +520,16 @@
 
         self._cache.invalidateMany(src._index.iterkeys())
         src._index.clear()
-        self._invalidate_creating(src._creating)
+        self._invalidate_created(src._created)
 
-    def _invalidate_creating(self, creating=None):
+    def _invalidate_created(self, created=None):
         """Dissown any objects newly saved in an uncommitted transaction.
         """
-        if creating is None:
-            creating = self._creating
-            self._creating = []
+        if created is None:
+            created = self._created
+            self._created = []
 
-        for oid in creating:
+        for oid in created:
             o = self._cache.get(oid)
             if o is not None:
                 del o._p_jar
@@ -543,17 +548,18 @@
         if self.__onCommitActions is not None:
             del self.__onCommitActions
         self._storage.tpc_abort(transaction)
+        # XXX race condition?
         self._cache.invalidateMany(self._invalidated.iterkeys())
         self._invalidated.clear()
-        self._cache.invalidateMany(self._invalidating.iterkeys())
-        self._invalidatng.clear()
-        self._invalidate_creating()
+        self._cache.invalidateMany(self._modified)
+        del self._modified[:]
+        self._invalidate_created()
 
     def tpc_begin(self, transaction, sub=None):
-        if self._invalid(None): # Some nitwit invalidated everything!
-            raise ConflictError("transaction already invalidated")
-        self._invalidating = []
-        self._creating = []
+        # _modified is a list of the oids of the objects modified
+        # by this transaction.
+        self._modified = []
+        self._created = []
 
         if sub:
             # Sub-transaction!
@@ -621,7 +627,7 @@
 
     def tpc_finish(self, transaction):
         # It's important that the storage call the function we pass
-        # (self._invalidate_invalidating) while it still has it's
+        # (self._invalidate_modified) while it still has it's
         # lock.  We don't want another thread to be able to read any
         # updated data until we've had a chance to send an
         # invalidation message to all of the other connections!
@@ -630,19 +636,20 @@
             # Commiting a subtransaction!
             # There is no need to invalidate anything.
             self._storage.tpc_finish(transaction)
-            self._storage._creating[:0] = self._creating
-            del self._creating[:]
+            self._storage._created[:0] = self._created
+            del self._created[:]
         else:
             self._db.begin_invalidation()
             self._storage.tpc_finish(transaction,
-                                     self._invalidate_invalidating)
+                                     self._invalidate_modified)
 
+        # XXX race condition?
         self._cache.invalidateMany(self._invalidated.iterkeys())
         self._invalidated.clear()
         self.cacheGC() # This is a good time to do some GC
 
-    def _invalidate_invalidating(self):
-        for oid in self._invalidating:
+    def _invalidate_modified(self):
+        for oid in self._modified:
             self._db.invalidate(oid, self)
         self._db.finish_invalidation()
 
@@ -651,6 +658,7 @@
         sync = getattr(self._storage, 'sync', None)
         if sync is not None:
             sync()
+        # XXX race condition?
         self._cache.invalidateMany(self._invalidated.iterkeys())
         self._invalidated.clear()
         self.cacheGC() # This is a good time to do some GC


=== Zope3/lib/python/ZODB/DB.py 1.34.4.5 => 1.34.4.6 ===
         self._r()
 
-    def invalidate(self, oid, connection=None, version='',
-                   rc=sys.getrefcount):
+    def invalidate(self, oid, connection=None, version=''):
         """Invalidate references to a given oid.
 
         This is used to indicate that one of the connections has committed a
@@ -274,32 +273,53 @@
         connection.
         """
         if connection is not None:
-            version=connection._version
-        # Update modified in version cache
-        h=hash(oid)%131
-        o=self._miv_cache.get(h, None)
-        if o is not None and o[0]==oid: del self._miv_cache[h]
+            version = connection._version
+
+        assert oid is not None
+
+        self.updateMIVCache(oid)
 
         # Notify connections
         for pool, allocated in self._pools[1]:
             for cc in allocated:
-                if (cc is not connection and
-                    (not version or cc._version==version)):
-                    if rc(cc) <= 3:
-                        cc.close()
-                    cc.invalidate(oid)
-
-        temps=self._temps
-        if temps:
-            t=[]
-            for cc in temps:
-                if rc(cc) > 3:
-                    if (cc is not connection and
-                        (not version or cc._version==version)):
-                        cc.invalidate(oid)
-                    t.append(cc)
-                else: cc.close()
-            self._temps=t
+                if cc is not connection:
+                    self.invalidateConnection(cc, version, oid)
+
+        if self._temps:
+            # t accumulates all the connections that aren't closed.
+            t = []
+            for cc in self._temps:
+                if cc is not connection:
+                    self.invalidateConnection(cc, oid, version,
+                                              t.append)
+            self._temps = t
+
+    def invalidateConnection(self, conn, oid, version, alive=None):
+        """Send invalidation message to conn for oid on version.
+
+        If the modification occurred on a version, an invalidation is
+        sent only if the version of the mod matches the version of the
+        connection.
+
+        This function also handles garbage collection of connection's
+        that aren't used anymore.  If the optional argument alive is
+        defined, it is a function that is called for all connections
+        that aren't garbage collected.
+        """
+
+        if sys.getrefcount(conn) <= 3:
+            conn.close()
+        else:
+            if alive is not None:
+                alive(conn)
+        if not version or conn.getVersion() == version:
+            conn.invalidate(oid)
+
+    def updateMIVCache(self, oid):
+        h = hash(oid) % 131
+        o = self._miv_cache.get(h)
+        if o is not None and o[0]==oid:
+            del self._miv_cache[h]
 
     def modifiedInVersion(self, oid):
         h=hash(oid)%131


=== Zope3/lib/python/ZODB/TmpStore.py 1.4.86.1 => 1.4.86.2 ===
             file=tempfile.TemporaryFile()
 
-        self._file=file
-        self._index={}
-        self._pos=self._tpos=0
-        self._bver=base_version
-        self._tindex=[]
-        self._db=None
-        self._creating=[]
+        self._file = file
+        self._index = {}
+        self._pos = self._tpos = 0
+        self._bver = base_version
+        self._tindex = []
+        self._db = None
+        self._created = []
 
-    def __del__(self): self.close()
+    def __del__(self):
+        self.close()
 
     def close(self):
         self._file.close()
@@ -39,8 +40,11 @@
         del self._index
         del self._db
 
-    def getName(self): return self._db.getName()
-    def getSize(self): return self._pos
+    def getName(self):
+        return self._db.getName()
+    
+    def getSize(self):
+        return self._pos
 
     def load(self, oid, version):
         #if version is not self: raise KeyError, oid
@@ -55,14 +59,16 @@
         return file.read(u64(h[16:])), h[8:16]
         
     def modifiedInVersion(self, oid):
-        if self._index.has_key(oid): return 1
+        if self._index.has_key(oid):
+            return 1
         return self._db._storage.modifiedInVersion(oid)
 
-    def new_oid(self): return self._db._storage.new_oid()
+    def new_oid(self):
+        return self._db._storage.new_oid()
 
     def registerDB(self, db, limit):
-        self._db=db
-        self._storage=db._storage
+        self._db = db
+        self._storage = db._storage
 
     def store(self, oid, serial, data, version, transaction):
         if transaction is not self._transaction: