[Zope3-checkins] CVS: Zope3/src/zodb/zeo - monitor.py:1.2 cache.py:1.3 client.py:1.5 interfaces.py:1.3 server.py:1.7 stubs.py:1.5
Jeremy Hylton
jeremy@zope.com
Tue, 25 Feb 2003 13:55:36 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv23205/src/zodb/zeo
Modified Files:
cache.py client.py interfaces.py server.py stubs.py
Added Files:
monitor.py
Log Message:
Merge the ZODB3-2-integration branch to the trunk.
The primary changes here are to port the many new ZEO features from
ZODB 3.2. There are also many changes to the test suite.
python2.2 tells me: Ran 3755 tests in 560.277s
=== Zope3/src/zodb/zeo/monitor.py 1.1 => 1.2 ===
--- /dev/null Tue Feb 25 13:55:36 2003
+++ Zope3/src/zodb/zeo/monitor.py Tue Feb 25 13:55:05 2003
@@ -0,0 +1,162 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Monitor behavior of ZEO server and record statistics.
+
+$Id$
+"""
+
+import asyncore
+import socket
+import time
+import types
+
+import zodb.zeo
+import logging
+
+class StorageStats:
+ """Per-storage usage statistics."""
+
+ def __init__(self):
+ self.loads = 0
+ self.stores = 0
+ self.commits = 0
+ self.aborts = 0
+ self.active_txns = 0
+ self.clients = 0
+ self.verifying_clients = 0
+ self.lock_time = None
+ self.conflicts = 0
+ self.conflicts_resolved = 0
+ self.start = time.ctime()
+
+ def parse(self, s):
+ # parse the dump format
+ lines = s.split("\n")
+ for line in lines:
+ field, value = line.split(":", 1)
+ if field == "Server started":
+ self.start = value
+ elif field == "Clients":
+ self.clients = int(value)
+ elif field == "Clients verifying":
+ self.verifying_clients = int(value)
+ elif field == "Active transactions":
+ self.active_txns = int(value)
+ elif field == "Commit lock held for":
+ # This assumes
+ self.lock_time = time.time() - int(value)
+ elif field == "Commits":
+ self.commits = int(value)
+ elif field == "Aborts":
+ self.aborts = int(value)
+ elif field == "Loads":
+ self.loads = int(value)
+ elif field == "Stores":
+ self.stores = int(value)
+ elif field == "Conflicts":
+ self.conflicts = int(value)
+ elif field == "Conflicts resolved":
+ self.conflicts_resolved = int(value)
+
+ def dump(self, f):
+ print >> f, "Server started:", self.start
+ print >> f, "Clients:", self.clients
+ print >> f, "Clients verifying:", self.verifying_clients
+ print >> f, "Active transactions:", self.active_txns
+ if self.lock_time:
+ howlong = time.time() - self.lock_time
+ print >> f, "Commit lock held for:", int(howlong)
+ print >> f, "Commits:", self.commits
+ print >> f, "Aborts:", self.aborts
+ print >> f, "Loads:", self.loads
+ print >> f, "Stores:", self.stores
+ print >> f, "Conflicts:", self.conflicts
+ print >> f, "Conflicts resolved:", self.conflicts_resolved
+
+class StatsClient(asyncore.dispatcher):
+
+ def __init__(self, sock, addr):
+ asyncore.dispatcher.__init__(self, sock)
+ self.buf = []
+ self.closed = 0
+
+ def close(self):
+ self.closed = 1
+ # The socket is closed after all the data is written.
+ # See handle_write().
+
+ def write(self, s):
+ self.buf.append(s)
+
+ def writable(self):
+ return len(self.buf)
+
+ def readable(self):
+ # XXX what goes here?
+ return 0
+
+ def handle_write(self):
+ s = "".join(self.buf)
+ self.buf = []
+ n = self.socket.send(s)
+ if n < len(s):
+ self.buf.append(s[:n])
+
+ if self.closed and not self.buf:
+ asyncore.dispatcher.close(self)
+
+class StatsServer(asyncore.dispatcher):
+
+ StatsConnectionClass = StatsClient
+
+ def __init__(self, addr, stats):
+ asyncore.dispatcher.__init__(self)
+ self.addr = addr
+ self.stats = stats
+ if type(self.addr) == types.TupleType:
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ else:
+ self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+## zLOG.LOG("ZSM", zLOG.INFO, "monitor listening on %s" % repr(self.addr))
+ self.bind(self.addr)
+ self.listen(5)
+
+ def writable(self):
+ return 0
+
+ def readable(self):
+ return 1
+
+ def handle_accept(self):
+ try:
+ sock, addr = self.accept()
+ except socket.error:
+ return
+ f = self.StatsConnectionClass(sock, addr)
+ self.dump(f)
+ f.close()
+
+ def dump(self, f):
+ print >> f, "ZEO monitor server version %s" % zodb.zeo.version
+ print >> f, time.ctime()
+ print >> f
+
+ L = self.stats.keys()
+ L.sort()
+ for k in L:
+ stats = self.stats[k]
+ print >> f, "Storage:", k
+ stats.dump(f)
+ print >> f
=== Zope3/src/zodb/zeo/cache.py 1.2 => 1.3 ===
--- Zope3/src/zodb/zeo/cache.py:1.2 Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/cache.py Tue Feb 25 13:55:05 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001. Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -31,11 +31,16 @@
0 or 1. Temporary cache files are unnamed files in the standard
temporary directory as determined by the tempfile module.
-The ClientStorage overrides the client name default to the value of
-the environment variable ZEO_CLIENT, if it exists.
+Each cache file has a 12-byte header followed by a sequence of
+records. The header format is as follows:
-Each cache file has a 4-byte magic number followed by a sequence of
-records of the form:
+ offset in header: name -- description
+
+ 0: magic -- 4-byte magic number, identifying this as a ZEO cache file
+
+ 4: lasttid -- 8-byte last transaction id
+
+Each record has the following form:
offset in record: name -- description
@@ -100,32 +105,33 @@
file 0 and file 1.
"""
+import logging
import os
import time
import tempfile
from struct import pack, unpack
from thread import allocate_lock
-import logging
from zodb.utils import u64
-from zodb.zeo.interfaces import ICache
+magic = 'ZEC1'
+headersize = 12
-magic='ZEC0'
+MB = 1024**2
class ClientCache:
- __implements__ = ICache
-
- def __init__(self, storage='1', size=20000000, client=None, var=None):
+ def __init__(self, storage='1', size=20*MB, client=None, var=None):
# Arguments:
# storage -- storage name (used in filenames and log messages)
# size -- size limit in bytes of both files together
# client -- if not None, use a persistent cache file and use this name
- # var -- directory where to create persistent cache files
+ # var -- directory where to create persistent cache files; default cwd
self._storage = storage
self._limit = size / 2
+ self._client = client
+ self._ltid = None # For getLastTid()
# Create a logger specific for this client cache
logger = logging.getLogger("ZEC.%s" % storage)
@@ -138,15 +144,8 @@
if client is not None:
# Create a persistent cache
- # CLIENT_HOME and INSTANCE_HOME are builtins set by App.FindHomes
if var is None:
- try:
- var = CLIENT_HOME
- except:
- try:
- var = os.path.join(INSTANCE_HOME, 'var')
- except:
- var = os.getcwd()
+ var = os.getcwd()
fmt = os.path.join(var, "c%s-%s-%%s.zec" % (storage, client))
# Initialize pairs of filenames, file objects, and serialnos.
@@ -158,9 +157,9 @@
fi = open(p[i],'r+b')
if fi.read(4) == magic: # Minimal sanity
fi.seek(0, 2)
- if fi.tell() > 30:
- # First serial is at offset 19 + 4 for magic
- fi.seek(23)
+ if fi.tell() > headersize:
+ # Read serial at offset 19 of first record
+ fi.seek(headersize + 19)
s[i] = fi.read(8)
# If we found a non-zero serial, then use the file
if s[i] != '\0\0\0\0\0\0\0\0':
@@ -176,17 +175,18 @@
if f[0] is None:
# We started, open the first cache file
f[0] = open(p[0], 'w+b')
- f[0].write(magic)
+ f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
f[1] = None
else:
self._f = f = [tempfile.TemporaryFile(suffix='.zec'), None]
# self._p file name 'None' signifies an unnamed temp file.
self._p = p = [None, None]
- f[0].write(magic)
+ f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
- self.log("size=%r; file[%r]=%r", size, current, p[current])
+ self.log("%s: storage=%r, size=%r; file[%r]=%r",
+ self.__class__.__name__, storage, size, current, p[current])
self._current = current
self._setup_trace()
@@ -222,6 +222,57 @@
except OSError:
pass
+ def getLastTid(self):
+ """Get the last transaction id stored by setLastTid().
+
+ If the cache is persistent, it is read from the current
+ cache file; otherwise it's an instance variable.
+ """
+ if self._client is None:
+ return self._ltid
+ else:
+ self._acquire()
+ try:
+ return self._getLastTid()
+ finally:
+ self._release()
+
+ def _getLastTid(self):
+ f = self._f[self._current]
+ f.seek(4)
+ tid = f.read(8)
+ if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+ return None
+ else:
+ return tid
+
+ def setLastTid(self, tid):
+ """Store the last transaction id.
+
+ If the cache is persistent, it is written to the current
+ cache file; otherwise it's an instance variable.
+ """
+ if self._client is None:
+ if tid == '\0\0\0\0\0\0\0\0':
+ tid = None
+ self._ltid = tid
+ else:
+ self._acquire()
+ try:
+ self._setLastTid(tid)
+ finally:
+ self._release()
+
+ def _setLastTid(self, tid):
+ if tid is None:
+ tid = '\0\0\0\0\0\0\0\0'
+ else:
+ tid = str(tid)
+ assert len(tid) == 8
+ f = self._f[self._current]
+ f.seek(4)
+ f.write(tid)
+
def verify(self, verifyFunc):
"""Call the verifyFunc on every object in the cache.
@@ -244,13 +295,13 @@
if len(h) != 27:
self.log("invalidate: short record for oid %16x "
"at position %d in cache file %d",
- u64(oid), ap, p < 0)
+ U64(oid), ap, p < 0)
del self._index[oid]
return None
if h[:8] != oid:
self.log("invalidate: oid mismatch: expected %16x read %16x "
"at position %d in cache file %d",
- u64(oid), u64(h[:8]), ap, p < 0)
+ U64(oid), U64(h[:8]), ap, p < 0)
del self._index[oid]
return None
f.seek(ap+8) # Switch from reading to writing
@@ -285,7 +336,7 @@
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
self.log("load: bad record for oid %16x "
"at position %d in cache file %d",
- u64(oid), ap, p < 0)
+ U64(oid), ap, p < 0)
del self._index[oid]
return None
@@ -456,7 +507,7 @@
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
self.log("modifiedInVersion: bad record for oid %16x "
"at position %d in cache file %d",
- u64(oid), ap, p < 0)
+ U64(oid), ap, p < 0)
del self._index[oid]
return None
@@ -480,6 +531,7 @@
self._acquire()
try:
if self._pos + size > self._limit:
+ ltid = self._getLastTid()
current = not self._current
self._current = current
self._trace(0x70)
@@ -503,8 +555,12 @@
else:
# Temporary cache file:
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
- self._f[current].write(magic)
- self._pos = 4
+ header = magic
+ if ltid:
+ header += ltid
+ self._f[current].write(header +
+ '\0' * (headersize - len(header)))
+ self._pos = headersize
finally:
self._release()
@@ -596,7 +652,7 @@
f = self._f[fileindex]
seek = f.seek
read = f.read
- pos = 4
+ pos = headersize
count = 0
while 1:
@@ -655,7 +711,6 @@
del serial[oid]
del index[oid]
-
pos = pos + tlen
count += 1
@@ -672,6 +727,6 @@
return pos
def rilog(self, msg, pos, fileindex):
- # Helper to log certain messages from read_index
+ # Helper to log messages from read_index
self.log("read_index: %s at position %d in cache file %d",
msg, pos, fileindex)
=== Zope3/src/zodb/zeo/client.py 1.4 => 1.5 ===
--- Zope3/src/zodb/zeo/client.py:1.4 Wed Feb 5 18:28:24 2003
+++ Zope3/src/zodb/zeo/client.py Tue Feb 25 13:55:05 2003
@@ -22,11 +22,6 @@
ClientDisconnected -- exception raised by ClientStorage
"""
-# XXX TO DO
-# get rid of beginVerify, set up _tfile in verify_cache
-# set self._storage = stub later, in endVerify
-# if wait is given, wait until verify is complete
-
import cPickle
import os
import socket
@@ -36,11 +31,12 @@
import types
import logging
-from zodb.zeo import cache, stubs
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo import cache
+from zodb.zeo.stubs import StorageServerStub
from zodb.zeo.tbuf import TransactionBuffer
from zodb.zeo.zrpc.client import ConnectionManager
+from zodb.zeo.interfaces import *
from zodb.storage.interfaces import *
from zodb.timestamp import TimeStamp
@@ -49,14 +45,8 @@
except ImportError:
ResolvedSerial = 'rs'
-class ClientStorageError(StorageError):
- """An error occured in the ZEO Client Storage."""
-
-class UnrecognizedResult(ClientStorageError):
- """A server call returned an unrecognized result."""
-
-class ClientDisconnected(ClientStorageError, Disconnected):
- """The database storage is disconnected from the storage."""
+def tid2time(tid):
+ return str(TimeStamp(tid))
def get_timestamp(prev_ts=None):
"""Internal helper to return a unique TimeStamp instance.
@@ -88,7 +78,7 @@
MB = 1024**2
-class ClientStorage:
+class ClientStorage(object):
"""A Storage class that is a network client to a remote storage.
@@ -103,7 +93,7 @@
TransactionBufferClass = TransactionBuffer
ClientCacheClass = cache.ClientCache
ConnectionManagerClass = ConnectionManager
- StorageServerStubClass = stubs.StorageServer
+ StorageServerStubClass = StorageServerStub
# The exact storage interfaces depend on the server that the client
# connects to. We know that every storage must implement IStorage,
@@ -113,11 +103,9 @@
__implements__ = IStorage
def __init__(self, addr, storage='1', cache_size=20 * MB,
- name='', client=None, debug=0, var=None,
+ name='', client=None, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
- wait_for_server_on_startup=None, # deprecated alias for wait
- wait=None, # defaults to 1
- read_only=0, read_only_fallback=0):
+ wait=True, read_only=False, read_only_fallback=False):
"""ClientStorage constructor.
@@ -149,9 +137,6 @@
effective value is true, the client cache is persistent.
See ClientCache for more info.
- debug -- Ignored. This is present only for backwards
- compatibility with ZEO 1.
-
var -- The 'var' directory, defaulting to None, in which
the persistent cache files should be written.
@@ -163,9 +148,6 @@
attempts to connect to the server, in seconds. Defaults
to 300 seconds.
- wait_for_server_on_startup -- A backwards compatible alias for
- the wait argument.
-
wait -- A flag indicating whether to wait until a connection
with a server is made, defaulting to true.
@@ -189,33 +171,37 @@
read_only_fallback and "fallback" or "normal",
storage)
- if debug:
- self.logger.warn(
- "ClientStorage(): debug argument is no longer used")
-
- # wait defaults to True, but wait_for_server_on_startup overrides
- # if not None
- if wait_for_server_on_startup is not None:
- if wait is not None and wait != wait_for_server_on_startup:
- self.logger.error(
- "ClientStorage(): conflicting values for wait and "
- "wait_for_server_on_startup; wait prevails")
- else:
- self.logger.warn(
- "ClientStorage(): wait_for_server_on_startup "
- "is deprecated; please use wait instead")
- wait = wait_for_server_on_startup
- elif wait is None:
- wait = 1
-
self._addr = addr # For tests
+ # A ZEO client can run in disconnected mode, using data from
+ # its cache, or in connected mode. Several instance variables
+ # are related to whether the client is connected.
+
+ # _server: All method calls are invoked through the server
+ # stub. When not connect, set to disconnected_stub an
+ # object that raises ClientDisconnected errors.
+
+ # _ready: A threading Event that is set only if _server
+ # is set to a real stub.
+
+ # _connection: The current zrpc connection or None.
+
+ # _connection is set as soon as a connection is established,
+ # but _server is set only after cache verification has finished
+ # and clients can safely use the server. _pending_server holds
+ # a server stub while it is being verified.
+
self._server = disconnected_stub
+ self._connection = None
+ self._pending_server = None
+ self._ready = threading.Event()
+
self._is_read_only = read_only
self._storage = storage
self._read_only_fallback = read_only_fallback
- self._connection = None
# _server_addr is used by sortKey()
self._server_addr = None
+ self._tfile = None
+ self._pickler = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client'}
@@ -265,10 +251,46 @@
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
- # If we're connected at this point, the cache is opened as a
- # side effect of verify_cache(). If not, open it now.
- if not self.is_connected():
- self._cache.open()
+ if wait:
+ self._wait()
+ else:
+ # attempt_connect() will make an attempt that doesn't block
+ # "too long," for a very vague notion of too long. If that
+ # doesn't succeed, call connect() to start a thread.
+ if not self._rpc_mgr.attempt_connect():
+ self._rpc_mgr.connect()
+ # If the connect hasn't occurred, run with cached data.
+ if not self._ready.isSet():
+ self._cache.open()
+
+ def _wait(self):
+ # Wait for a connection to be established.
+ self._rpc_mgr.connect(sync=1)
+ # When a synchronous connect() call returns, there is
+ # a valid _connection object but cache validation may
+ # still be going on. This code must wait until validation
+ # finishes, but if the connection isn't a zrpc async
+ # connection it also needs to poll for input.
+ if self._connection.is_async():
+ while 1:
+ self._ready.wait(30)
+ if self._ready.isSet():
+ break
+ self.logger.warn("Wait for cache verification to finish")
+ else:
+ # If there is no mainloop running, this code needs
+ # to call poll() to cause asyncore to handle events.
+ while 1:
+ cn = self._connection
+ if cn is None:
+ # If the connection was closed while we were
+ # waiting for it to become ready, start over.
+ return self._wait()
+ else:
+ cn.pending(30)
+ if self._ready.isSet():
+ break
+ self.logger.warn("Wait for cache verification to finish")
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
@@ -291,17 +313,22 @@
def is_connected(self):
"""Return whether the storage is currently connected to a server."""
- if self._server is disconnected_stub:
- return 0
- else:
- return 1
+ # This function is used by clients, so we only report that a
+ # connection exists when the connection is ready to use.
+ return self._ready.isSet()
def sync(self):
"""Handle any pending invalidation messages.
This is called by the sync method in ZODB.Connection.
"""
- self._server._update()
+ # If there is no connection, return immediately. Technically,
+ # there are no pending invalidations so they are all handled.
+ # There doesn't seem to be much benefit to raising an exception.
+
+ cn = self._connection
+ if cn is not None:
+ cn.pending()
def testConnection(self, conn):
"""Internal: test the given connection.
@@ -344,23 +371,37 @@
This is called by ConnectionManager after it has decided which
connection should be used.
"""
+
+ if self._cache is None:
+ # the storage was closed, but the connect thread called
+ # this method before it was stopped.
+ return
+
+ # XXX would like to report whether we get a read-only connection
if self._connection is not None:
- self.logger.warn("Reconnected to storage")
+ reconnect = 1
else:
- self.logger.warn("Connected to storage")
+ reconnect = 0
self.set_server_addr(conn.get_addr())
+
+ # If we are upgrading from a read-only fallback connection,
+ # we must close the old connection to prevent it from being
+ # used while the cache is verified against the new connection.
+ if self._connection is not None:
+ self._connection.close()
+ self._connection = conn
+
+ if reconnect:
+ self.logger.warn("Reconnected to storage: %s", self._server_addr)
+ else:
+ self.logger.warn("Connected to storage: %s", self._server_addr)
+
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
self.update_interfaces()
self.verify_cache(stub)
- # XXX The stub should be saved here and set in endVerify() below.
- if self._connection is not None:
- self._connection.close()
- self._connection = conn
- self._server = stub
-
def update_interfaces(self):
# Update instance's __implements__ based on the server.
L = [IStorage]
@@ -399,12 +440,52 @@
return self._server_addr
def verify_cache(self, server):
- """Internal routine called to verify the cache."""
- # XXX beginZeoVerify ends up calling back to beginVerify() below.
- # That whole exchange is rather unnecessary.
- server.beginZeoVerify()
+ """Internal routine called to verify the cache.
+
+ The return value (indicating which path we took) is used by
+ the test suite.
+ """
+
+ # If verify_cache() finishes the cache verification process,
+ # it should set self._server. If it goes through full cache
+ # verification, then endVerify() should self._server.
+
+ last_inval_tid = self._cache.getLastTid()
+ if last_inval_tid is not None:
+ ltid = server.lastTransaction()
+ if ltid == last_inval_tid:
+ self.logger.info("No verification necessary "
+ "(last_inval_tid up-to-date)")
+ self._cache.open()
+ self._server = server
+ self._ready.set()
+ return "no verification"
+
+ # log some hints about last transaction
+ self.logger.info("last inval tid: %r %s",
+ last_inval_tid, tid2time(last_inval_tid))
+ self.logger.info("last transaction: %r %s",
+ ltid, ltid and tid2time(ltid))
+
+ pair = server.getInvalidations(last_inval_tid)
+ if pair is not None:
+ self.logger.info("Recovering %d invalidations", len(pair[1]))
+ self._cache.open()
+ self.invalidateTransaction(*pair)
+ self._server = server
+ self._ready.set()
+ return "quick verification"
+
+ self.logger.info("Verifying cache")
+ # setup tempfile to hold zeoVerify results
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
self._cache.verify(server.zeoVerify)
+ self._pending_server = server
server.endZeoVerify()
+ return "full verification"
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
@@ -422,6 +503,7 @@
"""
self.logger.error("Disconnected from storage")
self._connection = None
+ self._ready.clear()
self._server = disconnected_stub
def getName(self):
@@ -664,14 +746,22 @@
def tpcAbort(self, transaction):
"""Storage API: abort a transaction."""
+ """Storage API: abort a transaction."""
if transaction is not self._transaction:
return
try:
- self._server.tpcAbort(self._serial)
+ # XXX Are there any transactions that should prevent an
+ # abort from occurring? It seems wrong to swallow them
+ # all, yet you want to be sure that other abort logic is
+ # executed regardless.
+ try:
+ self._server.tpcAbort(self._serial)
+ except ClientDisconnected:
+ self.logger.info("ClientDisconnected in tpcAbort() ignored")
+ finally:
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
- finally:
self.end_transaction()
def tpcFinish(self, transaction, f=None):
@@ -682,12 +772,13 @@
if f is not None:
f()
- self._server.tpcFinish(self._serial)
+ tid = self._server.tpcFinish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
self._update_cache()
+ self._cache.setLastTid(tid)
finally:
self.end_transaction()
@@ -773,12 +864,6 @@
"""Server callback to update the info dictionary."""
self._info.update(dict)
- def beginVerify(self):
- """Server callback to signal start of cache validation."""
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
-
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
@@ -796,6 +881,7 @@
if self._pickler is None:
return
self._pickler.dump((0,0))
+ self._pickler = None
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
f = self._tfile
@@ -803,35 +889,31 @@
while 1:
oid, version = unpick.load()
+ self.logger.debug("verify invalidate %r", oid)
if not oid:
break
self._cache.invalidate(oid, version=version)
- self._db.invalidate(oid, version=version)
+ if self._db is not None:
+ self._db.invalidate(oid, version=version)
f.close()
- def invalidateTrans(self, args):
- """Server callback to invalidate a list of (oid, version) pairs.
-
- This is called as the result of a transaction.
- """
+ self._server = self._pending_server
+ self._ready.set()
+ self._pending_conn = None
+ self.logger.debug("verification finished")
+
+ def invalidateTransaction(self, tid, args):
+ """Invalidate objects modified by tid."""
+ self._cache.setLastTid(tid)
+ if self._pickler is not None:
+ self.logger.info("Transactional invalidation "
+ "during cache verification")
+ for t in args:
+ self.self._pickler.dump(t)
+ return
+ db = self._db
for oid, version in args:
self._cache.invalidate(oid, version=version)
- try:
- self._db.invalidate(oid, version=version)
- except AttributeError, msg:
- self.logger.error("Invalidate(%r, %r) failed for _db: %s",
- oid, version, msg)
-
- # XXX In Zope3, there's no reason to stick to the ZEO 2 protocol!
-
- # Unfortunately, the ZEO 2 wire protocol uses different names for
- # several of the callback methods invoked by the StorageServer.
- # We can't change the wire protocol at this point because that
- # would require synchronized updates of clients and servers and we
- # don't want that. So here we alias the old names to their new
- # implementations.
-
- begin = beginVerify
- invalidate = invalidateVerify
- end = endVerify
- Invalidate = invalidateTrans
+ if db is not None:
+ db.invalidate(oid, version=version)
+
=== Zope3/src/zodb/zeo/interfaces.py 1.2 => 1.3 ===
--- Zope3/src/zodb/zeo/interfaces.py:1.2 Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/interfaces.py Tue Feb 25 13:55:05 2003
@@ -16,78 +16,14 @@
$Id$
"""
-from zope.interface import Interface
+from zodb.storage.interfaces import StorageError
-class Disconnected(Exception):
- """A client is disconnected from a server."""
+class ClientStorageError(StorageError):
+ """An error occured in the ZEO Client Storage."""
-class ICache(Interface):
- """ZEO client cache.
+class UnrecognizedResult(ClientStorageError):
+ """A server call returned an unrecognized result."""
- __init__(storage, size, client, var)
+class ClientDisconnected(ClientStorageError):
+ """The database storage is disconnected from the storage."""
- All arguments optional.
-
- storage -- name of storage
- size -- max size of cache in bytes
- client -- a string; if specified, cache is persistent.
- var -- var directory to store cache files in
- """
-
- def open():
- """Returns a sequence of object info tuples.
-
- An object info tuple is a pair containing an object id and a
- pair of serialnos, a non-version serialno and a version serialno:
- oid, (serial, ver_serial)
-
- This method builds an index of the cache and returns a
- sequence used for cache validation.
- """
-
- def close():
- """Closes the cache."""
-
- def verify(func):
- """Call func on every object in cache.
-
- func is called with three arguments
- func(oid, serial, ver_serial)
- """
-
- def invalidate(oid, version):
- """Remove object from cache."""
-
- def load(oid, version):
- """Load object from cache.
-
- Return None if object not in cache.
- Return data, serialno if object is in cache.
- """
-
- def store(oid, p, s, version, pv, sv):
- """Store a new object in the cache."""
-
- def update(oid, serial, version, data):
- """Update an object already in the cache.
-
- XXX This method is called to update objects that were modified by
- a transaction. It's likely that it is already in the cache,
- and it may be possible for the implementation to operate more
- efficiently.
- """
-
- def modifiedInVersion(oid):
- """Return the version an object is modified in.
-
- '' signifies the trunk.
- Returns None if the object is not in the cache.
- """
-
- def checkSize(size):
- """Check if adding size bytes would exceed cache limit.
-
- This method is often called just before store or update. The
- size is a hint about the amount of data that is about to be
- stored. The cache may want to evict some data to make space.
- """
=== Zope3/src/zodb/zeo/server.py 1.6 => 1.7 === (1037/1137 lines abridged)
--- Zope3/src/zodb/zeo/server.py:1.6 Wed Feb 5 18:28:24 2003
+++ Zope3/src/zodb/zeo/server.py Tue Feb 25 13:55:05 2003
@@ -20,190 +20,56 @@
exported for invocation by the server.
"""
+from __future__ import nested_scopes
+
import asyncore
import cPickle
+import logging
import os
import sys
import threading
import time
-import logging
-from zodb.zeo import stubs
+from zodb.zeo.stubs import ClientStorageStub
from zodb.zeo.commitlog import CommitLog
+from zodb.zeo.monitor import StorageStats, StatsServer
from zodb.zeo.zrpc.server import Dispatcher
from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from zodb.zeo.zrpc.trigger import trigger
-from zodb.ztransaction import Transaction
-from zodb.interfaces import TransactionError
+from zodb.interfaces import *
from zodb.storage.interfaces import *
+from zodb.conflict import ResolvedSerial
+from zodb.utils import u64
+from zodb.ztransaction import Transaction
from zope.interface.implements import objectImplements
+from transaction.interfaces import TransactionError
+
class StorageServerError(StorageError):
"""Error reported when an unpickleable exception is raised."""
-class StorageServer:
-
- """The server side implementation of ZEO.
-
- The StorageServer is the 'manager' for incoming connections. Each
- connection is associated with its own ZEOStorage instance (defined
- below). The StorageServer may handle multiple storages; each
- ZEOStorage instance only handles a single storage.
- """
-
[-=- -=- -=- 1037 lines omitted -=- -=- -=-]
+ self._client = client
+ self._deadline = time.time() + self._timeout
+ self._cond.notify()
+ finally:
+ self._cond.release()
+
+ def end(self, client):
+ # Called from the "main" thread whenever the storage lock is
+ # being released. (Serialized by asyncore.)
+ self._cond.acquire()
+ try:
+ assert self._client is not None
+ self._client = None
+ self._deadline = None
+ finally:
+ self._cond.release()
+
+ def run(self):
+ # Code running in the thread.
+ while 1:
+ self._cond.acquire()
+ try:
+ while self._deadline is None:
+ self._cond.wait()
+ howlong = self._deadline - time.time()
+ if howlong <= 0:
+ # Prevent reporting timeout more than once
+ self._deadline = None
+ client = self._client # For the howlong <= 0 branch below
+ finally:
+ self._cond.release()
+ if howlong <= 0:
+ client.logger.error("Transaction timeout after %s seconds",
+ self._timeout)
+ self._trigger.pull_trigger(lambda: client.connection.close())
+ else:
+ time.sleep(howlong)
+ self.trigger.close()
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)
@@ -782,8 +857,3 @@
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
-
-# Patch up class references
-StorageServer.ZEOStorageClass = ZEOStorage
-ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
-ZEOStorage.ImmediateCommitStrategyClass = ImmediateCommitStrategy
=== Zope3/src/zodb/zeo/stubs.py 1.4 => 1.5 ===
--- Zope3/src/zodb/zeo/stubs.py:1.4 Wed Feb 5 18:28:24 2003
+++ Zope3/src/zodb/zeo/stubs.py Tue Feb 25 13:55:05 2003
@@ -13,7 +13,7 @@
##############################################################################
"""RPC stubs for client and server interfaces."""
-class ClientStorage:
+class ClientStorageStub:
"""An RPC stub class for the interface exported by ClientStorage.
This is the interface presented by ClientStorage to the
@@ -43,16 +43,16 @@
self.rpc = rpc
def beginVerify(self):
- self.rpc.callAsync('begin')
+ self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
- self.rpc.callAsync('invalidate', args)
+ self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
- self.rpc.callAsync('end')
+ self.rpc.callAsync('endVerify')
- def invalidateTrans(self, args):
- self.rpc.callAsync('Invalidate', args)
+ def invalidateTransaction(self, tid, invlist):
+ self.rpc.callAsync('invalidateTransaction', tid, invlist)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
@@ -60,7 +60,7 @@
def info(self, arg):
self.rpc.callAsync('info', arg)
-class StorageServer:
+class StorageServerStub:
"""An RPC stub class for the interface exported by ClientStorage.
This is the interface presented by the StorageServer to the
@@ -103,6 +103,9 @@
def get_info(self):
return self.rpc.call('get_info')
+ def getInvalidations(self, tid):
+ return self.rpc.call('getInvalidations', tid)
+
def beginZeoVerify(self):
self.rpc.callAsync('beginZeoVerify')
@@ -168,6 +171,9 @@
return self.rpc.call('new_oid')
else:
return self.rpc.call('new_oid', last)
+
+ def lastTransaction(self):
+ return self.rpc.call('lastTransaction')
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)