[Zodb-checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.83 ServerStub.py:1.12
Jeremy Hylton
jeremy@zope.com
Tue, 14 Jan 2003 14:09:06 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv17853
Modified Files:
ClientStorage.py ServerStub.py
Log Message:
Prevent client from using stale cache data while connecting.
XXX Maybe there should be an option to allow this.
A ZEO client can run in disconnected mode, using data from
its cache, or in connected mode. Several instance variables
are related to whether the client is connected.
_server: All method calls are invoked through the server
stub. When not connect, set to disconnected_stub an
object that raises ClientDisconnected errors.
_ready: A threading Event that is set only if _server
is set to a real stub.
_connection: The current zrpc connection or None.
_connection is set as soon as a connection is established,
but _server is set only after cache verification has finished
and clients can safely use the server. _pending_server holds
a server stub while it is being verified.
Before this change, a client could start using a connection before
verification finished. If verification took a long time, it could
even commit a new transaction using a mixing of old and new data.
=== ZODB3/ZEO/ClientStorage.py 1.82 => 1.83 ===
--- ZODB3/ZEO/ClientStorage.py:1.82 Tue Jan 7 17:34:06 2003
+++ ZODB3/ZEO/ClientStorage.py Tue Jan 14 14:08:33 2003
@@ -202,11 +202,33 @@
wait = 1
self._addr = addr # For tests
+
+ # A ZEO client can run in disconnected mode, using data from
+ # its cache, or in connected mode. Several instance variables
+ # are related to whether the client is connected.
+
+ # _server: All method calls are invoked through the server
+ # stub. When not connect, set to disconnected_stub an
+ # object that raises ClientDisconnected errors.
+
+ # _ready: A threading Event that is set only if _server
+ # is set to a real stub.
+
+ # _connection: The current zrpc connection or None.
+
+ # _connection is set as soon as a connection is established,
+ # but _server is set only after cache verification has finished
+ # and clients can safely use the server. _pending_server holds
+ # a server stub while it is being verified.
+
self._server = disconnected_stub
+ self._connection = None
+ self._pending_server = None
+ self._ready = threading.Event()
+
self._is_read_only = read_only
self._storage = storage
self._read_only_fallback = read_only_fallback
- self._connection = None
# _server_addr is used by sortKey()
self._server_addr = None
self._tfile = None
@@ -257,15 +279,45 @@
tmax=max_disconnect_poll)
if wait:
- self._rpc_mgr.connect(sync=1)
+ self._wait()
else:
+ # attempt_connect() will make an attempt that doesn't block
+ # "too long," for a very vague notion of too long. If that
+ # doesn't succeed, call connect() to start a thread.
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
+ # If the connect hasn't occurred, run with cached data.
+ if not self._ready.isSet():
+ self._cache.open()
- # If we're connected at this point, the cache is opened as a
- # side effect of verify_cache(). If not, open it now.
- if not self.is_connected():
- self._cache.open()
+ def _wait(self):
+ # Wait for a connection to be established.
+ self._rpc_mgr.connect(sync=1)
+ # When a synchronous connect() call returns, there is
+ # a valid _connection object but cache validation may
+ # still be going on. This code must wait until validation
+ # finishes, but if the connection isn't a zrpc async
+ # connection it also needs to poll for input.
+ if self._connection.is_async():
+ while 1:
+ self._ready.wait(30)
+ if self._ready.isSet():
+ break
+ log2(INFO, "Waiting to connect to server")
+ else:
+ # If there is no mainloop running, this code needs
+ # to call poll() to cause asyncore to handle events.
+ while 1:
+ cn = self._connection
+ if cn is None:
+ # If the connection was closed while we were
+ # waiting for it to become ready, start over.
+ return self._wait()
+ else:
+ cn.pending(30)
+ if self._ready.isSet():
+ break
+ log2(INFO, "Waiting to connect to server")
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
@@ -288,17 +340,22 @@
def is_connected(self):
"""Return whether the storage is currently connected to a server."""
- if self._server is disconnected_stub:
- return 0
- else:
- return 1
+ # This function is used by clients, so we only report that a
+ # connection exists when the connection is ready to use.
+ return self._ready.isSet()
def sync(self):
"""Handle any pending invalidation messages.
This is called by the sync method in ZODB.Connection.
"""
- self._server._update()
+ # If there is no connection, return immediately. Technically,
+ # there are no pending invalidations so they are all handled.
+ # There doesn't seem to be much benefit to raising an exception.
+
+ cn = self._connection
+ if cn is not None:
+ cn.pending()
def testConnection(self, conn):
"""Internal: test the given connection.
@@ -346,23 +403,24 @@
else:
reconnect = 0
self.set_server_addr(conn.get_addr())
- stub = self.StorageServerStubClass(conn)
- stub = self.StorageServerStubClass(conn)
- self._oids = []
- self._info.update(stub.get_info())
- self.verify_cache(stub)
- # XXX The stub should be saved here and set in endVerify() below.
+ # If we are upgrading from a read-only fallback connection,
+ # we must close the old connection to prevent it from being
+ # used while the cache is verified against the new connection.
if self._connection is not None:
self._connection.close()
self._connection = conn
- self._server = stub
if reconnect:
log2(INFO, "Reconnected to storage: %s" % self._server_addr)
else:
log2(INFO, "Connected to storage: %s" % self._server_addr)
+ stub = self.StorageServerStubClass(conn)
+ self._oids = []
+ self._info.update(stub.get_info())
+ self.verify_cache(stub)
+
def set_server_addr(self, addr):
# Normalize server address and convert to string
if isinstance(addr, types.StringType):
@@ -396,6 +454,11 @@
The return value (indicating which path we took) is used by
the test suite.
"""
+
+ # If verify_cache() finishes the cache verification process,
+ # it should set self._server. If it goes through full cache
+ # verification, then endVerify() should self._server.
+
last_inval_tid = self._cache.getLastTid()
if last_inval_tid is not None:
ltid = server.lastTransaction()
@@ -403,10 +466,12 @@
log2(INFO, "No verification necessary "
"(last_inval_tid up-to-date)")
self._cache.open()
+ self._server = server
+ self._ready.set()
return "no verification"
# log some hints about last transaction
- log2(INFO, "last inval tid: %r %s"
+ log2(INFO, "last inval tid: %r %s\n"
% (last_inval_tid, tid2time(last_inval_tid)))
log2(INFO, "last transaction: %r %s" %
(ltid, ltid and tid2time(ltid)))
@@ -416,6 +481,8 @@
log2(INFO, "Recovering %d invalidations" % len(pair[1]))
self._cache.open()
self.invalidateTransaction(*pair)
+ self._server = server
+ self._ready.set()
return "quick verification"
log2(INFO, "Verifying cache")
@@ -425,6 +492,7 @@
self._pickler.fast = 1 # Don't use the memo
self._cache.verify(server.zeoVerify)
+ self._pending_server = server
server.endZeoVerify()
return "full verification"
@@ -445,6 +513,7 @@
log2(PROBLEM, "Disconnected from storage: %s"
% repr(self._server_addr))
self._connection = None
+ self._ready.clear()
self._server = disconnected_stub
def __len__(self):
@@ -847,11 +916,19 @@
while 1:
oid, version = unpick.load()
+ log2(INFO, "verify invalidate %r" % oid)
if not oid:
break
self._cache.invalidate(oid, version=version)
- self._db.invalidate(oid, version=version)
+ if self._db is not None:
+ self._db.invalidate(oid, version=version)
f.close()
+
+ log2(INFO, "endVerify finishing")
+ self._server = self._pending_server
+ self._ready.set()
+ self._pending_conn = None
+ log2(INFO, "endVerify finished")
def invalidateTransaction(self, tid, args):
"""Invalidate objects modified by tid."""
=== ZODB3/ZEO/ServerStub.py 1.11 => 1.12 ===
--- ZODB3/ZEO/ServerStub.py:1.11 Fri Jan 3 17:07:38 2003
+++ ZODB3/ZEO/ServerStub.py Tue Jan 14 14:08:33 2003
@@ -39,15 +39,6 @@
def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call
- def _update(self):
- """Handle pending incoming messages.
-
- This method is typically only used when no asyncore mainloop
- is already active. It can cause arbitrary callbacks from the
- server to the client to be handled.
- """
- self.rpc.pending()
-
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)