[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.101
Jeremy Hylton
jeremy@zope.com
Thu, 5 Jun 2003 18:38:35 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv13384/ZEO
Modified Files:
ClientStorage.py
Log Message:
Merge atomic cache invalidation code from the 3.1 release branch.
=== ZODB3/ZEO/ClientStorage.py 1.100 => 1.101 ===
--- ZODB3/ZEO/ClientStorage.py:1.100 Fri May 30 15:20:57 2003
+++ ZODB3/ZEO/ClientStorage.py Thu Jun 5 18:38:35 2003
@@ -268,6 +268,12 @@
self._oid_lock = threading.Lock()
self._oids = [] # Object ids retrieved from new_oids()
+ # Can't read data in one thread while writing data
+ # (tpc_finish) in another thread. In general, the lock
+ # must prevent access to the cache while _update_cache
+ # is executing.
+ self._lock = threading.Lock()
+
t = self._ts = get_timestamp()
self._serial = `t`
self._oid = '\0\0\0\0\0\0\0\0'
@@ -688,11 +694,19 @@
specified by the given object id and version, if they exist;
otherwise a KeyError is raised.
"""
- p = self._cache.load(oid, version)
- if p:
- return p
+ self._lock.acquire() # for atomic processing of invalidations
+ try:
+ p = self._cache.load(oid, version)
+ if p:
+ return p
+ finally:
+ self._lock.release()
+
if self._server is None:
raise ClientDisconnected()
+
+ # If an invalidation for oid comes in during zeoLoad, that's OK
+ # because we'll get oid's new state.
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
@@ -708,9 +722,13 @@
If no version modified the object, return an empty string.
"""
- v = self._cache.modifiedInVersion(oid)
- if v is not None:
- return v
+ self._lock.acquire()
+ try:
+ v = self._cache.modifiedInVersion(oid)
+ if v is not None:
+ return v
+ finally:
+ self._lock.release()
return self._server.modifiedInVersion(oid)
def new_oid(self):
@@ -847,16 +865,20 @@
if transaction is not self._transaction:
return
try:
+ self._lock.acquire() # for atomic processing of invalidations
+ try:
+ self._update_cache()
+ finally:
+ self._lock.release()
+
if f is not None:
f()
tid = self._server.tpc_finish(self._serial)
+ self._cache.setLastTid(tid)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
-
- self._update_cache()
- self._cache.setLastTid(tid)
finally:
self.end_transaction()
@@ -866,6 +888,8 @@
This iterates over the objects in the transaction buffer and
update or invalidate the cache.
"""
+ # Must be called with _lock already acquired.
+
self._cache.checkSize(self._tbuf.get_size())
try:
self._tbuf.begin_iterate()
@@ -912,10 +936,13 @@
"""Storage API: undo a transaction, writing directly to the storage."""
if self._is_read_only:
raise POSException.ReadOnlyError()
- # XXX what are the sync issues here?
oids = self._server.undo(transaction_id)
- for oid in oids:
- self._cache.invalidate(oid, '')
+ self._lock.acquire()
+ try:
+ for oid in oids:
+ self._cache.invalidate(oid, '')
+ finally:
+ self._lock.release()
return oids
def undoInfo(self, first=0, last=-20, specification=None):
@@ -969,15 +996,19 @@
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
- # versions maps version names to dictionary of invalidations
- versions = {}
- for oid, version in invs:
- d = versions.setdefault(version, {})
- self._cache.invalidate(oid, version=version)
- d[oid] = 1
- if self._db is not None:
- for v, d in versions.items():
- self._db.invalidate(d, version=v)
+ self._lock.acquire()
+ try:
+ # versions maps version names to dictionary of invalidations
+ versions = {}
+ for oid, version in invs:
+ d = versions.setdefault(version, {})
+ self._cache.invalidate(oid, version=version)
+ d[oid] = 1
+ if self._db is not None:
+ for v, d in versions.items():
+ self._db.invalidate(d, version=v)
+ finally:
+ self._lock.release()
def endVerify(self):
"""Server callback to signal end of cache validation."""