[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.73.2.23.2.5
Jeremy Hylton
jeremy@zope.com
Wed, 11 Jun 2003 14:42:54 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv824
Modified Files:
Tag: tim-loading_oids_status-branch
ClientStorage.py
Log Message:
Refactor handling of invalidation during load.
Keep count and set of invalidations in separate dicts keyed by oid.
=== ZODB3/ZEO/ClientStorage.py 1.73.2.23.2.4 => 1.73.2.23.2.5 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.23.2.4 Wed Jun 11 12:34:39 2003
+++ ZODB3/ZEO/ClientStorage.py Wed Jun 11 14:42:54 2003
@@ -265,16 +265,16 @@
# There's a nasty race. The ZRPC layer can deliver invalidations
# out of order (i.e., the server sends the result of a load, then
# sends an invalidation for that object, but we see the invalidation
- # first). To worm around this, load() stores an (oid, version)
- # pair for the requested object in the _loading_oids_status dict,
- # mapping to a
- # (count, status)
- # pair. The count is the number of load() calls in progress
- # that have requested this (oid, version) pair. status is
- # initially True, and a helper method of invalidateTrans() sets
- # it False to record the invalidation. load() then uses the
- # status. Note: mutations are protected by self._lock.
- self._loading_oids_status = {}
+ # first). To worm around this, load() creates a dict in
+ # _loading_oid_invs and invalidateTrans() stores any arriving
+ # invalidations in that dict. When the zeoLoad() call returns
+ # from the server, load() handles the invalidations.
+ # It's possible for different threads to attempt to load the same
+ # oid at the same time. To account for this, we keep a counter
+ # in _loading_oid_count. Invalidations are only handled when
+ # the count reaches zero.
+ self._loading_oid_count = {}
+ self._loading_oid_invs = {}
# Can't read data in one thread while writing data
# (tpc_finish) in another thread. In general, the lock
@@ -628,29 +628,28 @@
if self._server is None:
raise ClientDisconnected()
- self._incLoadStatus(oid, version)
+ self._incLoadStatus(oid)
try:
p, s, v, pv, sv = self._server.zeoLoad(oid)
except:
self._lock.acquire()
try:
- self._decLoadStatus(oid, version)
+ self._decLoadStatus(oid)
finally:
self._lock.release()
raise
self._lock.acquire()
try:
- statv, statnv = self._decLoadStatus(oid, version)
- if statv and statnv:
- self._cache.checkSize(0)
- self._cache.store(oid, p, s, v, pv, sv)
- else:
- if not statv:
- self._cache.invalidate(oid, version)
- if not statnv:
- self._cache.invalidate(oid, '')
+ is_last, invs = self._decLoadStatus(oid)
+ if is_last:
+ if invs:
+ for v in invs:
+ self._cache.invalidate(oid, v)
+ else:
+ self._cache.checkSize(0)
+ self._cache.store(oid, p, s, v, pv, sv)
finally:
self._lock.release()
@@ -661,45 +660,41 @@
return p, s
raise KeyError, oid # no non-version data for this
- def _incLoadStatus(self, oid, version):
+ def _incLoadStatus(self, oid):
"""Increment the load count for oid, version pair.
Does its own locking.
"""
- pairs = (oid, version), (oid, "")
self._lock.acquire()
try:
- for pair in pairs:
- stuff = self._loading_oids_status.get(pair)
- if stuff is None:
- stuff = 1, 1 # count 1, status True
- else:
- count, status = stuff
- stuff = count + 1, status
- self._loading_oids_status[pair] = stuff
+ count = self._loading_oid_count.get(oid)
+ if count is None:
+ count = 0
+ self._loading_oid_invs[oid] = {}
+ count += 1
+ self._loading_oid_count[oid] = count
finally:
self._lock.release()
- def _decLoadStatus(self, oid, version):
- """Decrement load count and return statii.
-
- Statii is a 2-element list of booleans indicating whether
- the object, version was invalidated during the load. The
- first element is oid, version; the second element is oid, "".
+ def _decLoadStatus(self, oid):
+ """Decrement load count.
+ Return boolean indicating whether this was the last load and
+ a list of versions to invalidate. The list is empty unless
+ the boolean is True.
+
Caller must hold self._lock.
"""
- statii = []
- pairs = (oid, version), (oid, "")
- for pair in pairs:
- count, status = self._loading_oids_status[pair]
- statii.append(status)
- count -= 1
- if count:
- self._loading_oids_status[pair] = count, status
- else:
- del self._loading_oids_status[pair]
- return statii
+ count = self._loading_oid_count[oid]
+ count -= 1
+ if count:
+ self._loading_oid_count[oid] = count
+ return 0, []
+ else:
+ del self._loading_oid_count[oid]
+ d = self._loading_oid_invs[oid]
+ del self._loading_oid_invs[oid]
+ return 1, d.keys()
def modifiedInVersion(self, oid):
"""Storage API: return the version, if any, that modfied an object.
@@ -997,17 +992,14 @@
d = versions.setdefault(version, {})
d[oid] = 1
- # Set invalidation flag for this (oid, version) pair.
- stuff = self._loading_oids_status.get(pair)
- if stuff:
- # load() is waiting for this. Keep the load count
- # the same, but clear the "no invalidations seen"
- # flag. This trick is necessary because zrpc invokes
- # invalidateTrans() immediately when an invalidation
- # arrives, but may not deliver the result of a load
- # until later, even if the load request preceded
- # the invalidation msg.
- self._loading_oids_status[pair] = stuff[0], 0
+ # Update the _loading_oids_invs dict for this oid,
+ # if necessary.
+ d = self._loading_oids_invs.get(oid)
+ if d is not None:
+ # load() is waiting for this. Mark the version
+ # as invalidated, so that load can invalidate it
+ # later.
+ d[version] = 1
else:
# load() isn't waiting for this. Simply invalidate it.
self._cache.invalidate(oid, version=version)