[Zope-Checkins] CVS: Zope2 - dispatcher.py:1.1.2.1
Andreas Jung
andreas@yetix.digicool.com
Thu, 8 Mar 2001 06:04:14 -0500
Update of /mnt/cvs-repository/Zope2/lib/python/Testing
In directory yetix:/work/Zope2/Catalog-BTrees-Integration/lib/python/Testing
Added Files:
Tag: Catalog-BTrees-Integration
dispatcher.py
Log Message:
initial checkin in Testing dir.
--- Added File dispatcher.py in package Zope2 ---
#!/usr/bin/env python1.5
# Dispatcher for usage inside Zope test environment
# Digital Creations
__version__ = '$Id: dispatcher.py,v 1.1.2.1 2001/03/08 11:04:10 andreas Exp $'
import os,sys,re,string
import threading,time,commands,resource
class Dispatcher:
"""
a multi-purpose thread dispatcher
"""
def __init__(self):
self.fp = sys.stderr
self.f_startup = []
self.f_teardown = []
self.lastlog = ""
self.lock = threading.Lock()
def setlog(self,fp):
self.fp = fp
def log(self,s):
if s==self.lastlog: return
self.fp.write(s)
self.fp.flush()
self.lastlog=s
def logn(self,s):
if s==self.lastlog: return
self.fp.write(s + '\n')
self.fp.flush()
self.lastlog=s
def dispatcher(self,name='', *params):
""" dispatcher for threads
The dispatcher expects one or several tupels:
(functionname, number of threads to start , args, keyword args)
"""
self.mem_usage = [-1]
mem_watcher = threading.Thread(None,self.mem_watcher,name='memwatcher')
mem_watcher.start()
self.start_test = time.time()
self.name = name
self.th_data = {}
self.runtime = {}
self._threads = []
for func,numthreads,args,kw in params:
f = getattr(self,func)
for i in range(0,numthreads):
kw['t_func'] = func
th = threading.Thread(None,self.worker,name="TH_%s_%03d" % (func,i) ,args=args,kwargs=kw)
self._threads.append(th)
for th in self._threads: th.start()
while threading.activeCount() > 1: time.sleep(1)
self.logn('ID: %s' % self.name)
for th in self._threads:
self.logn( '%-30s ........................ %9.3f sec' % (th.getName(), self.runtime[th.getName()]) )
for k,v in self.th_data[th.getName()].items():
self.logn ('%-30s %-15s = %s' % (' ',k,v) )
self.logn("")
self.logn('Complete running time: %9.3f sec' % (time.time()-self.start_test) )
if len(self.mem_usage)>1: self.mem_usage.remove(-1)
self.logn( "Memory: start: %s, end: %s, low: %s, high: %s" % \
(self.mem_usage[0],self.mem_usage[-1],min(self.mem_usage), max(self.mem_usage)))
self.logn('')
def worker(self,*args,**kw):
for func in self.f_startup: f = getattr(self,func)()
t_func = getattr(self,kw['t_func'])
del kw['t_func']
ts = time.time()
apply(t_func,args,kw)
te = time.time()
for func in self.f_teardown: getattr(self,func)()
def th_setup(self):
""" initalize thread with some environment data """
env = {'start': time.time()
}
return env
def th_teardown(self,env,**kw):
""" famous last actions of thread """
self.lock.acquire()
self.th_data[ threading.currentThread().getName() ] = kw
self.runtime [ threading.currentThread().getName() ] = time.time() - env['start']
self.lock.release()
def getmem(self):
""" try to determine the current memory usage """
if not sys.platform in ['linux2']: return None
cmd = '/bin/ps --no-headers -o pid,vsize --pid %s' % os.getpid()
outp = commands.getoutput(cmd)
pid,vsize = filter(lambda x: x!="" , string.split(outp," ") )
# res = resource.getrusage(resource.RUSAGE_SELF)
return vsize
def mem_watcher(self):
""" thread for watching memory usage """
running = 1
while running ==1:
self.mem_usage.append( self.getmem() )
time.sleep(1)
if threading.activeCount() == 2: running = 0
def register_startup(self,func):
self.f_startup.append(func)
def register_teardown(self,func):
self.f_teardown.append(func)
if __name__=="__main__":
d=Dispatcher()
print d.getmem()
pass