[Zodb-checkins] CVS: Zope3/src/zodb/zeo - stubs.py:1.4.2.1 server.py:1.6.2.2 interfaces.py:1.2.10.1 client.py:1.4.2.2

Jeremy Hylton jeremy@zope.com
Mon, 10 Feb 2003 17:08:35 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv19735/zeo

Modified Files:
      Tag: ZODB3-2-integration-branch
	stubs.py server.py interfaces.py client.py 
Log Message:
Merge more of ZODB3.2 into Zope3.

The big changes here are:
Move errors into interfaces and cleanup inheritance relationships.
Add new cache verification code to the client, including internal _ready.


=== Zope3/src/zodb/zeo/stubs.py 1.4 => 1.4.2.1 ===
--- Zope3/src/zodb/zeo/stubs.py:1.4	Wed Feb  5 18:28:24 2003
+++ Zope3/src/zodb/zeo/stubs.py	Mon Feb 10 17:08:31 2003
@@ -43,16 +43,16 @@
         self.rpc = rpc
 
     def beginVerify(self):
-        self.rpc.callAsync('begin')
+        self.rpc.callAsync('beginVerify')
 
     def invalidateVerify(self, args):
-        self.rpc.callAsync('invalidate', args)
+        self.rpc.callAsync('invalidateVerify', args)
 
     def endVerify(self):
-        self.rpc.callAsync('end')
+        self.rpc.callAsync('endVerify')
 
     def invalidateTrans(self, args):
-        self.rpc.callAsync('Invalidate', args)
+        self.rpc.callAsync('invalidateTransaction', args)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)


=== Zope3/src/zodb/zeo/server.py 1.6.2.1 => 1.6.2.2 ===
--- Zope3/src/zodb/zeo/server.py:1.6.2.1	Fri Feb  7 13:28:03 2003
+++ Zope3/src/zodb/zeo/server.py	Mon Feb 10 17:08:31 2003
@@ -45,6 +45,8 @@
 
 from zope.interface.implements import objectImplements
 
+from transaction.interfaces import TransactionError
+
 class StorageServerError(StorageError):
     """Error reported when an unpickleable exception is raised."""
 
@@ -110,6 +112,8 @@
         self.load = self.storage.load
         self.loadSerial = self.storage.loadSerial
         self.modifiedInVersion = self.storage.modifiedInVersion
+        self.getVersion = self.storage.getVersion
+        self.setVersion = self.storage.setVersion
         try:
             fn = self.storage.getExtensionMethods
         except AttributeError:
@@ -321,7 +325,7 @@
         tid = self.storage.lastTransaction()
         if self.invalidated:
             self.server.invalidate(self, self.storage_id, tid,
-                                   self.invalidated, self.get_size_info())
+                                   self.invalidated)
         self._clear_transaction()
         # Return the tid, for cache invalidation optimization
         return tid
@@ -378,12 +382,12 @@
     # The following four methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.
 
-    def vote(self, id):
+    def tpcVote(self, id):
         self._check_tid(id, exc=StorageTransactionError)
         if self.locked:
-            return self._vote()
+            return self._tpcVote()
         else:
-            return self._wait(lambda: self._vote())
+            return self._wait(lambda: self._tpcVote())
 
     def abortVersion(self, src, id):
         self._check_tid(id, exc=StorageTransactionError)
@@ -445,7 +449,7 @@
             self.stats.conflicts_resolved += 1
         self.serials.append((oid, newserial))
 
-    def _vote(self):
+    def _tpcVote(self):
         self.client.serialnos(self.serials)
         return self.storage.tpcVote(self.transaction)
 
@@ -666,7 +670,7 @@
         stats.clients += 1
         return self.timeouts[storage_id], stats
 
-    def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
+    def invalidate(self, conn, storage_id, tid, invalidated=()):
         """Internal: broadcast info and invalidations to clients.
 
         This is called from several ZEOStorage methods.
@@ -695,8 +699,6 @@
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
                 p.client.invalidateTransaction(tid, invalidated)
-            elif info is not None:
-                p.client.info(info)
 
     def get_invalidations(self, tid):
         """Return a tid and list of all objects invalidation since tid.


=== Zope3/src/zodb/zeo/interfaces.py 1.2 => 1.2.10.1 ===
--- Zope3/src/zodb/zeo/interfaces.py:1.2	Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/interfaces.py	Mon Feb 10 17:08:31 2003
@@ -16,78 +16,14 @@
 $Id$
 """
 
-from zope.interface import Interface
+from zodb.storage.interfaces import StorageError
 
-class Disconnected(Exception):
-    """A client is disconnected from a server."""
+class ClientStorageError(StorageError):
+    """An error occured in the ZEO Client Storage."""
 
-class ICache(Interface):
-    """ZEO client cache.
+class UnrecognizedResult(ClientStorageError):
+    """A server call returned an unrecognized result."""
 
-    __init__(storage, size, client, var)
+class ClientDisconnected(ClientStorageError):
+    """The database storage is disconnected from the storage."""
 
-    All arguments optional.
-
-    storage -- name of storage
-    size -- max size of cache in bytes
-    client -- a string; if specified, cache is persistent.
-    var -- var directory to store cache files in
-    """
-
-    def open():
-        """Returns a sequence of object info tuples.
-
-        An object info tuple is a pair containing an object id and a
-        pair of serialnos, a non-version serialno and a version serialno:
-        oid, (serial, ver_serial)
-
-        This method builds an index of the cache and returns a
-        sequence used for cache validation.
-        """
-
-    def close():
-        """Closes the cache."""
-
-    def verify(func):
-        """Call func on every object in cache.
-
-        func is called with three arguments
-        func(oid, serial, ver_serial)
-        """
-
-    def invalidate(oid, version):
-        """Remove object from cache."""
-
-    def load(oid, version):
-        """Load object from cache.
-
-        Return None if object not in cache.
-        Return data, serialno if object is in cache.
-        """
-
-    def store(oid, p, s, version, pv, sv):
-        """Store a new object in the cache."""
-
-    def update(oid, serial, version, data):
-        """Update an object already in the cache.
-
-        XXX This method is called to update objects that were modified by
-        a transaction.  It's likely that it is already in the cache,
-        and it may be possible for the implementation to operate more
-        efficiently.
-        """
-
-    def modifiedInVersion(oid):
-        """Return the version an object is modified in.
-
-        '' signifies the trunk.
-        Returns None if the object is not in the cache.
-        """
-
-    def checkSize(size):
-        """Check if adding size bytes would exceed cache limit.
-
-        This method is often called just before store or update.  The
-        size is a hint about the amount of data that is about to be
-        stored.  The cache may want to evict some data to make space.
-        """


=== Zope3/src/zodb/zeo/client.py 1.4.2.1 => 1.4.2.2 ===
--- Zope3/src/zodb/zeo/client.py:1.4.2.1	Fri Feb  7 13:28:03 2003
+++ Zope3/src/zodb/zeo/client.py	Mon Feb 10 17:08:31 2003
@@ -37,10 +37,10 @@
 import logging
 
 from zodb.zeo import cache, stubs
-from zodb.zeo.interfaces import Disconnected
 from zodb.zeo.tbuf import TransactionBuffer
 from zodb.zeo.zrpc.client import ConnectionManager
 
+from zodb.zeo.interfaces import *
 from zodb.storage.interfaces import *
 from zodb.timestamp import TimeStamp
 
@@ -49,15 +49,6 @@
 except ImportError:
     ResolvedSerial = 'rs'
 
-class ClientStorageError(StorageError):
-    """An error occured in the ZEO Client Storage."""
-
-class UnrecognizedResult(ClientStorageError):
-    """A server call returned an unrecognized result."""
-
-class ClientDisconnected(ClientStorageError, Disconnected):
-    """The database storage is disconnected from the storage."""
-
 def get_timestamp(prev_ts=None):
     """Internal helper to return a unique TimeStamp instance.
 
@@ -113,11 +104,9 @@
     __implements__ = IStorage
 
     def __init__(self, addr, storage='1', cache_size=20 * MB,
-                 name='', client=None, debug=0, var=None,
+                 name='', client=None, var=None,
                  min_disconnect_poll=5, max_disconnect_poll=300,
-                 wait_for_server_on_startup=None, # deprecated alias for wait
-                 wait=None, # defaults to 1
-                 read_only=0, read_only_fallback=0):
+                 wait=True, read_only=False, read_only_fallback=False):
 
         """ClientStorage constructor.
 
@@ -149,9 +138,6 @@
             effective value is true, the client cache is persistent.
             See ClientCache for more info.
 
-        debug -- Ignored.  This is present only for backwards
-            compatibility with ZEO 1.
-
         var -- The 'var' directory, defaulting to None, in which
             the persistent cache files should be written.
 
@@ -163,9 +149,6 @@
             attempts to connect to the server, in seconds.  Defaults
             to 300 seconds.
 
-        wait_for_server_on_startup -- A backwards compatible alias for
-            the wait argument.
-
         wait -- A flag indicating whether to wait until a connection
             with a server is made, defaulting to true.
 
@@ -189,33 +172,37 @@
                          read_only_fallback and "fallback" or "normal",
                          storage)
 
-        if debug:
-            self.logger.warn(
-                "ClientStorage(): debug argument is no longer used")
-
-        # wait defaults to True, but wait_for_server_on_startup overrides
-        # if not None
-        if wait_for_server_on_startup is not None:
-            if wait is not None and wait != wait_for_server_on_startup:
-                self.logger.error(
-                    "ClientStorage(): conflicting values for wait and "
-                    "wait_for_server_on_startup; wait prevails")
-            else:
-                self.logger.warn(
-                    "ClientStorage(): wait_for_server_on_startup "
-                    "is deprecated; please use wait instead")
-                wait = wait_for_server_on_startup
-        elif wait is None:
-            wait = 1
-
         self._addr = addr # For tests
+        # A ZEO client can run in disconnected mode, using data from
+        # its cache, or in connected mode.  Several instance variables
+        # are related to whether the client is connected.
+
+        # _server: All method calls are invoked through the server
+        #    stub.  When not connect, set to disconnected_stub an
+        #    object that raises ClientDisconnected errors.
+
+        # _ready: A threading Event that is set only if _server
+        #    is set to a real stub.
+
+        # _connection: The current zrpc connection or None.
+
+        # _connection is set as soon as a connection is established,
+        # but _server is set only after cache verification has finished
+        # and clients can safely use the server.  _pending_server holds
+        # a server stub while it is being verified.
+        
         self._server = disconnected_stub
+        self._connection = None
+        self._pending_server = None
+        self._ready = threading.Event()
+        
         self._is_read_only = read_only
         self._storage = storage
         self._read_only_fallback = read_only_fallback
-        self._connection = None
         # _server_addr is used by sortKey()
         self._server_addr = None
+        self._tfile = None
+        self._pickler = None
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client'}
         
@@ -265,10 +252,46 @@
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
 
-        # If we're connected at this point, the cache is opened as a
-        # side effect of verify_cache().  If not, open it now.
-        if not self.is_connected():
-            self._cache.open()
+        if wait:
+            self._wait()
+        else:
+            # attempt_connect() will make an attempt that doesn't block
+            # "too long," for a very vague notion of too long.  If that
+            # doesn't succeed, call connect() to start a thread.
+            if not self._rpc_mgr.attempt_connect():
+                self._rpc_mgr.connect()
+            # If the connect hasn't occurred, run with cached data.
+            if not self._ready.isSet():
+                self._cache.open()
+
+    def _wait(self):
+        # Wait for a connection to be established.
+        self._rpc_mgr.connect(sync=1)
+        # When a synchronous connect() call returns, there is
+        # a valid _connection object but cache validation may
+        # still be going on.  This code must wait until validation
+        # finishes, but if the connection isn't a zrpc async
+        # connection it also needs to poll for input.
+        if self._connection.is_async():
+            while 1:
+                self._ready.wait(30)
+                if self._ready.isSet():
+                    break
+                self.logger.warn("Wait for cache verification to finish")
+        else:
+            # If there is no mainloop running, this code needs
+            # to call poll() to cause asyncore to handle events.
+            while 1:
+                cn = self._connection
+                if cn is None:
+                    # If the connection was closed while we were
+                    # waiting for it to become ready, start over.
+                    return self._wait()
+                else:
+                    cn.pending(30)
+                if self._ready.isSet():
+                    break
+                self.logger.warn("Wait for cache verification to finish")
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -291,10 +314,9 @@
 
     def is_connected(self):
         """Return whether the storage is currently connected to a server."""
-        if self._server is disconnected_stub:
-            return 0
-        else:
-            return 1
+        # This function is used by clients, so we only report that a
+        # connection exists when the connection is ready to use.
+        return self._ready.isSet()
 
     def sync(self):
         """Handle any pending invalidation messages.