[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.110.2.2
Jeremy Hylton
cvs-admin at zope.org
Wed Nov 5 23:41:55 EST 2003
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv29646/ZEO
Modified Files:
Tag: ZODB3-mvcc-2-branch
ClientStorage.py
Log Message:
Use the new, partial cache implementation.
This changes leaves several failing tests, because the cache isn't
persistent. It's just stored in a few dicts in memory. There are
four ZEO tests that depend on the cache persistent, and they all fail:
checkBasicPersistence
checkRollover
checkQuickVerificationWith2Clients
checkVerificationWith2ClientsInvqOverflow
The new cache implementation requires that the cache verification
protocols pass tids in addition to oids. The new code does that,
except that it passes None for tid on the cache verification. I have
to make sure the implications of that are fully worked out.
=== ZODB3/ZEO/ClientStorage.py 1.110.2.1 => 1.110.2.2 ===
--- ZODB3/ZEO/ClientStorage.py:1.110.2.1 Thu Oct 9 16:27:25 2003
+++ ZODB3/ZEO/ClientStorage.py Wed Nov 5 23:41:54 2003
@@ -26,7 +26,8 @@
import time
import types
-from ZEO import ClientCache, ServerStub
+from ZEO import ServerStub
+from ZEO.cache import Cache
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.Exceptions import ClientStorageError, UnrecognizedResult, \
ClientDisconnected, AuthError
@@ -91,7 +92,7 @@
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
- ClientCacheClass = ClientCache.ClientCache
+ ClientCacheClass = Cache
ConnectionManagerClass = ConnectionManager
StorageServerStubClass = ServerStub.StorageServer
@@ -297,8 +298,7 @@
self._oid = '\0\0\0\0\0\0\0\0'
# Decide whether to use non-temporary files
- self._cache = self.ClientCacheClass(storage, cache_size,
- client=client, var=var)
+ self._cache = self.ClientCacheClass()
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll,
@@ -312,9 +312,6 @@
# doesn't succeed, call connect() to start a thread.
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
- # If the connect hasn't occurred, run with cached data.
- if not self._ready.isSet():
- self._cache.open()
def _wait(self, timeout=None):
if timeout is not None:
@@ -549,7 +546,6 @@
if ltid == last_inval_tid:
log2(INFO, "No verification necessary "
"(last_inval_tid up-to-date)")
- self._cache.open()
self._server = server
self._ready.set()
return "no verification"
@@ -563,7 +559,6 @@
pair = server.getInvalidations(last_inval_tid)
if pair is not None:
log2(INFO, "Recovering %d invalidations" % len(pair[1]))
- self._cache.open()
self.invalidateTransaction(*pair)
self._server = server
self._ready.set()
@@ -575,7 +570,9 @@
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
- self._cache.verify(server.zeoVerify)
+ # XXX should batch these operations for efficiency
+ for oid, version, serial in self._cache.contents():
+ server.verify(oid, version, serial)
self._pending_server = server
server.endZeoVerify()
return "full verification"
@@ -739,25 +736,19 @@
finally:
self._lock.release()
- p, s, v, pv, sv = self._server.zeoLoad(oid)
+ data, serial, tid = self._server.loadEx(oid, version)
self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
- self._cache.checkSize(0)
- self._cache.store(oid, p, s, v, pv, sv)
+ self._cache.store(oid, version, serial, tid, None, data)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
- if v and version and v == version:
- return pv, sv
- else:
- if s:
- return p, s
- raise KeyError, oid # no non-version data for this
+ return data, serial
def modifiedInVersion(self, oid):
"""Storage API: return the version, if any, that modfied an object.
@@ -917,7 +908,7 @@
self._lock.acquire() # for atomic processing of invalidations
try:
- self._update_cache()
+ self._update_cache(tid)
if f is not None:
f(tid)
finally:
@@ -930,7 +921,7 @@
self._load_lock.release()
self.end_transaction()
- def _update_cache(self):
+ def _update_cache(self, tid):
"""Internal helper to handle objects modified by a transaction.
This iterates over the objects in the transaction buffer and
@@ -943,7 +934,6 @@
if self._cache is None:
return
- self._cache.checkSize(self._tbuf.get_size())
try:
self._tbuf.begin_iterate()
except ValueError, msg:
@@ -959,15 +949,13 @@
"client storage: %s" % msg)
if t is None:
break
- oid, v, p = t
- if p is None: # an invalidation
- s = None
- else:
+ oid, version, data = t
+ self._cache.invalidate(oid, version, tid)
+ # If data is None, we just invalidate.
+ if data is not None:
s = self._seriald[oid]
- if s == ResolvedSerial or s is None:
- self._cache.invalidate(oid, v)
- else:
- self._cache.update(oid, s, v, p)
+ if s != ResolvedSerial:
+ self._cache.store(oid, version, s, tid, None, data)
self._tbuf.clear()
def transactionalUndo(self, trans_id, trans):
@@ -985,19 +973,6 @@
self._tbuf.invalidate(oid, '')
return oids
- def undo(self, transaction_id):
- """Storage API: undo a transaction, writing directly to the storage."""
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- oids = self._server.undo(transaction_id)
- self._lock.acquire()
- try:
- for oid in oids:
- self._cache.invalidate(oid, '')
- finally:
- self._lock.release()
- return oids
-
def undoInfo(self, first=0, last=-20, specification=None):
"""Storage API: return undo information."""
return self._server.undoInfo(first, last, specification)
@@ -1044,7 +1019,7 @@
return
self._pickler.dump(args)
- def _process_invalidations(self, tid, invs):
+ def _process_invalidations(self, invs):
# Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
@@ -1053,15 +1028,15 @@
try:
# versions maps version names to dictionary of invalidations
versions = {}
- for oid, version in invs:
+ for oid, version, tid in invs:
if oid == self._load_oid:
self._load_status = 0
- self._cache.invalidate(oid, version=version)
- versions.setdefault(version, {})[oid] = 1
+ self._cache.invalidate(oid, version, tid)
+ versions.setdefault((version, tid), {})[oid] = tid
if self._db is not None:
- for v, d in versions.items():
- self._db.invalidate(tid, d, version=v)
+ for (version, tid), d in versions.items():
+ self._db.invalidate(tid, d, version=version)
finally:
self._lock.release()
@@ -1093,7 +1068,8 @@
for t in args:
self._pickler.dump(t)
return
- self._process_invalidations(tid, args)
+ self._process_invalidations([(oid, version, tid)
+ for oid, version in args])
# The following are for compatibility with protocol version 2.0.0
@@ -1104,36 +1080,10 @@
end = endVerify
Invalidate = invalidateTrans
-try:
- StopIteration
-except NameError:
- class StopIteration(Exception):
- pass
-
-class InvalidationLogIterator:
- """Helper class for reading invalidations in endVerify."""
-
- def __init__(self, fileobj):
- self._unpickler = cPickle.Unpickler(fileobj)
- self.getitem_i = 0
-
- def __iter__(self):
- return self
-
- def next(self):
- oid, version = self._unpickler.load()
+def InvalidationLogIterator(fileobj):
+ unpickler = cPickle.Unpickler(fileobj)
+ while 1:
+ oid, version = unpickler.load()
if oid is None:
- raise StopIteration
- return oid, version
-
- # The __getitem__() method is needed to support iteration
- # in Python 2.1.
-
- def __getitem__(self, i):
- assert i == self.getitem_i
- try:
- obj = self.next()
- except StopIteration:
- raise IndexError, i
- self.getitem_i += 1
- return obj
+ break
+ yield oid, version, None
More information about the Zope-Checkins
mailing list