[Zope3-checkins] CVS: Zope3/src/zodb/zeo - stubs.py:1.4.2.1 server.py:1.6.2.2 interfaces.py:1.2.10.1 client.py:1.4.2.2
Jeremy Hylton
jeremy@zope.com
Mon, 10 Feb 2003 17:08:34 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv19735/zeo
Modified Files:
Tag: ZODB3-2-integration-branch
stubs.py server.py interfaces.py client.py
Log Message:
Merge more of ZODB3.2 into Zope3.
The big changes here are:
Move errors into interfaces and cleanup inheritance relationships.
Add new cache verification code to the client, including internal _ready.
=== Zope3/src/zodb/zeo/stubs.py 1.4 => 1.4.2.1 ===
--- Zope3/src/zodb/zeo/stubs.py:1.4 Wed Feb 5 18:28:24 2003
+++ Zope3/src/zodb/zeo/stubs.py Mon Feb 10 17:08:31 2003
@@ -43,16 +43,16 @@
self.rpc = rpc
def beginVerify(self):
- self.rpc.callAsync('begin')
+ self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
- self.rpc.callAsync('invalidate', args)
+ self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
- self.rpc.callAsync('end')
+ self.rpc.callAsync('endVerify')
def invalidateTrans(self, args):
- self.rpc.callAsync('Invalidate', args)
+ self.rpc.callAsync('invalidateTransaction', args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
=== Zope3/src/zodb/zeo/server.py 1.6.2.1 => 1.6.2.2 ===
--- Zope3/src/zodb/zeo/server.py:1.6.2.1 Fri Feb 7 13:28:03 2003
+++ Zope3/src/zodb/zeo/server.py Mon Feb 10 17:08:31 2003
@@ -45,6 +45,8 @@
from zope.interface.implements import objectImplements
+from transaction.interfaces import TransactionError
+
class StorageServerError(StorageError):
"""Error reported when an unpickleable exception is raised."""
@@ -110,6 +112,8 @@
self.load = self.storage.load
self.loadSerial = self.storage.loadSerial
self.modifiedInVersion = self.storage.modifiedInVersion
+ self.getVersion = self.storage.getVersion
+ self.setVersion = self.storage.setVersion
try:
fn = self.storage.getExtensionMethods
except AttributeError:
@@ -321,7 +325,7 @@
tid = self.storage.lastTransaction()
if self.invalidated:
self.server.invalidate(self, self.storage_id, tid,
- self.invalidated, self.get_size_info())
+ self.invalidated)
self._clear_transaction()
# Return the tid, for cache invalidation optimization
return tid
@@ -378,12 +382,12 @@
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
- def vote(self, id):
+ def tpcVote(self, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
- return self._vote()
+ return self._tpcVote()
else:
- return self._wait(lambda: self._vote())
+ return self._wait(lambda: self._tpcVote())
def abortVersion(self, src, id):
self._check_tid(id, exc=StorageTransactionError)
@@ -445,7 +449,7 @@
self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial))
- def _vote(self):
+ def _tpcVote(self):
self.client.serialnos(self.serials)
return self.storage.tpcVote(self.transaction)
@@ -666,7 +670,7 @@
stats.clients += 1
return self.timeouts[storage_id], stats
- def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
+ def invalidate(self, conn, storage_id, tid, invalidated=()):
"""Internal: broadcast info and invalidations to clients.
This is called from several ZEOStorage methods.
@@ -695,8 +699,6 @@
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated)
- elif info is not None:
- p.client.info(info)
def get_invalidations(self, tid):
"""Return a tid and list of all objects invalidation since tid.
=== Zope3/src/zodb/zeo/interfaces.py 1.2 => 1.2.10.1 ===
--- Zope3/src/zodb/zeo/interfaces.py:1.2 Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/interfaces.py Mon Feb 10 17:08:31 2003
@@ -16,78 +16,14 @@
$Id$
"""
-from zope.interface import Interface
+from zodb.storage.interfaces import StorageError
-class Disconnected(Exception):
- """A client is disconnected from a server."""
+class ClientStorageError(StorageError):
+ """An error occured in the ZEO Client Storage."""
-class ICache(Interface):
- """ZEO client cache.
+class UnrecognizedResult(ClientStorageError):
+ """A server call returned an unrecognized result."""
- __init__(storage, size, client, var)
+class ClientDisconnected(ClientStorageError):
+ """The database storage is disconnected from the storage."""
- All arguments optional.
-
- storage -- name of storage
- size -- max size of cache in bytes
- client -- a string; if specified, cache is persistent.
- var -- var directory to store cache files in
- """
-
- def open():
- """Returns a sequence of object info tuples.
-
- An object info tuple is a pair containing an object id and a
- pair of serialnos, a non-version serialno and a version serialno:
- oid, (serial, ver_serial)
-
- This method builds an index of the cache and returns a
- sequence used for cache validation.
- """
-
- def close():
- """Closes the cache."""
-
- def verify(func):
- """Call func on every object in cache.
-
- func is called with three arguments
- func(oid, serial, ver_serial)
- """
-
- def invalidate(oid, version):
- """Remove object from cache."""
-
- def load(oid, version):
- """Load object from cache.
-
- Return None if object not in cache.
- Return data, serialno if object is in cache.
- """
-
- def store(oid, p, s, version, pv, sv):
- """Store a new object in the cache."""
-
- def update(oid, serial, version, data):
- """Update an object already in the cache.
-
- XXX This method is called to update objects that were modified by
- a transaction. It's likely that it is already in the cache,
- and it may be possible for the implementation to operate more
- efficiently.
- """
-
- def modifiedInVersion(oid):
- """Return the version an object is modified in.
-
- '' signifies the trunk.
- Returns None if the object is not in the cache.
- """
-
- def checkSize(size):
- """Check if adding size bytes would exceed cache limit.
-
- This method is often called just before store or update. The
- size is a hint about the amount of data that is about to be
- stored. The cache may want to evict some data to make space.
- """
=== Zope3/src/zodb/zeo/client.py 1.4.2.1 => 1.4.2.2 ===
--- Zope3/src/zodb/zeo/client.py:1.4.2.1 Fri Feb 7 13:28:03 2003
+++ Zope3/src/zodb/zeo/client.py Mon Feb 10 17:08:31 2003
@@ -37,10 +37,10 @@
import logging
from zodb.zeo import cache, stubs
-from zodb.zeo.interfaces import Disconnected
from zodb.zeo.tbuf import TransactionBuffer
from zodb.zeo.zrpc.client import ConnectionManager
+from zodb.zeo.interfaces import *
from zodb.storage.interfaces import *
from zodb.timestamp import TimeStamp
@@ -49,15 +49,6 @@
except ImportError:
ResolvedSerial = 'rs'
-class ClientStorageError(StorageError):
- """An error occured in the ZEO Client Storage."""
-
-class UnrecognizedResult(ClientStorageError):
- """A server call returned an unrecognized result."""
-
-class ClientDisconnected(ClientStorageError, Disconnected):
- """The database storage is disconnected from the storage."""
-
def get_timestamp(prev_ts=None):
"""Internal helper to return a unique TimeStamp instance.
@@ -113,11 +104,9 @@
__implements__ = IStorage
def __init__(self, addr, storage='1', cache_size=20 * MB,
- name='', client=None, debug=0, var=None,
+ name='', client=None, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
- wait_for_server_on_startup=None, # deprecated alias for wait
- wait=None, # defaults to 1
- read_only=0, read_only_fallback=0):
+ wait=True, read_only=False, read_only_fallback=False):
"""ClientStorage constructor.
@@ -149,9 +138,6 @@
effective value is true, the client cache is persistent.
See ClientCache for more info.
- debug -- Ignored. This is present only for backwards
- compatibility with ZEO 1.
-
var -- The 'var' directory, defaulting to None, in which
the persistent cache files should be written.
@@ -163,9 +149,6 @@
attempts to connect to the server, in seconds. Defaults
to 300 seconds.
- wait_for_server_on_startup -- A backwards compatible alias for
- the wait argument.
-
wait -- A flag indicating whether to wait until a connection
with a server is made, defaulting to true.
@@ -189,33 +172,37 @@
read_only_fallback and "fallback" or "normal",
storage)
- if debug:
- self.logger.warn(
- "ClientStorage(): debug argument is no longer used")
-
- # wait defaults to True, but wait_for_server_on_startup overrides
- # if not None
- if wait_for_server_on_startup is not None:
- if wait is not None and wait != wait_for_server_on_startup:
- self.logger.error(
- "ClientStorage(): conflicting values for wait and "
- "wait_for_server_on_startup; wait prevails")
- else:
- self.logger.warn(
- "ClientStorage(): wait_for_server_on_startup "
- "is deprecated; please use wait instead")
- wait = wait_for_server_on_startup
- elif wait is None:
- wait = 1
-
self._addr = addr # For tests
+ # A ZEO client can run in disconnected mode, using data from
+ # its cache, or in connected mode. Several instance variables
+ # are related to whether the client is connected.
+
+ # _server: All method calls are invoked through the server
+ # stub. When not connect, set to disconnected_stub an
+ # object that raises ClientDisconnected errors.
+
+ # _ready: A threading Event that is set only if _server
+ # is set to a real stub.
+
+ # _connection: The current zrpc connection or None.
+
+ # _connection is set as soon as a connection is established,
+ # but _server is set only after cache verification has finished
+ # and clients can safely use the server. _pending_server holds
+ # a server stub while it is being verified.
+
self._server = disconnected_stub
+ self._connection = None
+ self._pending_server = None
+ self._ready = threading.Event()
+
self._is_read_only = read_only
self._storage = storage
self._read_only_fallback = read_only_fallback
- self._connection = None
# _server_addr is used by sortKey()
self._server_addr = None
+ self._tfile = None
+ self._pickler = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client'}
@@ -265,10 +252,46 @@
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
- # If we're connected at this point, the cache is opened as a
- # side effect of verify_cache(). If not, open it now.
- if not self.is_connected():
- self._cache.open()
+ if wait:
+ self._wait()
+ else:
+ # attempt_connect() will make an attempt that doesn't block
+ # "too long," for a very vague notion of too long. If that
+ # doesn't succeed, call connect() to start a thread.
+ if not self._rpc_mgr.attempt_connect():
+ self._rpc_mgr.connect()
+ # If the connect hasn't occurred, run with cached data.
+ if not self._ready.isSet():
+ self._cache.open()
+
+ def _wait(self):
+ # Wait for a connection to be established.
+ self._rpc_mgr.connect(sync=1)
+ # When a synchronous connect() call returns, there is
+ # a valid _connection object but cache validation may
+ # still be going on. This code must wait until validation
+ # finishes, but if the connection isn't a zrpc async
+ # connection it also needs to poll for input.
+ if self._connection.is_async():
+ while 1:
+ self._ready.wait(30)
+ if self._ready.isSet():
+ break
+ self.logger.warn("Wait for cache verification to finish")
+ else:
+ # If there is no mainloop running, this code needs
+ # to call poll() to cause asyncore to handle events.
+ while 1:
+ cn = self._connection
+ if cn is None:
+ # If the connection was closed while we were
+ # waiting for it to become ready, start over.
+ return self._wait()
+ else:
+ cn.pending(30)
+ if self._ready.isSet():
+ break
+ self.logger.warn("Wait for cache verification to finish")
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
@@ -291,10 +314,9 @@
def is_connected(self):
"""Return whether the storage is currently connected to a server."""
- if self._server is disconnected_stub:
- return 0
- else:
- return 1
+ # This function is used by clients, so we only report that a
+ # connection exists when the connection is ready to use.
+ return self._ready.isSet()
def sync(self):
"""Handle any pending invalidation messages.