[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.73.2.15
Jeremy Hylton
jeremy@zope.com
Thu, 5 Jun 2003 14:51:01 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv13137
Modified Files:
Tag: ZODB3-3_1-branch
ClientStorage.py
Log Message:
Add lock to make sure invalidations are handled atomically by the
cache.
=== ZODB3/ZEO/ClientStorage.py 1.73.2.14 => 1.73.2.15 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.14 Tue May 13 12:34:18 2003
+++ ZODB3/ZEO/ClientStorage.py Thu Jun 5 14:51:00 2003
@@ -267,6 +267,12 @@
self._oid_lock = threading.Lock()
self._oids = [] # Object ids retrieved from new_oids()
+ # Can't read data in one thread while writing data
+ # (tpc_finish) in another thread. In general, the lock
+ # must prevent access to the cache while _update_cache
+ # is executing.
+ self._lock = threading.Lock()
+
t = self._ts = get_timestamp()
self._serial = `t`
self._oid = '\0\0\0\0\0\0\0\0'
@@ -602,30 +608,38 @@
specified by the given object id and version, if they exist;
otherwise a KeyError is raised.
"""
- p = self._cache.load(oid, version)
- if p:
- return p
- if self._server is None:
- raise ClientDisconnected()
- p, s, v, pv, sv = self._server.zeoLoad(oid)
- self._cache.checkSize(0)
- self._cache.store(oid, p, s, v, pv, sv)
- if v and version and v == version:
- return pv, sv
- else:
- if s:
- return p, s
- raise KeyError, oid # no non-version data for this
+ self._lock.acquire()
+ try:
+ p = self._cache.load(oid, version)
+ if p:
+ return p
+ if self._server is None:
+ raise ClientDisconnected()
+ p, s, v, pv, sv = self._server.zeoLoad(oid)
+ self._cache.checkSize(0)
+ self._cache.store(oid, p, s, v, pv, sv)
+ if v and version and v == version:
+ return pv, sv
+ else:
+ if s:
+ return p, s
+ raise KeyError, oid # no non-version data for this
+ finally:
+ self._lock.release()
def modifiedInVersion(self, oid):
"""Storage API: return the version, if any, that modfied an object.
If no version modified the object, return an empty string.
"""
- v = self._cache.modifiedInVersion(oid)
- if v is not None:
- return v
- return self._server.modifiedInVersion(oid)
+ self._lock.acquire()
+ try:
+ v = self._cache.modifiedInVersion(oid)
+ if v is not None:
+ return v
+ return self._server.modifiedInVersion(oid)
+ finally:
+ self._lock.release()
def new_oid(self):
"""Storage API: return a new object identifier."""
@@ -763,16 +777,20 @@
"""Storage API: finish a transaction."""
if transaction is not self._transaction:
return
+ self._lock.acquire()
try:
- if f is not None:
- f()
+ try:
+ if f is not None:
+ f()
- self._server.tpc_finish(self._serial)
+ self._server.tpc_finish(self._serial)
- r = self._check_serials()
- assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
+ r = self._check_serials()
+ assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
- self._update_cache()
+ self._update_cache()
+ finally:
+ self._lock.release()
finally:
self.end_transaction()
@@ -782,6 +800,8 @@
This iterates over the objects in the transaction buffer and
update or invalidate the cache.
"""
+ # Must be called with _lock already acquired.
+
self._cache.checkSize(self._tbuf.get_size())
try:
self._tbuf.begin_iterate()
@@ -828,10 +848,13 @@
"""Storage API: undo a transaction, writing directly to the storage."""
if self._is_read_only:
raise POSException.ReadOnlyError()
- # XXX what are the sync issues here?
- oids = self._server.undo(transaction_id)
- for oid in oids:
- self._cache.invalidate(oid, '')
+ self._lock.acquire()
+ try:
+ oids = self._server.undo(transaction_id)
+ for oid in oids:
+ self._cache.invalidate(oid, '')
+ finally:
+ self._lock.release()
return oids
def undoInfo(self, first=0, last=-20, specification=None):
@@ -884,16 +907,20 @@
# Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
-
- # versions maps version names to dictionary of invalidations
- versions = {}
- for oid, version in invs:
- d = versions.setdefault(version, {})
- self._cache.invalidate(oid, version=version)
- d[oid] = 1
- if self._db is not None:
- for v, d in versions.items():
- self._db.invalidate(d, version=v)
+
+ self._lock.acquire()
+ try:
+ # versions maps version names to dictionary of invalidations
+ versions = {}
+ for oid, version in invs:
+ d = versions.setdefault(version, {})
+ self._cache.invalidate(oid, version=version)
+ d[oid] = 1
+ if self._db is not None:
+ for v, d in versions.items():
+ self._db.invalidate(d, version=v)
+ finally:
+ self._lock.release()
def endVerify(self):
"""Server callback to signal end of cache validation."""