[Zope-Checkins] CVS: Products/DCOracle2/test - testDCOracle2.py:1.1
dispatcher.py:1.1
Chris Withers
cvs-admin at zope.org
Thu Oct 30 12:35:42 EST 2003
Update of /cvs-repository/Products/DCOracle2/test
In directory cvs.zope.org:/tmp/cvs-serv13922/test
Added Files:
testDCOracle2.py dispatcher.py
Log Message:
refreshed Andreas' test suite and put it in the right place.
=== Added File Products/DCOracle2/test/testDCOracle2.py ===
import common
import DCOracle2
import dispatcher
import string
import unittest
class test_DC(unittest.TestCase,dispatcher.Dispatcher):
"""unittests for DCOracle 2"""
def __init__(self,func,*args,**kw):
unittest.TestCase.__init__(self,func)
dispatcher.Dispatcher.__init__(self,func)
def setUp(self):
self.conn = DCOracle2.connect(common.getConnectionString())
self.c = self.conn.cursor()
def tearDown(self):
self.conn.close()
self.conn=self.c=None
def testConnect(self):
""" try to connect to Oracle """
pass
def testCreateTable(self):
""" try to create test table """
try: self.c.execute("drop table partlist")
except: pass
self.c.execute("create table partlist (part varchar2(62), \
partno integer, quantity integer, price number(10,2)) tablespace users")
file = open("namegen.out","r")
v = []
for l in file.readlines():
(part, partno, quantity, price) = string.split(string.strip(l),',')
v.append((part, partno, quantity, price))
file.close()
self.c.executemany("insert into partlist (part, partno, quantity, price) values (:1,:2,:3,:4)", v)
self.conn.commit()
self.c.execute("select part, partno, quantity, price from partlist order by partno")
results = self.c.fetchall()
def testConcurrentReads(self):
self.dispatcher("testConcurrentReads",("funcConcurrentReads",3,(),{}))
def funcConcurrentReads(self):
env = self.th_setup()
curs = self.conn.cursor()
for i in range(5):
curs.execute("select part, partno, quantity, price from partlist order by partno")
results = curs.fetchall()
self.th_teardown(env)
def testCLOBS(self,iterations=30):
""" testing CLOBs """
self.funcLOBS(iterations,"CLOB")
def testBLOBS(self,iterations=30):
""" testing BLOBs """
self.funcLOBS(iterations,"BLOB")
def funcLOBS(self,iterations=30,t="CLOB"):
""" test LOBS """
try: self.c.execute("drop table testtab")
except: pass
self.c.execute("create table testtab(num integer,content %s) tablespace users" % t)
d={}
for i in range(iterations):
d[i] = chr(ord('a'))*100000
for i in range(iterations):
self.c.execute("insert into testtab values(:1,EMPTY_%s())" % t , i)
self.c.execute("select content from testtab where num=%d for update" % i)
r = self.c.fetchone()
lob = r[0]
lob.write(d[i])
self.conn.commit()
self.c.execute("select num,content from testtab order by num")
for r in self.c.fetchall():
num = r[0]
lob = r[1]
data = lob.read()
assert data==d[num],"Invalid data in LOB found"
def testStoredProc1(self):
""" testing describe() for stored procedures """
db = self.conn
d = db.describe('emp_actions')
e = db.describe('emp')
p = db.mapproc('emp_actions')
def testStoredProc2(self):
""" testing stored procedures """
db = self.conn
find = db.procedure.emp_actions.find
findy = db.procedure.emp_actions.findy
self.c.execute('select empno, ename from emp')
for r in self.c.fetchall():
empid = r[0]
ename = r[1]
fe = find(empid)
assert ename==fe,"Wrong employee name found %d vs. %d" % (ename,fe)
if __name__=="__main__":
unittest.main()
=== Added File Products/DCOracle2/test/dispatcher.py ===
#!/usr/bin/env python1.5
# Dispatcher for usage inside Zope test environment
# Digital Creations
__version__ = '$Id: dispatcher.py,v 1.1 2003/10/30 17:35:41 chrisw Exp $'
import os,sys,re,string
import threading,time,commands,profile
from StringIO import StringIO
class Dispatcher:
"""
a multi-purpose thread dispatcher
"""
def __init__(self,func=''):
# rather than spewing to stderr, put it in a log attribute
# if people care, they can retrieve this
self.log = StringIO()
self.fp = self.log
self.f_startup = []
self.f_teardown = []
self.lastlog = ""
self.lock = threading.Lock()
self.func = func
self.profiling = 0
self.doc = getattr(self,self.func).__doc__
def setlog(self,fp):
self.fp = fp
def log(self,s):
if s==self.lastlog: return
self.fp.write(s)
self.fp.flush()
self.lastlog=s
def logn(self,s):
if s==self.lastlog: return
self.fp.write(s + '\n')
self.fp.flush()
self.lastlog=s
def profiling_on():
self.profiling = 1
def profiling_off():
self.profiling = 0
def dispatcher(self,name='', *params):
""" dispatcher for threads
The dispatcher expects one or several tupels:
(functionname, number of threads to start , args, keyword args)
"""
self.mem_usage = [-1]
mem_watcher = threading.Thread(None,self.mem_watcher,name='memwatcher')
mem_watcher.start()
self.start_test = time.time()
self.name = name
self.th_data = {}
self.runtime = {}
self._threads = []
s2s=self.s2s
for func,numthreads,args,kw in params:
f = getattr(self,func)
for i in range(0,numthreads):
kw['t_func'] = func
th = threading.Thread(None,self.worker,name="TH_%s_%03d" % (func,i) ,args=args,kwargs=kw)
self._threads.append(th)
for th in self._threads: th.start()
while threading.activeCount() > 1: time.sleep(1)
self.logn('ID: %s ' % self.name)
self.logn('FUNC: %s ' % self.func)
self.logn('DOC: %s ' % self.doc)
self.logn('Args: %s' % params)
for th in self._threads:
self.logn( '%-30s ........................ %9.3f sec' % (th.getName(), self.runtime[th.getName()]) )
for k,v in self.th_data[th.getName()].items():
self.logn ('%-30s %-15s = %s' % (' ',k,v) )
self.logn("")
self.logn('Complete running time: %9.3f sec' % (time.time()-self.start_test) )
if len(self.mem_usage)>1: self.mem_usage.remove(-1)
self.logn( "Memory: start: %s, end: %s, low: %s, high: %s" % \
(s2s(self.mem_usage[0]),s2s(self.mem_usage[-1]),s2s(min(self.mem_usage)), s2s(max(self.mem_usage))))
self.logn('')
def worker(self,*args,**kw):
for func in self.f_startup: f = getattr(self,func)()
t_func = getattr(self,kw['t_func'])
del kw['t_func']
ts = time.time()
apply(t_func,args,kw)
te = time.time()
for func in self.f_teardown: getattr(self,func)()
def th_setup(self):
""" initalize thread with some environment data """
env = {'start': time.time()
}
return env
def th_teardown(self,env,**kw):
""" famous last actions of thread """
self.lock.acquire()
self.th_data[ threading.currentThread().getName() ] = kw
self.runtime [ threading.currentThread().getName() ] = time.time() - env['start']
self.lock.release()
def getmem(self):
""" try to determine the current memory usage """
if not sys.platform in ['linux2']: return None
cmd = '/bin/ps --no-headers -o pid,vsize --pid %s' % os.getpid()
outp = commands.getoutput(cmd)
pid,vsize = filter(lambda x: x!="" , string.split(outp," ") )
data = open("/proc/%d/statm" % os.getpid()).read()
fields = re.split(" ",data)
mem = string.atoi(fields[0]) * 4096
return mem
def mem_watcher(self):
""" thread for watching memory usage """
running = 1
while running ==1:
self.mem_usage.append( self.getmem() )
time.sleep(1)
if threading.activeCount() == 2: running = 0
def register_startup(self,func):
self.f_startup.append(func)
def register_teardown(self,func):
self.f_teardown.append(func)
def s2s(self,n):
import math
if n <1024.0: return "%8.3lf Bytes" % n
if n <1024.0*1024.0: return "%8.3lf KB" % (1.0*n/1024.0)
if n <1024.0*1024.0*1024.0: return "%8.3lf MB" % (1.0*n/1024.0/1024.0)
else: return n
if __name__=="__main__":
d=Dispatcher()
# print d.getmem()
More information about the Zope-Checkins
mailing list