[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.103 ClientStub.py:1.12

Jeremy Hylton jeremy@zope.com
Fri, 13 Jun 2003 17:57:08 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv3078/ZEO

Modified Files:
	ClientStorage.py ClientStub.py 
Log Message:
Bacport various cache consistency bug fixes from the ZODB3-3_1-branch.


=== ZODB3/ZEO/ClientStorage.py 1.102 => 1.103 ===
--- ZODB3/ZEO/ClientStorage.py:1.102	Tue Jun 10 10:55:34 2003
+++ ZODB3/ZEO/ClientStorage.py	Fri Jun 13 17:56:37 2003
@@ -268,6 +268,15 @@
         self._oid_lock = threading.Lock()
         self._oids = [] # Object ids retrieved from new_oids()
 
+        # load() and tpc_finish() must be serialized to guarantee
+        # that cache modifications from each occur atomically.
+        # It also prevents multiple load calls occuring simultaneously,
+        # which simplifies the cache logic.
+        self._load_lock = threading.Lock()
+        # _load_oid and _load_status are protected by _lock
+        self._load_oid = None
+        self._load_status = None
+
         # Can't read data in one thread while writing data
         # (tpc_finish) in another thread.  In general, the lock
         # must prevent access to the cache while _update_cache
@@ -696,20 +705,37 @@
         """
         self._lock.acquire()    # for atomic processing of invalidations
         try:
-            p = self._cache.load(oid, version)
-            if p:
-                return p
+            pair = self._cache.load(oid, version)
+            if pair:
+                return pair
         finally:
             self._lock.release()
-            
+
         if self._server is None:
             raise ClientDisconnected()
-        
-        # If an invalidation for oid comes in during zeoLoad, that's OK
-        # because we'll get oid's new state.
-        p, s, v, pv, sv = self._server.zeoLoad(oid)
-        self._cache.checkSize(0)
-        self._cache.store(oid, p, s, v, pv, sv)
+
+        self._load_lock.acquire()
+        try:
+            self._lock.acquire()
+            try:
+                self._load_oid = oid
+                self._load_status = 1
+            finally:
+                self._lock.release()
+
+            p, s, v, pv, sv = self._server.zeoLoad(oid)
+
+            self._lock.acquire()    # for atomic processing of invalidations
+            try:
+                if self._load_status:
+                    self._cache.checkSize(0)
+                    self._cache.store(oid, p, s, v, pv, sv)
+                self._load_oid = None
+            finally:
+                self._lock.release()
+        finally:
+            self._load_lock.release()
+
         if v and version and v == version:
             return pv, sv
         else:
@@ -864,22 +890,22 @@
         """Storage API: finish a transaction."""
         if transaction is not self._transaction:
             return
+        self._load_lock.acquire()
         try:
             self._lock.acquire()  # for atomic processing of invalidations
             try:
                 self._update_cache()
+                if f is not None:
+                    f()
             finally:
                 self._lock.release()
-                
-            if f is not None:
-                f()
 
-            tid = self._server.tpc_finish(self._serial)
-            self._cache.setLastTid(tid)
+            self._server.tpc_finish(self._serial)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
         finally:
+            self._load_lock.release()
             self.end_transaction()
 
     def _update_cache(self):
@@ -1000,15 +1026,17 @@
         # Invalidations are sent by the ZEO server as a sequence of
         # oid, version pairs.  The DB's invalidate() method expects a
         # dictionary of oids.
-        
+
         self._lock.acquire()
         try:
             # versions maps version names to dictionary of invalidations
             versions = {}
             for oid, version in invs:
-                d = versions.setdefault(version, {})
+                if oid == self._load_oid:
+                    self._load_status = 0
                 self._cache.invalidate(oid, version=version)
-                d[oid] = 1
+                versions.setdefault(version, {})[oid] = 1
+
             if self._db is not None:
                 for v, d in versions.items():
                     self._db.invalidate(d, version=v)
@@ -1038,10 +1066,10 @@
         """Invalidate objects modified by tid."""
         self._cache.setLastTid(tid)
         if self._pickler is not None:
-            self.log("Transactional invalidation during cache verification",
-                     level=zLOG.BLATHER)
+            log2(BLATHER,
+                 "Transactional invalidation during cache verification")
             for t in args:
-                self.self._pickler.dump(t)
+                self._pickler.dump(t)
             return
         self._process_invalidations(args)
 


=== ZODB3/ZEO/ClientStub.py 1.11 => 1.12 ===
--- ZODB3/ZEO/ClientStub.py:1.11	Fri Jan  3 17:07:38 2003
+++ ZODB3/ZEO/ClientStub.py	Fri Jun 13 17:56:37 2003
@@ -53,7 +53,7 @@
         self.rpc.callAsync('endVerify')
 
     def invalidateTransaction(self, tid, args):
-        self.rpc.callAsync('invalidateTransaction', tid, args)
+        self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)