[Zope3-checkins] CVS: Zope3/src/zodb/zeo - monitor.py:1.1.2.1 cache.py:1.2.10.1 client.py:1.4.2.1 server.py:1.6.2.1
Jeremy Hylton
jeremy@zope.com
Fri, 7 Feb 2003 13:28:04 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv29340/src/zodb/zeo
Modified Files:
Tag: ZODB3-2-integration-branch
cache.py client.py server.py
Added Files:
Tag: ZODB3-2-integration-branch
monitor.py
Log Message:
Snapshot of progress so Barry can take over.
client, server, and cache should be completely converted, although
it's wise to double-check. Some of the tests succeed, but many of
them fail. I believe the most significant outstanding problem is that
the exceptions were cleaned up in ZODB 3.2, but not yet in Zope3.
=== Added File Zope3/src/zodb/zeo/monitor.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.
$Id: monitor.py,v 1.1.2.1 2003/02/07 18:28:03 jeremy Exp $
"""
import asyncore
import socket
import time
import types
import zodb.zeo
import logging
class StorageStats:
"""Per-storage usage statistics."""
def __init__(self):
self.loads = 0
self.stores = 0
self.commits = 0
self.aborts = 0
self.active_txns = 0
self.clients = 0
self.verifying_clients = 0
self.lock_time = None
self.conflicts = 0
self.conflicts_resolved = 0
self.start = time.ctime()
def parse(self, s):
# parse the dump format
lines = s.split("\n")
for line in lines:
field, value = line.split(":", 1)
if field == "Server started":
self.start = value
elif field == "Clients":
self.clients = int(value)
elif field == "Clients verifying":
self.verifying_clients = int(value)
elif field == "Active transactions":
self.active_txns = int(value)
elif field == "Commit lock held for":
# This assumes
self.lock_time = time.time() - int(value)
elif field == "Commits":
self.commits = int(value)
elif field == "Aborts":
self.aborts = int(value)
elif field == "Loads":
self.loads = int(value)
elif field == "Stores":
self.stores = int(value)
elif field == "Conflicts":
self.conflicts = int(value)
elif field == "Conflicts resolved":
self.conflicts_resolved = int(value)
def dump(self, f):
print >> f, "Server started:", self.start
print >> f, "Clients:", self.clients
print >> f, "Clients verifying:", self.verifying_clients
print >> f, "Active transactions:", self.active_txns
if self.lock_time:
howlong = time.time() - self.lock_time
print >> f, "Commit lock held for:", int(howlong)
print >> f, "Commits:", self.commits
print >> f, "Aborts:", self.aborts
print >> f, "Loads:", self.loads
print >> f, "Stores:", self.stores
print >> f, "Conflicts:", self.conflicts
print >> f, "Conflicts resolved:", self.conflicts_resolved
class StatsClient(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.buf = []
self.closed = 0
def close(self):
self.closed = 1
# The socket is closed after all the data is written.
# See handle_write().
def write(self, s):
self.buf.append(s)
def writable(self):
return len(self.buf)
def readable(self):
# XXX what goes here?
return 0
def handle_write(self):
s = "".join(self.buf)
self.buf = []
n = self.socket.send(s)
if n < len(s):
self.buf.append(s[:n])
if self.closed and not self.buf:
asyncore.dispatcher.close(self)
class StatsServer(asyncore.dispatcher):
StatsConnectionClass = StatsClient
def __init__(self, addr, stats):
asyncore.dispatcher.__init__(self)
self.addr = addr
self.stats = stats
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
## zLOG.LOG("ZSM", zLOG.INFO, "monitor listening on %s" % repr(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
return
f = self.StatsConnectionClass(sock, addr)
self.dump(f)
f.close()
def dump(self, f):
print >> f, "ZEO monitor server version %s" % zodb.zeo.version
print >> f, time.ctime()
print >> f
L = self.stats.keys()
L.sort()
for k in L:
stats = self.stats[k]
print >> f, "Storage:", k
stats.dump(f)
print >> f
=== Zope3/src/zodb/zeo/cache.py 1.2 => 1.2.10.1 ===
--- Zope3/src/zodb/zeo/cache.py:1.2 Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/cache.py Fri Feb 7 13:28:03 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001. Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -31,11 +31,16 @@
0 or 1. Temporary cache files are unnamed files in the standard
temporary directory as determined by the tempfile module.
-The ClientStorage overrides the client name default to the value of
-the environment variable ZEO_CLIENT, if it exists.
+Each cache file has a 12-byte header followed by a sequence of
+records. The header format is as follows:
-Each cache file has a 4-byte magic number followed by a sequence of
-records of the form:
+ offset in header: name -- description
+
+ 0: magic -- 4-byte magic number, identifying this as a ZEO cache file
+
+ 4: lasttid -- 8-byte last transaction id
+
+Each record has the following form:
offset in record: name -- description
@@ -100,32 +105,33 @@
file 0 and file 1.
"""
+import logging
import os
import time
import tempfile
from struct import pack, unpack
from thread import allocate_lock
-import logging
from zodb.utils import u64
-from zodb.zeo.interfaces import ICache
+magic = 'ZEC1'
+headersize = 12
-magic='ZEC0'
+MB = 1024**2
class ClientCache:
- __implements__ = ICache
-
- def __init__(self, storage='1', size=20000000, client=None, var=None):
+ def __init__(self, storage='1', size=20*MB, client=None, var=None):
# Arguments:
# storage -- storage name (used in filenames and log messages)
# size -- size limit in bytes of both files together
# client -- if not None, use a persistent cache file and use this name
- # var -- directory where to create persistent cache files
+ # var -- directory where to create persistent cache files; default cwd
self._storage = storage
self._limit = size / 2
+ self._client = client
+ self._ltid = None # For getLastTid()
# Create a logger specific for this client cache
logger = logging.getLogger("ZEC.%s" % storage)
@@ -138,15 +144,8 @@
if client is not None:
# Create a persistent cache
- # CLIENT_HOME and INSTANCE_HOME are builtins set by App.FindHomes
if var is None:
- try:
- var = CLIENT_HOME
- except:
- try:
- var = os.path.join(INSTANCE_HOME, 'var')
- except:
- var = os.getcwd()
+ var = os.getcwd()
fmt = os.path.join(var, "c%s-%s-%%s.zec" % (storage, client))
# Initialize pairs of filenames, file objects, and serialnos.
@@ -158,9 +157,9 @@
fi = open(p[i],'r+b')
if fi.read(4) == magic: # Minimal sanity
fi.seek(0, 2)
- if fi.tell() > 30:
- # First serial is at offset 19 + 4 for magic
- fi.seek(23)
+ if fi.tell() > headersize:
+ # Read serial at offset 19 of first record
+ fi.seek(headersize + 19)
s[i] = fi.read(8)
# If we found a non-zero serial, then use the file
if s[i] != '\0\0\0\0\0\0\0\0':
@@ -176,17 +175,18 @@
if f[0] is None:
# We started, open the first cache file
f[0] = open(p[0], 'w+b')
- f[0].write(magic)
+ f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
f[1] = None
else:
self._f = f = [tempfile.TemporaryFile(suffix='.zec'), None]
# self._p file name 'None' signifies an unnamed temp file.
self._p = p = [None, None]
- f[0].write(magic)
+ f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
- self.log("size=%r; file[%r]=%r", size, current, p[current])
+ self.log("%s: storage=%r, size=%r; file[%r]=%r",
+ self.__class__.__name__, storage, size, current, p[current])
self._current = current
self._setup_trace()
@@ -222,6 +222,57 @@
except OSError:
pass
+ def getLastTid(self):
+ """Get the last transaction id stored by setLastTid().
+
+ If the cache is persistent, it is read from the current
+ cache file; otherwise it's an instance variable.
+ """
+ if self._client is None:
+ return self._ltid
+ else:
+ self._acquire()
+ try:
+ return self._getLastTid()
+ finally:
+ self._release()
+
+ def _getLastTid(self):
+ f = self._f[self._current]
+ f.seek(4)
+ tid = f.read(8)
+ if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+ return None
+ else:
+ return tid
+
+ def setLastTid(self, tid):
+ """Store the last transaction id.
+
+ If the cache is persistent, it is written to the current
+ cache file; otherwise it's an instance variable.
+ """
+ if self._client is None:
+ if tid == '\0\0\0\0\0\0\0\0':
+ tid = None
+ self._ltid = tid
+ else:
+ self._acquire()
+ try:
+ self._setLastTid(tid)
+ finally:
+ self._release()
+
+ def _setLastTid(self, tid):
+ if tid is None:
+ tid = '\0\0\0\0\0\0\0\0'
+ else:
+ tid = str(tid)
+ assert len(tid) == 8
+ f = self._f[self._current]
+ f.seek(4)
+ f.write(tid)
+
def verify(self, verifyFunc):
"""Call the verifyFunc on every object in the cache.
@@ -244,13 +295,13 @@
if len(h) != 27:
self.log("invalidate: short record for oid %16x "
"at position %d in cache file %d",
- u64(oid), ap, p < 0)
+ U64(oid), ap, p < 0)
del self._index[oid]
return None
if h[:8] != oid:
self.log("invalidate: oid mismatch: expected %16x read %16x "
"at position %d in cache file %d",
- u64(oid), u64(h[:8]), ap, p < 0)
+ U64(oid), U64(h[:8]), ap, p < 0)
del self._index[oid]
return None
f.seek(ap+8) # Switch from reading to writing
@@ -285,7 +336,7 @@
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
self.log("load: bad record for oid %16x "
"at position %d in cache file %d",
- u64(oid), ap, p < 0)
+ U64(oid), ap, p < 0)
del self._index[oid]
return None
@@ -456,7 +507,7 @@
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
self.log("modifiedInVersion: bad record for oid %16x "
"at position %d in cache file %d",
- u64(oid), ap, p < 0)
+ U64(oid), ap, p < 0)
del self._index[oid]
return None
@@ -480,6 +531,7 @@
self._acquire()
try:
if self._pos + size > self._limit:
+ ltid = self._getLastTid()
current = not self._current
self._current = current
self._trace(0x70)
@@ -503,8 +555,12 @@
else:
# Temporary cache file:
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
- self._f[current].write(magic)
- self._pos = 4
+ header = magic
+ if ltid:
+ header += ltid
+ self._f[current].write(header +
+ '\0' * (headersize - len(header)))
+ self._pos = headersize
finally:
self._release()
@@ -596,7 +652,7 @@
f = self._f[fileindex]
seek = f.seek
read = f.read
- pos = 4
+ pos = headersize
count = 0
while 1:
@@ -655,7 +711,6 @@
del serial[oid]
del index[oid]
-
pos = pos + tlen
count += 1
@@ -672,6 +727,6 @@
return pos
def rilog(self, msg, pos, fileindex):
- # Helper to log certain messages from read_index
+ # Helper to log messages from read_index
self.log("read_index: %s at position %d in cache file %d",
msg, pos, fileindex)
=== Zope3/src/zodb/zeo/client.py 1.4 => 1.4.2.1 ===
--- Zope3/src/zodb/zeo/client.py:1.4 Wed Feb 5 18:28:24 2003
+++ Zope3/src/zodb/zeo/client.py Fri Feb 7 13:28:03 2003
@@ -344,23 +344,31 @@
This is called by ConnectionManager after it has decided which
connection should be used.
"""
+ # XXX would like to report whether we get a read-only connection
if self._connection is not None:
- self.logger.warn("Reconnected to storage")
+ reconnect = 1
else:
- self.logger.warn("Connected to storage")
+ reconnect = 0
self.set_server_addr(conn.get_addr())
+
+ # If we are upgrading from a read-only fallback connection,
+ # we must close the old connection to prevent it from being
+ # used while the cache is verified against the new connection.
+ if self._connection is not None:
+ self._connection.close()
+ self._connection = conn
+
+ if reconnect:
+ self.logger.warn("Reconnected to storage: %s", self._server_addr)
+ else:
+ self.logger.warn("Connected to storage: %s", self._server_addr)
+
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
self.update_interfaces()
self.verify_cache(stub)
- # XXX The stub should be saved here and set in endVerify() below.
- if self._connection is not None:
- self._connection.close()
- self._connection = conn
- self._server = stub
-
def update_interfaces(self):
# Update instance's __implements__ based on the server.
L = [IStorage]
@@ -399,12 +407,52 @@
return self._server_addr
def verify_cache(self, server):
- """Internal routine called to verify the cache."""
- # XXX beginZeoVerify ends up calling back to beginVerify() below.
- # That whole exchange is rather unnecessary.
- server.beginZeoVerify()
+ """Internal routine called to verify the cache.
+
+ The return value (indicating which path we took) is used by
+ the test suite.
+ """
+
+ # If verify_cache() finishes the cache verification process,
+ # it should set self._server. If it goes through full cache
+ # verification, then endVerify() should self._server.
+
+ last_inval_tid = self._cache.getLastTid()
+ if last_inval_tid is not None:
+ ltid = server.lastTransaction()
+ if ltid == last_inval_tid:
+ self.logger.info("No verification necessary "
+ "(last_inval_tid up-to-date)")
+ self._cache.open()
+ self._server = server
+ self._ready.set()
+ return "no verification"
+
+ # log some hints about last transaction
+ self.logger.info("last inval tid: %r %s",
+ last_inval_tid, tid2time(last_inval_tid))
+ self.logger.info("last transaction: %r %s",
+ ltid, ltid and tid2time(ltid))
+
+ pair = server.getInvalidations(last_inval_tid)
+ if pair is not None:
+ self.logger.info("Recovering %d invalidations", len(pair[1]))
+ self._cache.open()
+ self.invalidateTransaction(*pair)
+ self._server = server
+ self._ready.set()
+ return "quick verification"
+
+ self.logger.info("Verifying cache")
+ # setup tempfile to hold zeoVerify results
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
self._cache.verify(server.zeoVerify)
+ self._pending_server = server
server.endZeoVerify()
+ return "full verification"
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
@@ -773,12 +821,6 @@
"""Server callback to update the info dictionary."""
self._info.update(dict)
- def beginVerify(self):
- """Server callback to signal start of cache validation."""
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
-
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
@@ -796,6 +838,7 @@
if self._pickler is None:
return
self._pickler.dump((0,0))
+ self._pickler = None
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
f = self._tfile
@@ -803,35 +846,31 @@
while 1:
oid, version = unpick.load()
+ self.logger.debug("verify invalidate %r", oid)
if not oid:
break
self._cache.invalidate(oid, version=version)
- self._db.invalidate(oid, version=version)
+ if self._db is not None:
+ self._db.invalidate(oid, version=version)
f.close()
- def invalidateTrans(self, args):
- """Server callback to invalidate a list of (oid, version) pairs.
-
- This is called as the result of a transaction.
- """
+ self._server = self._pending_server
+ self._ready.set()
+ self._pending_conn = None
+ self.logger.debug("verification finished")
+
+ def invalidateTransaction(self, tid, args):
+ """Invalidate objects modified by tid."""
+ self._cache.setLastTid(tid)
+ if self._pickler is not None:
+ self.logger.info("Transactional invalidation "
+ "during cache verification")
+ for t in args:
+ self.self._pickler.dump(t)
+ return
+ db = self._db
for oid, version in args:
self._cache.invalidate(oid, version=version)
- try:
- self._db.invalidate(oid, version=version)
- except AttributeError, msg:
- self.logger.error("Invalidate(%r, %r) failed for _db: %s",
- oid, version, msg)
-
- # XXX In Zope3, there's no reason to stick to the ZEO 2 protocol!
-
- # Unfortunately, the ZEO 2 wire protocol uses different names for
- # several of the callback methods invoked by the StorageServer.
- # We can't change the wire protocol at this point because that
- # would require synchronized updates of clients and servers and we
- # don't want that. So here we alias the old names to their new
- # implementations.
-
- begin = beginVerify
- invalidate = invalidateVerify
- end = endVerify
- Invalidate = invalidateTrans
+ if db is not None:
+ db.invalidate(oid, version=version)
+
=== Zope3/src/zodb/zeo/server.py 1.6 => 1.6.2.1 === (1030/1130 lines abridged)
--- Zope3/src/zodb/zeo/server.py:1.6 Wed Feb 5 18:28:24 2003
+++ Zope3/src/zodb/zeo/server.py Fri Feb 7 13:28:03 2003
@@ -20,190 +20,54 @@
exported for invocation by the server.
"""
+from __future__ import nested_scopes
+
import asyncore
import cPickle
+import logging
import os
import sys
import threading
import time
-import logging
-from zodb.zeo import stubs
+from zodb.zeo.stubs import ClientStorage
from zodb.zeo.commitlog import CommitLog
+from zodb.zeo.monitor import StorageStats, StatsServer
from zodb.zeo.zrpc.server import Dispatcher
from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from zodb.zeo.zrpc.trigger import trigger
-from zodb.ztransaction import Transaction
-from zodb.interfaces import TransactionError
+from zodb.interfaces import *
from zodb.storage.interfaces import *
+from zodb.conflict import ResolvedSerial
+from zodb.utils import u64
+from zodb.ztransaction import Transaction
from zope.interface.implements import objectImplements
class StorageServerError(StorageError):
"""Error reported when an unpickleable exception is raised."""
-class StorageServer:
-
- """The server side implementation of ZEO.
-
- The StorageServer is the 'manager' for incoming connections. Each
- connection is associated with its own ZEOStorage instance (defined
- below). The StorageServer may handle multiple storages; each
- ZEOStorage instance only handles a single storage.
- """
-
- # Classes we instantiate. A subclass might override.
-
[-=- -=- -=- 1030 lines omitted -=- -=- -=-]
+ self._client = client
+ self._deadline = time.time() + self._timeout
+ self._cond.notify()
+ finally:
+ self._cond.release()
+
+ def end(self, client):
+ # Called from the "main" thread whenever the storage lock is
+ # being released. (Serialized by asyncore.)
+ self._cond.acquire()
+ try:
+ assert self._client is not None
+ self._client = None
+ self._deadline = None
+ finally:
+ self._cond.release()
+
+ def run(self):
+ # Code running in the thread.
+ while 1:
+ self._cond.acquire()
+ try:
+ while self._deadline is None:
+ self._cond.wait()
+ howlong = self._deadline - time.time()
+ if howlong <= 0:
+ # Prevent reporting timeout more than once
+ self._deadline = None
+ client = self._client # For the howlong <= 0 branch below
+ finally:
+ self._cond.release()
+ if howlong <= 0:
+ client.logger.error("Transaction timeout after %s seconds",
+ self._timeout)
+ self._trigger.pull_trigger(lambda: client.connection.close())
+ else:
+ time.sleep(howlong)
+ self.trigger.close()
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)
@@ -782,8 +854,3 @@
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
-
-# Patch up class references
-StorageServer.ZEOStorageClass = ZEOStorage
-ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
-ZEOStorage.ImmediateCommitStrategyClass = ImmediateCommitStrategy