[Zodb-checkins] CVS: ZODB3/ZEO - monitor.py:1.1 StorageServer.py:1.89 runsvr.py:1.27 schema.xml:1.5
Jeremy Hylton
jeremy@zope.com
Thu, 9 Jan 2003 16:50:52 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv26741
Modified Files:
StorageServer.py runsvr.py schema.xml
Added Files:
monitor.py
Log Message:
Add provisional monitor server that reports server statistics
Also, remove unused reuse_addr arg to ZEO.zrpc.server. The server was
always calling set_reuse_addr().
No tests yet, that's the next step. Simple functional tests work.
=== Added File ZODB3/ZEO/monitor.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.
$id:$
"""
import asyncore
import socket
import time
import types
import ZEO
class StorageStats:
"""Per-storage usage statistics."""
def __init__(self):
self.loads = 0
self.stores = 0
self.commits = 0
self.aborts = 0
self.active_txns = 0
self.clients = 0
self.verifying_clients = 0
self.lock_time = None
self.conflicts = 0
self.conflicts_resolved = 0
def dump(self, f):
print >> f, "Clients:", self.clients
print >> f, "Clients verifying:", self.verifying_clients
print >> f, "Active transactions:", self.active_txns
if self.lock_time:
howlong = time.time() - self.lock_time
print >> f, "Commit lock held for:", int(howlong)
print >> f, "Commits:", self.commits
print >> f, "Aborts:", self.aborts
print >> f, "Loads:", self.loads
print >> f, "Stores:", self.stores
print >> f, "Conflicts:", self.conflicts
print >> f, "Conflicts resolved:", self.conflicts_resolved
class StatsClient(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.buf = []
self.closed = 0
def close(self):
self.closed = 1
# The socket is closed after all the data is written.
# See handle_write().
def write(self, s):
self.buf.append(s)
def writable(self):
return len(self.buf)
def readable(self):
# XXX what goes here?
return 0
def handle_write(self):
s = "".join(self.buf)
self.buf = []
n = self.socket.send(s)
if n < len(s):
self.buf.append(s[:n])
if self.closed and not self.buf:
asyncore.dispatcher.close(self)
class StatsServer(asyncore.dispatcher):
StatsConnectionClass = StatsClient
def __init__(self, addr, stats):
asyncore.dispatcher.__init__(self)
self.addr = addr
self.stats = stats
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
return
f = self.StatsConnectionClass(sock, addr)
self.dump(f)
f.close()
def dump(self, f):
print >> f, "ZEO monitor server version %s" % ZEO.version
print >> f, time.ctime()
print >> f
L = self.stats.keys()
L.sort()
for k in L:
stats = self.stats[k]
print >> f, "Storage:", k
stats.dump(f)
print >> f
=== ZODB3/ZEO/StorageServer.py 1.88 => 1.89 ===
--- ZODB3/ZEO/StorageServer.py:1.88 Thu Jan 9 13:45:08 2003
+++ ZODB3/ZEO/StorageServer.py Thu Jan 9 16:50:18 2003
@@ -31,13 +31,15 @@
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
+from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
from ZEO.zrpc.trigger import trigger
import zLOG
+from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError
-from ZODB.POSException import TransactionError, ReadOnlyError
+from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.utils import u64
@@ -65,7 +67,9 @@
def __init__(self, server, read_only=0):
self.server = server
+ # timeout and stats will be initialized in register()
self.timeout = None
+ self.stats = None
self.connection = None
self.client = None
self.storage = None
@@ -73,6 +77,7 @@
self.transaction = None
self.read_only = read_only
self.locked = 0
+ self.verifying = 0
self.log_label = _label
def notifyConnected(self, conn):
@@ -94,6 +99,8 @@
self._abort()
else:
self.log("disconnected")
+ if self.stats is not None:
+ self.stats.clients -= 1
def __repr__(self):
tid = self.transaction and repr(self.transaction.id)
@@ -130,7 +137,7 @@
setattr(self, name, getattr(self.storage, name))
self.lastTransaction = self.storage.lastTransaction
- def check_tid(self, tid, exc=None):
+ def _check_tid(self, tid, exc=None):
if self.read_only:
raise ReadOnlyError()
caller = sys._getframe().f_back.f_code.co_name
@@ -150,6 +157,18 @@
return 0
return 1
+ # _lock() and _unlock() control the locked flag
+
+ def _lock(self):
+ self.locked = 1
+ self.timeout.begin(self)
+ self.stats.lock_time = time.time()
+
+ def _unlock(self):
+ self.locked = 0
+ self.timeout.end(self)
+ self.stats.lock_time = None
+
def register(self, storage_id, read_only):
"""Select the storage that this client will use
@@ -170,7 +189,8 @@
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
- self.timeout = self.server.register_connection(storage_id, self)
+ self.timeout, self.stats = self.server.register_connection(storage_id,
+ self)
def get_info(self):
return {'length': len(self.storage),
@@ -197,6 +217,7 @@
return e()
def zeoLoad(self, oid):
+ self.stats.loads += 1
v = self.storage.modifiedInVersion(oid)
if v:
pv, sv = self.storage.load(oid, v)
@@ -221,6 +242,9 @@
return invtid, invlist
def zeoVerify(self, oid, s, sv):
+ if not self.verifying:
+ self.verifying = 1
+ self.stats.verifying_clients += 1
try:
os = self.storage.getSerial(oid)
except KeyError:
@@ -251,6 +275,9 @@
self.client.invalidateVerify((oid, ''))
def endZeoVerify(self):
+ if self.verifying:
+ self.stats.verifying_clients -= 1
+ self.verifying = 0
self.client.endVerify()
def pack(self, time, wait=1):
@@ -320,31 +347,34 @@
self.txnlog = CommitLog()
self.tid = tid
self.status = status
+ self.stats.active_txns += 1
def tpc_finish(self, id):
- if not self.check_tid(id):
+ if not self._check_tid(id):
return
assert self.locked
+ self.stats.active_txns -= 1
+ self.stats.commits += 1
self.storage.tpc_finish(self.transaction)
tid = self.storage.lastTransaction()
if self.invalidated:
self.server.invalidate(self, self.storage_id, tid,
self.invalidated, self.get_size_info())
self.transaction = None
- self.locked = 0
- self.timeout.end(self)
+ self._unlock()
# Return the tid, for cache invalidation optimization
self._handle_waiting()
return tid
def tpc_abort(self, id):
- if not self.check_tid(id):
+ if not self._check_tid(id):
return
+ self.stats.active_txns -= 1
+ self.stats.aborts += 1
if self.locked:
self.storage.tpc_abort(self.transaction)
self.transaction = None
- self.locked = 0
- self.timeout.end(self)
+ self._unlock()
self._handle_waiting()
def _abort(self):
@@ -361,6 +391,8 @@
break
if self.transaction:
+ self.stats.active_txns -= 1
+ self.stats.aborts += 1
self.tpc_abort(self.transaction.id)
# The public methods of the ZEO client API do not do the real work.
@@ -369,44 +401,44 @@
# an _.
def storea(self, oid, serial, data, version, id):
- self.check_tid(id, exc=StorageTransactionError)
+ self._check_tid(id, exc=StorageTransactionError)
+ self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
def vote(self, id):
- self.check_tid(id, exc=StorageTransactionError)
+ self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._vote()
else:
return self._wait(lambda: self._vote())
def abortVersion(self, src, id):
- self.check_tid(id, exc=StorageTransactionError)
+ self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._abortVersion(src)
else:
return self._wait(lambda: self._abortVersion(src))
def commitVersion(self, src, dest, id):
- self.check_tid(id, exc=StorageTransactionError)
+ self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._commitVersion(src, dest)
else:
return self._wait(lambda: self._commitVersion(src, dest))
def transactionalUndo(self, trans_id, id):
- self.check_tid(id, exc=StorageTransactionError)
+ self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._transactionalUndo(trans_id)
else:
return self._wait(lambda: self._transactionalUndo(trans_id))
def _tpc_begin(self, txn, tid, status):
- self.locked = 1
+ self._lock()
self.storage.tpc_begin(txn, tid, status)
- self.timeout.begin(self)
def _store(self, oid, serial, data, version):
try:
@@ -415,6 +447,8 @@
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
+ if isinstance(err, ConflictError):
+ self.stats.conflicts += 1
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
exc_info = sys.exc_info()
@@ -436,6 +470,8 @@
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
+ if newserial == ResolvedSerial:
+ self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial))
def _vote(self):
@@ -543,7 +579,8 @@
def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100,
- transaction_timeout=None):
+ transaction_timeout=None,
+ monitor_address=None):
"""StorageServer constructor.
This is typically invoked from the start.py script.
@@ -580,6 +617,11 @@
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
will be closed and the transaction aborted.
+
+ monitor_address -- The address at which the monitor server
+ should listen. If specified, a monitor server is started.
+ The monitor server provides server statistics in a simple
+ text format.
"""
self.addr = addr
@@ -599,10 +641,11 @@
self.invq_bound = invalidation_queue_size
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
- factory=self.new_connection,
- reuse_addr=1)
+ factory=self.new_connection)
+ self.stats = {}
self.timeouts = {}
for name in self.storages.keys():
+ self.stats[name] = StorageStats()
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
@@ -610,6 +653,10 @@
timeout = TimeoutThread(transaction_timeout)
timeout.start()
self.timeouts[name] = timeout
+ if monitor_address:
+ self.monitor = StatsServer(monitor_address, self.stats)
+ else:
+ self.monitor = None
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
@@ -633,13 +680,15 @@
is needed to handle invalidation. This function updates this
dictionary.
- Returns the timeout object for the appropriate storage.
+ Returns the timeout and stats objects for the appropriate storage.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
l.append(conn)
- return self.timeouts[storage_id]
+ stats = self.stats[storage_id]
+ stats.clients += 1
+ return self.timeouts[storage_id], stats
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
@@ -707,6 +756,8 @@
for timeout in self.timeouts.values():
timeout.stop()
self.dispatcher.close()
+ if self.monitor is not None:
+ self.monitor.close()
for storage in self.storages.values():
storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
=== ZODB3/ZEO/runsvr.py 1.26 => 1.27 ===
--- ZODB3/ZEO/runsvr.py:1.26 Thu Jan 9 15:40:22 2003
+++ ZODB3/ZEO/runsvr.py Thu Jan 9 16:50:18 2003
@@ -22,6 +22,7 @@
(a PATH must contain at least one "/")
-f/--filename FILENAME -- filename for FileStorage
-h/--help -- print this usage message and exit
+-m/--monitor ADDRESS -- address of monitor server
Unless -C is specified, -a and -f are required.
"""
@@ -147,43 +148,62 @@
sys.stderr.write("For help, use %s -h\n" % self.progname)
sys.exit(2)
+def parse_address(arg):
+ if "/" in arg:
+ family = socket.AF_UNIX
+ address = arg
+ else:
+ family = socket.AF_INET
+ if ":" in arg:
+ host, port = arg.split(":", 1)
+ else:
+ host = ""
+ port = arg
+ try:
+ port = int(port)
+ except: # int() can raise all sorts of errors
+ raise ValueError("invalid port number: %r" % port)
+ address = host, port
+ return family, address
class ZEOOptions(Options):
read_only = None
transaction_timeout = None
invalidation_queue_size = None
+ monitor_address = None
family = None # set by -a; AF_UNIX or AF_INET
address = None # set by -a; string or (host, port)
storages = None # set by -f
- _short_options = "a:C:f:h"
+ _short_options = "a:C:f:hm:"
_long_options = [
"address=",
"configuration=",
"filename=",
"help",
+ "monitor=",
]
def handle_option(self, opt, arg):
# Alphabetical order please!
if opt in ("-a", "--address"):
- if "/" in arg:
- self.family = socket.AF_UNIX
- self.address = arg
+ try:
+ f, a = parse_address(arg)
+ except ValueError, err:
+ self.usage(str(err))
+ else:
+ self.family = f
+ self.address = a
+ elif opt in ("-m", "--monitor"):
+ try:
+ f, a = parse_address(arg)
+ except ValueError, err:
+ self.usage(str(err))
else:
- self.family = socket.AF_INET
- if ":" in arg:
- host, port = arg.split(":", 1)
- else:
- host = ""
- port = arg
- try:
- port = int(port)
- except: # int() can raise all sorts of errors
- self.usage("invalid port number: %r" % port)
- self.address = (host, port)
+ self.monitor_family = f
+ self.monitor_address = a
elif opt in ("-f", "--filename"):
from ZODB.config import FileStorage
class FSConfig:
@@ -238,7 +258,7 @@
self.read_only = self.rootconf.read_only
self.transaction_timeout = self.rootconf.transaction_timeout
- self.invalidation_queue_size = self.rootconf.invalidation_queue_size
+ self.invalidation_queue_size = 100
def load_logconf(self):
# Get logging options from conf, unless overridden by environment
@@ -349,7 +369,8 @@
self.storages,
read_only=self.options.read_only,
invalidation_queue_size=self.options.invalidation_queue_size,
- transaction_timeout=self.options.transaction_timeout)
+ transaction_timeout=self.options.transaction_timeout,
+ monitor_address=self.options.monitor_address)
def loop_forever(self):
import ThreadedAsync.LoopCallback
=== ZODB3/ZEO/schema.xml 1.4 => 1.5 ===
--- ZODB3/ZEO/schema.xml:1.4 Thu Jan 9 13:25:29 2003
+++ ZODB3/ZEO/schema.xml Thu Jan 9 16:50:18 2003
@@ -58,6 +58,14 @@
</description>
</key>
+ <key name="monitor-address" datatype="socket-address" required="no">
+ <description>
+ The address at which the monitor server should listen. If
+ specified, a monitor server is started. The monitor server
+ provides server statistics in a simple text format.
+ </description>
+ </key>
+
<multisection name="+" type="storage"
attribute="storages"
required="yes">