[Zope3-checkins] CVS: Zope3/src/zodb/zeo - monitor.py:1.3.2.1 stubs.py:1.4.4.1 server.py:1.6.4.3 interfaces.py:1.2.12.1 client.py:1.4.4.3 cache.py:1.2.12.3

Jeremy Hylton jeremy@zope.com
Wed, 12 Mar 2003 16:41:34 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv29357/zeo

Modified Files:
      Tag: opaque-pickles-branch
	stubs.py server.py interfaces.py client.py cache.py 
Added Files:
      Tag: opaque-pickles-branch
	monitor.py 
Log Message:
Update from trunk.

Resolve a few import conflicts.


=== Added File Zope3/src/zodb/zeo/monitor.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.

$Id: monitor.py,v 1.3.2.1 2003/03/12 21:41:25 jeremy Exp $
"""

import asyncore
import socket
import time
import types

import zodb.zeo

class StorageStats:
    """Per-storage usage statistics."""

    def __init__(self):
        self.loads = 0
        self.stores = 0
        self.commits = 0
        self.aborts = 0
        self.active_txns = 0
        self.clients = 0
        self.verifying_clients = 0
        self.lock_time = None
        self.conflicts = 0
        self.conflicts_resolved = 0
        self.start = time.ctime()

    def parse(self, s):
        # parse the dump format
        lines = s.split("\n")
        for line in lines:
            field, value = line.split(":", 1)
            if field == "Server started":
                self.start = value
            elif field == "Clients":
                self.clients = int(value)
            elif field == "Clients verifying":
                self.verifying_clients = int(value)
            elif field == "Active transactions":
                self.active_txns = int(value)
            elif field == "Commit lock held for":
                # This assumes 
                self.lock_time = time.time() - int(value)
            elif field == "Commits":
                self.commits = int(value)
            elif field == "Aborts":
                self.aborts = int(value)
            elif field == "Loads":
                self.loads = int(value)
            elif field == "Stores":
                self.stores = int(value)
            elif field == "Conflicts":
                self.conflicts = int(value)
            elif field == "Conflicts resolved":
                self.conflicts_resolved = int(value)

    def dump(self, f):
        print >> f, "Server started:", self.start
        print >> f, "Clients:", self.clients
        print >> f, "Clients verifying:", self.verifying_clients
        print >> f, "Active transactions:", self.active_txns
        if self.lock_time:
            howlong = time.time() - self.lock_time
            print >> f, "Commit lock held for:", int(howlong)
        print >> f, "Commits:", self.commits
        print >> f, "Aborts:", self.aborts
        print >> f, "Loads:", self.loads
        print >> f, "Stores:", self.stores
        print >> f, "Conflicts:", self.conflicts
        print >> f, "Conflicts resolved:", self.conflicts_resolved

class StatsClient(asyncore.dispatcher):

    def __init__(self, sock, addr):
        asyncore.dispatcher.__init__(self, sock)
        self.buf = []
        self.closed = 0

    def close(self):
        self.closed = 1
        # The socket is closed after all the data is written.
        # See handle_write().

    def write(self, s):
        self.buf.append(s)

    def writable(self):
        return len(self.buf)

    def readable(self):
        # XXX what goes here?
        return 0

    def handle_write(self):
        s = "".join(self.buf)
        self.buf = []
        n = self.socket.send(s)
        if n < len(s):
            self.buf.append(s[:n])
            
        if self.closed and not self.buf:
            asyncore.dispatcher.close(self)

class StatsServer(asyncore.dispatcher):

    StatsConnectionClass = StatsClient

    def __init__(self, addr, stats):
        asyncore.dispatcher.__init__(self)
        self.addr = addr
        self.stats = stats
        if type(self.addr) == types.TupleType:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.set_reuse_addr()
##        zLOG.LOG("ZSM", zLOG.INFO, "monitor listening on %s" % repr(self.addr))
        self.bind(self.addr)
        self.listen(5)

    def writable(self):
        return 0

    def readable(self):
        return 1

    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error:
            return
        f = self.StatsConnectionClass(sock, addr)
        self.dump(f)
        f.close()

    def dump(self, f):
        print >> f, "ZEO monitor server version %s" % zodb.zeo.version
        print >> f, time.ctime()
        print >> f

        L = self.stats.keys()
        L.sort()
        for k in L:
            stats = self.stats[k]
            print >> f, "Storage:", k
            stats.dump(f)
            print >> f


=== Zope3/src/zodb/zeo/stubs.py 1.4 => 1.4.4.1 ===
--- Zope3/src/zodb/zeo/stubs.py:1.4	Wed Feb  5 18:28:24 2003
+++ Zope3/src/zodb/zeo/stubs.py	Wed Mar 12 16:41:25 2003
@@ -13,7 +13,7 @@
 ##############################################################################
 """RPC stubs for client and server interfaces."""
 
-class ClientStorage:
+class ClientStorageStub:
     """An RPC stub class for the interface exported by ClientStorage.
 
     This is the interface presented by ClientStorage to the
@@ -43,16 +43,16 @@
         self.rpc = rpc
 
     def beginVerify(self):
-        self.rpc.callAsync('begin')
+        self.rpc.callAsync('beginVerify')
 
     def invalidateVerify(self, args):
-        self.rpc.callAsync('invalidate', args)
+        self.rpc.callAsync('invalidateVerify', args)
 
     def endVerify(self):
-        self.rpc.callAsync('end')
+        self.rpc.callAsync('endVerify')
 
-    def invalidateTrans(self, args):
-        self.rpc.callAsync('Invalidate', args)
+    def invalidateTransaction(self, tid, invlist):
+        self.rpc.callAsync('invalidateTransaction', tid, invlist)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)
@@ -60,7 +60,7 @@
     def info(self, arg):
         self.rpc.callAsync('info', arg)
 
-class StorageServer:
+class StorageServerStub:
     """An RPC stub class for the interface exported by ClientStorage.
 
     This is the interface presented by the StorageServer to the
@@ -103,6 +103,9 @@
     def get_info(self):
         return self.rpc.call('get_info')
 
+    def getInvalidations(self, tid):
+        return self.rpc.call('getInvalidations', tid)
+    
     def beginZeoVerify(self):
         self.rpc.callAsync('beginZeoVerify')
 
@@ -168,6 +171,9 @@
             return self.rpc.call('new_oid')
         else:
             return self.rpc.call('new_oid', last)
+
+    def lastTransaction(self):
+        return self.rpc.call('lastTransaction')
 
     def store(self, oid, serial, data, version, trans):
         return self.rpc.call('store', oid, serial, data, version, trans)


=== Zope3/src/zodb/zeo/server.py 1.6.4.2 => 1.6.4.3 === (1039/1139 lines abridged)
--- Zope3/src/zodb/zeo/server.py:1.6.4.2	Wed Feb 12 15:07:55 2003
+++ Zope3/src/zodb/zeo/server.py	Wed Mar 12 16:41:25 2003
@@ -20,191 +20,57 @@
 exported for invocation by the server.
 """
 
+from __future__ import nested_scopes
+
 import asyncore
 import cPickle
+import logging
 import os
 import sys
 import threading
 import time
-import logging
 
-from zodb.zeo import stubs
+from zodb.zeo.stubs import ClientStorageStub
 from zodb.zeo.commitlog import CommitLog
+from zodb.zeo.monitor import StorageStats, StatsServer
 from zodb.zeo.zrpc.server import Dispatcher
 from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from zodb.zeo.zrpc.trigger import trigger
 
 from zodb.serialize import findrefs
-from zodb.ztransaction import Transaction
-from zodb.interfaces import TransactionError, ZERO
+from zodb.interfaces import *
 from zodb.storage.interfaces import *
+from zodb.conflict import ResolvedSerial
+from zodb.utils import u64
+from zodb.ztransaction import Transaction
 
 from zope.interface.implements import objectImplements
 
+from transaction.interfaces import TransactionError
+
 class StorageServerError(StorageError):
     """Error reported when an unpickleable exception is raised."""
 
-class StorageServer:
-
-    """The server side implementation of ZEO.
-
-    The StorageServer is the 'manager' for incoming connections.  Each
-    connection is associated with its own ZEOStorage instance (defined
-    below).  The StorageServer may handle multiple storages; each
-    ZEOStorage instance only handles a single storage.
-    """

[-=- -=- -=- 1039 lines omitted -=- -=- -=-]

-        assert isinstance(new_strategy, ImmediateCommitStrategy)
-        new_strategy.tpcBegin(self.txn, self.tid, self.status)
-        loads, loader = self.log.get_loader()
-        for i in range(loads):
-            oid, serial, data, version = loader.load()
-            new_strategy.store(oid, serial, data, version)
-        meth = getattr(new_strategy, self.name)
-        return meth(*self.args)
-
-    def abort(self, zeo_storage):
-        # Delete (d, zeo_storage) from the _waiting list, if found.
-        waiting = self.storage._waiting
-        for i in range(len(waiting)):
-            d, z = waiting[i]
-            if z is zeo_storage:
-                del waiting[i]
-                break
+    def run(self):
+        # Code running in the thread.
+        while 1:
+            self._cond.acquire()
+            try:
+                while self._deadline is None:
+                    self._cond.wait()
+                howlong = self._deadline - time.time()
+                if howlong <= 0:
+                    # Prevent reporting timeout more than once
+                    self._deadline = None
+                client = self._client # For the howlong <= 0 branch below
+            finally:
+                self._cond.release()
+            if howlong <= 0:
+                client.logger.error("Transaction timeout after %s seconds",
+                                    self._timeout)
+                self._trigger.pull_trigger(lambda: client.connection.close())
+            else:
+                time.sleep(howlong)
+        self.trigger.close()
 
 def run_in_thread(method, *args):
     t = SlowMethodThread(method, args)
@@ -783,8 +870,3 @@
             self.delay.error(sys.exc_info())
         else:
             self.delay.reply(result)
-
-# Patch up class references
-StorageServer.ZEOStorageClass = ZEOStorage
-ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
-ZEOStorage.ImmediateCommitStrategyClass = ImmediateCommitStrategy


=== Zope3/src/zodb/zeo/interfaces.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/interfaces.py:1.2	Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/interfaces.py	Wed Mar 12 16:41:25 2003
@@ -16,78 +16,14 @@
 $Id$
 """
 
-from zope.interface import Interface
+from zodb.storage.interfaces import StorageError
 
-class Disconnected(Exception):
-    """A client is disconnected from a server."""
+class ClientStorageError(StorageError):
+    """An error occured in the ZEO Client Storage."""
 
-class ICache(Interface):
-    """ZEO client cache.
+class UnrecognizedResult(ClientStorageError):
+    """A server call returned an unrecognized result."""
 
-    __init__(storage, size, client, var)
+class ClientDisconnected(ClientStorageError):
+    """The database storage is disconnected from the storage."""
 
-    All arguments optional.
-
-    storage -- name of storage
-    size -- max size of cache in bytes
-    client -- a string; if specified, cache is persistent.
-    var -- var directory to store cache files in
-    """
-
-    def open():
-        """Returns a sequence of object info tuples.
-
-        An object info tuple is a pair containing an object id and a
-        pair of serialnos, a non-version serialno and a version serialno:
-        oid, (serial, ver_serial)
-
-        This method builds an index of the cache and returns a
-        sequence used for cache validation.
-        """
-
-    def close():
-        """Closes the cache."""
-
-    def verify(func):
-        """Call func on every object in cache.
-
-        func is called with three arguments
-        func(oid, serial, ver_serial)
-        """
-
-    def invalidate(oid, version):
-        """Remove object from cache."""
-
-    def load(oid, version):
-        """Load object from cache.
-
-        Return None if object not in cache.
-        Return data, serialno if object is in cache.
-        """
-
-    def store(oid, p, s, version, pv, sv):
-        """Store a new object in the cache."""
-
-    def update(oid, serial, version, data):
-        """Update an object already in the cache.
-
-        XXX This method is called to update objects that were modified by
-        a transaction.  It's likely that it is already in the cache,
-        and it may be possible for the implementation to operate more
-        efficiently.
-        """
-
-    def modifiedInVersion(oid):
-        """Return the version an object is modified in.
-
-        '' signifies the trunk.
-        Returns None if the object is not in the cache.
-        """
-
-    def checkSize(size):
-        """Check if adding size bytes would exceed cache limit.
-
-        This method is often called just before store or update.  The
-        size is a hint about the amount of data that is about to be
-        stored.  The cache may want to evict some data to make space.
-        """


=== Zope3/src/zodb/zeo/client.py 1.4.4.2 => 1.4.4.3 ===
--- Zope3/src/zodb/zeo/client.py:1.4.4.2	Wed Feb 12 15:08:16 2003
+++ Zope3/src/zodb/zeo/client.py	Wed Mar 12 16:41:25 2003
@@ -22,11 +22,6 @@
 ClientDisconnected -- exception raised by ClientStorage
 """
 
-# XXX TO DO
-# get rid of beginVerify, set up _tfile in verify_cache
-# set self._storage = stub later, in endVerify
-# if wait is given, wait until verify is complete
-
 import cPickle
 import os
 import socket
@@ -36,11 +31,12 @@
 import types
 import logging
 
-from zodb.zeo import cache, stubs
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo import cache
+from zodb.zeo.stubs import StorageServerStub
 from zodb.zeo.tbuf import TransactionBuffer
 from zodb.zeo.zrpc.client import ConnectionManager
 
+from zodb.zeo.interfaces import *
 from zodb.storage.interfaces import *
 from zodb.timestamp import TimeStamp
 
@@ -49,14 +45,8 @@
 except ImportError:
     ResolvedSerial = 'rs'
 
-class ClientStorageError(StorageError):
-    """An error occured in the ZEO Client Storage."""
-
-class UnrecognizedResult(ClientStorageError):
-    """A server call returned an unrecognized result."""
-
-class ClientDisconnected(ClientStorageError, Disconnected):
-    """The database storage is disconnected from the storage."""
+def tid2time(tid):
+    return str(TimeStamp(tid))
 
 def get_timestamp(prev_ts=None):
     """Internal helper to return a unique TimeStamp instance.
@@ -88,7 +78,7 @@
 
 MB = 1024**2
 
-class ClientStorage:
+class ClientStorage(object):
 
     """A Storage class that is a network client to a remote storage.
 
@@ -103,7 +93,7 @@
     TransactionBufferClass = TransactionBuffer
     ClientCacheClass = cache.ClientCache
     ConnectionManagerClass = ConnectionManager
-    StorageServerStubClass = stubs.StorageServer
+    StorageServerStubClass = StorageServerStub
 
     # The exact storage interfaces depend on the server that the client
     # connects to.  We know that every storage must implement IStorage,
@@ -113,11 +103,9 @@
     __implements__ = IStorage
 
     def __init__(self, addr, storage='1', cache_size=20 * MB,
-                 name='', client=None, debug=0, var=None,
+                 name='', client=None, var=None,
                  min_disconnect_poll=5, max_disconnect_poll=300,
-                 wait_for_server_on_startup=None, # deprecated alias for wait
-                 wait=None, # defaults to 1
-                 read_only=0, read_only_fallback=0):
+                 wait=True, read_only=False, read_only_fallback=False):
 
         """ClientStorage constructor.
 
@@ -149,9 +137,6 @@
             effective value is true, the client cache is persistent.
             See ClientCache for more info.
 
-        debug -- Ignored.  This is present only for backwards
-            compatibility with ZEO 1.
-
         var -- The 'var' directory, defaulting to None, in which
             the persistent cache files should be written.
 
@@ -163,9 +148,6 @@
             attempts to connect to the server, in seconds.  Defaults
             to 300 seconds.
 
-        wait_for_server_on_startup -- A backwards compatible alias for
-            the wait argument.
-
         wait -- A flag indicating whether to wait until a connection
             with a server is made, defaulting to true.
 
@@ -189,33 +171,37 @@
                          read_only_fallback and "fallback" or "normal",
                          storage)
 
-        if debug:
-            self.logger.warn(
-                "ClientStorage(): debug argument is no longer used")
-
-        # wait defaults to True, but wait_for_server_on_startup overrides
-        # if not None
-        if wait_for_server_on_startup is not None:
-            if wait is not None and wait != wait_for_server_on_startup:
-                self.logger.error(
-                    "ClientStorage(): conflicting values for wait and "
-                    "wait_for_server_on_startup; wait prevails")
-            else:
-                self.logger.warn(
-                    "ClientStorage(): wait_for_server_on_startup "
-                    "is deprecated; please use wait instead")
-                wait = wait_for_server_on_startup
-        elif wait is None:
-            wait = 1
-
         self._addr = addr # For tests
+        # A ZEO client can run in disconnected mode, using data from
+        # its cache, or in connected mode.  Several instance variables
+        # are related to whether the client is connected.
+
+        # _server: All method calls are invoked through the server
+        #    stub.  When not connect, set to disconnected_stub an
+        #    object that raises ClientDisconnected errors.
+
+        # _ready: A threading Event that is set only if _server
+        #    is set to a real stub.
+
+        # _connection: The current zrpc connection or None.
+
+        # _connection is set as soon as a connection is established,
+        # but _server is set only after cache verification has finished
+        # and clients can safely use the server.  _pending_server holds
+        # a server stub while it is being verified.
+        
         self._server = disconnected_stub
+        self._connection = None
+        self._pending_server = None
+        self._ready = threading.Event()
+        
         self._is_read_only = read_only
         self._storage = storage
         self._read_only_fallback = read_only_fallback
-        self._connection = None
         # _server_addr is used by sortKey()
         self._server_addr = None
+        self._tfile = None
+        self._pickler = None
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client'}
         
@@ -265,10 +251,46 @@
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
 
-        # If we're connected at this point, the cache is opened as a
-        # side effect of verify_cache().  If not, open it now.
-        if not self.is_connected():
-            self._cache.open()
+        if wait:
+            self._wait()
+        else:
+            # attempt_connect() will make an attempt that doesn't block
+            # "too long," for a very vague notion of too long.  If that
+            # doesn't succeed, call connect() to start a thread.
+            if not self._rpc_mgr.attempt_connect():
+                self._rpc_mgr.connect()
+            # If the connect hasn't occurred, run with cached data.
+            if not self._ready.isSet():
+                self._cache.open()
+
+    def _wait(self):
+        # Wait for a connection to be established.
+        self._rpc_mgr.connect(sync=1)
+        # When a synchronous connect() call returns, there is
+        # a valid _connection object but cache validation may
+        # still be going on.  This code must wait until validation
+        # finishes, but if the connection isn't a zrpc async
+        # connection it also needs to poll for input.
+        if self._connection.is_async():
+            while 1:
+                self._ready.wait(30)
+                if self._ready.isSet():
+                    break
+                self.logger.warn("Wait for cache verification to finish")
+        else:
+            # If there is no mainloop running, this code needs
+            # to call poll() to cause asyncore to handle events.
+            while 1:
+                cn = self._connection
+                if cn is None:
+                    # If the connection was closed while we were
+                    # waiting for it to become ready, start over.
+                    return self._wait()
+                else:
+                    cn.pending(30)
+                if self._ready.isSet():
+                    break
+                self.logger.warn("Wait for cache verification to finish")
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -291,17 +313,22 @@
 
     def is_connected(self):
         """Return whether the storage is currently connected to a server."""
-        if self._server is disconnected_stub:
-            return 0
-        else:
-            return 1
+        # This function is used by clients, so we only report that a
+        # connection exists when the connection is ready to use.
+        return self._ready.isSet()
 
     def sync(self):
         """Handle any pending invalidation messages.
 
         This is called by the sync method in ZODB.Connection.
         """
-        self._server._update()
+        # If there is no connection, return immediately.  Technically,
+        # there are no pending invalidations so they are all handled.
+        # There doesn't seem to be much benefit to raising an exception.
+        
+        cn = self._connection
+        if cn is not None:
+            cn.pending()
 
     def testConnection(self, conn):
         """Internal: test the given connection.
@@ -344,23 +371,37 @@
         This is called by ConnectionManager after it has decided which
         connection should be used.
         """
+
+        if self._cache is None:
+            # the storage was closed, but the connect thread called
+            # this method before it was stopped.
+            return
+        
+        # XXX would like to report whether we get a read-only connection
         if self._connection is not None:
-            self.logger.warn("Reconnected to storage")
+            reconnect = 1
         else:
-            self.logger.warn("Connected to storage")
+            reconnect = 0
         self.set_server_addr(conn.get_addr())
+
+        # If we are upgrading from a read-only fallback connection,
+        # we must close the old connection to prevent it from being
+        # used while the cache is verified against the new connection.
+        if self._connection is not None:
+            self._connection.close()
+        self._connection = conn
+
+        if reconnect:
+            self.logger.warn("Reconnected to storage: %s", self._server_addr)
+        else:
+            self.logger.warn("Connected to storage: %s", self._server_addr)
+
         stub = self.StorageServerStubClass(conn)
         self._oids = []
         self._info.update(stub.get_info())
         self.update_interfaces()
         self.verify_cache(stub)
 
-        # XXX The stub should be saved here and set in endVerify() below.
-        if self._connection is not None:
-            self._connection.close()
-        self._connection = conn
-        self._server = stub
-
     def update_interfaces(self):
         # Update instance's __implements__ based on the server.
         L = [IStorage]
@@ -399,12 +440,52 @@
             return self._server_addr
 
     def verify_cache(self, server):
-        """Internal routine called to verify the cache."""
-        # XXX beginZeoVerify ends up calling back to beginVerify() below.
-        # That whole exchange is rather unnecessary.
-        server.beginZeoVerify()
+        """Internal routine called to verify the cache.
+
+        The return value (indicating which path we took) is used by
+        the test suite.
+        """
+
+        # If verify_cache() finishes the cache verification process,
+        # it should set self._server.  If it goes through full cache
+        # verification, then endVerify() should self._server.
+        
+        last_inval_tid = self._cache.getLastTid()
+        if last_inval_tid is not None:
+            ltid = server.lastTransaction()
+            if ltid == last_inval_tid:
+                self.logger.info("No verification necessary "
+                                 "(last_inval_tid up-to-date)")
+                self._cache.open()
+                self._server = server
+                self._ready.set()
+                return "no verification"
+
+            # log some hints about last transaction
+            self.logger.info("last inval tid: %r %s",
+                             last_inval_tid, tid2time(last_inval_tid))
+            self.logger.info("last transaction: %r %s",
+                             ltid, ltid and tid2time(ltid))
+
+            pair = server.getInvalidations(last_inval_tid)
+            if pair is not None:
+                self.logger.info("Recovering %d invalidations", len(pair[1]))
+                self._cache.open()
+                self.invalidateTransaction(*pair)
+                self._server = server
+                self._ready.set()
+                return "quick verification"
+
+        self.logger.info("Verifying cache")
+        # setup tempfile to hold zeoVerify results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
         self._cache.verify(server.zeoVerify)
+        self._pending_server = server
         server.endZeoVerify()
+        return "full verification"
 
     ### Is there a race condition between notifyConnected and
     ### notifyDisconnected? In Particular, what if we get
@@ -422,6 +503,7 @@
         """
         self.logger.error("Disconnected from storage")
         self._connection = None
+        self._ready.clear()
         self._server = disconnected_stub
 
     def getName(self):
@@ -664,14 +746,22 @@
 
     def tpcAbort(self, transaction):
         """Storage API: abort a transaction."""
+        """Storage API: abort a transaction."""
         if transaction is not self._transaction:
             return
         try:
-            self._server.tpcAbort(self._serial)
+            # XXX Are there any transactions that should prevent an
+            # abort from occurring?  It seems wrong to swallow them
+            # all, yet you want to be sure that other abort logic is
+            # executed regardless.
+            try:
+                self._server.tpcAbort(self._serial)
+            except ClientDisconnected:
+                self.logger.info("ClientDisconnected in tpcAbort() ignored")
+        finally:
             self._tbuf.clear()
             self._seriald.clear()
             del self._serials[:]
-        finally:
             self.end_transaction()
 
     def tpcFinish(self, transaction, f=None):
@@ -682,12 +772,13 @@
             if f is not None:
                 f()
 
-            self._server.tpcFinish(self._serial)
+            tid = self._server.tpcFinish(self._serial)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
 
             self._update_cache()
+            self._cache.setLastTid(tid)
         finally:
             self.end_transaction()
 
@@ -773,12 +864,6 @@
         """Server callback to update the info dictionary."""
         self._info.update(dict)
 
-    def beginVerify(self):
-        """Server callback to signal start of cache validation."""
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
-
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.
 
@@ -796,6 +881,7 @@
         if self._pickler is None:
             return
         self._pickler.dump((0,0))
+        self._pickler = None
         self._tfile.seek(0)
         unpick = cPickle.Unpickler(self._tfile)
         f = self._tfile
@@ -803,35 +889,31 @@
 
         while 1:
             oid, version = unpick.load()
+            self.logger.debug("verify invalidate %r", oid)
             if not oid:
                 break
             self._cache.invalidate(oid, version=version)
-            self._db.invalidate(oid, version=version)
+            if self._db is not None:
+                self._db.invalidate(oid, version=version)
         f.close()
 
-    def invalidateTrans(self, args):
-        """Server callback to invalidate a list of (oid, version) pairs.
-
-        This is called as the result of a transaction.
-        """
+        self._server = self._pending_server
+        self._ready.set()
+        self._pending_conn = None
+        self.logger.debug("verification finished")
+
+    def invalidateTransaction(self, tid, args):
+        """Invalidate objects modified by tid."""
+        self._cache.setLastTid(tid)
+        if self._pickler is not None:
+            self.logger.info("Transactional invalidation "
+                             "during cache verification")
+            for t in args:
+                self.self._pickler.dump(t)
+            return
+        db = self._db
         for oid, version in args:
             self._cache.invalidate(oid, version=version)
-            try:
-                self._db.invalidate(oid, version=version)
-            except AttributeError, msg:
-                self.logger.error("Invalidate(%r, %r) failed for _db: %s",
-                                  oid, version, msg)
-
-    # XXX In Zope3, there's no reason to stick to the ZEO 2 protocol!
-
-    # Unfortunately, the ZEO 2 wire protocol uses different names for
-    # several of the callback methods invoked by the StorageServer.
-    # We can't change the wire protocol at this point because that
-    # would require synchronized updates of clients and servers and we
-    # don't want that.  So here we alias the old names to their new
-    # implementations.
-
-    begin = beginVerify
-    invalidate = invalidateVerify
-    end = endVerify
-    Invalidate = invalidateTrans
+            if db is not None:
+                db.invalidate(oid, version=version)
+


=== Zope3/src/zodb/zeo/cache.py 1.2.12.2 => 1.2.12.3 ===
--- Zope3/src/zodb/zeo/cache.py:1.2.12.2	Wed Feb 12 16:06:24 2003
+++ Zope3/src/zodb/zeo/cache.py	Wed Mar 12 16:41:25 2003
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001. Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -31,11 +31,16 @@
 0 or 1.  Temporary cache files are unnamed files in the standard
 temporary directory as determined by the tempfile module.
 
-The ClientStorage overrides the client name default to the value of
-the environment variable ZEO_CLIENT, if it exists.
+Each cache file has a 12-byte header followed by a sequence of
+records.  The header format is as follows:
 
-Each cache file has a 4-byte magic number followed by a sequence of
-records of the form:
+  offset in header: name -- description
+
+  0: magic -- 4-byte magic number, identifying this as a ZEO cache file
+
+  4: lasttid -- 8-byte last transaction id
+
+Each record has the following form:
 
   offset in record: name -- description
 
@@ -100,6 +105,7 @@
 file 0 and file 1.
 """
 
+import logging
 import os
 import time
 import logging
@@ -109,23 +115,25 @@
 
 from zodb.utils import u64
 from zodb.interfaces import ZERO
-from zodb.zeo.interfaces import ICache
 
-magic='ZEC0'
+magic = 'ZEC1'
+headersize = 12
 
-class ClientCache:
+MB = 1024**2
 
-    __implements__ = ICache
+class ClientCache:
 
-    def __init__(self, storage='1', size=20000000, client=None, var=None):
+    def __init__(self, storage='1', size=20*MB, client=None, var=None):
         # Arguments:
         # storage -- storage name (used in filenames and log messages)
         # size -- size limit in bytes of both files together
         # client -- if not None, use a persistent cache file and use this name
-        # var -- directory where to create persistent cache files
+        # var -- directory where to create persistent cache files; default cwd
 
         self._storage = storage
         self._limit = size / 2
+        self._client = client
+        self._ltid = None # For getLastTid()
 
         # Create a logger specific for this client cache
         logger = logging.getLogger("ZEC.%s" % storage)
@@ -138,15 +146,8 @@
 
         if client is not None:
             # Create a persistent cache
-            # CLIENT_HOME and INSTANCE_HOME are builtins set by App.FindHomes
             if var is None:
-                try:
-                    var = CLIENT_HOME
-                except:
-                    try:
-                        var = os.path.join(INSTANCE_HOME, 'var')
-                    except:
-                        var = os.getcwd()
+                var = os.getcwd()
 
             fmt = os.path.join(var, "c%s-%s-%%s.zec" % (storage, client))
             # Initialize pairs of filenames, file objects, and serialnos.
@@ -158,9 +159,9 @@
                     fi = open(p[i],'r+b')
                     if fi.read(4) == magic: # Minimal sanity
                         fi.seek(0, 2)
-                        if fi.tell() > 30:
-                            # First serial is at offset 19 + 4 for magic
-                            fi.seek(23)
+                        if fi.tell() > headersize:
+                            # Read serial at offset 19 of first record
+                            fi.seek(headersize + 19)
                             s[i] = fi.read(8)
                     # If we found a non-zero serial, then use the file
                     if s[i] != ZERO:
@@ -176,17 +177,18 @@
                 if f[0] is None:
                     # We started, open the first cache file
                     f[0] = open(p[0], 'w+b')
-                    f[0].write(magic)
+                    f[0].write(magic + '\0' * (headersize - len(magic)))
                 current = 0
                 f[1] = None
         else:
             self._f = f = [tempfile.TemporaryFile(suffix='.zec'), None]
             # self._p file name 'None' signifies an unnamed temp file.
             self._p = p = [None, None]
-            f[0].write(magic)
+            f[0].write(magic + '\0' * (headersize - len(magic)))
             current = 0
 
-        self.log("size=%r; file[%r]=%r", size, current, p[current])
+        self.log("%s: storage=%r, size=%r; file[%r]=%r",
+                 self.__class__.__name__, storage, size, current, p[current])
 
         self._current = current
         self._setup_trace()
@@ -222,6 +224,57 @@
                 except OSError:
                     pass
 
+    def getLastTid(self):
+        """Get the last transaction id stored by setLastTid().
+
+        If the cache is persistent, it is read from the current
+        cache file; otherwise it's an instance variable.
+        """
+        if self._client is None:
+            return self._ltid
+        else:
+            self._acquire()
+            try:
+                return self._getLastTid()
+            finally:
+                self._release()
+
+    def _getLastTid(self):
+        f = self._f[self._current]
+        f.seek(4)
+        tid = f.read(8)
+        if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+            return None
+        else:
+            return tid
+
+    def setLastTid(self, tid):
+        """Store the last transaction id.
+
+        If the cache is persistent, it is written to the current
+        cache file; otherwise it's an instance variable.
+        """
+        if self._client is None:
+            if tid == '\0\0\0\0\0\0\0\0':
+                tid = None
+            self._ltid = tid
+        else:
+            self._acquire()
+            try:
+                self._setLastTid(tid)
+            finally:
+                self._release()
+
+    def _setLastTid(self, tid):
+        if tid is None:
+            tid = '\0\0\0\0\0\0\0\0'
+        else:
+            tid = str(tid)
+            assert len(tid) == 8
+        f = self._f[self._current]
+        f.seek(4)
+        f.write(tid)
+
     def verify(self, verifyFunc):
         """Call the verifyFunc on every object in the cache.
 
@@ -244,13 +297,13 @@
             if len(h) != 27:
                 self.log("invalidate: short record for oid %16x "
                          "at position %d in cache file %d",
-                         u64(oid), ap, p < 0)
+                         U64(oid), ap, p < 0)
                 del self._index[oid]
                 return None
             if h[:8] != oid:
                 self.log("invalidate: oid mismatch: expected %16x read %16x "
                          "at position %d in cache file %d",
-                         u64(oid), u64(h[:8]), ap, p < 0)
+                         U64(oid), U64(h[:8]), ap, p < 0)
                 del self._index[oid]
                 return None
             f.seek(ap+8) # Switch from reading to writing
@@ -285,7 +338,7 @@
             if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
                 self.log("load: bad record for oid %16x "
                          "at position %d in cache file %d",
-                         u64(oid), ap, p < 0)
+                         U64(oid), ap, p < 0)
                 del self._index[oid]
                 return None
 
@@ -458,7 +511,7 @@
             if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
                 self.log("modifiedInVersion: bad record for oid %16x "
                          "at position %d in cache file %d",
-                         u64(oid), ap, p < 0)
+                         U64(oid), ap, p < 0)
                 del self._index[oid]
                 return None
 
@@ -482,6 +535,7 @@
         self._acquire()
         try:
             if self._pos + size > self._limit:
+                ltid = self._getLastTid()
                 current = not self._current
                 self._current = current
                 self._trace(0x70)
@@ -505,8 +559,12 @@
                 else:
                     # Temporary cache file:
                     self._f[current] = tempfile.TemporaryFile(suffix='.zec')
-                self._f[current].write(magic)
-                self._pos = 4
+                header = magic
+                if ltid:
+                    header += ltid
+                self._f[current].write(header +
+                                       '\0' * (headersize - len(header)))
+                self._pos = headersize
         finally:
             self._release()
 
@@ -598,7 +656,7 @@
         f = self._f[fileindex]
         seek = f.seek
         read = f.read
-        pos = 4
+        pos = headersize
         count = 0
 
         while 1:
@@ -657,7 +715,6 @@
                     del serial[oid]
                     del index[oid]
 
-
             pos = pos + tlen
             count += 1
 
@@ -674,6 +731,6 @@
         return pos
 
     def rilog(self, msg, pos, fileindex):
-        # Helper to log certain messages from read_index
+        # Helper to log messages from read_index
         self.log("read_index: %s at position %d in cache file %d",
                  msg, pos, fileindex)