[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.73.2.23.2.5

Jeremy Hylton jeremy@zope.com
Wed, 11 Jun 2003 14:42:54 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv824

Modified Files:
      Tag: tim-loading_oids_status-branch
	ClientStorage.py 
Log Message:
Refactor handling of invalidation during load.

Keep count and set of invalidations in separate dicts keyed by oid.


=== ZODB3/ZEO/ClientStorage.py 1.73.2.23.2.4 => 1.73.2.23.2.5 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.23.2.4	Wed Jun 11 12:34:39 2003
+++ ZODB3/ZEO/ClientStorage.py	Wed Jun 11 14:42:54 2003
@@ -265,16 +265,16 @@
         # There's a nasty race.  The ZRPC layer can deliver invalidations
         # out of order (i.e., the server sends the result of a load, then
         # sends an invalidation for that object, but we see the invalidation
-        # first).  To worm around this, load() stores an (oid, version)
-        # pair for the requested object in the _loading_oids_status dict,
-        # mapping to a
-        #     (count, status)
-        # pair.  The count is the number of load() calls in progress
-        # that have requested this (oid, version) pair.  status is
-        # initially True, and a helper method of invalidateTrans() sets
-        # it False to record the invalidation.  load() then uses the
-        # status.  Note:  mutations are protected by self._lock.
-        self._loading_oids_status = {}
+        # first).  To worm around this, load() creates a dict in
+        # _loading_oid_invs and invalidateTrans() stores any arriving
+        # invalidations in that dict.  When the zeoLoad() call returns
+        # from the server, load() handles the invalidations.  
+        # It's possible for different threads to attempt to load the same
+        # oid at the same time.  To account for this, we keep a counter
+        # in _loading_oid_count.  Invalidations are only handled when
+        # the count reaches zero.
+        self._loading_oid_count = {}
+        self._loading_oid_invs = {}
 
         # Can't read data in one thread while writing data
         # (tpc_finish) in another thread.  In general, the lock
@@ -628,29 +628,28 @@
         if self._server is None:
             raise ClientDisconnected()
 
-        self._incLoadStatus(oid, version)
+        self._incLoadStatus(oid)
 
         try:
             p, s, v, pv, sv = self._server.zeoLoad(oid)
         except:
             self._lock.acquire()
             try:
-                self._decLoadStatus(oid, version)
+                self._decLoadStatus(oid)
             finally:
                 self._lock.release()
             raise
 
         self._lock.acquire()
         try:
-            statv, statnv = self._decLoadStatus(oid, version)
-            if statv and statnv:
-                self._cache.checkSize(0)
-                self._cache.store(oid, p, s, v, pv, sv)
-            else:
-                if not statv:
-                    self._cache.invalidate(oid, version)
-                if not statnv:
-                    self._cache.invalidate(oid, '')
+            is_last, invs = self._decLoadStatus(oid)
+            if is_last:
+                if invs:
+                    for v in invs:
+                        self._cache.invalidate(oid, v)
+                else:
+                    self._cache.checkSize(0)
+                    self._cache.store(oid, p, s, v, pv, sv)
         finally:
             self._lock.release()
 
@@ -661,45 +660,41 @@
                 return p, s
             raise KeyError, oid # no non-version data for this
 
-    def _incLoadStatus(self, oid, version):
+    def _incLoadStatus(self, oid):
         """Increment the load count for oid, version pair.
 
         Does its own locking.
         """
-        pairs = (oid, version), (oid, "")
         self._lock.acquire()
         try:
-            for pair in pairs:
-                stuff = self._loading_oids_status.get(pair)
-                if stuff is None:
-                    stuff = 1, 1    # count 1, status True
-                else:
-                    count, status = stuff
-                    stuff = count + 1, status
-                self._loading_oids_status[pair] = stuff
+            count = self._loading_oid_count.get(oid)
+            if count is None:
+                count = 0
+                self._loading_oid_invs[oid] = {}
+            count += 1
+            self._loading_oid_count[oid] = count
         finally:
             self._lock.release()
 
-    def _decLoadStatus(self, oid, version):
-        """Decrement load count and return statii.
-
-        Statii is a 2-element list of booleans indicating whether
-        the object, version was invalidated during the load.  The
-        first element is oid, version; the second element is oid, "".
+    def _decLoadStatus(self, oid):
+        """Decrement load count.
 
+        Return boolean indicating whether this was the last load and
+        a list of versions to invalidate.  The list is empty unless
+        the boolean is True.
+        
         Caller must hold self._lock.
         """
-        statii = []
-        pairs = (oid, version), (oid, "")
-        for pair in pairs:
-            count, status = self._loading_oids_status[pair]
-            statii.append(status)
-            count -= 1
-            if count:
-                self._loading_oids_status[pair] = count, status
-            else:
-                del self._loading_oids_status[pair]
-        return statii
+        count = self._loading_oid_count[oid]
+        count -= 1
+        if count:
+            self._loading_oid_count[oid] = count
+            return 0, []
+        else:
+            del self._loading_oid_count[oid]
+            d = self._loading_oid_invs[oid]
+            del self._loading_oid_invs[oid]
+            return 1, d.keys()
 
     def modifiedInVersion(self, oid):
         """Storage API: return the version, if any, that modfied an object.
@@ -997,17 +992,14 @@
                 d = versions.setdefault(version, {})
                 d[oid] = 1
 
-                # Set invalidation flag for this (oid, version) pair.
-                stuff = self._loading_oids_status.get(pair)
-                if stuff:
-                    # load() is waiting for this.  Keep the load count
-                    # the same, but clear the "no invalidations seen"
-                    # flag.  This trick is necessary because zrpc invokes
-                    # invalidateTrans() immediately when an invalidation
-                    # arrives, but may not deliver the result of a load
-                    # until later, even if the load request preceded
-                    # the invalidation msg.
-                    self._loading_oids_status[pair] = stuff[0], 0
+                # Update the _loading_oids_invs dict for this oid,
+                # if necessary.
+                d = self._loading_oids_invs.get(oid)
+                if d is not None:
+                    # load() is waiting for this.  Mark the version
+                    # as invalidated, so that load can invalidate it
+                    # later.
+                    d[version] = 1
                 else:
                     # load() isn't waiting for this.  Simply invalidate it.
                     self._cache.invalidate(oid, version=version)