[Zope-Checkins] CVS: Zope/lib/python/Products/ZCatalog/regressiontests - keywords.py:1.2.2.1 loadmail.py:1.2.2.1 regressionCatalog.py:1.2.2.1 regressionCatalogTiming.py:1.2.2.1 unittest_patched.py:1.2.2.1
Shane Hathaway
shane@digicool.com
Thu, 9 Aug 2001 13:33:41 -0400
Update of /cvs-repository/Zope/lib/python/Products/ZCatalog/regressiontests
In directory cvs.zope.org:/tmp/cvs-serv29115/lib/python/Products/ZCatalog/regressiontests
Added Files:
Tag: NR-branch
keywords.py loadmail.py regressionCatalog.py
regressionCatalogTiming.py unittest_patched.py
Log Message:
Sync NR-branch with trunk. Sorry about so many checkin messages...
=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/keywords.py ===
import rfc822,mailbox,cPickle,string
class Keywords:
""" stupid class to read a list of rfc822 messages and extract
all words from the subject header. We use this class for testing
purposes only
"""
def __init__(self):
self.kw = []
def build(self,mbox,limit):
mb = mailbox.UnixMailbox(open(mbox))
msg = mb.next()
while msg and len(self.kw) < limit:
sub = msg.dict.get("subject","").split(' ')
for f in sub:
ok = 1
for c in f:
if not c in string.letters: ok=0
if ok==1 and not f in self.kw : self.kw.append(f)
msg = mb.next()
P = cPickle.Pickler(open('data/keywords','w'))
P.dump(self.kw)
def reload(self):
P = cPickle.Unpickler(open('data/keywords','r'))
self.kw = P.load()
def keywords(self):
return self.kw
if __name__=="__main__":
k = Keywords()
k.build("/home/andreas/zope.mbox",1000)
=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/loadmail.py === (578/678 lines abridged)
"""Test script for exercising various catalog features under load
Usage:
cd lib/python
python Products/ZCatalog/tests/loadmail.py command args
where each command has it's own command-line arguments that it expects.
Note that all of the commands operate on the Zope database,
typically var/Data.fs.
Note that this script uses the proc file system to get memory size.
Many of the commands output performance statisics on lines that look like::
11.3585170507 0.06 2217781L 7212
where the numbers are:
- clock time in seconds
- cpu time used by the main thread, in seconds,
- Database size growth over the test
- Memory growth during the test (if the proc file system is available).
Commands:
base mbox max
Build a base database by:
- Deleting ../../Data.fs
- Starting Zope
- Adding a top-level folder names 'mail'
- Reading up to max messages from the Unix mailbox file, mbox
and adding them as documents to the mail folder.
index threshold
Index all of the DTML documents in the database, committing
sub-transactions after each threshold objects.
If the threshold is less than the number of messages, then the
size of the temporary sub-transaction commit file is output.
[-=- -=- -=- 578 lines omitted -=- -=- -=-]
else:
def returnf(t, c, size, mem, r, lock=lock):
print c, r
edits=[0]
while len(edits) <= nedit:
edit=whrandom.randint(0, number_of_messages)
if not alledits.has_key(edit):
alledits[edit]=1
edits.append(edit)
#print edits
argss.append((lock, (edits, wait, ndel, nins), returnf))
for lock, args, returnf in argss:
start_new_thread(do, (Zope.DB, incedit, args, returnf))
for lock, args, returnf in argss:
lock.acquire()
t=time.time() - t
c=time.clock() - c
size=db.getSize()-size
mem=VmSize()-mem
print t, c, size, mem
#hist("e%s" % (threads))
Zope.DB.close()
def VmSize():
try: f=open('/proc/%s/status' % os.getpid())
except: return 0
else:
l=filter(lambda l: l[:7]=='VmSize:', f.readlines())
if l:
l=string.split(string.strip(l[0][7:]))[0]
return string.atoi(l)
return 0
def pdebug():
import pdb
del sys.argv[1]
pdb.run('globals()[sys.argv[1]]()')
if __name__=='__main__':
try: f=globals()[sys.argv[1]]
except:
print __doc__
sys.exit(1)
else: f()
=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/regressionCatalog.py === (610/710 lines abridged)
#!/usr/bin/env python
# Regression test for ZCatalog
import os,sys
sys.path.insert(0,'.')
try:
import Testing
except ImportError:
sys.path[0] = "../../.."
import Testing
os.environ['STUPID_LOG_FILE']= "debug.log"
here = os.getcwd()
import Zope
import ZODB, ZODB.FileStorage
from Products.ZCatalog import ZCatalog,Vocabulary
from Products.ZCatalog.Catalog import CatalogError
import Persistence
import ExtensionClass
from Testing import dispatcher
import keywords
from zLOG import LOG
import getopt,whrandom,time,string,mailbox,rfc822
import unittest_patched as unittest
# maximum number of files to read for the test suite
maxFiles = 1000
# maximum number of threads for stress testa
numThreads = 4
# number of iterations for searches
searchIterations = 1000
# number of iterations for catalog/uncatalog operations
updateIterations = 100
# input mailbox file
mbox = os.environ.get("TESTCATALOG_MBOX","/usr/home/andreas/zope.mbox")
mbox2 = os.environ.get("TESTCATALOG_MBOX2", "/usr/home/andreas/python.mbox")
dataDir = ""
[-=- -=- -=- 610 lines omitted -=- -=- -=-]
testSearches("testFieldIndex",numThreads= 4),
testSearches("testFieldRangeIndex",numThreads=1),
testSearches("testFieldRangeIndex",numThreads= 4),
testSearches("testKeywordIndex",numThreads= 1),
testSearches("testKeywordIndex",numThreads= 4),
testSearches("testKeywordRangeIndex",numThreads= 1),
testSearches("testKeywordRangeIndex",numThreads=4)
)
bench2_tests = (
# testSearches("testReindexing",numThreads=1),
# testSearches("testIncrementalIndexing",numThreads=1),
testSearches("testUpdates",numThreads=2,numUpdates=200),
# testSearches("testUpdates",numThreads=4,numUpdates=200)
)
exp_tests = (
# testRS("testRangeSearch"),
# testSearches("testReindexing",numThreads=1),
testSearches("testReindexingAndModify",numThreads=1),
# testSearches("testUpdates",numThreads=10,numUpdates=100),
)
init_tests = (
BuildEnv("buildTestEnvironment",dataDir,maxFiles) ,
)
ts = unittest.TestSuite()
for x in eval('%s_tests' % what): ts.addTest(x)
return ts
return
def pdebug():
import pdb
test_suite()
def debug():
test_suite().debug()
def pdebug():
import pdb
pdb.run('debug()')
if __name__ == '__main__':
main()
=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/regressionCatalogTiming.py ===
import os, sys
sys.path.insert(0, '.')
try:
import Testing
os.environ['SOFTWARE_HOME']=os.environ.get('SOFTWARE_HOME', '.')
except ImportError:
sys.path[0]='../../..'
import Testing
os.environ['SOFTWARE_HOME']='../../..'
os.environ['INSTANCE_HOME']=os.environ.get(
'INSTANCE_HOME',
os.path.join(os.environ['SOFTWARE_HOME'],'..','..')
)
os.environ['STUPID_LOG_FILE']=os.path.join(os.environ['INSTANCE_HOME'],'var',
'debug.log')
here = os.getcwd()
import Zope
import mailbox, time, httplib
from string import strip, find, split, lower, atoi, join
from urllib import quote
from Products.ZCatalog import ZCatalog
from unittest import TestCase, TestSuite, JUnitTextTestRunner,\
VerboseTextTestRunner, makeSuite
from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex
from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
from Products.PluginIndexes.TextIndex.Lexicon import Lexicon
from Products.PluginIndexes.KeywordIndex.KeywordIndex import KeywordIndex
from Testing.makerequest import makerequest
TextTestRunner = VerboseTextTestRunner
class TestTimeIndex(TestCase):
def setUp(self):
self.app = makerequest(Zope.app())
try: self.app._delObject('catalogtest')
except AttributeError: pass
self.app.manage_addFolder('catalogtest')
zcatalog = ZCatalog.ZCatalog('catalog', 'a catalog')
self.app.catalogtest._setObject('catalog', zcatalog)
c = self.app.catalogtest.catalog
for x in ('title', 'to', 'from', 'date', 'raw'):
try: c.manage_delIndexes([x])
except: pass
c.manage_addIndex('title', 'TextIndex')
c.manage_addIndex('to', 'TextIndex')
c.manage_addIndex('from', 'TextIndex')
c.manage_addIndex('date', 'FieldIndex')
c.manage_addIndex('raw', 'TextIndex')
def tearDown(self):
try: self.app._delObject('catalogtest')
except AttributeError: pass
try:
self.app._p_jar._db.pack()
self.app._p_jar.close()
except AttributeError: pass
self.app = None
del self.app
def checkTimeBulkIndex(self):
print
c = self.app.catalogtest.catalog
t = time.time()
loadmail(self.app.catalogtest, 'zopemail',
os.path.join(here, 'zope.mbox'), 500)
get_transaction().commit()
loadtime = time.time() - t
out("loading data took %s seconds.. " % loadtime)
t = time.time()
req = self.app.REQUEST
parents = [self.app.catalogtest.catalog,
self.app.catalogtest, self.app]
req['PARENTS'] = parents
rsp = self.app.REQUEST.RESPONSE
url1 = ''
c.manage_catalogFoundItems(req, rsp, url1, url1,
obj_metatypes=['DTML Document'])
indextime = time.time() - t
out("bulk index took %s seconds.. " % indextime)
out("total time for load and index was %s seconds.. "
% (loadtime + indextime))
def checkTimeIncrementalIndexAndQuery(self):
print
c = self.app.catalogtest.catalog
t = time.time()
max = 500
m = loadmail(self.app.catalogtest, 'zopemail',
os.path.join(here, 'zope.mbox'), max, c)
get_transaction().commit()
total = time.time() - t
out("total time for load and index was %s seconds.. " % total)
t = time.time()
rs = c() # empty query should return all
assert len(rs) == max, len(rs)
dates = m['date']
froms = m['from']
tos =m['to']
titles = m['title']
assert len(c({'date':'foobarfoo'})) == 0 # should return no results
for x in dates:
assert len(c({'date':x})) == 1 # each date should be fieldindexed
assert len(c({'from':'a'})) == 0 # should be caught by splitter
assert len(c({'raw':'chris'})) != 0
assert len(c({'raw':'gghdjkasjdsda'})) == 0
assert c({'PrincipiaSearchSource':'the*'})
def checkTimeSubcommit(self):
print
for x in (None,100,500,1000,10000):
out("testing subcommit at theshhold of %s" % x)
if x is not None:
self.setUp()
c = self.app.catalogtest.catalog
c.threshold = x
get_transaction().commit()
t = time.time()
loadmail(self.app.catalogtest, 'zopemail',
os.path.join(here, 'zope.mbox'), 500, c)
get_transaction().commit()
total = time.time() - t
out("total time with subcommit thresh %s was %s seconds.. "
% (x,total))
self.tearDown()
# utility
def loadmail(folder, name, mbox, max=None, catalog=None):
"""
creates a folder inside object 'folder' named 'name', opens
filename 'mbox' and adds 'max' mail messages as DTML documents to
the ZODB inside the folder named 'name'. If 'catalog' (which
should be a ZCatalog object) is passed in, call catalog_object on it
with the document while we're iterating. If 'max' is not None,
only do 'max' messages, else do all messages in the mbox archive.
"""
m = {'date':[],'from':[],'to':[],'title':[]}
folder.manage_addFolder(name)
folder=getattr(folder, name)
mb=mailbox.UnixMailbox(open(mbox))
i=0
every=100
message=mb.next()
while message:
part = `i/every * 100`
try:
dest = getattr(folder, part)
except AttributeError:
folder.manage_addFolder(part)
dest = getattr(folder, part)
dest.manage_addDTMLDocument(str(i), file=message.fp.read())
doc=getattr(dest, str(i))
i=i+1
for h in message.headers:
h=strip(h)
l=find(h,':')
if l <= 0: continue
name=lower(h[:l])
if name=='subject': name='title'
h=strip(h[l+1:])
type='string'
if 0 and name=='date': type='date'
elif 0:
try: atoi(h)
except: pass
else: type=int
if name=='title':
doc.manage_changeProperties(title=h)
m[name].append(h)
elif name in ('to', 'from', 'date'):
try: doc.manage_addProperty(name, h, type)
except: pass
m[name].append(h)
if catalog:
path = join(doc.getPhysicalPath(), '/')
catalog.catalog_object(doc, path)
if max is not None:
if i >= max: break
message=mb.next()
return m
def out(s):
print " %s" % s
def test_suite():
s1 = makeSuite(TestTimeIndex, 'check')
testsuite = TestSuite((s1,))
return testsuite
def main():
mb = os.path.join(here, 'zope.mbox')
if not os.path.isfile(mb):
print "do you want to get the zope.mbox file from lists.zope.org?"
print "it's required for testing (98MB, ~ 30mins on fast conn)"
print "it's also available at korak:/home/chrism/zope.mbox"
print "-- type 'Y' or 'N'"
a = raw_input()
if lower(a[:1]) == 'y':
server = 'lists.zope.org:80'
method = '/pipermail/zope.mbox/zope.mbox'
h = httplib.HTTP(server)
h.putrequest('GET', method)
h.putheader('User-Agent', 'silly')
h.putheader('Accept', 'text/html')
h.putheader('Accept', 'text/plain')
h.putheader('Host', server)
h.endheaders()
errcode, errmsg, headers = h.getreply()
if errcode != 200:
f = h.getfile()
data = f.read()
print data
raise "Error reading from host %s" % server
f = h.getfile()
out=open(mb,'w')
print "this is going to take a while..."
print "downloading mbox from %s" % server
while 1:
l = f.readline()
if not l: break
out.write(l)
alltests=test_suite()
runner = TextTestRunner()
runner.run(alltests)
def debug():
test_suite().debug()
if __name__=='__main__':
if len(sys.argv) > 1:
globals()[sys.argv[1]]()
else:
main()
=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/unittest_patched.py === (630/730 lines abridged)
#!/usr/bin/env python
"""
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
Smalltalk testing framework.
Further information is available in the bundled documentation, and from
http://pyunit.sourceforge.net/
This module contains the core framework classes that form the basis of
specific test cases and suites (TestCase, TestSuite etc.), and also a
text-based utility class for running the tests and reporting the results
(TextTestRunner).
Copyright (c) 1999, 2000, 2001 Steve Purcell
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
# This is patched version of unittest.py and allows to pass additional
# parameters to the TestCase constructor.
# This special version is only need to run the regression test
# in testCatalog.py#
#
# ajung
__author__ = "Steve Purcell"
__email__ = "stephen_purcell@yahoo.com"
__version__ = "$Revision: 1.2.2.1 $"[11:-2]
import time
import sys
import traceback
import string
import os
##############################################################################
# A platform-specific concession to help the code work for JPython users
[-=- -=- -=- 630 lines omitted -=- -=- -=-]
self.parseArgs(argv)
self.runTests()
def usageExit(self, msg=None):
if msg: print msg
print self.USAGE % self.__dict__
sys.exit(2)
def parseArgs(self, argv):
import getopt
try:
options, args = getopt.getopt(argv[1:], 'hH', ['help'])
opts = {}
for opt, value in options:
if opt in ('-h','-H','--help'):
self.usageExit()
if len(args) == 0 and self.defaultTest is None:
self.test = findTestCases(self.module,
suiteClass=self.suiteClass)
return
if len(args) > 0:
self.testNames = args
else:
self.testNames = (self.defaultTest,)
self.createTests()
except getopt.error, msg:
self.usageExit(msg)
def createTests(self):
tests = []
for testName in self.testNames:
tests.append(createTestInstance(testName, self.module,
suiteClass=self.suiteClass))
self.test = self.suiteClass(tests)
def runTests(self):
if self.testRunner is None:
self.testRunner = TextTestRunner()
result = self.testRunner.run(self.test)
sys.exit(not result.wasSuccessful())
main = TestProgram
##############################################################################
# Executing this module from the command line
##############################################################################
if __name__ == "__main__":
main(module=None)