[Zope3-checkins] CVS: ZODB4/src/zodb/zeo - client.py:1.12

Jeremy Hylton jeremy@zope.com
Thu, 22 May 2003 11:28:18 -0400


Update of /cvs-repository/ZODB4/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv17016/zeo

Modified Files:
	client.py 
Log Message:
Port sundry improvements from ZODB 3.2; fix invalidation.

isReadOnly() returns True for a read-only fallback connection.
Use new atomic invalidation API.


=== ZODB4/src/zodb/zeo/client.py 1.11 => 1.12 ===
--- ZODB4/src/zodb/zeo/client.py:1.11	Mon May 19 11:02:51 2003
+++ ZODB4/src/zodb/zeo/client.py	Thu May 22 11:28:17 2003
@@ -196,7 +196,10 @@
         self._pending_server = None
         self._ready = threading.Event()
 
+        # _is_read_only stores the constructor argument
         self._is_read_only = read_only
+        # _conn_is_read_only stores the status of the current connection
+        self._conn_is_read_only = 0
         self._storage = storage
         self._read_only_fallback = read_only_fallback
         # _server_addr is used by sortKey()
@@ -247,12 +250,6 @@
                                                     tmax=max_disconnect_poll)
 
         if wait:
-            self._rpc_mgr.connect(sync=1)
-        else:
-            if not self._rpc_mgr.attempt_connect():
-                self._rpc_mgr.connect()
-
-        if wait:
             self._wait()
         else:
             # attempt_connect() will make an attempt that doesn't block
@@ -279,18 +276,21 @@
                     break
                 self.logger.warn("Wait for cache verification to finish")
         else:
-            # If there is no mainloop running, this code needs
-            # to call poll() to cause asyncore to handle events.
-            while 1:
-                if self._ready.isSet():
-                    break
-                self.logger.warn("Wait for cache verification to finish")
-                if self._connection is None:
-                    # If the connection was closed while we were
-                    # waiting for it to become ready, start over.
-                    return self._wait()
-                else:
-                    self._connection.pending(30)
+            self._wait_sync()
+
+    def _wait_sync(self):
+        # If there is no mainloop running, this code needs
+        # to call poll() to cause asyncore to handle events.
+        while 1:
+            if self._ready.isSet():
+                break
+            self.logger.warn("Wait for cache verification to finish")
+            if self._connection is None:
+                # If the connection was closed while we were
+                # waiting for it to become ready, start over.
+                return self._wait()
+            else:
+                self._connection.pending(30)
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -353,6 +353,7 @@
         """
         self.logger.warn("Testing connection %r", conn)
         # XXX Check the protocol version here?
+        self._conn_is_read_only = 0
         stub = self.StorageServerStubClass(conn)
         try:
             stub.register(str(self._storage), self._is_read_only)
@@ -363,6 +364,7 @@
             self.logger.warn(
                 "Got ReadOnlyError; trying again with read_only=1")
             stub.register(str(self._storage), read_only=1)
+            self._conn_is_read_only = 1
             return 0
 
     def notifyConnected(self, conn):
@@ -401,6 +403,9 @@
         self._info.update(stub.get_info())
         self.update_interfaces()
         self.verify_cache(stub)
+        if not conn.is_async():
+            self.logger.warn("Waiting for cache verification to finish")
+            self._wait_sync()
 
     def update_interfaces(self):
         # Update instance's __implements__ based on the server.
@@ -532,13 +537,15 @@
         return self._info['extensionMethods']
 
     def isReadOnly(self):
-        """Storage API: return whether we are in read-only mode.
-
-        XXX In read-only fallback mode, this returns false, even if we
-        are currently connected to a read-only server.
-        """
-        return self._is_read_only
-
+        """Storage API: return whether we are in read-only mode."""
+        if self._is_read_only:
+            return 1
+        else:
+            # If the client is configured for a read-write connection
+            # but has a read-only fallback connection, _conn_is_read_only
+            # will be True.
+            return self._conn_is_read_only
+        
     # XXX version should really be part of _info
 
     def getVersion(self):
@@ -880,44 +887,63 @@
             return
         self._pickler.dump(args)
 
+    def _process_invalidations(self, invs):
+        # Invalidations are sent by the ZEO server as a sequence of
+        # oid, version pairs.  The DB's invalidate() method expects a
+        # dictionary of oids.
+        
+        # versions maps version names to dictionary of invalidations
+        versions = {}
+        for oid, version in invs:
+            d = versions.setdefault(version, {})
+            self._cache.invalidate(oid, version=version)
+            d[oid] = 1
+        if self._db is not None:
+            for v, d in versions.items():
+                self._db.invalidate(d, version=v)
+                
     def endVerify(self):
         """Server callback to signal end of cache validation."""
         if self._pickler is None:
             return
-        self._pickler.dump((0,0))
+        # write end-of-data marker
+        self._pickler.dump((None, None))
         self._pickler = None
         self._tfile.seek(0)
-        unpick = cPickle.Unpickler(self._tfile)
         f = self._tfile
         self._tfile = None
-
-        while 1:
-            oid, version = unpick.load()
-            self.logger.debug("verify invalidate %r", oid)
-            if not oid:
-                break
-            self._cache.invalidate(oid, version=version)
-            if self._db is not None:
-                self._db.invalidate(oid, version=version)
+        self._process_invalidations(InvalidationLogIterator(f))
         f.close()
 
+        self.logger.info("endVerify finishing")
         self._server = self._pending_server
         self._ready.set()
         self._pending_conn = None
-        self.logger.debug("verification finished")
+        self.logger.info("endVerify finished")
 
     def invalidateTransaction(self, tid, args):
         """Invalidate objects modified by tid."""
         self._cache.setLastTid(tid)
         if self._pickler is not None:
-            self.logger.info("Transactional invalidation "
-                             "during cache verification")
+            self.logger.debug(
+                "Transactional invalidation during cache verification")
             for t in args:
                 self.self._pickler.dump(t)
             return
-        db = self._db
-        for oid, version in args:
-            self._cache.invalidate(oid, version=version)
-            if db is not None:
-                db.invalidate(oid, version=version)
+        self._process_invalidations(args)
+
+class InvalidationLogIterator:
+    """Helper class for reading invalidations in endVerify."""
+
+    def __init__(self, fileobj):
+        self._unpickler = cPickle.Unpickler(fileobj)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        oid, version = self._unpickler.load()
+        if oid is None:
+            raise StopIteration
+        return oid, version