[Zope3-checkins] CVS: Zope3/lib/python/Zope/TextIndex/tests - __init__.py:1.1 hs-tool.py:1.1 indexhtml.py:1.1 mailtest.py:1.1 mhindex.py:1.1 queryhtml.py:1.1 testIndex.py:1.1 testLexicon.py:1.1 testNBest.py:1.1 testPipelineFactory.py:1.1 testQueryEngine.py:1.1 testQueryParser.py:1.1 testSetOps.py:1.1 wordstats.py:1.1
Guido van Rossum
guido@python.org
Tue, 3 Dec 2002 09:28:18 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/TextIndex/tests
In directory cvs.zope.org:/tmp/cvs-serv25939/tests
Added Files:
__init__.py hs-tool.py indexhtml.py mailtest.py mhindex.py
queryhtml.py testIndex.py testLexicon.py testNBest.py
testPipelineFactory.py testQueryEngine.py testQueryParser.py
testSetOps.py wordstats.py
Log Message:
Baseline for TextIndex, copied from Zope2's ZCTextIndex with changes
necessitated by different locations of BTrees in Zope3, and C
extensions (temporarily) omitted. The test suite passes!
[Rotterdam sprint]
=== Added File Zope3/lib/python/Zope/TextIndex/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test package."""
=== Added File Zope3/lib/python/Zope/TextIndex/tests/hs-tool.py ===
#! /usr/bin/env python
import cPickle
import os.path
import sys
from hotshot.log import LogReader
def load_line_info(log):
byline = {}
prevloc = None
for what, place, tdelta in log:
if tdelta > 0:
t, nhits = byline.get(prevloc, (0, 0))
byline[prevloc] = (tdelta + t), (nhits + 1)
prevloc = place
return byline
def basename(path, cache={}):
try:
return cache[path]
except KeyError:
fn = os.path.split(path)[1]
cache[path] = fn
return fn
def print_results(results):
for info, place in results:
if place is None:
# This is the startup time for the profiler, and only
# occurs at the very beginning. Just ignore it, since it
# corresponds to frame setup of the outermost call, not
# anything that's actually interesting.
continue
filename, line, funcname = place
print '%8d %8d' % info, basename(filename), line
def annotate_results(results):
files = {}
for stats, place in results:
if not place:
continue
time, hits = stats
file, line, func = place
l = files.get(file)
if l is None:
l = files[file] = []
l.append((line, hits, time))
order = files.keys()
order.sort()
for k in order:
if os.path.exists(k):
v = files[k]
v.sort()
annotate(k, v)
def annotate(file, lines):
print "-" * 60
print file
print "-" * 60
f = open(file)
i = 1
match = lines[0][0]
for line in f:
if match == i:
print "%6d %8d " % lines[0][1:], line,
del lines[0]
if lines:
match = lines[0][0]
else:
match = None
else:
print " " * 16, line,
i += 1
print
def get_cache_name(filename):
d, fn = os.path.split(filename)
cache_dir = os.path.join(d, '.hs-tool')
cache_file = os.path.join(cache_dir, fn)
return cache_dir, cache_file
def cache_results(filename, results):
cache_dir, cache_file = get_cache_name(filename)
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
fp = open(cache_file, 'wb')
try:
cPickle.dump(results, fp, 1)
finally:
fp.close()
def main(filename, annotate):
cache_dir, cache_file = get_cache_name(filename)
if ( os.path.isfile(cache_file)
and os.path.getmtime(cache_file) > os.path.getmtime(filename)):
# cached data is up-to-date:
fp = open(cache_file, 'rb')
results = cPickle.load(fp)
fp.close()
else:
log = LogReader(filename)
byline = load_line_info(log)
# Sort
results = [(v, k) for k, v in byline.items()]
results.sort()
cache_results(filename, results)
if annotate:
annotate_results(results)
else:
print_results(results)
if __name__ == "__main__":
import getopt
annotate_p = 0
opts, args = getopt.getopt(sys.argv[1:], 'A')
for o, v in opts:
if o == '-A':
annotate_p = 1
if args:
filename, = args
else:
filename = "profile.dat"
main(filename, annotate_p)
=== Added File Zope3/lib/python/Zope/TextIndex/tests/indexhtml.py ===
#! /usr/bin/env python
"""Index a collection of HTML files on the filesystem.
usage: indexhtml.py [options] dir
Will create an index of all files in dir or its subdirectories.
options:
-f data.fs -- the path to the filestorage datafile
"""
from __future__ import nested_scopes
import os
from time import clock
import ZODB
from ZODB.FileStorage import FileStorage
from BTrees.IOBTree import IOBTree
from Zope.TextIndex.ZCTextIndex import ZCTextIndex
from Zope.TextIndex.HTMLSplitter import HTMLWordSplitter
from Zope.TextIndex.Lexicon import Lexicon, StopWordRemover
def make_zc_index():
# there's an elaborate dance necessary to construct an index
class Struct:
pass
extra = Struct()
extra.doc_attr = "read"
extra.lexicon_id = "lexicon"
caller = Struct()
caller.lexicon = Lexicon(HTMLWordSplitter(), StopWordRemover())
return ZCTextIndex("read", extra, caller)
# XXX make a splitter more like the HTMLSplitter for TextIndex
# signature is
# Splitter(string, stop_words, encoding,
# singlechar, indexnumbers, casefolding)
class MySplitter:
def __init__(self):
self._v_splitter = HTMLWordSplitter()
def __call__(self, text, stopdict, *args, **kwargs):
words = self._v_splitter._split(text)
def lookup(w):
return stopdict.get(w, w)
return filter(None, map(lookup, words))
def make_old_index():
from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
from Products.PluginIndexes.TextIndex.Lexicon import Lexicon
from Zope.TextIndex.StopDict import get_stopdict
l = Lexicon(get_stopdict())
l.SplitterFunc = MySplitter()
return TextIndex("read", lexicon=l)
def main(db, root, dir):
rt["index"] = index = INDEX()
rt["files"] = paths = IOBTree()
get_transaction().commit()
zodb_time = 0.0
pack_time = 0.0
files = [os.path.join(dir, file) for file in os.listdir(dir)]
docid = 0
t0 = clock()
for file in files:
if os.path.isdir(file):
files += [os.path.join(file, sub) for sub in os.listdir(file)]
else:
if not file.endswith(".html"):
continue
docid += 1
if LIMIT is not None and docid > LIMIT:
break
if VERBOSE:
print "%5d" % docid, file
f = open(file, "rb")
paths[docid] = file
index.index_object(docid, f)
f.close()
if docid % TXN_INTERVAL == 0:
z0 = clock()
get_transaction().commit()
z1 = clock()
zodb_time += z1 - z0
if VERBOSE:
print "commit took", z1 - z0, zodb_time
if docid % PACK_INTERVAL == 0:
p0 = clock()
db.pack()
p1 = clock()
zodb_time += p1 - p0
pack_time += p1 - p0
if VERBOSE:
print "pack took", p1 - p0, pack_time
z0 = clock()
get_transaction().commit()
z1 = t1 = clock()
total_time = t1 - t0
zodb_time += z1 - z0
if VERBOSE:
print "Total index time", total_time
print "Non-pack time", total_time - pack_time
print "Non-ZODB time", total_time - zodb_time
if __name__ == "__main__":
import sys
import getopt
VERBOSE = 0
FSPATH = "Data.fs"
TXN_INTERVAL = 100
PACK_INTERVAL = 500
LIMIT = None
INDEX = make_zc_index
try:
opts, args = getopt.getopt(sys.argv[1:], 'vf:t:p:n:T')
except getopt.error, msg:
print msg
print __doc__
sys.exit(2)
for o, v in opts:
if o == '-v':
VERBOSE += 1
if o == '-f':
FSPATH = v
if o == '-t':
TXN_INTERVAL = int(v)
if o == '-p':
PACK_INTERVAL = int(v)
if o == '-n':
LIMIT = int(v)
if o == '-T':
INDEX = make_old_index
if len(args) != 1:
print "Expected on argument"
print __doc__
sys.exit(2)
dir = args[0]
fs = FileStorage(FSPATH)
db = ZODB.DB(fs)
cn = db.open()
rt = cn.root()
dir = os.path.join(os.getcwd(), dir)
print dir
main(db, rt, dir)
cn.close()
fs.close()
=== Added File Zope3/lib/python/Zope/TextIndex/tests/mailtest.py ===
"""Test an index with a Unix mailbox file.
usage: python mailtest.py [options] <data.fs>
options:
-v -- verbose
Index Generation
-i mailbox
-n NNN -- max number of messages to read from mailbox
-t NNN -- commit a transaction every NNN messages (default: 1)
-p NNN -- pack <data.fs> every NNN messages (default: 500), and at end
-p 0 -- don't pack at all
-x -- exclude the message text from the data.fs
Queries
-q query
-b NNN -- return the NNN best matches (default: 10)
-c NNN -- context; if -v, show the first NNN lines of results (default: 5)
The script either indexes or queries depending on whether -q or -i is
passed as an option.
For -i mailbox, the script reads mail messages from the mailbox and
indexes them. It indexes one message at a time, then commits the
transaction.
For -q query, it performs a query on an existing index.
If both are specified, the index is performed first.
You can also interact with the index after it is completed. Load the
index from the database:
import ZODB
from ZODB.FileStorage import FileStorage
fs = FileStorage(<data.fs>
db = ZODB.DB(fs)
index = cn.open().root()["index"]
index.search("python AND unicode")
"""
import ZODB
import ZODB.FileStorage
from Zope.TextIndex.Lexicon import \
Lexicon, CaseNormalizer, Splitter, StopWordRemover
from Zope.TextIndex.ZCTextIndex import ZCTextIndex
from BTrees.IOBTree import IOBTree
from Zope.TextIndex.QueryParser import QueryParser
import sys
import mailbox
import time
def usage(msg):
print msg
print __doc__
sys.exit(2)
class Message:
total_bytes = 0
def __init__(self, msg):
subject = msg.getheader('subject', '')
author = msg.getheader('from', '')
if author:
summary = "%s (%s)\n" % (subject, author)
else:
summary = "%s\n" % subject
self.text = summary + msg.fp.read()
Message.total_bytes += len(self.text)
class Extra:
pass
def index(rt, mboxfile, db, profiler):
global NUM
idx_time = 0
pack_time = 0
start_time = time.time()
lexicon = Lexicon(Splitter(), CaseNormalizer(), StopWordRemover())
extra = Extra()
extra.lexicon_id = 'lexicon'
extra.doc_attr = 'text'
extra.index_type = 'Okapi BM25 Rank'
caller = Extra()
caller.lexicon = lexicon
rt["index"] = idx = ZCTextIndex("index", extra, caller)
if not EXCLUDE_TEXT:
rt["documents"] = docs = IOBTree()
else:
docs = None
get_transaction().commit()
mbox = mailbox.UnixMailbox(open(mboxfile, 'rb'))
if VERBOSE:
print "opened", mboxfile
if not NUM:
NUM = sys.maxint
if profiler:
itime, ptime, i = profiler.runcall(indexmbox, mbox, idx, docs, db)
else:
itime, ptime, i = indexmbox(mbox, idx, docs, db)
idx_time += itime
pack_time += ptime
get_transaction().commit()
if PACK_INTERVAL and i % PACK_INTERVAL != 0:
if VERBOSE >= 2:
print "packing one last time..."
p0 = time.clock()
db.pack(time.time())
p1 = time.clock()
if VERBOSE:
print "pack took %s sec" % (p1 - p0)
pack_time += p1 - p0
if VERBOSE:
finish_time = time.time()
print
print "Index time", round(idx_time / 60, 3), "minutes"
print "Pack time", round(pack_time / 60, 3), "minutes"
print "Index bytes", Message.total_bytes
rate = (Message.total_bytes / idx_time) / 1024
print "Index rate %.2f KB/sec" % rate
print "Indexing began", time.ctime(start_time)
print "Indexing ended", time.ctime(finish_time)
print "Wall clock minutes", round((finish_time - start_time)/60, 3)
def indexmbox(mbox, idx, docs, db):
idx_time = 0
pack_time = 0
i = 0
while i < NUM:
_msg = mbox.next()
if _msg is None:
break
i += 1
msg = Message(_msg)
if VERBOSE >= 2:
print "indexing msg", i
i0 = time.clock()
idx.index_object(i, msg)
if not EXCLUDE_TEXT:
docs[i] = msg
if i % TXN_SIZE == 0:
get_transaction().commit()
i1 = time.clock()
idx_time += i1 - i0
if VERBOSE and i % 50 == 0:
print i, "messages indexed"
print "cache size", db.cacheSize()
if PACK_INTERVAL and i % PACK_INTERVAL == 0:
if VERBOSE >= 2:
print "packing..."
p0 = time.clock()
db.pack(time.time())
p1 = time.clock()
if VERBOSE:
print "pack took %s sec" % (p1 - p0)
pack_time += p1 - p0
return idx_time, pack_time, i
def query(rt, query_str, profiler):
idx = rt["index"]
docs = rt["documents"]
start = time.clock()
if profiler is None:
results, num_results = idx.query(query_str, BEST)
else:
if WARM_CACHE:
print "Warming the cache..."
idx.query(query_str, BEST)
start = time.clock()
results, num_results = profiler.runcall(idx.query, query_str, BEST)
elapsed = time.clock() - start
print "query:", query_str
print "# results:", len(results), "of", num_results, \
"in %.2f ms" % (elapsed * 1000)
tree = QueryParser(idx.lexicon).parseQuery(query_str)
qw = idx.index.query_weight(tree.terms())
for docid, score in results:
scaled = 100.0 * score / qw
print "docid %7d score %6d scaled %5.2f%%" % (docid, score, scaled)
if VERBOSE:
msg = docs[docid]
ctx = msg.text.split("\n", CONTEXT)
del ctx[-1]
print "-" * 60
print "message:"
for l in ctx:
print l
print "-" * 60
def main(fs_path, mbox_path, query_str, profiler):
f = ZODB.FileStorage.FileStorage(fs_path)
db = ZODB.DB(f, cache_size=CACHE_SIZE)
cn = db.open()
rt = cn.root()
if mbox_path is not None:
index(rt, mbox_path, db, profiler)
if query_str is not None:
query(rt, query_str, profiler)
cn.close()
db.close()
f.close()
if __name__ == "__main__":
import getopt
NUM = 0
VERBOSE = 0
PACK_INTERVAL = 500
EXCLUDE_TEXT = 0
CACHE_SIZE = 10000
TXN_SIZE = 1
BEST = 10
CONTEXT = 5
WARM_CACHE = 0
query_str = None
mbox_path = None
profile = None
old_profile = None
try:
opts, args = getopt.getopt(sys.argv[1:], 'vn:p:i:q:b:c:xt:w',
['profile=', 'old-profile='])
except getopt.error, msg:
usage(msg)
if len(args) != 1:
usage("exactly 1 filename argument required")
for o, v in opts:
if o == '-n':
NUM = int(v)
elif o == '-v':
VERBOSE += 1
elif o == '-p':
PACK_INTERVAL = int(v)
elif o == '-q':
query_str = v
elif o == '-i':
mbox_path = v
elif o == '-b':
BEST = int(v)
elif o == '-x':
EXCLUDE_TEXT = 1
elif o == '-t':
TXN_SIZE = int(v)
elif o == '-c':
CONTEXT = int(v)
elif o == '-w':
WARM_CACHE = 1
elif o == '--profile':
profile = v
elif o == '--old-profile':
old_profile = v
fs_path, = args
if profile:
import hotshot
profiler = hotshot.Profile(profile, lineevents=1, linetimings=1)
elif old_profile:
import profile
profiler = profile.Profile()
else:
profiler = None
main(fs_path, mbox_path, query_str, profiler)
if profile:
profiler.close()
elif old_profile:
import pstats
profiler.dump_stats(old_profile)
stats = pstats.Stats(old_profile)
stats.strip_dirs().sort_stats('time').print_stats(20)
=== Added File Zope3/lib/python/Zope/TextIndex/tests/mhindex.py === (504/604 lines abridged)
#! /usr/bin/env python2.1
"""MH mail indexer.
To index messages from a single folder (messages defaults to 'all'):
mhindex.py [options] -u +folder [messages ...]
To bulk index all messages from several folders:
mhindex.py [options] -b folder ...; the folder name ALL means all folders.
To execute a single query:
mhindex.py [options] query
To enter interactive query mode:
mhindex.py [options]
Common options:
-d FILE -- specify the Data.fs to use (default ~/.Data.fs)
-w -- dump the word list in alphabetical order and exit
-W -- dump the word list ordered by word id and exit
Indexing options:
-O -- do a prescan on the data to compute optimal word id assignments;
this is only useful the first time the Data.fs is used
-t N -- commit a transaction after every N messages (default 20000)
-p N -- pack after every N commits (by default no packing is done)
Querying options:
-m N -- show at most N matching lines from the message (default 3)
-n N -- show the N best matching messages (default 3)
"""
import os
import re
import sys
import time
import mhlib
import getopt
import traceback
from StringIO import StringIO
from stat import ST_MTIME
DATAFS = "~/.Data.fs"
ZOPECODE = "~/projects/Zope/lib/python"
sys.path.append(os.path.expanduser(ZOPECODE))
from ZODB import DB
from ZODB.FileStorage import FileStorage
from Persistence import Persistent
[-=- -=- -=- 504 lines omitted -=- -=- -=-]
if self.trans_count > 0:
print "committing..."
get_transaction().commit()
self.trans_count = 0
self.pack_count += 1
if self.pack_count >= self.pack_limit > 0:
self.pack()
def pack(self):
if self.pack_count > 0:
print "packing..."
self.database.pack()
self.pack_count = 0
class TextIndex(Persistent):
def __init__(self):
self.lexicon = Lexicon(Splitter(), CaseNormalizer(), StopWordRemover())
self.index = OkapiIndex(self.lexicon)
def index_text(self, docid, text):
self.index.index_doc(docid, text)
self._p_changed = 1 # XXX
def unindex(self, docid):
self.index.unindex_doc(docid)
self._p_changed = 1 # XXX
def query(self, query, nbest=10):
# returns a total hit count and a mapping from docids to scores
parser = QueryParser(self.lexicon)
tree = parser.parseQuery(query)
results = tree.executeQuery(self.index)
if results is None:
return [], 0
chooser = NBest(nbest)
chooser.addmany(results.items())
return chooser.getbest(), len(results)
def query_weight(self, query):
parser = QueryParser(self.lexicon)
tree = parser.parseQuery(query)
terms = tree.terms()
return self.index.query_weight(terms)
def reportexc():
traceback.print_exc()
if __name__ == "__main__":
sys.exit(main())
=== Added File Zope3/lib/python/Zope/TextIndex/tests/queryhtml.py ===
import os
from time import clock
import ZODB
from ZODB.FileStorage import FileStorage
QUERIES = ["nested recursive functions",
"explicit better than implicit",
"build hpux",
"cannot create 'method-wrapper' instances",
"extension module C++",
"class method",
"instance variable",
"articulate information",
"import default files",
"gopher ftp http",
"documentation",
]
def path2url(p):
# convert the paths to a python.org URL
# hack: only works for the way Jeremy indexed his copy of python.org
marker = "www.python.org/."
i = p.find(marker)
if i == -1:
return p
i += len(marker)
return "http://www.python.org" + p[i:]
from Products.PluginIndexes.TextIndex.TextIndex import And, Or
from Zope.TextIndex.tests.indexhtml import MySplitter
from Zope.TextIndex.NBest import NBest
def main(rt):
index = rt["index"]
files = rt["files"]
times = {}
ITERS = range(50)
for i in range(11):
for q in QUERIES:
terms = q.split()
for c in " OR ", " AND ":
query = c.join(terms)
t0 = clock()
if TEXTINDEX:
if c == " OR ":
op = Or
else:
op = And
_q = " ".join(terms)
for _ in ITERS:
b = index.query(_q, op).bucket()
num = len(b)
chooser = NBest(10)
chooser.addmany(b.items())
results = chooser.getbest()
else:
try:
for _ in ITERS:
results, num = index.query(query)
except:
continue
t1 = clock()
print "<p>Query: \"%s\"" % query
print "<br>Num results: %d" % num
print "<br>time.clock(): %s" % (t1 - t0)
key = query
if i == 0:
print "<ol>"
for docid, score in results:
url = path2url(files[docid])
fmt = '<li><a href="%s">%s</A> score = %s'
print fmt % (url, url, score)
print "</ol>"
continue
l = times.setdefault(key, [])
l.append(t1 - t0)
l = times.keys()
l.sort()
print "<hr>"
for k in l:
v = times[k]
print "<p>Query: \"%s\"" % k
print "<br>Min time: %s" % min(v)
print "<br>All times: %s" % " ".join(map(str, v))
if __name__ == "__main__":
import sys
import getopt
VERBOSE = 0
FSPATH = "Data.fs"
TEXTINDEX = 0
try:
opts, args = getopt.getopt(sys.argv[1:], 'vf:T')
except getopt.error, msg:
print msg
print __doc__
sys.exit(2)
for o, v in opts:
if o == '-v':
VERBOSE += 1
if o == '-f':
FSPATH = v
if o == '-T':
TEXTINDEX = 1
fs = FileStorage(FSPATH, read_only=1)
db = ZODB.DB(fs, cache_size=10000)
cn = db.open()
rt = cn.root()
main(rt)
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testIndex.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from Zope.TextIndex.Lexicon import Lexicon, Splitter
from Zope.TextIndex.CosineIndex import CosineIndex
from Zope.TextIndex.OkapiIndex import OkapiIndex
# Subclasses must set a class variable IndexFactory to the appropriate
# index object constructor.
class IndexTest(TestCase):
def setUp(self):
self.lexicon = Lexicon(Splitter())
self.index = self.IndexFactory(self.lexicon)
def test_index_document(self, DOCID=1):
doc = "simple document contains five words"
self.assert_(not self.index.has_doc(DOCID))
self.index.index_doc(DOCID, doc)
self.assert_(self.index.has_doc(DOCID))
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._docweight), 1)
self.assertEqual(len(self.index._wordinfo), 5)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 5)
self.assertEqual(len(self.index._wordinfo),
self.index.length())
for map in self.index._wordinfo.values():
self.assertEqual(len(map), 1)
self.assert_(map.has_key(DOCID))
def test_unindex_document(self):
DOCID = 1
self.test_index_document(DOCID)
self.index.unindex_doc(DOCID)
self.assertEqual(len(self.index._docweight), 0)
self.assertEqual(len(self.index._wordinfo), 0)
self.assertEqual(len(self.index._docwords), 0)
self.assertEqual(len(self.index._wordinfo),
self.index.length())
def test_index_two_documents(self):
self.test_index_document()
doc = "another document just four"
DOCID = 2
self.index.index_doc(DOCID, doc)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._docweight), 2)
self.assertEqual(len(self.index._wordinfo), 8)
self.assertEqual(len(self.index._docwords), 2)
self.assertEqual(len(self.index.get_words(DOCID)), 4)
self.assertEqual(len(self.index._wordinfo),
self.index.length())
wids = self.lexicon.termToWordIds("document")
self.assertEqual(len(wids), 1)
document_wid = wids[0]
for wid, map in self.index._wordinfo.items():
if wid == document_wid:
self.assertEqual(len(map), 2)
self.assert_(map.has_key(1))
self.assert_(map.has_key(DOCID))
else:
self.assertEqual(len(map), 1)
def test_index_two_unindex_one(self):
# index two documents, unindex one, and test the results
self.test_index_two_documents()
self.index.unindex_doc(1)
DOCID = 2
self.assertEqual(len(self.index._docweight), 1)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._wordinfo), 4)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 4)
self.assertEqual(len(self.index._wordinfo),
self.index.length())
for map in self.index._wordinfo.values():
self.assertEqual(len(map), 1)
self.assert_(map.has_key(DOCID))
def test_index_duplicated_words(self, DOCID=1):
doc = "very simple repeat repeat repeat document test"
self.index.index_doc(DOCID, doc)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._wordinfo), 5)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 7)
self.assertEqual(len(self.index._wordinfo),
self.index.length())
wids = self.lexicon.termToWordIds("repeat")
self.assertEqual(len(wids), 1)
repititive_wid = wids[0]
for wid, map in self.index._wordinfo.items():
self.assertEqual(len(map), 1)
self.assert_(map.has_key(DOCID))
def test_simple_query_oneresult(self):
self.index.index_doc(1, 'not the same document')
results = self.index.search("document")
self.assertEqual(list(results.keys()), [1])
def test_simple_query_noresults(self):
self.index.index_doc(1, 'not the same document')
results = self.index.search("frobnicate")
self.assertEqual(list(results.keys()), [])
def test_query_oneresult(self):
self.index.index_doc(1, 'not the same document')
self.index.index_doc(2, 'something about something else')
results = self.index.search("document")
self.assertEqual(list(results.keys()), [1])
def test_search_phrase(self):
self.index.index_doc(1, "the quick brown fox jumps over the lazy dog")
self.index.index_doc(2, "the quick fox jumps lazy over the brown dog")
results = self.index.search_phrase("quick brown fox")
self.assertEqual(list(results.keys()), [1])
def test_search_glob(self):
self.index.index_doc(1, "how now brown cow")
self.index.index_doc(2, "hough nough browne cough")
self.index.index_doc(3, "bar brawl")
results = self.index.search_glob("bro*")
self.assertEqual(list(results.keys()), [1, 2])
results = self.index.search_glob("b*")
self.assertEqual(list(results.keys()), [1, 2, 3])
class CosineIndexTest(IndexTest):
IndexFactory = CosineIndex
class OkapiIndexTest(IndexTest):
IndexFactory = OkapiIndex
def test_suite():
return TestSuite((makeSuite(CosineIndexTest),
makeSuite(OkapiIndexTest),
))
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testLexicon.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import sys
from unittest import TestCase, TestSuite, main, makeSuite
from Zope.TextIndex.Lexicon import Lexicon
from Zope.TextIndex.Lexicon import Splitter, CaseNormalizer
class StupidPipelineElement:
def __init__(self, fromword, toword):
self.__fromword = fromword
self.__toword = toword
def process(self, seq):
res = []
for term in seq:
if term == self.__fromword:
res.append(self.__toword)
else:
res.append(term)
return res
class WackyReversePipelineElement:
def __init__(self, revword):
self.__revword = revword
def process(self, seq):
res = []
for term in seq:
if term == self.__revword:
x = list(term)
x.reverse()
res.append(''.join(x))
else:
res.append(term)
return res
class StopWordPipelineElement:
def __init__(self, stopdict={}):
self.__stopdict = stopdict
def process(self, seq):
res = []
for term in seq:
if self.__stopdict.get(term):
continue
else:
res.append(term)
return res
class Test(TestCase):
def testSourceToWordIds(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('cats and dogs')
self.assertEqual(wids, [1, 2, 3])
def testTermToWordIds(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('dogs')
self.assertEqual(wids, [3])
def testMissingTermToWordIds(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('boxes')
self.assertEqual(wids, [0])
def testOnePipelineElement(self):
lexicon = Lexicon(Splitter(), StupidPipelineElement('dogs', 'fish'))
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('fish')
self.assertEqual(wids, [3])
def testSplitterAdaptorFold(self):
lexicon = Lexicon(Splitter(), CaseNormalizer())
wids = lexicon.sourceToWordIds('CATS and dogs')
wids = lexicon.termToWordIds('cats and dogs')
self.assertEqual(wids, [1, 2, 3])
def testSplitterAdaptorNofold(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('CATS and dogs')
wids = lexicon.termToWordIds('cats and dogs')
self.assertEqual(wids, [0, 2, 3])
def testTwoElementPipeline(self):
lexicon = Lexicon(Splitter(),
StupidPipelineElement('cats', 'fish'),
WackyReversePipelineElement('fish'))
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('hsif')
self.assertEqual(wids, [1])
def testThreeElementPipeline(self):
lexicon = Lexicon(Splitter(),
StopWordPipelineElement({'and':1}),
StupidPipelineElement('dogs', 'fish'),
WackyReversePipelineElement('fish'))
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('hsif')
self.assertEqual(wids, [2])
def testSplitterLocaleAwareness(self):
if sys.platform == 'darwin':
return # This test doesn't work on Mac OSX :-(
from Zope.TextIndex.HTMLSplitter import HTMLWordSplitter
import locale
loc = locale.setlocale(locale.LC_ALL) # get current locale
# set German locale
if sys.platform != 'win32':
locale.setlocale(locale.LC_ALL, 'de_DE.ISO8859-1')
else:
locale.setlocale(locale.LC_ALL, 'German_Germany.1252')
words = ['mülltonne waschbär behörde überflieger']
words = Splitter().process(words)
self.assertEqual(
words, ['mülltonne', 'waschbär', 'behörde', 'überflieger'])
words = HTMLWordSplitter().process(words)
self.assertEqual(
words, ['mülltonne', 'waschbär', 'behörde', 'überflieger'])
locale.setlocale(locale.LC_ALL, loc) # restore saved locale
def test_suite():
return makeSuite(Test)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testNBest.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from Zope.TextIndex.NBest import NBest
class NBestTest(TestCase):
def testConstructor(self):
self.assertRaises(ValueError, NBest, 0)
self.assertRaises(ValueError, NBest, -1)
for n in range(1, 11):
nb = NBest(n)
self.assertEqual(len(nb), 0)
self.assertEqual(nb.capacity(), n)
def testOne(self):
nb = NBest(1)
nb.add('a', 0)
self.assertEqual(nb.getbest(), [('a', 0)])
nb.add('b', 1)
self.assertEqual(len(nb), 1)
self.assertEqual(nb.capacity(), 1)
self.assertEqual(nb.getbest(), [('b', 1)])
nb.add('c', -1)
self.assertEqual(len(nb), 1)
self.assertEqual(nb.capacity(), 1)
self.assertEqual(nb.getbest(), [('b', 1)])
nb.addmany([('d', 3), ('e', -6), ('f', 5), ('g', 4)])
self.assertEqual(len(nb), 1)
self.assertEqual(nb.capacity(), 1)
self.assertEqual(nb.getbest(), [('f', 5)])
def testMany(self):
import random
inputs = [(-i, i) for i in range(50)]
reversed_inputs = inputs[:]
reversed_inputs.reverse()
# Test the N-best for a variety of n (1, 6, 11, ... 50).
for n in range(1, len(inputs)+1, 5):
expected = inputs[-n:]
expected.reverse()
random_inputs = inputs[:]
random.shuffle(random_inputs)
for source in inputs, reversed_inputs, random_inputs:
# Try feeding them one at a time.
nb = NBest(n)
for item, score in source:
nb.add(item, score)
self.assertEqual(len(nb), n)
self.assertEqual(nb.capacity(), n)
self.assertEqual(nb.getbest(), expected)
# And again in one gulp.
nb = NBest(n)
nb.addmany(source)
self.assertEqual(len(nb), n)
self.assertEqual(nb.capacity(), n)
self.assertEqual(nb.getbest(), expected)
for i in range(1, n+1):
self.assertEqual(nb.pop_smallest(), expected[-i])
self.assertRaises(IndexError, nb.pop_smallest)
def test_suite():
return makeSuite(NBestTest)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testPipelineFactory.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from Zope.TextIndex.IPipelineElement import IPipelineElement
from Zope.TextIndex.PipelineFactory import PipelineElementFactory
class NullPipelineElement:
__implements__ = IPipelineElement
def process(source):
pass
class PipelineFactoryTest(TestCase):
def setUp(self):
self.huey = NullPipelineElement()
self.dooey = NullPipelineElement()
self.louie = NullPipelineElement()
self.daffy = NullPipelineElement()
def testPipeline(self):
pf = PipelineElementFactory()
pf.registerFactory('donald', 'huey', self.huey)
pf.registerFactory('donald', 'dooey', self.dooey)
pf.registerFactory('donald', 'louie', self.louie)
pf.registerFactory('looney', 'daffy', self.daffy)
self.assertRaises(ValueError, pf.registerFactory,'donald', 'huey',
self.huey)
self.assertEqual(pf.getFactoryGroups(), ['donald', 'looney'])
self.assertEqual(pf.getFactoryNames('donald'),
['dooey', 'huey', 'louie'])
def test_suite():
return makeSuite(PipelineFactoryTest)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testQueryEngine.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from Persistence.BTrees.IIBTree import IIBucket
from Zope.TextIndex.QueryParser import QueryParser
from Zope.TextIndex.ParseTree import ParseError, QueryError
from Zope.TextIndex.Lexicon import Lexicon, Splitter
class FauxIndex:
def search(self, term):
b = IIBucket()
if term == "foo":
b[1] = b[3] = 1
elif term == "bar":
b[1] = b[2] = 1
elif term == "ham":
b[1] = b[2] = b[3] = b[4] = 1
return b
class TestQueryEngine(TestCase):
def setUp(self):
self.lexicon = Lexicon(Splitter())
self.parser = QueryParser(self.lexicon)
self.index = FauxIndex()
def compareSet(self, set, dict):
d = {}
for k, v in set.items():
d[k] = v
self.assertEqual(d, dict)
def compareQuery(self, query, dict):
tree = self.parser.parseQuery(query)
set = tree.executeQuery(self.index)
self.compareSet(set, dict)
def testExecuteQuery(self):
self.compareQuery("foo AND bar", {1: 2})
self.compareQuery("foo OR bar", {1: 2, 2: 1, 3:1})
self.compareQuery("foo AND NOT bar", {3: 1})
self.compareQuery("foo AND foo AND foo", {1: 3, 3: 3})
self.compareQuery("foo OR foo OR foo", {1: 3, 3: 3})
self.compareQuery("ham AND NOT foo AND NOT bar", {4: 1})
self.compareQuery("ham OR foo OR bar", {1: 3, 2: 2, 3: 2, 4: 1})
self.compareQuery("ham AND foo AND bar", {1: 3})
def testInvalidQuery(self):
from Zope.TextIndex.ParseTree import NotNode, AtomNode
tree = NotNode(AtomNode("foo"))
self.assertRaises(QueryError, tree.executeQuery, self.index)
def test_suite():
return makeSuite(TestQueryEngine)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testQueryParser.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from Interface.Verify import verifyClass
from Zope.TextIndex.IQueryParser import IQueryParser
from Zope.TextIndex.IQueryParseTree import IQueryParseTree
from Zope.TextIndex.QueryParser import QueryParser
from Zope.TextIndex.ParseTree import ParseError, ParseTreeNode
from Zope.TextIndex.ParseTree import OrNode, AndNode, NotNode
from Zope.TextIndex.ParseTree import AtomNode, PhraseNode, GlobNode
from Zope.TextIndex.Lexicon import Lexicon, Splitter
class TestInterfaces(TestCase):
def testInterfaces(self):
verifyClass(IQueryParser, QueryParser)
verifyClass(IQueryParseTree, ParseTreeNode)
verifyClass(IQueryParseTree, OrNode)
verifyClass(IQueryParseTree, AndNode)
verifyClass(IQueryParseTree, NotNode)
verifyClass(IQueryParseTree, AtomNode)
verifyClass(IQueryParseTree, PhraseNode)
verifyClass(IQueryParseTree, GlobNode)
class TestQueryParserBase(TestCase):
def setUp(self):
self.lexicon = Lexicon(Splitter())
self.parser = QueryParser(self.lexicon)
def expect(self, input, output, expected_ignored=[]):
tree = self.parser.parseQuery(input)
ignored = self.parser.getIgnored()
self.compareParseTrees(tree, output)
self.assertEqual(ignored, expected_ignored)
# Check that parseQueryEx() == (parseQuery(), getIgnored())
ex_tree, ex_ignored = self.parser.parseQueryEx(input)
self.compareParseTrees(ex_tree, tree)
self.assertEqual(ex_ignored, expected_ignored)
def failure(self, input):
self.assertRaises(ParseError, self.parser.parseQuery, input)
self.assertRaises(ParseError, self.parser.parseQueryEx, input)
def compareParseTrees(self, got, expected, msg=None):
if msg is None:
msg = repr(got)
self.assertEqual(isinstance(got, ParseTreeNode), 1)
self.assertEqual(got.__class__, expected.__class__, msg)
if isinstance(got, PhraseNode):
self.assertEqual(got.nodeType(), "PHRASE", msg)
self.assertEqual(got.getValue(), expected.getValue(), msg)
elif isinstance(got, GlobNode):
self.assertEqual(got.nodeType(), "GLOB", msg)
self.assertEqual(got.getValue(), expected.getValue(), msg)
elif isinstance(got, AtomNode):
self.assertEqual(got.nodeType(), "ATOM", msg)
self.assertEqual(got.getValue(), expected.getValue(), msg)
elif isinstance(got, NotNode):
self.assertEqual(got.nodeType(), "NOT")
self.compareParseTrees(got.getValue(), expected.getValue(), msg)
elif isinstance(got, AndNode) or isinstance(got, OrNode):
self.assertEqual(got.nodeType(),
isinstance(got, AndNode) and "AND" or "OR", msg)
list1 = got.getValue()
list2 = expected.getValue()
self.assertEqual(len(list1), len(list2), msg)
for i in range(len(list1)):
self.compareParseTrees(list1[i], list2[i], msg)
class TestQueryParser(TestQueryParserBase):
def test001(self):
self.expect("foo", AtomNode("foo"))
def test002(self):
self.expect("note", AtomNode("note"))
def test003(self):
self.expect("aa and bb AND cc",
AndNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))
def test004(self):
self.expect("aa OR bb or cc",
OrNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))
def test005(self):
self.expect("aa AND bb OR cc AnD dd",
OrNode([AndNode([AtomNode("aa"), AtomNode("bb")]),
AndNode([AtomNode("cc"), AtomNode("dd")])]))
def test006(self):
self.expect("(aa OR bb) AND (cc OR dd)",
AndNode([OrNode([AtomNode("aa"), AtomNode("bb")]),
OrNode([AtomNode("cc"), AtomNode("dd")])]))
def test007(self):
self.expect("aa AND NOT bb",
AndNode([AtomNode("aa"), NotNode(AtomNode("bb"))]))
def test010(self):
self.expect('"foo bar"', PhraseNode(["foo", "bar"]))
def test011(self):
self.expect("foo bar", AndNode([AtomNode("foo"), AtomNode("bar")]))
def test012(self):
self.expect('(("foo bar"))"', PhraseNode(["foo", "bar"]))
def test013(self):
self.expect("((foo bar))", AndNode([AtomNode("foo"), AtomNode("bar")]))
def test014(self):
self.expect("foo-bar", PhraseNode(["foo", "bar"]))
def test015(self):
self.expect("foo -bar", AndNode([AtomNode("foo"),
NotNode(AtomNode("bar"))]))
def test016(self):
self.expect("-foo bar", AndNode([AtomNode("bar"),
NotNode(AtomNode("foo"))]))
def test017(self):
self.expect("booh -foo-bar",
AndNode([AtomNode("booh"),
NotNode(PhraseNode(["foo", "bar"]))]))
def test018(self):
self.expect('booh -"foo bar"',
AndNode([AtomNode("booh"),
NotNode(PhraseNode(["foo", "bar"]))]))
def test019(self):
self.expect('foo"bar"',
AndNode([AtomNode("foo"), AtomNode("bar")]))
def test020(self):
self.expect('"foo"bar',
AndNode([AtomNode("foo"), AtomNode("bar")]))
def test021(self):
self.expect('foo"bar"blech',
AndNode([AtomNode("foo"), AtomNode("bar"),
AtomNode("blech")]))
def test022(self):
self.expect("foo*", GlobNode("foo*"))
def test023(self):
self.expect("foo* bar", AndNode([GlobNode("foo*"),
AtomNode("bar")]))
def test101(self):
self.failure("")
def test102(self):
self.failure("not")
def test103(self):
self.failure("or")
def test104(self):
self.failure("and")
def test105(self):
self.failure("NOT")
def test106(self):
self.failure("OR")
def test107(self):
self.failure("AND")
def test108(self):
self.failure("NOT foo")
def test109(self):
self.failure(")")
def test110(self):
self.failure("(")
def test111(self):
self.failure("foo OR")
def test112(self):
self.failure("foo AND")
def test113(self):
self.failure("OR foo")
def test114(self):
self.failure("AND foo")
def test115(self):
self.failure("(foo) bar")
def test116(self):
self.failure("(foo OR)")
def test117(self):
self.failure("(foo AND)")
def test118(self):
self.failure("(NOT foo)")
def test119(self):
self.failure("-foo")
def test120(self):
self.failure("-foo -bar")
def test121(self):
self.failure("foo OR -bar")
def test122(self):
self.failure("foo AND -bar")
class StopWordTestQueryParser(TestQueryParserBase):
def setUp(self):
# Only 'stop' is a stopword (but 'and' is still an operator)
self.lexicon = Lexicon(Splitter(), FakeStopWordRemover())
self.parser = QueryParser(self.lexicon)
def test201(self):
self.expect('and/', AtomNode("and"))
def test202(self):
self.expect('foo AND stop', AtomNode("foo"), ["stop"])
def test203(self):
self.expect('foo AND NOT stop', AtomNode("foo"), ["stop"])
def test204(self):
self.expect('stop AND foo', AtomNode("foo"), ["stop"])
def test205(self):
self.expect('foo OR stop', AtomNode("foo"), ["stop"])
def test206(self):
self.expect('stop OR foo', AtomNode("foo"), ["stop"])
def test301(self):
self.failure('stop')
def test302(self):
self.failure('stop stop')
def test303(self):
self.failure('stop AND stop')
def test304(self):
self.failure('stop OR stop')
def test305(self):
self.failure('stop -foo')
def test306(self):
self.failure('stop AND NOT foo')
class FakeStopWordRemover:
def process(self, list):
return [word for word in list if word != "stop"]
def test_suite():
return TestSuite((makeSuite(TestQueryParser),
makeSuite(StopWordTestQueryParser),
makeSuite(TestInterfaces),
))
if __name__=="__main__":
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/testSetOps.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from Persistence.BTrees.IIBTree import IIBTree, IIBucket
from Zope.TextIndex.SetOps import mass_weightedIntersection
from Zope.TextIndex.SetOps import mass_weightedUnion
class TestSetOps(TestCase):
def testEmptyLists(self):
self.assertEqual(len(mass_weightedIntersection([])), 0)
self.assertEqual(len(mass_weightedUnion([])), 0)
def testIdentity(self):
t = IIBTree([(1, 2)])
b = IIBucket([(1, 2)])
for x in t, b:
for func in mass_weightedUnion, mass_weightedIntersection:
result = func([(x, 1)])
self.assertEqual(len(result), 1)
self.assertEqual(list(result.items()), list(x.items()))
def testScalarMultiply(self):
t = IIBTree([(1, 2), (2, 3), (3, 4)])
allkeys = [1, 2, 3]
b = IIBucket(t)
for x in t, b:
self.assertEqual(list(x.keys()), allkeys)
for func in mass_weightedUnion, mass_weightedIntersection:
for factor in 0, 1, 5, 10:
result = func([(x, factor)])
self.assertEqual(allkeys, list(result.keys()))
for key in x.keys():
self.assertEqual(x[key] * factor, result[key])
def testPairs(self):
t1 = IIBTree([(1, 10), (3, 30), (7, 70)])
t2 = IIBTree([(3, 30), (5, 50), (7, 7), (9, 90)])
allkeys = [1, 3, 5, 7, 9]
b1 = IIBucket(t1)
b2 = IIBucket(t2)
for x in t1, t2, b1, b2:
for key in x.keys():
self.assertEqual(key in allkeys, 1)
for y in t1, t2, b1, b2:
for w1, w2 in (0, 0), (1, 10), (10, 1), (2, 3):
# Test the union.
expected = []
for key in allkeys:
if x.has_key(key) or y.has_key(key):
result = x.get(key, 0) * w1 + y.get(key, 0) * w2
expected.append((key, result))
expected.sort()
got = mass_weightedUnion([(x, w1), (y, w2)])
self.assertEqual(expected, list(got.items()))
got = mass_weightedUnion([(y, w2), (x, w1)])
self.assertEqual(expected, list(got.items()))
# Test the intersection.
expected = []
for key in allkeys:
if x.has_key(key) and y.has_key(key):
result = x[key] * w1 + y[key] * w2
expected.append((key, result))
expected.sort()
got = mass_weightedIntersection([(x, w1), (y, w2)])
self.assertEqual(expected, list(got.items()))
got = mass_weightedIntersection([(y, w2), (x, w1)])
self.assertEqual(expected, list(got.items()))
def testMany(self):
import random
N = 15 # number of IIBTrees to feed in
L = []
commonkey = N * 1000
allkeys = {commonkey: 1}
for i in range(N):
t = IIBTree()
t[commonkey] = i
for j in range(N-i):
key = i + j
allkeys[key] = 1
t[key] = N*i + j
L.append((t, i+1))
random.shuffle(L)
allkeys = allkeys.keys()
allkeys.sort()
# Test the union.
expected = []
for key in allkeys:
sum = 0
for t, w in L:
if t.has_key(key):
sum += t[key] * w
expected.append((key, sum))
# print 'union', expected
got = mass_weightedUnion(L)
self.assertEqual(expected, list(got.items()))
# Test the intersection.
expected = []
for key in allkeys:
sum = 0
for t, w in L:
if t.has_key(key):
sum += t[key] * w
else:
break
else:
# We didn't break out of the loop so it's in the intersection.
expected.append((key, sum))
# print 'intersection', expected
got = mass_weightedIntersection(L)
self.assertEqual(expected, list(got.items()))
def test_suite():
return makeSuite(TestSetOps)
if __name__=="__main__":
main(defaultTest='test_suite')
=== Added File Zope3/lib/python/Zope/TextIndex/tests/wordstats.py ===
#! /usr/bin/env python
"""Dump statistics about each word in the index.
usage: wordstats.py data.fs [index key]
"""
import ZODB
from ZODB.FileStorage import FileStorage
def main(fspath, key):
fs = FileStorage(fspath, read_only=1)
db = ZODB.DB(fs)
rt = db.open().root()
index = rt[key]
lex = index.lexicon
idx = index.index
print "Words", lex.length()
print "Documents", idx.length()
print "Word frequencies: count, word, wid"
for word, wid in lex.items():
docs = idx._wordinfo[wid]
print len(docs), word, wid
print "Per-doc scores: wid, (doc, score,)+"
for wid in lex.wids():
print wid,
docs = idx._wordinfo[wid]
for docid, score in docs.items():
print docid, score,
print
if __name__ == "__main__":
import sys
args = sys.argv[1:]
index_key = "index"
if len(args) == 1:
fspath = args[0]
elif len(args) == 2:
fspath, index_key = args
else:
print "Expected 1 or 2 args, got", len(args)
main(fspath, index_key)