[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.10
ClientStorage.py:1.73.2.13
Jeremy Hylton
jeremy at zope.com
Tue Apr 29 18:39:58 EDT 2003
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv5133/ZEO
Modified Files:
Tag: ZODB3-3_1-branch
StorageServer.py ClientStorage.py
Log Message:
Backport assorted ZEO fixes.
Make isReadOnly() report the right answer with a read-only fallback
connection.
Don't allow client to read cache data during verification.
=== ZODB3/ZEO/StorageServer.py 1.74.2.9 => 1.74.2.10 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.9 Tue Jan 28 13:11:22 2003
+++ ZODB3/ZEO/StorageServer.py Tue Apr 29 17:39:56 2003
@@ -24,8 +24,8 @@
import cPickle
import os
import sys
-import time
import threading
+import time
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
@@ -383,6 +383,7 @@
self.client.invalidateVerify((oid, ''))
def endZeoVerify(self):
+ log("received endZeoVerify")
self.client.endVerify()
def pack(self, time, wait=1):
=== ZODB3/ZEO/ClientStorage.py 1.73.2.12 => 1.73.2.13 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.12 Wed Jan 29 14:39:24 2003
+++ ZODB3/ZEO/ClientStorage.py Tue Apr 29 17:39:56 2003
@@ -201,13 +201,40 @@
wait = 1
self._addr = addr # For tests
+
+ # A ZEO client can run in disconnected mode, using data from
+ # its cache, or in connected mode. Several instance variables
+ # are related to whether the client is connected.
+
+ # _server: All method calls are invoked through the server
+ # stub. When not connect, set to disconnected_stub an
+ # object that raises ClientDisconnected errors.
+
+ # _ready: A threading Event that is set only if _server
+ # is set to a real stub.
+
+ # _connection: The current zrpc connection or None.
+
+ # _connection is set as soon as a connection is established,
+ # but _server is set only after cache verification has finished
+ # and clients can safely use the server. _pending_server holds
+ # a server stub while it is being verified.
+
self._server = disconnected_stub
+ self._connection = None
+ self._pending_server = None
+ self._ready = threading.Event()
+
+ # _is_read_only stores the constructor argument
self._is_read_only = read_only
+ # _conn_is_read_only stores the status of the current connection
+ self._conn_is_read_only = 0
self._storage = storage
self._read_only_fallback = read_only_fallback
- self._connection = None
# _server_addr is used by sortKey()
self._server_addr = None
+ self._tfile = None
+ self._pickler = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
@@ -254,15 +281,47 @@
tmax=max_disconnect_poll)
if wait:
- self._rpc_mgr.connect(sync=1)
+ self._wait()
else:
+ # attempt_connect() will make an attempt that doesn't block
+ # "too long," for a very vague notion of too long. If that
+ # doesn't succeed, call connect() to start a thread.
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
+ # If the connect hasn't occurred, run with cached data.
+ if not self._ready.isSet():
+ self._cache.open()
+
+ def _wait(self):
+ # Wait for a connection to be established.
+ self._rpc_mgr.connect(sync=1)
+ # When a synchronous connect() call returns, there is
+ # a valid _connection object but cache validation may
+ # still be going on. This code must wait until validation
+ # finishes, but if the connection isn't a zrpc async
+ # connection it also needs to poll for input.
+ if self._connection.is_async():
+ while 1:
+ self._ready.wait(30)
+ if self._ready.isSet():
+ break
+ log2(INFO, "Wait for cache verification to finish")
+ else:
+ self._wait_sync()
- # If we're connected at this point, the cache is opened as a
- # side effect of verify_cache(). If not, open it now.
- if not self.is_connected():
- self._cache.open()
+ def _wait_sync(self):
+ # If there is no mainloop running, this code needs
+ # to call poll() to cause asyncore to handle events.
+ while 1:
+ if self._ready.isSet():
+ break
+ log2(INFO, "Wait for cache verification to finish")
+ if self._connection is None:
+ # If the connection was closed while we were
+ # waiting for it to become ready, start over.
+ return self._wait()
+ else:
+ self._connection.pending(30)
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
@@ -295,7 +354,13 @@
This is called by the sync method in ZODB.Connection.
"""
- self._server._update()
+ # If there is no connection, return immediately. Technically,
+ # there are no pending invalidations so they are all handled.
+ # There doesn't seem to be much benefit to raising an exception.
+
+ cn = self._connection
+ if cn is not None:
+ cn.pending()
def testConnection(self, conn):
"""Internal: test the given connection.
@@ -320,6 +385,7 @@
"""
log2(INFO, "Testing connection %r" % conn)
# XXX Check the protocol version here?
+ self._conn_is_read_only = 0
stub = self.StorageServerStubClass(conn)
try:
stub.register(str(self._storage), self._is_read_only)
@@ -329,6 +395,7 @@
raise
log2(INFO, "Got ReadOnlyError; trying again with read_only=1")
stub.register(str(self._storage), read_only=1)
+ self._conn_is_read_only = 1
return 0
def notifyConnected(self, conn):
@@ -337,28 +404,37 @@
This is called by ConnectionManager after it has decided which
connection should be used.
"""
+ if self._cache is None:
+ # the storage was closed, but the connect thread called
+ # this method before it was stopped.
+ return
+
# XXX would like to report whether we get a read-only connection
if self._connection is not None:
reconnect = 1
else:
reconnect = 0
- addr = conn.get_addr()
- self.set_server_addr(addr)
- stub = self.StorageServerStubClass(conn)
- self._oids = []
- self._info.update(stub.get_info())
- self.verify_cache(stub)
+ self.set_server_addr(conn.get_addr())
- # XXX The stub should be saved here and set in endVerify() below.
+ # If we are upgrading from a read-only fallback connection,
+ # we must close the old connection to prevent it from being
+ # used while the cache is verified against the new connection.
if self._connection is not None:
self._connection.close()
self._connection = conn
- self._server = stub
if reconnect:
- log2(INFO, "Reconnected to storage: %s" % repr(addr))
+ log2(INFO, "Reconnected to storage: %s" % self._server_addr)
else:
- log2(INFO, "Connected to storage: %s" % repr(addr))
+ log2(INFO, "Connected to storage: %s" % self._server_addr)
+
+ stub = self.StorageServerStubClass(conn)
+ self._oids = []
+ self._info.update(stub.get_info())
+ self.verify_cache(stub)
+ if not conn.is_async():
+ log2(INFO, "Waiting for cache verification to finish")
+ self._wait_sync()
def set_server_addr(self, addr):
# Normalize server address and convert to string
@@ -389,10 +465,15 @@
def verify_cache(self, server):
"""Internal routine called to verify the cache."""
- # XXX beginZeoVerify ends up calling back to beginVerify() below.
- # That whole exchange is rather unnecessary.
- server.beginZeoVerify()
+ log2(INFO, "Verifying cache")
+ # setup tempfile to hold zeoVerify results
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
self._cache.verify(server.zeoVerify)
+ self._pending_server = server
+ log2(INFO, "Calling endZeoVerify on server")
server.endZeoVerify()
### Is there a race condition between notifyConnected and
@@ -412,6 +493,7 @@
log2(PROBLEM, "Disconnected from storage: %s"
% repr(self._server_addr))
self._connection = None
+ self._ready.clear()
self._server = disconnected_stub
def __len__(self):
@@ -454,7 +536,13 @@
XXX In read-only fallback mode, this returns false, even if we
are currently connected to a read-only server.
"""
- return self._is_read_only
+ if self._is_read_only:
+ return 1
+ else:
+ # If the client is configured for a read-write connection
+ # but has a read-only fallback connection, _conn_is_read_only
+ # will be True.
+ return self._conn_is_read_only
def _check_trans(self, trans):
"""Internal helper to check a transaction argument for sanity."""
@@ -780,14 +868,6 @@
"""Server callback to update the info dictionary."""
self._info.update(dict)
- def beginVerify(self):
- """Server callback to signal start of cache validation."""
- log2(INFO, "begin cache verification")
- self._verify_start = time.time()
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
-
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
@@ -800,31 +880,33 @@
return
self._pickler.dump(args)
+ def _process_invalidations(self, invs):
+ # Invalidations are sent by the ZEO server as a sequence of
+ # oid, version pairs. The DB's invalidate() method expects a
+ # dictionary of oids.
+
+ for oid, version in invs:
+ self._cache.invalidate(oid, version=version)
+ self._db.invalidate(oid, version=version)
+
def endVerify(self):
"""Server callback to signal end of cache validation."""
if self._pickler is None:
- # XXX This should never happen
return
- self._pickler.dump((0,0))
+ # write end-of-data marker
+ self._pickler.dump((None, None))
+ self._pickler = None
self._tfile.seek(0)
- unpick = cPickle.Unpickler(self._tfile)
f = self._tfile
self._tfile = None
-
- ninval = 0
-
- while 1:
- oid, version = unpick.load()
- if not oid:
- break
- ninval += 1
- self._cache.invalidate(oid, version=version)
- self._db.invalidate(oid, version=version)
+ self._process_invalidations(InvalidationLogIterator(f))
f.close()
- elapsed = time.time() - self._verify_start
- log2(INFO, "end cache verification (%d invalidations, %.3g seconds)" %
- (ninval, elapsed))
+ log2(INFO, "endVerify finishing")
+ self._server = self._pending_server
+ self._ready.set()
+ self._pending_conn = None
+ log2(INFO, "endVerify finished")
def invalidateTrans(self, args):
"""Server callback to invalidate a list of (oid, version) pairs.
@@ -848,7 +930,40 @@
# don't want that. So here we alias the old names to their new
# implementations.
- begin = beginVerify
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
+
+try:
+ StopIteration
+except NameError:
+ class StopIteration(Exception):
+ pass
+
+class InvalidationLogIterator:
+ """Helper class for reading invalidations in endVerify."""
+
+ def __init__(self, fileobj):
+ self._unpickler = cPickle.Unpickler(fileobj)
+ self.getitem_i = 0
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ oid, version = self._unpickler.load()
+ if oid is None:
+ raise StopIteration
+ return oid, version
+
+ # The __getitem__() method is needed to support iteration
+ # in Python 2.1.
+
+ def __getitem__(self, i):
+ assert i == self.getitem_i
+ try:
+ obj = self.next()
+ except StopIteration:
+ raise IndexError, i
+ self.getitem_i += 1
+ return obj
More information about the Zodb-checkins
mailing list