[Zope-Checkins] CVS: StandaloneZODB/Tools - zeoreplay.py:1.1

Barry Warsaw barry@wooz.org
Thu, 9 May 2002 11:34:27 -0400


Update of /cvs-repository/StandaloneZODB/Tools
In directory cvs.zope.org:/tmp/cvs-serv17041

Added Files:
	zeoreplay.py 
Log Message:
A parse and replay script which gathers transaction timings from a zeo
blather log, and optionally replays the transactions into another
storage, timing the differences between the original and the new
storage spead.


=== Added File StandaloneZODB/Tools/zeoreplay.py ===
"""Parse the BLATHER logging generated by ZEO, and optionally replay it.

Usage: zeointervals.py [options]

Options:

    --help / -h
        Print this message and exit.

    --replay=storage
    -r storage
        Replay the parsed transactions through the new storage

    --maxtxn=count
    -m count
        Parse no more than count transactions.

    --report / -p
        Print a report as we're parsing.

Unlike parsezeolog.py, this script generates timestamps for each transaction,
and sub-command in the transaction.  We can use this to compare timings with
synthesized data.
"""

import re
import sys
import time
import getopt
import operator
# ZEO logs measure wall-clock time so for consistency we need to do the same
#from time import clock as now
from time import time as now

from ZODB.FileStorage import FileStorage
from bsddb3Storage.Full import Full
from Standby.primary import PrimaryStorage
from Standby.config import RS_PORT
from ZODB.Transaction import Transaction
from ZODB.utils import p64

datecre = re.compile('(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d)')
methcre = re.compile("ZEO Server (\w+)\((.*)\) \('(.*)', (\d+)")

class StopParsing(Exception):
    pass



def usage(code, msg=''):
    print __doc__
    if msg:
        print msg
    sys.exit(code)



def parse_time(line):
    """Return the time portion of a zLOG line in seconds or None."""
    mo = datecre.match(line)
    if mo is None:
        return None
    date, time_ = mo.group(1, 2)
    date_l = [int(elt) for elt in date.split('-')]
    time_l = [int(elt) for elt in time_.split(':')]
    return int(time.mktime(date_l + time_l + [0, 0, 0]))


def parse_line(line):
    """Parse a log entry and return time, method info, and client."""
    t = parse_time(line)
    if t is None:
        return None, None, None
    mo = methcre.search(line)
    if mo is None:
        return None, None, None
    meth_name = mo.group(1)
    meth_args = mo.group(2)
    meth_args = [s.strip() for s in meth_args.split(',')]
    m = meth_name, tuple(meth_args)
    c = mo.group(3), mo.group(4)
    return t, m, c



class StoreStat:
    def __init__(self, when, oid, size):
        self.when = when
        self.oid = oid
        self.size = size

    # Crufty
    def __getitem__(self, i):
        if i == 0: return self.oid
        if i == 1: return self.size
        raise IndexError


class TxnStat:
    def __init__(self):
        self._begintime = None
        self._finishtime = None
        self._aborttime = None
        self._url = None
        self._objects = []

    def tpc_begin(self, when, args, client):
        self._begintime = when
        # args are txnid, user, description (looks like it's always a url)
        self._url = args[2]

    def storea(self, when, args, client):
        oid = int(args[0])
        # args[1] is "[numbytes]"
        size = int(args[1][1:-1])
        s = StoreStat(when, oid, size)
        self._objects.append(s)

    def tpc_abort(self, when):
        self._aborttime = when

    def tpc_finish(self, when):
        self._finishtime = when



# Mapping oid -> revid
_revids = {}

class ReplayTxn(TxnStat):
    def __init__(self, storage):
        self._storage = storage
        self._replaydelta = 0
        TxnStat.__init__(self)

    def replay(self):
        ZERO = '\0'*8
        t0 = now()
        t = Transaction()
        self._storage.tpc_begin(t)
        for obj in self._objects:
            oid = obj.oid
            revid = _revids.get(oid, ZERO)
            # BAW: simulate a pickle of the given size
            data = 'x' * obj.size
            # BAW: ignore versions for now
            newrevid  = self._storage.store(p64(oid), revid, data, '', t)
            _revids[oid] = newrevid
        if self._aborttime:
            self._storage.tpc_abort(t)
            origdelta = self._aborttime - self._begintime
        else:
            self._storage.tpc_vote(t)
            self._storage.tpc_finish(t)
            origdelta = self._finishtime - self._begintime
        t1 = now()
        # Shows how many seconds behind (positive) or ahead (negative) of the
        # original reply our local update took
        self._replaydelta = t1 - t0 - origdelta



class ZEOParser:
    def __init__(self, maxtxns=-1, report=1, storage=None):
        self.__txns = []
        self.__curtxn = {}
        self.__skipped = 0
        self.__maxtxns = maxtxns
        self.__finishedtxns = 0
        self.__report = report
        self.__storage = storage

    def parse(self, line):
        t, m, c = parse_line(line)
        if t is None:
            # Skip this line
            return
        name = m[0]
        meth = getattr(self, name, None)
        if meth is not None:
            meth(t, m[1], c)

    def tpc_begin(self, when, args, client):
        txn = ReplayTxn(self.__storage)
        self.__curtxn[client] = txn
        meth = getattr(txn, 'tpc_begin', None)
        if meth is not None:
            meth(when, args, client)
        
    def storea(self, when, args, client):
        txn = self.__curtxn.get(client)
        if txn is None:
            self.__skipped += 1
            return
        meth = getattr(txn, 'storea', None)
        if meth is not None:
            meth(when, args, client)

    def tpc_finish(self, when, args, client):
        txn = self.__curtxn.get(client)
        if txn is None:
            self.__skipped += 1
            return
        meth = getattr(txn, 'tpc_finish', None)
        if meth is not None:
            meth(when)
        if self.__report:
            self.report(txn)
        self.__txns.append(txn)
        self.__curtxn[client] = None
        self.__finishedtxns += 1
        if self.__maxtxns > 0 and self.__finishedtxns >= self.__maxtxns:
            raise StopParsing

    def report(self, txn):
        """Print a report about the transaction"""
        if txn._objects:
            bytes = reduce(operator.add, [size for oid, size in txn._objects])
        else:
            bytes = 0
        print '%s %s %4d %10d %s %s' % (
            txn._begintime, txn._finishtime - txn._begintime,
            len(txn._objects),
            bytes, 
            time.ctime(txn._begintime),
            txn._url)

    def replay(self):
        for txn in self.__txns:
            txn.replay()
        # How many fell behind?
        slower = []
        faster = []
        for txn in self.__txns:
            if txn._replaydelta > 0:
                slower.append(txn)
            else:
                faster.append(txn)
        print len(slower), 'laggards,', len(faster), 'on-time or faster'
        # Find some averages
        if slower:
            sum = reduce(operator.add,
                         [txn._replaydelta for txn in slower], 0)
            print 'average slower txn was:', float(sum) / len(slower)
        if faster:
            sum = reduce(operator.add,
                         [txn._replaydelta for txn in faster], 0)
            print 'average faster txn was:', float(sum) / len(faster)



def main():
    try:
        opts, args = getopt.getopt(
            sys.argv[1:],
            'hr:pm:',
            ['help', 'replay=', 'report', 'maxtxns='])
    except getopt.error, e:
        usage(1, e)

    if args:
        usage(1)

    replay = 0
    maxtxns = -1
    report = 0
    storagefile = None
    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage(0)
        elif opt in ('-r', '--replay'):
            replay = 1
            storagefile = arg
        elif opt in ('-p', '--report'):
            report = 1
        elif opt in ('-m', '--maxtxns'):
            try:
                maxtxns = int(arg)
            except ValueError:
                usage(1, 'Bad -m argument: %s' % arg)

    if replay:
        storage = FileStorage(storagefile)
	#storage = Full(storagefile)
        storage = PrimaryStorage('yyz', storage, RS_PORT)
    t0 = now()
    p = ZEOParser(maxtxns, report, storage)
    i = 0
    while 1:
        line = sys.stdin.readline()
        if not line:
            break
        i += 1
        try:
            p.parse(line)
        except StopParsing:
            break
        except:
            print 'input file line:', i
            raise
    t1 = now()
    print 'total parse time:', t1-t0
    t2 = now()
    if replay:
        p.replay()
    t3 = now()
    print 'total replay time:', t3-t2
    print 'total time:', t3-t0



if __name__ == '__main__':
    main()