[Zodb-checkins] CVS: ZODB3/Tools - zeoqueue.py:1.1
Jeremy Hylton
jeremy@zope.com
Mon, 16 Dec 2002 13:26:13 -0500
Update of /cvs-repository/ZODB3/Tools
In directory cvs.zope.org:/tmp/cvs-serv18789
Added Files:
zeoqueue.py
Log Message:
A preliminary replacement for handy zeowait.py script.
This processes the whole file from the beginning collecting info about
active / blocked transactions. This version is preliminary because it
needs to start at the end and backup until it has enough information.
=== Added File ZODB3/Tools/zeoqueue.py ===
#! /usr/bin/env python
"""Report on the number of currently waiting clients in the ZEO queue."""
import fileinput
import re
import sys
import time
rx_time = re.compile('(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d)')
def parse_time(line):
"""Return the time portion of a zLOG line in seconds or None."""
mo = rx_time.match(line)
if mo is None:
return None
date, time_ = mo.group(1, 2)
date_l = [int(elt) for elt in date.split('-')]
time_l = [int(elt) for elt in time_.split(':')]
return int(time.mktime(date_l + time_l + [0, 0, 0]))
class Txn:
"""Track status of single transaction."""
def __init__(self, tid):
self.tid = tid
self.hint = None
self.begin = None
self.vote = None
self.abort = None
self.finish = None
def isactive(self):
if self.begin and not (self.abort or self.finish):
return 1
else:
return 0
class Status:
"""Track status of ZEO server by replaying log records.
We want to keep track of several events:
- The last committed transaction.
- The last committed or aborted transaction.
- The last transaction that got the lock but didn't finish.
- The number of currently active transactions.
- The number of reported queued transactions.
- Client restarts.
- Number of current connections. XXX (This might not be useful.)
We can observe these events by reading the following sorts of log
entries:
2002-12-16T06:16:05 BLATHER(-100) zrpc:12649 calling
tpc_begin('\x03I\x90((\xdbp\xd5', '', 'QueueCatal...
2002-12-16T06:16:06 BLATHER(-100) zrpc:12649 calling
vote('\x03I\x90((\xdbp\xd5')
2002-12-16T06:16:06 BLATHER(-100) zrpc:12649 calling
tpc_finish('\x03I\x90((\xdbp\xd5')
2002-12-16T10:46:10 INFO(0) ZSS:12649:1 Transaction blocked waiting
for storage. Clients waiting: 1.
2002-12-16T06:15:57 BLATHER(-100) zrpc:12649 connect from
('10.0.26.54', 48983): <ManagedServerConnection ('10.0.26.54', 48983)>
2002-12-16T10:30:09 INFO(0) ZSS:12649:1 disconnected
"""
def __init__(self):
self.reset()
def reset(self):
self.commit = None
self.commit_or_abort = None
self.n_active = 0
self.n_blocked = 0
self.n_conns = 0
self.t_restart = None
self.txns = {}
def report(self):
print "Blocked transactions:", self.n_blocked
if not VERBOSE:
return
if self.t_restart:
print "Server started:", time.ctime(self.t_restart)
if self.commit is not None:
t = self.commit_or_abort.finish
if t is None:
t = self.commit_or_abort.abort
print "Last finished transaction:", time.ctime(t)
# the blocked transaction should be the first one that calls vote
L = [(txn.begin, txn) for txn in self.txns.values()]
L.sort()
first_txn = L[0][1]
if first_txn.isactive():
began = first_txn.begin
print "Blocked transaction began at:", time.ctime(began)
print "Hint:", first_txn.hint
print "Idle time: %d sec" % int(time.time() - began)
def process(self, line):
if line.find("calling") != -1:
self.process_call(line)
elif line.find("connect") != -1:
self.process_connect(line)
elif line.find("locked") != -1:
# test for "locked" because word may start with "B" or "b"
self.process_block(line)
elif line.find("Starting") != -1:
self.process_start(line)
rx_call = re.compile("calling (\w+)\(\'(\S+)\'(.*)")
def process_call(self, line):
mo = self.rx_call.search(line)
if mo is None:
return
called_method = mo.group(1)
# XXX exit earlier if we've got zeoLoad, because it's the most
# frequently called method and we don't use it.
if called_method == "zeoLoad":
return
t = parse_time(line)
meth = getattr(self, "call_%s" % called_method, None)
if meth is None:
return
tid = mo.group(2)
rest = mo.group(3)
meth(t, tid, rest)
def process_connect(self, line):
pass
rx_waiting = re.compile("Clients waiting: (\d+)")
def process_block(self, line):
mo = self.rx_waiting.search(line)
if mo is None:
# assume that this was a restart message for the last blocked
# transaction.
self.n_blocked = 0
else:
self.n_blocked = int(mo.group(1))
def process_start(self, line):
if line.find("Starting ZEO server") != -1:
self.reset()
self.t_restart = parse_time(line)
def call_tpc_begin(self, t, tid, rest):
txn = Txn(tid)
txn.begin = t
if rest[0] == ',':
i = 1
while rest[i].isspace():
i += 1
rest = rest[i:]
txn.hint = rest
self.txns[tid] = txn
self.n_active += 1
def call_vote(self, t, tid, rest):
txn = self.txns.get(tid)
if txn is None:
print "Oops!"
txn = self.txns[tid] = Txn(tid)
txn.vote = t
def call_tpc_abort(self, t, tid, rest):
txn = self.txns.get(tid)
if txn is None:
print "Oops!"
txn = self.txns[tid] = Txn(tid)
txn.abort = t
self.n_active -= 1
if self.commit_or_abort:
# delete the old transaction
try:
del self.txns[self.commit_or_abort.tid]
except KeyError:
pass
self.commit_or_abort = txn
def call_tpc_finish(self, t, tid, rest):
txn = self.txns.get(tid)
if txn is None:
print "Oops!"
txn = self.txns[tid] = Txn(tid)
txn.finish = t
self.n_active -= 1
if self.commit:
# delete the old transaction
try:
del self.txns[self.commit.tid]
except KeyError:
pass
if self.commit_or_abort:
# delete the old transaction
try:
del self.txns[self.commit_or_abort.tid]
except KeyError:
pass
self.commit = self.commit_or_abort = txn
def main():
global VERBOSE
# decide whether -v was passed on the command line
try:
i = sys.argv.index("-v")
except ValueError:
VERBOSE = 0
else:
VERBOSE = 1
# fileinput assumes all of sys.argv[1:] is files it should read
del sys.argv[i]
s = Status()
for line in fileinput.input():
s.process(line)
s.report()
if __name__ == "__main__":
main()