[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.110.2.2

Jeremy Hylton cvs-admin at zope.org
Wed Nov 5 23:41:55 EST 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv29646/ZEO

Modified Files:
      Tag: ZODB3-mvcc-2-branch
	ClientStorage.py 
Log Message:
Use the new, partial cache implementation.

This changes leaves several failing tests, because the cache isn't
persistent.  It's just stored in a few dicts in memory.  There are
four ZEO tests that depend on the cache persistent, and they all fail:
    checkBasicPersistence
    checkRollover
    checkQuickVerificationWith2Clients
    checkVerificationWith2ClientsInvqOverflow

The new cache implementation requires that the cache verification
protocols pass tids in addition to oids.  The new code does that,
except that it passes None for tid on the cache verification.  I have
to make sure the implications of that are fully worked out.


=== ZODB3/ZEO/ClientStorage.py 1.110.2.1 => 1.110.2.2 ===
--- ZODB3/ZEO/ClientStorage.py:1.110.2.1	Thu Oct  9 16:27:25 2003
+++ ZODB3/ZEO/ClientStorage.py	Wed Nov  5 23:41:54 2003
@@ -26,7 +26,8 @@
 import time
 import types
 
-from ZEO import ClientCache, ServerStub
+from ZEO import ServerStub
+from ZEO.cache import Cache
 from ZEO.TransactionBuffer import TransactionBuffer
 from ZEO.Exceptions import ClientStorageError, UnrecognizedResult, \
      ClientDisconnected, AuthError
@@ -91,7 +92,7 @@
     # Classes we instantiate.  A subclass might override.
 
     TransactionBufferClass = TransactionBuffer
-    ClientCacheClass = ClientCache.ClientCache
+    ClientCacheClass = Cache
     ConnectionManagerClass = ConnectionManager
     StorageServerStubClass = ServerStub.StorageServer
 
@@ -297,8 +298,7 @@
         self._oid = '\0\0\0\0\0\0\0\0'
 
         # Decide whether to use non-temporary files
-        self._cache = self.ClientCacheClass(storage, cache_size,
-                                            client=client, var=var)
+        self._cache = self.ClientCacheClass()
 
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,
@@ -312,9 +312,6 @@
             # doesn't succeed, call connect() to start a thread.
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
-            # If the connect hasn't occurred, run with cached data.
-            if not self._ready.isSet():
-                self._cache.open()
 
     def _wait(self, timeout=None):
         if timeout is not None:
@@ -549,7 +546,6 @@
             if ltid == last_inval_tid:
                 log2(INFO, "No verification necessary "
                      "(last_inval_tid up-to-date)")
-                self._cache.open()
                 self._server = server
                 self._ready.set()
                 return "no verification"
@@ -563,7 +559,6 @@
             pair = server.getInvalidations(last_inval_tid)
             if pair is not None:
                 log2(INFO, "Recovering %d invalidations" % len(pair[1]))
-                self._cache.open()
                 self.invalidateTransaction(*pair)
                 self._server = server
                 self._ready.set()
@@ -575,7 +570,9 @@
         self._pickler = cPickle.Pickler(self._tfile, 1)
         self._pickler.fast = 1 # Don't use the memo
 
-        self._cache.verify(server.zeoVerify)
+        # XXX should batch these operations for efficiency
+        for oid, version, serial in self._cache.contents():
+            server.verify(oid, version, serial)
         self._pending_server = server
         server.endZeoVerify()
         return "full verification"
@@ -739,25 +736,19 @@
             finally:
                 self._lock.release()
 
-            p, s, v, pv, sv = self._server.zeoLoad(oid)
+            data, serial, tid = self._server.loadEx(oid, version)
 
             self._lock.acquire()    # for atomic processing of invalidations
             try:
                 if self._load_status:
-                    self._cache.checkSize(0)
-                    self._cache.store(oid, p, s, v, pv, sv)
+                    self._cache.store(oid, version, serial, tid, None, data)
                 self._load_oid = None
             finally:
                 self._lock.release()
         finally:
             self._load_lock.release()
 
-        if v and version and v == version:
-            return pv, sv
-        else:
-            if s:
-                return p, s
-            raise KeyError, oid # no non-version data for this
+        return data, serial
 
     def modifiedInVersion(self, oid):
         """Storage API: return the version, if any, that modfied an object.
@@ -917,7 +908,7 @@
 
             self._lock.acquire()  # for atomic processing of invalidations
             try:
-                self._update_cache()
+                self._update_cache(tid)
                 if f is not None:
                     f(tid)
             finally:
@@ -930,7 +921,7 @@
             self._load_lock.release()
             self.end_transaction()
 
-    def _update_cache(self):
+    def _update_cache(self, tid):
         """Internal helper to handle objects modified by a transaction.
 
         This iterates over the objects in the transaction buffer and
@@ -943,7 +934,6 @@
         if self._cache is None:
             return
 
-        self._cache.checkSize(self._tbuf.get_size())
         try:
             self._tbuf.begin_iterate()
         except ValueError, msg:
@@ -959,15 +949,13 @@
                     "client storage: %s" % msg)
             if t is None:
                 break
-            oid, v, p = t
-            if p is None: # an invalidation
-                s = None
-            else:
+            oid, version, data = t
+            self._cache.invalidate(oid, version, tid)
+            # If data is None, we just invalidate.
+            if data is not None:
                 s = self._seriald[oid]
-            if s == ResolvedSerial or s is None:
-                self._cache.invalidate(oid, v)
-            else:
-                self._cache.update(oid, s, v, p)
+                if s != ResolvedSerial:
+                    self._cache.store(oid, version, s, tid, None, data)
         self._tbuf.clear()
 
     def transactionalUndo(self, trans_id, trans):
@@ -985,19 +973,6 @@
             self._tbuf.invalidate(oid, '')
         return oids
 
-    def undo(self, transaction_id):
-        """Storage API: undo a transaction, writing directly to the storage."""
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        oids = self._server.undo(transaction_id)
-        self._lock.acquire()
-        try:
-            for oid in oids:
-                self._cache.invalidate(oid, '')
-        finally:
-            self._lock.release()
-        return oids
-
     def undoInfo(self, first=0, last=-20, specification=None):
         """Storage API: return undo information."""
         return self._server.undoInfo(first, last, specification)
@@ -1044,7 +1019,7 @@
             return
         self._pickler.dump(args)
 
-    def _process_invalidations(self, tid, invs):
+    def _process_invalidations(self, invs):
         # Invalidations are sent by the ZEO server as a sequence of
         # oid, version pairs.  The DB's invalidate() method expects a
         # dictionary of oids.
@@ -1053,15 +1028,15 @@
         try:
             # versions maps version names to dictionary of invalidations
             versions = {}
-            for oid, version in invs:
+            for oid, version, tid in invs:
                 if oid == self._load_oid:
                     self._load_status = 0
-                self._cache.invalidate(oid, version=version)
-                versions.setdefault(version, {})[oid] = 1
+                self._cache.invalidate(oid, version, tid)
+                versions.setdefault((version, tid), {})[oid] = tid
 
             if self._db is not None:
-                for v, d in versions.items():
-                    self._db.invalidate(tid, d, version=v)
+                for (version, tid), d in versions.items():
+                    self._db.invalidate(tid, d, version=version)
         finally:
             self._lock.release()
 
@@ -1093,7 +1068,8 @@
             for t in args:
                 self._pickler.dump(t)
             return
-        self._process_invalidations(tid, args)
+        self._process_invalidations([(oid, version, tid)
+                                     for oid, version in args])
 
     # The following are for compatibility with protocol version 2.0.0
 
@@ -1104,36 +1080,10 @@
     end = endVerify
     Invalidate = invalidateTrans
 
-try:
-    StopIteration
-except NameError:
-    class StopIteration(Exception):
-        pass
-
-class InvalidationLogIterator:
-    """Helper class for reading invalidations in endVerify."""
-
-    def __init__(self, fileobj):
-        self._unpickler = cPickle.Unpickler(fileobj)
-        self.getitem_i = 0
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        oid, version = self._unpickler.load()
+def InvalidationLogIterator(fileobj):
+    unpickler = cPickle.Unpickler(fileobj)
+    while 1:
+        oid, version = unpickler.load()
         if oid is None:
-            raise StopIteration
-        return oid, version
-
-    # The __getitem__() method is needed to support iteration
-    # in Python 2.1.
-
-    def __getitem__(self, i):
-        assert i == self.getitem_i
-        try:
-            obj = self.next()
-        except StopIteration:
-            raise IndexError, i
-        self.getitem_i += 1
-        return obj
+            break
+        yield oid, version, None




More information about the Zope-Checkins mailing list