[Zope3-checkins] CVS: ZODB4/src/zodb/zeo - client.py:1.12
Jeremy Hylton
jeremy@zope.com
Thu, 22 May 2003 11:28:18 -0400
Update of /cvs-repository/ZODB4/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv17016/zeo
Modified Files:
client.py
Log Message:
Port sundry improvements from ZODB 3.2; fix invalidation.
isReadOnly() returns True for a read-only fallback connection.
Use new atomic invalidation API.
=== ZODB4/src/zodb/zeo/client.py 1.11 => 1.12 ===
--- ZODB4/src/zodb/zeo/client.py:1.11 Mon May 19 11:02:51 2003
+++ ZODB4/src/zodb/zeo/client.py Thu May 22 11:28:17 2003
@@ -196,7 +196,10 @@
self._pending_server = None
self._ready = threading.Event()
+ # _is_read_only stores the constructor argument
self._is_read_only = read_only
+ # _conn_is_read_only stores the status of the current connection
+ self._conn_is_read_only = 0
self._storage = storage
self._read_only_fallback = read_only_fallback
# _server_addr is used by sortKey()
@@ -247,12 +250,6 @@
tmax=max_disconnect_poll)
if wait:
- self._rpc_mgr.connect(sync=1)
- else:
- if not self._rpc_mgr.attempt_connect():
- self._rpc_mgr.connect()
-
- if wait:
self._wait()
else:
# attempt_connect() will make an attempt that doesn't block
@@ -279,18 +276,21 @@
break
self.logger.warn("Wait for cache verification to finish")
else:
- # If there is no mainloop running, this code needs
- # to call poll() to cause asyncore to handle events.
- while 1:
- if self._ready.isSet():
- break
- self.logger.warn("Wait for cache verification to finish")
- if self._connection is None:
- # If the connection was closed while we were
- # waiting for it to become ready, start over.
- return self._wait()
- else:
- self._connection.pending(30)
+ self._wait_sync()
+
+ def _wait_sync(self):
+ # If there is no mainloop running, this code needs
+ # to call poll() to cause asyncore to handle events.
+ while 1:
+ if self._ready.isSet():
+ break
+ self.logger.warn("Wait for cache verification to finish")
+ if self._connection is None:
+ # If the connection was closed while we were
+ # waiting for it to become ready, start over.
+ return self._wait()
+ else:
+ self._connection.pending(30)
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
@@ -353,6 +353,7 @@
"""
self.logger.warn("Testing connection %r", conn)
# XXX Check the protocol version here?
+ self._conn_is_read_only = 0
stub = self.StorageServerStubClass(conn)
try:
stub.register(str(self._storage), self._is_read_only)
@@ -363,6 +364,7 @@
self.logger.warn(
"Got ReadOnlyError; trying again with read_only=1")
stub.register(str(self._storage), read_only=1)
+ self._conn_is_read_only = 1
return 0
def notifyConnected(self, conn):
@@ -401,6 +403,9 @@
self._info.update(stub.get_info())
self.update_interfaces()
self.verify_cache(stub)
+ if not conn.is_async():
+ self.logger.warn("Waiting for cache verification to finish")
+ self._wait_sync()
def update_interfaces(self):
# Update instance's __implements__ based on the server.
@@ -532,13 +537,15 @@
return self._info['extensionMethods']
def isReadOnly(self):
- """Storage API: return whether we are in read-only mode.
-
- XXX In read-only fallback mode, this returns false, even if we
- are currently connected to a read-only server.
- """
- return self._is_read_only
-
+ """Storage API: return whether we are in read-only mode."""
+ if self._is_read_only:
+ return 1
+ else:
+ # If the client is configured for a read-write connection
+ # but has a read-only fallback connection, _conn_is_read_only
+ # will be True.
+ return self._conn_is_read_only
+
# XXX version should really be part of _info
def getVersion(self):
@@ -880,44 +887,63 @@
return
self._pickler.dump(args)
+ def _process_invalidations(self, invs):
+ # Invalidations are sent by the ZEO server as a sequence of
+ # oid, version pairs. The DB's invalidate() method expects a
+ # dictionary of oids.
+
+ # versions maps version names to dictionary of invalidations
+ versions = {}
+ for oid, version in invs:
+ d = versions.setdefault(version, {})
+ self._cache.invalidate(oid, version=version)
+ d[oid] = 1
+ if self._db is not None:
+ for v, d in versions.items():
+ self._db.invalidate(d, version=v)
+
def endVerify(self):
"""Server callback to signal end of cache validation."""
if self._pickler is None:
return
- self._pickler.dump((0,0))
+ # write end-of-data marker
+ self._pickler.dump((None, None))
self._pickler = None
self._tfile.seek(0)
- unpick = cPickle.Unpickler(self._tfile)
f = self._tfile
self._tfile = None
-
- while 1:
- oid, version = unpick.load()
- self.logger.debug("verify invalidate %r", oid)
- if not oid:
- break
- self._cache.invalidate(oid, version=version)
- if self._db is not None:
- self._db.invalidate(oid, version=version)
+ self._process_invalidations(InvalidationLogIterator(f))
f.close()
+ self.logger.info("endVerify finishing")
self._server = self._pending_server
self._ready.set()
self._pending_conn = None
- self.logger.debug("verification finished")
+ self.logger.info("endVerify finished")
def invalidateTransaction(self, tid, args):
"""Invalidate objects modified by tid."""
self._cache.setLastTid(tid)
if self._pickler is not None:
- self.logger.info("Transactional invalidation "
- "during cache verification")
+ self.logger.debug(
+ "Transactional invalidation during cache verification")
for t in args:
self.self._pickler.dump(t)
return
- db = self._db
- for oid, version in args:
- self._cache.invalidate(oid, version=version)
- if db is not None:
- db.invalidate(oid, version=version)
+ self._process_invalidations(args)
+
+class InvalidationLogIterator:
+ """Helper class for reading invalidations in endVerify."""
+
+ def __init__(self, fileobj):
+ self._unpickler = cPickle.Unpickler(fileobj)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ oid, version = self._unpickler.load()
+ if oid is None:
+ raise StopIteration
+ return oid, version