[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.103 ClientStub.py:1.12
Jeremy Hylton
jeremy@zope.com
Fri, 13 Jun 2003 17:57:08 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv3078/ZEO
Modified Files:
ClientStorage.py ClientStub.py
Log Message:
Bacport various cache consistency bug fixes from the ZODB3-3_1-branch.
=== ZODB3/ZEO/ClientStorage.py 1.102 => 1.103 ===
--- ZODB3/ZEO/ClientStorage.py:1.102 Tue Jun 10 10:55:34 2003
+++ ZODB3/ZEO/ClientStorage.py Fri Jun 13 17:56:37 2003
@@ -268,6 +268,15 @@
self._oid_lock = threading.Lock()
self._oids = [] # Object ids retrieved from new_oids()
+ # load() and tpc_finish() must be serialized to guarantee
+ # that cache modifications from each occur atomically.
+ # It also prevents multiple load calls occuring simultaneously,
+ # which simplifies the cache logic.
+ self._load_lock = threading.Lock()
+ # _load_oid and _load_status are protected by _lock
+ self._load_oid = None
+ self._load_status = None
+
# Can't read data in one thread while writing data
# (tpc_finish) in another thread. In general, the lock
# must prevent access to the cache while _update_cache
@@ -696,20 +705,37 @@
"""
self._lock.acquire() # for atomic processing of invalidations
try:
- p = self._cache.load(oid, version)
- if p:
- return p
+ pair = self._cache.load(oid, version)
+ if pair:
+ return pair
finally:
self._lock.release()
-
+
if self._server is None:
raise ClientDisconnected()
-
- # If an invalidation for oid comes in during zeoLoad, that's OK
- # because we'll get oid's new state.
- p, s, v, pv, sv = self._server.zeoLoad(oid)
- self._cache.checkSize(0)
- self._cache.store(oid, p, s, v, pv, sv)
+
+ self._load_lock.acquire()
+ try:
+ self._lock.acquire()
+ try:
+ self._load_oid = oid
+ self._load_status = 1
+ finally:
+ self._lock.release()
+
+ p, s, v, pv, sv = self._server.zeoLoad(oid)
+
+ self._lock.acquire() # for atomic processing of invalidations
+ try:
+ if self._load_status:
+ self._cache.checkSize(0)
+ self._cache.store(oid, p, s, v, pv, sv)
+ self._load_oid = None
+ finally:
+ self._lock.release()
+ finally:
+ self._load_lock.release()
+
if v and version and v == version:
return pv, sv
else:
@@ -864,22 +890,22 @@
"""Storage API: finish a transaction."""
if transaction is not self._transaction:
return
+ self._load_lock.acquire()
try:
self._lock.acquire() # for atomic processing of invalidations
try:
self._update_cache()
+ if f is not None:
+ f()
finally:
self._lock.release()
-
- if f is not None:
- f()
- tid = self._server.tpc_finish(self._serial)
- self._cache.setLastTid(tid)
+ self._server.tpc_finish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
finally:
+ self._load_lock.release()
self.end_transaction()
def _update_cache(self):
@@ -1000,15 +1026,17 @@
# Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
-
+
self._lock.acquire()
try:
# versions maps version names to dictionary of invalidations
versions = {}
for oid, version in invs:
- d = versions.setdefault(version, {})
+ if oid == self._load_oid:
+ self._load_status = 0
self._cache.invalidate(oid, version=version)
- d[oid] = 1
+ versions.setdefault(version, {})[oid] = 1
+
if self._db is not None:
for v, d in versions.items():
self._db.invalidate(d, version=v)
@@ -1038,10 +1066,10 @@
"""Invalidate objects modified by tid."""
self._cache.setLastTid(tid)
if self._pickler is not None:
- self.log("Transactional invalidation during cache verification",
- level=zLOG.BLATHER)
+ log2(BLATHER,
+ "Transactional invalidation during cache verification")
for t in args:
- self.self._pickler.dump(t)
+ self._pickler.dump(t)
return
self._process_invalidations(args)
=== ZODB3/ZEO/ClientStub.py 1.11 => 1.12 ===
--- ZODB3/ZEO/ClientStub.py:1.11 Fri Jan 3 17:07:38 2003
+++ ZODB3/ZEO/ClientStub.py Fri Jun 13 17:56:37 2003
@@ -53,7 +53,7 @@
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
- self.rpc.callAsync('invalidateTransaction', tid, args)
+ self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)