[Zope3-checkins] CVS: ZODB4/src/zodb/zeo - schema.xml:1.1.2.1 runzeo.py:1.1.2.1 component.xml:1.1.2.1 stubs.py:1.7.6.1 server.py:1.12.2.1 interfaces.py:1.3.30.1 client.py:1.13.2.1

Jeremy Hylton jeremy@zope.com
Tue, 17 Jun 2003 17:59:55 -0400


Update of /cvs-repository/ZODB4/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv10995/src/zodb/zeo

Modified Files:
      Tag: ZODB3-2-merge
	stubs.py server.py interfaces.py client.py 
Added Files:
      Tag: ZODB3-2-merge
	schema.xml runzeo.py component.xml 
Log Message:
Checkpoint progress merging ZODB 3.2 features and fixes into ZODB4.


=== Added File ZODB4/src/zodb/zeo/schema.xml ===
<schema>

  <description>
    This schema describes the configuration of the ZEO storage server
    process.
  </description>

  <!-- Use the storage types defined by ZODB. -->
  <import package="zodb"/>

  <!-- Use the ZEO server information structure. -->
  <import package="zodb/zeo"/>

  <section type="zeo" name="*" required="yes" attribute="zeo" />

  <multisection name="+" type="ZODB.storage"
                attribute="storages"
                required="yes">
    <description>
      One or more storages that are provided by the ZEO server.  The
      section names are used as the storage names, and must be unique
      within each ZEO storage server.  Traditionally, these names
      represent small integers starting at '1'.
    </description>
  </multisection>

  <section name="*" type="eventlog" attribute="eventlog" required="no" />

</schema>


=== Added File ZODB4/src/zodb/zeo/runzeo.py ===
#!python
##############################################################################
#
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Start the ZEO storage server.

Usage: %s [-C URL] [-a ADDRESS] [-f FILENAME] [-h]

Options:
-C/--configuration URL -- configuration file or URL
-a/--address ADDRESS -- server address of the form PORT, HOST:PORT, or PATH
                        (a PATH must contain at least one "/")
-f/--filename FILENAME -- filename for FileStorage
-t/--timeout TIMEOUT -- transaction timeout in secondes (default no timeout)
-h/--help -- print this usage message and exit
-m/--monitor ADDRESS -- address of monitor server ([HOST:]PORT or PATH)

Unless -C is specified, -a and -f are required.
"""

# The code here is designed to be reused by other, similar servers.
# For the forseeable future, it must work under Python 2.1 as well as
# 2.2 and above.

import os
import sys
import getopt
import signal
import socket
import logging

import ZConfig
from zdaemon.zdoptions import ZDOptions
from zodb import zeo

def parse_address(arg):
    # XXX Not part of the official ZConfig API
    obj = ZConfig.datatypes.SocketAddress(arg)
    return obj.family, obj.address

class ZEOOptionsMixin:

    storages = None

    def handle_address(self, arg):
        self.family, self.address = parse_address(arg)

    def handle_monitor_address(self, arg):
        self.monitor_family, self.monitor_address = parse_address(arg)

    def handle_filename(self, arg):
        from ZODB.config import FileStorage # That's a FileStorage *opener*!
        class FSConfig:
            def __init__(self, name, path):
                self._name = name
                self.path = path
                self.create = 0
                self.read_only = 0
                self.stop = None
                self.quota = None
            def getSectionName(self):
                return self._name
        if not self.storages:
            self.storages = []
        name = str(1 + len(self.storages))
        conf = FileStorage(FSConfig(name, arg))
        self.storages.append(conf)

    def add_zeo_options(self):
        self.add(None, None, "a:", "address=", self.handle_address)
        self.add(None, None, "f:", "filename=", self.handle_filename)
        self.add("family", "zeo.address.family")
        self.add("address", "zeo.address.address",
                 required="no server address specified; use -a or -C")
        self.add("read_only", "zeo.read_only", default=0)
        self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
                 default=100)
        self.add("transaction_timeout", "zeo.transaction_timeout",
                 "t:", "timeout=", float)
        self.add("monitor_address", "zeo.monitor_address", "m:", "monitor=",
                 self.handle_monitor_address)
        self.add('auth_protocol', 'zeo.authentication_protocol',
                 None, 'auth-protocol=', default=None)
        self.add('auth_database', 'zeo.authentication_database',
                 None, 'auth-database=')
        self.add('auth_realm', 'zeo.authentication_realm',
                 None, 'auth-realm=')

class ZEOOptions(ZDOptions, ZEOOptionsMixin):

    logsectionname = "eventlog"

    def __init__(self):
        self.schemadir = os.path.dirname(zeo.__file__)
        ZDOptions.__init__(self)
        self.add_zeo_options()
        self.add("storages", "storages",
                 required="no storages specified; use -f or -C")


class ZEOServer:

    def __init__(self, options):
        self.options = options

    def main(self):
        self.setup_default_logging()
        self.check_socket()
        self.clear_socket()
        try:
            self.open_storages()
            self.setup_signals()
            self.create_server()
            self.loop_forever()
        finally:
            self.close_storages()
            self.clear_socket()

    def setup_default_logging(self):
        if self.options.config_logger is not None:
            return
        if os.getenv("EVENT_LOG_FILE") is not None:
            return
        if os.getenv("STUPID_LOG_FILE") is not None:
            return
        # No log file is configured; default to stderr.  The logging
        # level can still be controlled by {STUPID,EVENT}_LOG_SEVERITY.
        os.environ["EVENT_LOG_FILE"] = ""

    def check_socket(self):
        if self.can_connect(self.options.family, self.options.address):
            self.options.usage("address %s already in use" %
                               repr(self.options.address))

    def can_connect(self, family, address):
        s = socket.socket(family, socket.SOCK_STREAM)
        try:
            s.connect(address)
        except socket.error:
            return 0
        else:
            s.close()
            return 1

    def clear_socket(self):
        if isinstance(self.options.address, type("")):
            try:
                os.unlink(self.options.address)
            except os.error:
                pass

    def open_storages(self):
        self.storages = {}
        for opener in self.options.storages:
            _logger.info("opening storage %r using %s"
                 % (opener.name, opener.__class__.__name__))
            self.storages[opener.name] = opener.open()

    def setup_signals(self):
        """Set up signal handlers.

        The signal handler for SIGFOO is a method handle_sigfoo().
        If no handler method is defined for a signal, the signal
        action is not changed from its initial value.  The handler
        method is called without additional arguments.
        """
        if os.name != "posix":
            return
        if hasattr(signal, 'SIGXFSZ'):
            signal.signal(signal.SIGXFSZ, signal.SIG_IGN) # Special case
        init_signames()
        for sig, name in signames.items():
            method = getattr(self, "handle_" + name.lower(), None)
            if method is not None:
                def wrapper(sig_dummy, frame_dummy, method=method):
                    method()
                signal.signal(sig, wrapper)

    def create_server(self):
        from zodb.zeo.server import StorageServer
        self.server = StorageServer(
            self.options.address,
            self.storages,
            read_only=self.options.read_only,
            invalidation_queue_size=self.options.invalidation_queue_size,
            transaction_timeout=self.options.transaction_timeout,
            monitor_address=self.options.monitor_address,
            auth_protocol=self.options.auth_protocol,
            auth_database=self.options.auth_database,
            auth_realm=self.options.auth_realm)

    def loop_forever(self):
        import ThreadedAsync.LoopCallback
        ThreadedAsync.LoopCallback.loop()

    def handle_sigterm(self):
        _logger.info("terminated by SIGTERM")
        sys.exit(0)

    def handle_sigint(self):
        _logger.info("terminated by SIGINT")
        sys.exit(0)

    def handle_sighup(self):
        _logger.info("restarted by SIGHUP")
        sys.exit(1)

    def handle_sigusr2(self):
        # How should this work with new logging?
        
        # This requires a modern zLOG (from Zope 2.6 or later); older
        # zLOG packages don't have the initialize() method
        _logger.info("reinitializing zLOG")
        # XXX Shouldn't this be below with _log()?
        import zLOG
        zLOG.initialize()
        _logger.info("reinitialized zLOG")

    def close_storages(self):
        for name, storage in self.storages.items():
            _logger.info("closing storage %r" % name)
            try:
                storage.close()
            except: # Keep going
                _logging.exception("failed to close storage %r" % name)


# Signal names

signames = None

def signame(sig):
    """Return a symbolic name for a signal.

    Return "signal NNN" if there is no corresponding SIG name in the
    signal module.
    """

    if signames is None:
        init_signames()
    return signames.get(sig) or "signal %d" % sig

def init_signames():
    global signames
    signames = {}
    for name, sig in signal.__dict__.items():
        k_startswith = getattr(name, "startswith", None)
        if k_startswith is None:
            continue
        if k_startswith("SIG") and not k_startswith("SIG_"):
            signames[sig] = name


# Main program

def main(args=None):
    global _logger
    _logger = logging.getLogger("runzeo")

    options = ZEOOptions()
    options.realize(args)
    s = ZEOServer(options)
    s.main()

if __name__ == "__main__":
    main()


=== Added File ZODB4/src/zodb/zeo/component.xml ===
<component>

  <!-- stub out the type until we figure out how to zconfig logging -->
  <sectiontype name="eventlog" />

  <sectiontype name="zeo">

    <description>
      The content of a ZEO section describe operational parameters
      of a ZEO server except for the storage(s) to be served.
    </description>

    <key name="address" datatype="socket-address"
         required="yes">
      <description>
        The address at which the server should listen.  This can be in
        the form 'host:port' to signify a TCP/IP connection or a
        pathname string to signify a Unix domain socket connection (at
        least one '/' is required).  A hostname may be a DNS name or a
        dotted IP address.  If the hostname is omitted, the platform's
        default behavior is used when binding the listening socket (''
        is passed to socket.bind() as the hostname portion of the
        address).
      </description>
    </key>

    <key name="read-only" datatype="boolean"
         required="no"
         default="false">
      <description>
        Flag indicating whether the server should operate in read-only
        mode.  Defaults to false.  Note that even if the server is
        operating in writable mode, individual storages may still be
        read-only.  But if the server is in read-only mode, no write
        operations are allowed, even if the storages are writable.  Note
        that pack() is considered a read-only operation.
      </description>
    </key>

    <key name="invalidation-queue-size" datatype="integer"
         required="no"
         default="100">
      <description>
        The storage server keeps a queue of the objects modified by the
        last N transactions, where N == invalidation_queue_size.  This
        queue is used to speed client cache verification when a client
        disconnects for a short period of time.
      </description>
    </key>

    <key name="monitor-address" datatype="socket-address"
         required="no">
      <description>
        The address at which the monitor server should listen.  If
        specified, a monitor server is started.  The monitor server
        provides server statistics in a simple text format.  This can
        be in the form 'host:port' to signify a TCP/IP connection or a
        pathname string to signify a Unix domain socket connection (at
        least one '/' is required).  A hostname may be a DNS name or a
        dotted IP address.  If the hostname is omitted, the platform's
        default behavior is used when binding the listening socket (''
        is passed to socket.bind() as the hostname portion of the
        address).
      </description>
    </key>

    <key name="transaction-timeout" datatype="integer"
         required="no">
      <description>
        The maximum amount of time to wait for a transaction to commit
        after acquiring the storage lock, specified in seconds.  If the
        transaction takes too long, the client connection will be closed
        and the transaction aborted.
      </description>
    </key>

    <key name="authentication-protocol" required="no">
      <description>
        The name of the protocol used for authentication.  The
        only protocol provided with ZEO is "digest," but extensions
        may provide other protocols.
      </description>
    </key>

    <key name="authentication-database" required="no">
      <description>
        The path of the database containing authentication credentials.
      </description>
    </key>

    <key name="authentication-realm" required="no">
      <description>
        The authentication realm of the server.  Some authentication
        schemes use a realm to identify the logic set of usernames
        that are accepted by this server.
      </description>
    </key>

  </sectiontype>

</component>


=== ZODB4/src/zodb/zeo/stubs.py 1.7 => 1.7.6.1 ===
--- ZODB4/src/zodb/zeo/stubs.py:1.7	Mon May 19 11:02:51 2003
+++ ZODB4/src/zodb/zeo/stubs.py	Tue Jun 17 17:59:24 2003
@@ -52,7 +52,7 @@
         self.rpc.callAsync('endVerify')
 
     def invalidateTransaction(self, tid, invlist):
-        self.rpc.callAsync('invalidateTransaction', tid, invlist)
+        self.rpc.callAsyncNoPoll('invalidateTransaction', tid, invlist)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)
@@ -102,6 +102,12 @@
 
     def get_info(self):
         return self.rpc.call('get_info')
+
+    def getAuthProtocol(self):
+        return self.rpc.call('getAuthProtocol')
+    
+    def lastTransaction(self):
+        return self.rpc.call('lastTransaction')
 
     def getInvalidations(self, tid):
         return self.rpc.call('getInvalidations', tid)


=== ZODB4/src/zodb/zeo/server.py 1.12 => 1.12.2.1 ===
--- ZODB4/src/zodb/zeo/server.py:1.12	Sat Jun  7 02:54:23 2003
+++ ZODB4/src/zodb/zeo/server.py	Tue Jun 17 17:59:24 2003
@@ -58,7 +58,11 @@
 
     ClientStorageStubClass = ClientStorageStub
 
-    def __init__(self, server, read_only=0):
+    # A list of extension methods.  A subclass with extra methods
+    # should override.
+    extensions = []
+
+    def __init__(self, server, read_only=0, auth_realm=None):
         self.server = server
         # timeout and stats will be initialized in register()
         self.timeout = None
@@ -73,7 +77,22 @@
         self.verifying = 0
         self.logger = logging.getLogger("ZSS.%d.ZEO" % os.getpid())
         self.log_label = ""
+        self.authenticated = 0
+        self.auth_realm = auth_realm
+        # The authentication protocol may define extra methods.
+        self._extensions = {}
+        for func in self.extensions:
+            self._extensions[func.func_name] = None
+
+    def finish_auth(self, authenticated):
+        if not self.auth_realm:
+            return 1
+        self.authenticated = authenticated
+        return authenticated
 
+    def set_database(self, database):
+        self.database = database
+        
     def notifyConnected(self, conn):
         self.connection = conn # For restart_other() below
         self.client = self.ClientStorageStubClass(conn)
@@ -110,6 +129,7 @@
         """Delegate several methods to the storage"""
         self.versionEmpty = self.storage.versionEmpty
         self.versions = self.storage.versions
+        self.getSerial = self.storage.getSerial
         self.load = self.storage.load
         self.modifiedInVersion = self.storage.modifiedInVersion
         self.getVersion = self.storage.getVersion
@@ -125,9 +145,11 @@
             # can be removed
             pass
         else:
-            for name in fn().keys():
-                if not hasattr(self,name):
-                    setattr(self, name, getattr(self.storage, name))
+            d = fn()
+            self._extensions.update(d)
+            for name in d.keys():
+                assert not hasattr(self, name)
+                setattr(self, name, getattr(self.storage, name))
         self.lastTransaction = self.storage.lastTransaction
 
     def _check_tid(self, tid, exc=None):
@@ -149,6 +171,15 @@
                 return 0
         return 1
 
+    def getAuthProtocol(self):
+        """Return string specifying name of authentication module to use.
+
+        The module name should be auth_%s where %s is auth_protocol."""
+        protocol = self.server.auth_protocol
+        if not protocol or protocol == 'none':
+            return None
+        return protocol
+    
     def register(self, storage_id, read_only):
         """Select the storage that this client will use
 
@@ -173,19 +204,14 @@
                                                                    self)
 
     def get_info(self):
-        return {'name': self.storage.getName(),
-                'extensionMethods': self.getExtensionMethods(),
+        return {"name": self.storage.getName(),
+                "extensionMethods": self.getExtensionMethods(),
                  "implements": [iface.__name__
                                 for iface in providedBy(self.storage)],
                 }
 
     def getExtensionMethods(self):
-        try:
-            e = self.storage.getExtensionMethods
-        except AttributeError:
-            return {}
-        else:
-            return e()
+        return self._extensions
 
     def zeoLoad(self, oid):
         self.stats.loads += 1
@@ -564,7 +590,10 @@
     def __init__(self, addr, storages, read_only=0,
                  invalidation_queue_size=100,
                  transaction_timeout=None,
-                 monitor_address=None):
+                 monitor_address=None,
+                 auth_protocol=None,
+                 auth_filename=None,
+                 auth_realm=None):
         """StorageServer constructor.
 
         This is typically invoked from the start.py script.
@@ -606,6 +635,21 @@
             should listen.  If specified, a monitor server is started.
             The monitor server provides server statistics in a simple
             text format.
+
+        auth_protocol -- The name of the authentication protocol to use.
+            Examples are "digest" and "srp".
+            
+        auth_filename -- The name of the password database filename.
+            It should be in a format compatible with the authentication
+            protocol used; for instance, "sha" and "srp" require different
+            formats.
+            
+            Note that to implement an authentication protocol, a server
+            and client authentication mechanism must be implemented in a
+            auth_* module, which should be stored inside the "auth"
+            subdirectory. This module may also define a DatabaseClass
+            variable that should indicate what database should be used
+            by the authenticator.
         """
 
         self.addr = addr
@@ -621,6 +665,12 @@
         for s in storages.values():
             s._waiting = []
         self.read_only = read_only
+        self.auth_protocol = auth_protocol
+        self.auth_filename = auth_filename
+        self.auth_realm = auth_realm
+        self.database = None
+        if auth_protocol:
+            self._setup_auth(auth_protocol)
         # A list of at most invalidation_queue_size invalidations
         self.invq = []
         self.invq_bound = invalidation_queue_size
@@ -643,6 +693,39 @@
         else:
             self.monitor = None
 
+    def _setup_auth(self, protocol):
+        # Can't be done in global scope, because of cyclic references
+        from ZEO.auth import get_module
+
+        name = self.__class__.__name__
+
+        module = get_module(protocol)
+        if not module:
+            log("%s: no such an auth protocol: %s" % (name, protocol))
+            return
+        
+        storage_class, client, db_class = module
+        
+        if not storage_class or not issubclass(storage_class, ZEOStorage):
+            log(("%s: %s isn't a valid protocol, must have a StorageClass" %
+                 (name, protocol)))
+            self.auth_protocol = None
+            return
+        self.ZEOStorageClass = storage_class
+
+        log("%s: using auth protocol: %s" % (name, protocol))
+        
+        # We create a Database instance here for use with the authenticator
+        # modules. Having one instance allows it to be shared between multiple
+        # storages, avoiding the need to bloat each with a new authenticator
+        # Database that would contain the same info, and also avoiding any
+        # possibly synchronization issues between them.
+        self.database = db_class(self.auth_filename)
+        if self.database.realm != self.auth_realm:
+            raise ValueError("password database realm %r "
+                             "does not match storage realm %r"
+                             % (self.database.realm, self.auth_realm))
+
     def new_connection(self, sock, addr):
         """Internal: factory to create a new connection.
 
@@ -650,8 +733,13 @@
         whenever accept() returns a socket for a new incoming
         connection.
         """
-        z = self.ZEOStorageClass(self, self.read_only)
-        c = self.ManagedServerConnectionClass(sock, addr, z, self)
+        if self.auth_protocol and self.database:
+            zstorage = self.ZEOStorageClass(self, self.read_only,
+                                            auth_realm=self.auth_realm)
+            zstorage.set_database(self.database)
+        else:
+            zstorage = self.ZEOStorageClass(self, self.read_only)
+        c = self.ManagedServerConnectionClass(sock, addr, zstorage, self)
         self.logger.warn("new connection %s: %s", addr, `c`)
         return c
 


=== ZODB4/src/zodb/zeo/interfaces.py 1.3 => 1.3.30.1 ===
--- ZODB4/src/zodb/zeo/interfaces.py:1.3	Tue Feb 25 13:55:05 2003
+++ ZODB4/src/zodb/zeo/interfaces.py	Tue Jun 17 17:59:24 2003
@@ -27,3 +27,5 @@
 class ClientDisconnected(ClientStorageError):
     """The database storage is disconnected from the storage."""
 
+class AuthError(StorageError):
+    """The client provided invalid authentication credentials."""


=== ZODB4/src/zodb/zeo/client.py 1.13 => 1.13.2.1 ===
--- ZODB4/src/zodb/zeo/client.py:1.13	Fri Jun  6 11:24:21 2003
+++ ZODB4/src/zodb/zeo/client.py	Tue Jun 17 17:59:24 2003
@@ -106,7 +106,8 @@
     def __init__(self, addr, storage='1', cache_size=20 * MB,
                  name='', client=None, var=None,
                  min_disconnect_poll=5, max_disconnect_poll=300,
-                 wait=True, read_only=False, read_only_fallback=False):
+                 wait=True, read_only=False, read_only_fallback=False,
+                 username='', password='', realm=None):
 
         """ClientStorage constructor.
 
@@ -161,6 +162,17 @@
             writable storages are available.  Defaults to false.  At
             most one of read_only and read_only_fallback should be
             true.
+
+        username -- string with username to be used when authenticating.
+            These only need to be provided if you are connecting to an
+            authenticated server storage.
+
+        password -- string with plaintext password to be used
+            when authenticated.
+
+        Note that the authentication protocol is defined by the server
+        and is detected by the ClientStorage upon connecting (see
+        testConnection() and doAuth() for details).
         """
 
         self.logger = logging.getLogger("ZCS.%d" % os.getpid())
@@ -202,6 +214,9 @@
         self._conn_is_read_only = 0
         self._storage = storage
         self._read_only_fallback = read_only_fallback
+        self._username = username
+        self._password = password
+        self._realm = realm
         # _server_addr is used by sortKey()
         self._server_addr = None
         self._tfile = None
@@ -236,6 +251,21 @@
         self._oid_lock = threading.Lock()
         self._oids = [] # Object ids retrieved from newObjectIds()
 
+        # load() and tpc_finish() must be serialized to guarantee
+        # that cache modifications from each occur atomically.
+        # It also prevents multiple load calls occuring simultaneously,
+        # which simplifies the cache logic.
+        self._load_lock = threading.Lock()
+        # _load_oid and _load_status are protected by _lock
+        self._load_oid = None
+        self._load_status = None
+
+        # Can't read data in one thread while writing data
+        # (tpc_finish) in another thread.  In general, the lock
+        # must prevent access to the cache while _update_cache
+        # is executing.
+        self._lock = threading.Lock()
+
         t = self._ts = get_timestamp()
         self._serial = `t`
         self._oid = '\0\0\0\0\0\0\0\0'
@@ -330,6 +360,29 @@
         if cn is not None:
             cn.pending()
 
+    def doAuth(self, protocol, stub):
+        if not (self._username and self._password):
+            raise AuthError, "empty username or password"
+
+        module = get_module(protocol)
+        if not module:
+            log2(PROBLEM, "%s: no such an auth protocol: %s" %
+                 (self.__class__.__name__, protocol))
+            return
+
+        storage_class, client, db_class = module
+
+        if not client:
+            log2(PROBLEM,
+                 "%s: %s isn't a valid protocol, must have a Client class" %
+                 (self.__class__.__name__, protocol))
+            raise AuthError, "invalid protocol"
+
+        c = client(stub)
+
+        # Initiate authentication, returns boolean specifying whether OK
+        return c.start(self._username, self._realm, self._password)
+
     def testConnection(self, conn):
         """Internal: test the given connection.
 
@@ -355,6 +408,16 @@
         # XXX Check the protocol version here?
         self._conn_is_read_only = 0
         stub = self.StorageServerStubClass(conn)
+        
+        auth = stub.getAuthProtocol()
+        self.logger.info("Client authentication successful")
+        if auth:
+            if self.doAuth(auth, stub):
+                self.logger.info("Client authentication successful")
+            else:
+                self.logger.error("Authentication failed")
+                raise AuthError, "Authentication failed"
+
         try:
             stub.register(str(self._storage), self._is_read_only)
             return 1
@@ -406,6 +469,12 @@
         if not conn.is_async():
             self.logger.warn("Waiting for cache verification to finish")
             self._wait_sync()
+        self._handle_extensions()
+
+    def _handle_extensions(self):
+        for name in self.getExtensionMethods().keys():
+            if not hasattr(self, name):
+                setattr(self, name, self._server.extensionMethod(name))
 
     def update_interfaces(self):
         # Update what interfaces the instance provides based on the server.
@@ -600,12 +669,6 @@
         """
         return self._server.history(oid, version, length)
 
-    def __getattr__(self, name):
-        if self.getExtensionMethods().has_key(name):
-            return self._server.extensionMethod(name)
-        else:
-            raise AttributeError(name)
-
     def loadSerial(self, oid, serial):
         """Storage API: load a historical revision of an object."""
         return self._server.loadSerial(oid, serial)
@@ -621,14 +684,39 @@
         specified by the given object id and version, if they exist;
         otherwise a KeyError is raised.
         """
-        p = self._cache.load(oid, version)
-        if p:
-            return p
+        self._lock.acquire()    # for atomic processing of invalidations
+        try:
+            pair = self._cache.load(oid, version)
+            if pair:
+                return pair
+        finally:
+            self._lock.release()
+
         if self._server is None:
             raise ClientDisconnected()
-        p, s, v, pv, sv = self._server.zeoLoad(oid)
-        self._cache.checkSize(0)
-        self._cache.store(oid, p, s, v, pv, sv)
+
+        self._load_lock.acquire()
+        try:
+            self._lock.acquire()
+            try:
+                self._load_oid = oid
+                self._load_status = 1
+            finally:
+                self._lock.release()
+
+            p, s, v, pv, sv = self._server.zeoLoad(oid)
+
+            self._lock.acquire()    # for atomic processing of invalidations
+            try:
+                if self._load_status:
+                    self._cache.checkSize(0)
+                    self._cache.store(oid, p, s, v, pv, sv)
+                self._load_oid = None
+            finally:
+                self._lock.release()
+        finally:
+            self._load_lock.release()
+
         if v and version and v == version:
             return pv, sv
         else:
@@ -641,9 +729,13 @@
 
         If no version modified the object, return an empty string.
         """
-        v = self._cache.modifiedInVersion(oid)
-        if v is not None:
-            return v
+        self._lock.acquire()
+        try:
+            v = self._cache.modifiedInVersion(oid)
+            if v is not None:
+                return v
+        finally:
+            self._lock.release()
         return self._server.modifiedInVersion(oid)
 
     def newObjectId(self):
@@ -740,6 +832,7 @@
 
         self._serial = id
         self._seriald.clear()
+        self._tbuf.clear()
         del self._serials[:]
 
     def end_transaction(self):
@@ -779,18 +872,23 @@
         """Storage API: finish a transaction."""
         if transaction is not self._transaction:
             return
+        self._load_lock.acquire()
         try:
-            if f is not None:
-                f()
+            self._lock.acquire()  # for atomic processing of invalidations
+            try:
+                self._update_cache()
+                if f is not None:
+                    f()
+            finally:
+                self._lock.release()
 
             tid = self._server.tpcFinish(self._serial)
+            self._cache.setLastTid(tid)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
-
-            self._update_cache()
-            self._cache.setLastTid(tid)
         finally:
+            self._load_lock.release()
             self.end_transaction()
 
     def _update_cache(self):
@@ -799,6 +897,13 @@
         This iterates over the objects in the transaction buffer and
         update or invalidate the cache.
         """
+        # Must be called with _lock already acquired.
+
+        # XXX not sure why _update_cache() would be called on
+        # a closed storage.
+        if self._cache is None:
+            return
+
         self._cache.checkSize(self._tbuf.get_size())
         try:
             self._tbuf.begin_iterate()
@@ -892,15 +997,21 @@
         # oid, version pairs.  The DB's invalidate() method expects a
         # dictionary of oids.
 
-        # versions maps version names to dictionary of invalidations
-        versions = {}
-        for oid, version in invs:
-            d = versions.setdefault(version, {})
-            self._cache.invalidate(oid, version=version)
-            d[oid] = 1
-        if self._db is not None:
-            for v, d in versions.items():
-                self._db.invalidate(d, version=v)
+        self._lock.acquire()
+        try:
+            # versions maps version names to dictionary of invalidations
+            versions = {}
+            for oid, version in invs:
+                if oid == self._load_oid:
+                    self._load_status = 0
+                self._cache.invalidate(oid, version=version)
+                versions.setdefault(version, {})[oid] = 1
+
+            if self._db is not None:
+                for v, d in versions.items():
+                    self._db.invalidate(d, version=v)
+        finally:
+            self._lock.release()
 
     def endVerify(self):
         """Server callback to signal end of cache validation."""
@@ -928,7 +1039,7 @@
             self.logger.debug(
                 "Transactional invalidation during cache verification")
             for t in args:
-                self.self._pickler.dump(t)
+                self._pickler.dump(t)
             return
         self._process_invalidations(args)