[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.73.2.15

Jeremy Hylton jeremy@zope.com
Thu, 5 Jun 2003 14:51:01 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv13137

Modified Files:
      Tag: ZODB3-3_1-branch
	ClientStorage.py 
Log Message:
Add lock to make sure invalidations are handled atomically by the
cache.


=== ZODB3/ZEO/ClientStorage.py 1.73.2.14 => 1.73.2.15 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.14	Tue May 13 12:34:18 2003
+++ ZODB3/ZEO/ClientStorage.py	Thu Jun  5 14:51:00 2003
@@ -267,6 +267,12 @@
         self._oid_lock = threading.Lock()
         self._oids = [] # Object ids retrieved from new_oids()
 
+        # Can't read data in one thread while writing data
+        # (tpc_finish) in another thread.  In general, the lock
+        # must prevent access to the cache while _update_cache
+        # is executing.
+        self._lock = threading.Lock()
+
         t = self._ts = get_timestamp()
         self._serial = `t`
         self._oid = '\0\0\0\0\0\0\0\0'
@@ -602,30 +608,38 @@
         specified by the given object id and version, if they exist;
         otherwise a KeyError is raised.
         """
-        p = self._cache.load(oid, version)
-        if p:
-            return p
-        if self._server is None:
-            raise ClientDisconnected()
-        p, s, v, pv, sv = self._server.zeoLoad(oid)
-        self._cache.checkSize(0)
-        self._cache.store(oid, p, s, v, pv, sv)
-        if v and version and v == version:
-            return pv, sv
-        else:
-            if s:
-                return p, s
-            raise KeyError, oid # no non-version data for this
+        self._lock.acquire()
+        try:
+            p = self._cache.load(oid, version)
+            if p:
+                return p
+            if self._server is None:
+                raise ClientDisconnected()
+            p, s, v, pv, sv = self._server.zeoLoad(oid)
+            self._cache.checkSize(0)
+            self._cache.store(oid, p, s, v, pv, sv)
+            if v and version and v == version:
+                return pv, sv
+            else:
+                if s:
+                    return p, s
+                raise KeyError, oid # no non-version data for this
+        finally:
+            self._lock.release()
 
     def modifiedInVersion(self, oid):
         """Storage API: return the version, if any, that modfied an object.
 
         If no version modified the object, return an empty string.
         """
-        v = self._cache.modifiedInVersion(oid)
-        if v is not None:
-            return v
-        return self._server.modifiedInVersion(oid)
+        self._lock.acquire()
+        try:
+            v = self._cache.modifiedInVersion(oid)
+            if v is not None:
+                return v
+            return self._server.modifiedInVersion(oid)
+        finally:
+            self._lock.release()
 
     def new_oid(self):
         """Storage API: return a new object identifier."""
@@ -763,16 +777,20 @@
         """Storage API: finish a transaction."""
         if transaction is not self._transaction:
             return
+        self._lock.acquire()
         try:
-            if f is not None:
-                f()
+            try:
+                if f is not None:
+                    f()
 
-            self._server.tpc_finish(self._serial)
+                self._server.tpc_finish(self._serial)
 
-            r = self._check_serials()
-            assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
+                r = self._check_serials()
+                assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
 
-            self._update_cache()
+                self._update_cache()
+            finally:
+                self._lock.release()
         finally:
             self.end_transaction()
 
@@ -782,6 +800,8 @@
         This iterates over the objects in the transaction buffer and
         update or invalidate the cache.
         """
+        # Must be called with _lock already acquired.
+        
         self._cache.checkSize(self._tbuf.get_size())
         try:
             self._tbuf.begin_iterate()
@@ -828,10 +848,13 @@
         """Storage API: undo a transaction, writing directly to the storage."""
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        # XXX what are the sync issues here?
-        oids = self._server.undo(transaction_id)
-        for oid in oids:
-            self._cache.invalidate(oid, '')
+        self._lock.acquire()
+        try:
+            oids = self._server.undo(transaction_id)
+            for oid in oids:
+                self._cache.invalidate(oid, '')
+        finally:
+            self._lock.release()
         return oids
 
     def undoInfo(self, first=0, last=-20, specification=None):
@@ -884,16 +907,20 @@
         # Invalidations are sent by the ZEO server as a sequence of
         # oid, version pairs.  The DB's invalidate() method expects a
         # dictionary of oids.
-        
-        # versions maps version names to dictionary of invalidations
-        versions = {}
-        for oid, version in invs:
-            d = versions.setdefault(version, {})
-            self._cache.invalidate(oid, version=version)
-            d[oid] = 1
-        if self._db is not None:
-            for v, d in versions.items():
-                self._db.invalidate(d, version=v)
+
+        self._lock.acquire()
+        try:
+            # versions maps version names to dictionary of invalidations
+            versions = {}
+            for oid, version in invs:
+                d = versions.setdefault(version, {})
+                self._cache.invalidate(oid, version=version)
+                d[oid] = 1
+            if self._db is not None:
+                for v, d in versions.items():
+                    self._db.invalidate(d, version=v)
+        finally:
+            self._lock.release()
 
     def endVerify(self):
         """Server callback to signal end of cache validation."""