[Zope3-checkins] CVS: Zope3/src/zope/index/text/tests - __init__.py:1.1 hs-tool.py:1.1 indexhtml.py:1.1 mailtest.py:1.1 mhindex.py:1.1 queryhtml.py:1.1 test_index.py:1.1 test_lexicon.py:1.1 test_nbest.py:1.1 test_pipelinefactory.py:1.1 test_queryengine.py:1.1 test_queryparser.py:1.1 test_setops.py:1.1 test_textindexwrapper.py:1.1 wordstats.py:1.1
Anthony Baxter
anthony@interlink.com.au
Sun, 13 Jul 2003 23:53:59 -0400
Update of /cvs-repository/Zope3/src/zope/index/text/tests
In directory cvs.zope.org:/tmp/cvs-serv12584/src/zope/index/text/tests
Added Files:
__init__.py hs-tool.py indexhtml.py mailtest.py mhindex.py
queryhtml.py test_index.py test_lexicon.py test_nbest.py
test_pipelinefactory.py test_queryengine.py
test_queryparser.py test_setops.py test_textindexwrapper.py
wordstats.py
Log Message:
index-geddon, part the second.
Moved zope.textindex and zope.fieldindex into zope.index.text and
zope.index.field, respectively. Shouldn't need any module aliases
for the old ones, as people shouldn't be instantiating the classes
in zope.textindex or zope.fieldindex directly - instead, they should
be going via zope.app.index.field.index and zope.app.index.text.index.
=== Added File Zope3/src/zope/index/text/tests/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zope/index/text/tests/hs-tool.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import cPickle
import os.path
import sys
from hotshot.log import LogReader
def load_line_info(log):
byline = {}
prevloc = None
for what, place, tdelta in log:
if tdelta > 0:
t, nhits = byline.get(prevloc, (0, 0))
byline[prevloc] = (tdelta + t), (nhits + 1)
prevloc = place
return byline
def basename(path, cache={}):
try:
return cache[path]
except KeyError:
fn = os.path.split(path)[1]
cache[path] = fn
return fn
def print_results(results):
for info, place in results:
if place is None:
# This is the startup time for the profiler, and only
# occurs at the very beginning. Just ignore it, since it
# corresponds to frame setup of the outermost call, not
# anything that's actually interesting.
continue
filename, line, funcname = place
print '%8d %8d' % info, basename(filename), line
def annotate_results(results):
files = {}
for stats, place in results:
if not place:
continue
time, hits = stats
file, line, func = place
l = files.get(file)
if l is None:
l = files[file] = []
l.append((line, hits, time))
order = files.keys()
order.sort()
for k in order:
if os.path.exists(k):
v = files[k]
v.sort()
annotate(k, v)
def annotate(file, lines):
print "-" * 60
print file
print "-" * 60
f = open(file)
i = 1
match = lines[0][0]
for line in f:
if match == i:
print "%6d %8d " % lines[0][1:], line,
del lines[0]
if lines:
match = lines[0][0]
else:
match = None
else:
print " " * 16, line,
i += 1
print
def get_cache_name(filename):
d, fn = os.path.split(filename)
cache_dir = os.path.join(d, '.hs-tool')
cache_file = os.path.join(cache_dir, fn)
return cache_dir, cache_file
def cache_results(filename, results):
cache_dir, cache_file = get_cache_name(filename)
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
fp = open(cache_file, 'wb')
try:
cPickle.dump(results, fp, 1)
finally:
fp.close()
def main(filename, annotate):
cache_dir, cache_file = get_cache_name(filename)
if ( os.path.isfile(cache_file)
and os.path.getmtime(cache_file) > os.path.getmtime(filename)):
# cached data is up-to-date:
fp = open(cache_file, 'rb')
results = cPickle.load(fp)
fp.close()
else:
log = LogReader(filename)
byline = load_line_info(log)
# Sort
results = [(v, k) for k, v in byline.items()]
results.sort()
cache_results(filename, results)
if annotate:
annotate_results(results)
else:
print_results(results)
if __name__ == "__main__":
import getopt
annotate_p = 0
opts, args = getopt.getopt(sys.argv[1:], 'A')
for o, v in opts:
if o == '-A':
annotate_p = 1
if args:
filename, = args
else:
filename = "profile.dat"
main(filename, annotate_p)
=== Added File Zope3/src/zope/index/text/tests/indexhtml.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Index a collection of HTML files on the filesystem.
usage: indexhtml.py [options] dir
Will create an index of all files in dir or its subdirectories.
options:
-f data.fs -- the path to the filestorage datafile
"""
from __future__ import nested_scopes
import os
from time import clock
from zodb.storage.file import FileStorage
from zodb.btrees.IOBTree import IOBTree
from zope.index.text.textindexwrapper import TextIndexWrapper
from zope.index.text.htmlsplitter import HTMLWordSplitter
from zope.index.text.lexicon import Lexicon, StopWordRemover
def make_zc_index():
# there's an elaborate dance necessary to construct an index
class Struct:
pass
extra = Struct()
extra.doc_attr = "read"
extra.lexicon_id = "lexicon"
caller = Struct()
caller.lexicon = Lexicon(HTMLWordSplitter(), StopWordRemover())
return ZCTextIndex("read", extra, caller)
# XXX make a splitter more like the HTMLSplitter for TextIndex
# signature is
# Splitter(string, stop_words, encoding,
# singlechar, indexnumbers, casefolding)
class MySplitter:
def __init__(self):
self._v_splitter = HTMLWordSplitter()
def __call__(self, text, stopdict, *args, **kwargs):
words = self._v_splitter._split(text)
def lookup(w):
return stopdict.get(w, w)
return filter(None, map(lookup, words))
def make_old_index():
from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
from Products.PluginIndexes.TextIndex.Lexicon import Lexicon
from zope.index.text.stopdict import get_stopdict
l = Lexicon(get_stopdict())
l.SplitterFunc = MySplitter()
return TextIndex("read", lexicon=l)
def main(db, root, dir):
rt["index"] = index = INDEX()
rt["files"] = paths = IOBTree()
get_transaction().commit()
zodb_time = 0.0
pack_time = 0.0
files = [os.path.join(dir, file) for file in os.listdir(dir)]
docid = 0
t0 = clock()
for file in files:
if os.path.isdir(file):
files += [os.path.join(file, sub) for sub in os.listdir(file)]
else:
if not file.endswith(".html"):
continue
docid += 1
if LIMIT is not None and docid > LIMIT:
break
if VERBOSE:
print "%5d" % docid, file
f = open(file, "rb")
paths[docid] = file
index.index_object(docid, f)
f.close()
if docid % TXN_INTERVAL == 0:
z0 = clock()
get_transaction().commit()
z1 = clock()
zodb_time += z1 - z0
if VERBOSE:
print "commit took", z1 - z0, zodb_time
if docid % PACK_INTERVAL == 0:
p0 = clock()
db.pack()
p1 = clock()
zodb_time += p1 - p0
pack_time += p1 - p0
if VERBOSE:
print "pack took", p1 - p0, pack_time
z0 = clock()
get_transaction().commit()
z1 = t1 = clock()
total_time = t1 - t0
zodb_time += z1 - z0
if VERBOSE:
print "Total index time", total_time
print "Non-pack time", total_time - pack_time
print "Non-ZODB time", total_time - zodb_time
if __name__ == "__main__":
import sys
import getopt
VERBOSE = 0
FSPATH = "Data.fs"
TXN_INTERVAL = 100
PACK_INTERVAL = 500
LIMIT = None
INDEX = make_zc_index
try:
opts, args = getopt.getopt(sys.argv[1:], 'vf:t:p:n:T')
except getopt.error, msg:
print msg
print __doc__
sys.exit(2)
for o, v in opts:
if o == '-v':
VERBOSE += 1
if o == '-f':
FSPATH = v
if o == '-t':
TXN_INTERVAL = int(v)
if o == '-p':
PACK_INTERVAL = int(v)
if o == '-n':
LIMIT = int(v)
if o == '-T':
INDEX = make_old_index
if len(args) != 1:
print "Expected on argument"
print __doc__
sys.exit(2)
dir = args[0]
fs = FileStorage(FSPATH)
db = ZODB.DB(fs)
cn = db.open()
rt = cn.root()
dir = os.path.join(os.getcwd(), dir)
print dir
main(db, rt, dir)
cn.close()
fs.close()
=== Added File Zope3/src/zope/index/text/tests/mailtest.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test an index with a Unix mailbox file.
usage: python mailtest.py [options] <data.fs>
options:
-v -- verbose
Index Generation
-i mailbox
-n NNN -- max number of messages to read from mailbox
-t NNN -- commit a transaction every NNN messages (default: 1)
-p NNN -- pack <data.fs> every NNN messages (default: 500), and at end
-p 0 -- don't pack at all
-x -- exclude the message text from the data.fs
Queries
-q query
-b NNN -- return the NNN best matches (default: 10)
-c NNN -- context; if -v, show the first NNN lines of results (default: 5)
The script either indexes or queries depending on whether -q or -i is
passed as an option.
For -i mailbox, the script reads mail messages from the mailbox and
indexes them. It indexes one message at a time, then commits the
transaction.
For -q query, it performs a query on an existing index.
If both are specified, the index is performed first.
You can also interact with the index after it is completed. Load the
index from the database:
import zodb
from zodb.storage.file import FileStorage
fs = FileStorage(<data.fs>
db = ZODB.DB(fs)
index = cn.open().root()["index"]
index.search("python AND unicode")
"""
from zope.index.text.lexicon import \
Lexicon, CaseNormalizer, Splitter, StopWordRemover
# XXX This import is bad, and was so before the renaming
from zope.index.text.zctextindex import ZCTextIndex
from BTrees.IOBTree import IOBTree
from zope.index.text.queryparser import QueryParser
import sys
import mailbox
import time
def usage(msg):
print msg
print __doc__
sys.exit(2)
class Message:
total_bytes = 0
def __init__(self, msg):
subject = msg.getheader('subject', '')
author = msg.getheader('from', '')
if author:
summary = "%s (%s)\n" % (subject, author)
else:
summary = "%s\n" % subject
self.text = summary + msg.fp.read()
Message.total_bytes += len(self.text)
class Extra:
pass
def index(rt, mboxfile, db, profiler):
global NUM
idx_time = 0
pack_time = 0
start_time = time.time()
lexicon = Lexicon(Splitter(), CaseNormalizer(), StopWordRemover())
extra = Extra()
extra.lexicon_id = 'lexicon'
extra.doc_attr = 'text'
extra.index_type = 'Okapi BM25 Rank'
caller = Extra()
caller.lexicon = lexicon
rt["index"] = idx = ZCTextIndex("index", extra, caller)
if not EXCLUDE_TEXT:
rt["documents"] = docs = IOBTree()
else:
docs = None
get_transaction().commit()
mbox = mailbox.UnixMailbox(open(mboxfile, 'rb'))
if VERBOSE:
print "opened", mboxfile
if not NUM:
NUM = sys.maxint
if profiler:
itime, ptime, i = profiler.runcall(indexmbox, mbox, idx, docs, db)
else:
itime, ptime, i = indexmbox(mbox, idx, docs, db)
idx_time += itime
pack_time += ptime
get_transaction().commit()
if PACK_INTERVAL and i % PACK_INTERVAL != 0:
if VERBOSE >= 2:
print "packing one last time..."
p0 = time.clock()
db.pack(time.time())
p1 = time.clock()
if VERBOSE:
print "pack took %s sec" % (p1 - p0)
pack_time += p1 - p0
if VERBOSE:
finish_time = time.time()
print
print "Index time", round(idx_time / 60, 3), "minutes"
print "Pack time", round(pack_time / 60, 3), "minutes"
print "Index bytes", Message.total_bytes
rate = (Message.total_bytes / idx_time) / 1024
print "Index rate %.2f KB/sec" % rate
print "Indexing began", time.ctime(start_time)
print "Indexing ended", time.ctime(finish_time)
print "Wall clock minutes", round((finish_time - start_time)/60, 3)
def indexmbox(mbox, idx, docs, db):
idx_time = 0
pack_time = 0
i = 0
while i < NUM:
_msg = mbox.next()
if _msg is None:
break
i += 1
msg = Message(_msg)
if VERBOSE >= 2:
print "indexing msg", i
i0 = time.clock()
idx.index_object(i, msg)
if not EXCLUDE_TEXT:
docs[i] = msg
if i % TXN_SIZE == 0:
get_transaction().commit()
i1 = time.clock()
idx_time += i1 - i0
if VERBOSE and i % 50 == 0:
print i, "messages indexed"
print "cache size", db.cacheSize()
if PACK_INTERVAL and i % PACK_INTERVAL == 0:
if VERBOSE >= 2:
print "packing..."
p0 = time.clock()
db.pack(time.time())
p1 = time.clock()
if VERBOSE:
print "pack took %s sec" % (p1 - p0)
pack_time += p1 - p0
return idx_time, pack_time, i
def query(rt, query_str, profiler):
idx = rt["index"]
docs = rt["documents"]
start = time.clock()
if profiler is None:
results, num_results = idx.query(query_str, BEST)
else:
if WARM_CACHE:
print "Warming the cache..."
idx.query(query_str, BEST)
start = time.clock()
results, num_results = profiler.runcall(idx.query, query_str, BEST)
elapsed = time.clock() - start
print "query:", query_str
print "# results:", len(results), "of", num_results, \
"in %.2f ms" % (elapsed * 1000)
tree = QueryParser(idx.lexicon).parseQuery(query_str)
qw = idx.index.query_weight(tree.terms())
for docid, score in results:
scaled = 100.0 * score / qw
print "docid %7d score %6d scaled %5.2f%%" % (docid, score, scaled)
if VERBOSE:
msg = docs[docid]
ctx = msg.text.split("\n", CONTEXT)
del ctx[-1]
print "-" * 60
print "message:"
for l in ctx:
print l
print "-" * 60
def main(fs_path, mbox_path, query_str, profiler):
f = ZODB.FileStorage.FileStorage(fs_path)
db = ZODB.DB(f, cache_size=CACHE_SIZE)
cn = db.open()
rt = cn.root()
if mbox_path is not None:
index(rt, mbox_path, db, profiler)
if query_str is not None:
query(rt, query_str, profiler)
cn.close()
db.close()
f.close()
if __name__ == "__main__":
import getopt
NUM = 0
VERBOSE = 0
PACK_INTERVAL = 500
EXCLUDE_TEXT = 0
CACHE_SIZE = 10000
TXN_SIZE = 1
BEST = 10
CONTEXT = 5
WARM_CACHE = 0
query_str = None
mbox_path = None
profile = None
old_profile = None
try:
opts, args = getopt.getopt(sys.argv[1:], 'vn:p:i:q:b:c:xt:w',
['profile=', 'old-profile='])
except getopt.error, msg:
usage(msg)
if len(args) != 1:
usage("exactly 1 filename argument required")
for o, v in opts:
if o == '-n':
NUM = int(v)
elif o == '-v':
VERBOSE += 1
elif o == '-p':
PACK_INTERVAL = int(v)
elif o == '-q':
query_str = v
elif o == '-i':
mbox_path = v
elif o == '-b':
BEST = int(v)
elif o == '-x':
EXCLUDE_TEXT = 1
elif o == '-t':
TXN_SIZE = int(v)
elif o == '-c':
CONTEXT = int(v)
elif o == '-w':
WARM_CACHE = 1
elif o == '--profile':
profile = v
elif o == '--old-profile':
old_profile = v
fs_path, = args
if profile:
import hotshot
profiler = hotshot.Profile(profile, lineevents=1, linetimings=1)
elif old_profile:
import profile
profiler = profile.Profile()
else:
profiler = None
main(fs_path, mbox_path, query_str, profiler)
if profile:
profiler.close()
elif old_profile:
import pstats
profiler.dump_stats(old_profile)
stats = pstats.Stats(old_profile)
stats.strip_dirs().sort_stats('time').print_stats(20)
=== Added File Zope3/src/zope/index/text/tests/mhindex.py ===
#! /usr/bin/env python2.2
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""MH mail indexer.
To index messages from a single folder (messages defaults to 'all'):
mhindex.py [options] -u +folder [messages ...]
To bulk index all messages from several folders:
mhindex.py [options] -b folder ...; the folder name ALL means all folders.
To execute a single query:
mhindex.py [options] query
To enter interactive query mode:
mhindex.py [options]
Common options:
-d FILE -- specify the Data.fs to use (default ~/.Data.fs)
-w -- dump the word list in alphabetical order and exit
-W -- dump the word list ordered by word id and exit
Indexing options:
-O -- do a prescan on the data to compute optimal word id assignments;
this is only useful the first time the Data.fs is used
-t N -- commit a transaction after every N messages (default 20000)
-p N -- pack after every N commits (by default no packing is done)
Querying options:
-m N -- show at most N matching lines from the message (default 3)
-n N -- show the N best matching messages (default 3)
"""
import os
import re
import sys
import time
import mhlib
import getopt
import traceback
from StringIO import StringIO
from stat import ST_MTIME
DATAFS = "~/.mhindex.fs"
ZOPECODE = "~/projects/Zope3/lib/python"
zopecode = os.path.expanduser(ZOPECODE)
sys.path.insert(0, zopecode)
from zodb.db import DB
from zodb.storage.file import FileStorage
from transaction import get_transaction
from zodb.btrees.IOBTree import IOBTree
from zodb.btrees.OIBTree import OIBTree
from zodb.btrees.IIBTree import IIBTree
from zope.index.text.okapiindex import OkapiIndex
from zope.index.text.lexicon import Splitter
from zope.index.text.lexicon import CaseNormalizer, StopWordRemover
from zope.index.text.stopdict import get_stopdict
from zope.index.text.textindexwrapper import TextIndexWrapper
NBEST = 3
MAXLINES = 3
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "bd:fhm:n:Op:t:uwW")
except getopt.error, msg:
print msg
print "use -h for help"
return 2
update = 0
bulk = 0
optimize = 0
nbest = NBEST
maxlines = MAXLINES
datafs = os.path.expanduser(DATAFS)
pack = 0
trans = 20000
dumpwords = dumpwids = dumpfreqs = 0
for o, a in opts:
if o == "-b":
bulk = 1
if o == "-d":
datafs = a
if o == "-f":
dumpfreqs = 1
if o == "-h":
print __doc__
return
if o == "-m":
maxlines = int(a)
if o == "-n":
nbest = int(a)
if o == "-O":
optimize = 1
if o == "-p":
pack = int(a)
if o == "-t":
trans = int(a)
if o == "-u":
update = 1
if o == "-w":
dumpwords = 1
if o == "-W":
dumpwids = 1
ix = Indexer(datafs, writable=update or bulk, trans=trans, pack=pack)
if dumpfreqs:
ix.dumpfreqs()
if dumpwords:
ix.dumpwords()
if dumpwids:
ix.dumpwids()
if dumpwords or dumpwids or dumpfreqs:
return
if bulk:
if optimize:
ix.optimize(args)
ix.bulkupdate(args)
elif update:
ix.update(args)
elif args:
for i in range(len(args)):
a = args[i]
if " " in a:
if a[0] == "-":
args[i] = '-"' + a[1:] + '"'
else:
args[i] = '"' + a + '"'
ix.query(" ".join(args), nbest, maxlines)
else:
ix.interact(nbest)
if pack:
ix.pack()
class Indexer:
filestorage = database = connection = root = None
def __init__(self, datafs, writable=0, trans=0, pack=0):
self.trans_limit = trans
self.pack_limit = pack
self.trans_count = 0
self.pack_count = 0
self.stopdict = get_stopdict()
self.mh = mhlib.MH()
self.filestorage = FileStorage(datafs, read_only=(not writable))
self.database = DB(self.filestorage)
self.connection = self.database.open()
self.root = self.connection.root()
try:
self.index = self.root["index"]
except KeyError:
self.index = self.root["index"] = TextIndexWrapper()
try:
self.docpaths = self.root["docpaths"]
except KeyError:
self.docpaths = self.root["docpaths"] = IOBTree()
try:
self.doctimes = self.root["doctimes"]
except KeyError:
self.doctimes = self.root["doctimes"] = IIBTree()
try:
self.watchfolders = self.root["watchfolders"]
except KeyError:
self.watchfolders = self.root["watchfolders"] = {}
self.path2docid = OIBTree()
for docid in self.docpaths.keys():
path = self.docpaths[docid]
self.path2docid[path] = docid
try:
self.maxdocid = max(self.docpaths.keys())
except ValueError:
self.maxdocid = 0
print len(self.docpaths), "Document ids"
print len(self.path2docid), "Pathnames"
print self.index.lexicon.length(), "Words"
def dumpfreqs(self):
lexicon = self.index.lexicon
index = self.index.index
assert isinstance(index, OkapiIndex)
L = []
for wid in lexicon.wids():
freq = 0
for f in index._wordinfo.get(wid, {}).values():
freq += f
L.append((freq, wid, lexicon.get_word(wid)))
L.sort()
L.reverse()
for freq, wid, word in L:
print "%10d %10d %s" % (wid, freq, word)
def dumpwids(self):
lexicon = self.index.lexicon
index = self.index.index
assert isinstance(index, OkapiIndex)
for wid in lexicon.wids():
freq = 0
for f in index._wordinfo.get(wid, {}).values():
freq += f
print "%10d %10d %s" % (wid, freq, lexicon.get_word(wid))
def dumpwords(self):
lexicon = self.index.lexicon
index = self.index.index
assert isinstance(index, OkapiIndex)
for word in lexicon.words():
wid = lexicon.get_wid(word)
freq = 0
for f in index._wordinfo.get(wid, {}).values():
freq += f
print "%10d %10d %s" % (wid, freq, word)
def close(self):
self.root = None
if self.connection is not None:
self.connection.close()
self.connection = None
if self.database is not None:
self.database.close()
self.database = None
if self.filestorage is not None:
self.filestorage.close()
self.filestorage = None
def interact(self, nbest=NBEST, maxlines=MAXLINES):
try:
import readline
except ImportError:
pass
text = ""
top = 0
results = []
while 1:
try:
line = raw_input("Query: ")
except EOFError:
print "\nBye."
break
line = line.strip()
if line.startswith("/"):
self.specialcommand(line, results, top - nbest)
continue
if line:
text = line
top = 0
else:
if not text:
continue
try:
results, n = self.timequery(text, top + nbest)
except KeyboardInterrupt:
raise
except:
reportexc()
text = ""
continue
if len(results) <= top:
if not n:
print "No hits for %r." % text
else:
print "No more hits for %r." % text
text = ""
continue
print "[Results %d-%d from %d" % (top+1, min(n, top+nbest), n),
print "for query %s]" % repr(text)
self.formatresults(text, results, maxlines, top, top+nbest)
top += nbest
def specialcommand(self, line, results, first):
assert line.startswith("/")
line = line[1:]
if not line:
n = first
else:
try:
n = int(line) - 1
except:
print "Huh?"
return
if n < 0 or n >= len(results):
print "Out of range"
return
docid, score = results[n]
path = self.docpaths[docid]
i = path.rfind("/")
assert i > 0
folder = path[:i]
n = path[i+1:]
cmd = "show +%s %s" % (folder, n)
if os.getenv("DISPLAY"):
os.system("xterm -e sh -c '%s | less' &" % cmd)
else:
os.system(cmd)
def query(self, text, nbest=NBEST, maxlines=MAXLINES):
results, n = self.timequery(text, nbest)
if not n:
print "No hits for %r." % text
return
print "[Results 1-%d from %d]" % (len(results), n)
self.formatresults(text, results, maxlines)
def timequery(self, text, nbest):
t0 = time.time()
c0 = time.clock()
results, n = self.index.query(text, 0, nbest)
t1 = time.time()
c1 = time.clock()
print "[Query time: %.3f real, %.3f user]" % (t1-t0, c1-c0)
return results, n
def formatresults(self, text, results, maxlines=MAXLINES,
lo=0, hi=sys.maxint):
stop = self.stopdict.has_key
words = [w for w in re.findall(r"\w+\*?", text.lower()) if not stop(w)]
pattern = r"\b(" + "|".join(words) + r")\b"
pattern = pattern.replace("*", ".*") # glob -> re syntax
prog = re.compile(pattern, re.IGNORECASE)
print '='*70
rank = lo
for docid, score in results[lo:hi]:
rank += 1
path = self.docpaths[docid]
score *= 100.0
print "Rank: %d Score: %d%% File: %s" % (rank, score, path)
path = os.path.join(self.mh.getpath(), path)
try:
fp = open(path)
except (IOError, OSError), msg:
print "Can't open:", msg
continue
msg = mhlib.Message("<folder>", 0, fp)
for header in "From", "To", "Cc", "Bcc", "Subject", "Date":
h = msg.getheader(header)
if h:
print "%-8s %s" % (header+":", h)
text = self.getmessagetext(msg)
if text:
print
nleft = maxlines
for part in text:
for line in part.splitlines():
if prog.search(line):
print line
nleft -= 1
if nleft <= 0:
break
if nleft <= 0:
break
print '-'*70
def update(self, args):
folder = None
seqs = []
for arg in args:
if arg.startswith("+"):
if folder is None:
folder = arg[1:]
else:
print "only one folder at a time"
return
else:
seqs.append(arg)
if not folder:
folder = self.mh.getcontext()
if not seqs:
seqs = ['all']
try:
f = self.mh.openfolder(folder)
except mhlib.Error, msg:
print msg
return
dict = {}
for seq in seqs:
try:
nums = f.parsesequence(seq)
except mhlib.Error, msg:
print msg or "unparsable message sequence: %s" % `seq`
return
for n in nums:
dict[n] = n
msgs = dict.keys()
msgs.sort()
self.updatefolder(f, msgs)
self.commit()
def optimize(self, args):
uniqwords = {}
for folder in args:
if folder.startswith("+"):
folder = folder[1:]
print "\nOPTIMIZE FOLDER", folder
try:
f = self.mh.openfolder(folder)
except mhlib.Error, msg:
print msg
continue
self.prescan(f, f.listmessages(), uniqwords)
L = [(uniqwords[word], word) for word in uniqwords.keys()]
L.sort()
L.reverse()
for i in range(100):
print "%3d. %6d %s" % ((i+1,) + L[i])
self.index.lexicon.sourceToWordIds([word for (count, word) in L])
def prescan(self, f, msgs, uniqwords):
pipeline = [Splitter(), CaseNormalizer(), StopWordRemover()]
for n in msgs:
print "prescanning", n
m = f.openmessage(n)
text = self.getmessagetext(m, f.name)
for p in pipeline:
text = p.process(text)
for word in text:
uniqwords[word] = uniqwords.get(word, 0) + 1
def bulkupdate(self, args):
if not args:
print "No folders specified; use ALL to bulk-index all folders"
return
if "ALL" in args:
i = args.index("ALL")
args[i:i+1] = self.mh.listfolders()
for folder in args:
if folder.startswith("+"):
folder = folder[1:]
print "\nFOLDER", folder
try:
f = self.mh.openfolder(folder)
except mhlib.Error, msg:
print msg
continue
self.updatefolder(f, f.listmessages())
print "Total", len(self.docpaths)
self.commit()
print "Indexed", self.index.lexicon._nbytes, "bytes and",
print self.index.lexicon._nwords, "words;",
print len(self.index.lexicon._words), "unique words."
def updatefolder(self, f, msgs):
self.watchfolders[f.name] = self.getmtime(f.name)
for n in msgs:
path = "%s/%s" % (f.name, n)
docid = self.path2docid.get(path, 0)
if docid and self.getmtime(path) == self.doctimes.get(docid, 0):
print "unchanged", docid, path
continue
docid = self.newdocid(path)
try:
m = f.openmessage(n)
except IOError:
print "disappeared", docid, path
self.unindexpath(path)
continue
text = self.getmessagetext(m, f.name)
if not text:
self.unindexpath(path)
continue
print "indexing", docid, path
self.index.index_doc(docid, text)
self.maycommit()
# Remove messages from the folder that no longer exist
for path in list(self.path2docid.keys(f.name)):
if not path.startswith(f.name + "/"):
break
if self.getmtime(path) == 0:
self.unindexpath(path)
print "done."
def unindexpath(self, path):
if self.path2docid.has_key(path):
docid = self.path2docid[path]
print "unindexing", docid, path
del self.docpaths[docid]
del self.doctimes[docid]
del self.path2docid[path]
try:
self.index.unindex_doc(docid)
except KeyError, msg:
print "KeyError", msg
self.maycommit()
def getmessagetext(self, m, name=None):
L = []
if name:
L.append("_folder " + name) # To restrict search to a folder
self.getheaders(m, L)
try:
self.getmsgparts(m, L, 0)
except KeyboardInterrupt:
raise
except:
print "(getmsgparts failed:)"
reportexc()
return L
def getmsgparts(self, m, L, level):
ctype = m.gettype()
if level or ctype != "text/plain":
print ". "*level + str(ctype)
if ctype == "text/plain":
L.append(m.getbodytext())
elif ctype in ("multipart/alternative", "multipart/mixed"):
for part in m.getbodyparts():
self.getmsgparts(part, L, level+1)
elif ctype == "message/rfc822":
f = StringIO(m.getbodytext())
m = mhlib.Message("<folder>", 0, f)
self.getheaders(m, L)
self.getmsgparts(m, L, level+1)
def getheaders(self, m, L):
H = []
for key in "from", "to", "cc", "bcc", "subject":
value = m.get(key)
if value:
H.append(value)
if H:
L.append("\n".join(H))
def newdocid(self, path):
docid = self.path2docid.get(path)
if docid is not None:
self.doctimes[docid] = self.getmtime(path)
return docid
docid = self.maxdocid + 1
self.maxdocid = docid
self.docpaths[docid] = path
self.doctimes[docid] = self.getmtime(path)
self.path2docid[path] = docid
return docid
def getmtime(self, path):
path = os.path.join(self.mh.getpath(), path)
try:
st = os.stat(path)
except os.error, msg:
return 0
return int(st[ST_MTIME])
def maycommit(self):
self.trans_count += 1
if self.trans_count >= self.trans_limit > 0:
self.commit()
def commit(self):
if self.trans_count > 0:
print "committing..."
get_transaction().commit()
self.trans_count = 0
self.pack_count += 1
if self.pack_count >= self.pack_limit > 0:
self.pack()
def pack(self):
if self.pack_count > 0:
print "packing..."
self.database.pack()
self.pack_count = 0
def reportexc():
traceback.print_exc()
if __name__ == "__main__":
sys.exit(main())
=== Added File Zope3/src/zope/index/text/tests/queryhtml.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from time import clock
from zodb.storage.file import FileStorage
QUERIES = ["nested recursive functions",
"explicit better than implicit",
"build hpux",
"cannot create 'method-wrapper' instances",
"extension module C++",
"class method",
"instance variable",
"articulate information",
"import default files",
"gopher ftp http",
"documentation",
]
def path2url(p):
# convert the paths to a python.org URL
# hack: only works for the way Jeremy indexed his copy of python.org
marker = "www.python.org/."
i = p.find(marker)
if i == -1:
return p
i += len(marker)
return "http://www.python.org" + p[i:]
from Products.PluginIndexes.TextIndex.TextIndex import And, Or
from zope.index.text.tests.indexhtml import MySplitter
from zope.index.text.nbest import NBest
def main(rt):
index = rt["index"]
files = rt["files"]
times = {}
ITERS = range(50)
for i in range(11):
for q in QUERIES:
terms = q.split()
for c in " OR ", " AND ":
query = c.join(terms)
t0 = clock()
if TEXTINDEX:
if c == " OR ":
op = Or
else:
op = And
_q = " ".join(terms)
for _ in ITERS:
b = index.query(_q, op).bucket()
num = len(b)
chooser = NBest(10)
chooser.addmany(b.items())
results = chooser.getbest()
else:
try:
for _ in ITERS:
results, num = index.query(query)
except:
continue
t1 = clock()
print "<p>Query: \"%s\"" % query
print "<br>Num results: %d" % num
print "<br>time.clock(): %s" % (t1 - t0)
key = query
if i == 0:
print "<ol>"
for docid, score in results:
url = path2url(files[docid])
fmt = '<li><a href="%s">%s</A> score = %s'
print fmt % (url, url, score)
print "</ol>"
continue
l = times.setdefault(key, [])
l.append(t1 - t0)
l = times.keys()
l.sort()
print "<hr>"
for k in l:
v = times[k]
print "<p>Query: \"%s\"" % k
print "<br>Min time: %s" % min(v)
print "<br>All times: %s" % " ".join(map(str, v))
if __name__ == "__main__":
import sys
import getopt
VERBOSE = 0
FSPATH = "Data.fs"
TEXTINDEX = 0
try:
opts, args = getopt.getopt(sys.argv[1:], 'vf:T')
except getopt.error, msg:
print msg
print __doc__
sys.exit(2)
for o, v in opts:
if o == '-v':
VERBOSE += 1
if o == '-f':
FSPATH = v
if o == '-T':
TEXTINDEX = 1
fs = FileStorage(FSPATH, read_only=1)
db = ZODB.DB(fs, cache_size=10000)
cn = db.open()
rt = cn.root()
main(rt)
=== Added File Zope3/src/zope/index/text/tests/test_index.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from zope.index.text.lexicon import Lexicon, Splitter
from zope.index.text.cosineindex import CosineIndex
from zope.index.text.okapiindex import OkapiIndex
# Subclasses must set a class variable IndexFactory to the appropriate
# index object constructor.
class IndexTest(TestCase):
def setUp(self):
self.lexicon = Lexicon(Splitter())
self.index = self.IndexFactory(self.lexicon)
def test_index_document(self, DOCID=1):
doc = "simple document contains five words"
self.assert_(not self.index.has_doc(DOCID))
self.index.index_doc(DOCID, doc)
self.assertEqual(self.index.documentCount(), 1)
self.assertEqual(self.index.wordCount(), 5)
self.assertEqual(self.lexicon.wordCount(), 5)
self.assert_(self.index.has_doc(DOCID))
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._docweight), 1)
self.assertEqual(len(self.index._wordinfo), 5)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 5)
self.assertEqual(len(self.index._wordinfo),
self.index.wordCount())
for map in self.index._wordinfo.values():
self.assertEqual(len(map), 1)
self.assert_(map.has_key(DOCID))
def test_unindex_document(self):
DOCID = 1
self.test_index_document(DOCID)
self.index.unindex_doc(DOCID)
self.assertEqual(len(self.index._docweight), 0)
self.assertEqual(len(self.index._wordinfo), 0)
self.assertEqual(len(self.index._docwords), 0)
self.assertEqual(len(self.index._wordinfo),
self.index.wordCount())
def test_index_two_documents(self):
self.test_index_document()
doc = "another document just four"
DOCID = 2
self.index.index_doc(DOCID, doc)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._docweight), 2)
self.assertEqual(len(self.index._wordinfo), 8)
self.assertEqual(len(self.index._docwords), 2)
self.assertEqual(len(self.index.get_words(DOCID)), 4)
self.assertEqual(len(self.index._wordinfo),
self.index.wordCount())
wids = self.lexicon.termToWordIds("document")
self.assertEqual(len(wids), 1)
document_wid = wids[0]
for wid, map in self.index._wordinfo.items():
if wid == document_wid:
self.assertEqual(len(map), 2)
self.assert_(map.has_key(1))
self.assert_(map.has_key(DOCID))
else:
self.assertEqual(len(map), 1)
def test_index_two_unindex_one(self):
# index two documents, unindex one, and test the results
self.test_index_two_documents()
self.index.unindex_doc(1)
DOCID = 2
self.assertEqual(len(self.index._docweight), 1)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._wordinfo), 4)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 4)
self.assertEqual(len(self.index._wordinfo),
self.index.wordCount())
for map in self.index._wordinfo.values():
self.assertEqual(len(map), 1)
self.assert_(map.has_key(DOCID))
def test_index_duplicated_words(self, DOCID=1):
doc = "very simple repeat repeat repeat document test"
self.index.index_doc(DOCID, doc)
self.assert_(self.index._docweight[DOCID])
self.assertEqual(len(self.index._wordinfo), 5)
self.assertEqual(len(self.index._docwords), 1)
self.assertEqual(len(self.index.get_words(DOCID)), 7)
self.assertEqual(len(self.index._wordinfo),
self.index.wordCount())
wids = self.lexicon.termToWordIds("repeat")
self.assertEqual(len(wids), 1)
repititive_wid = wids[0]
for wid, map in self.index._wordinfo.items():
self.assertEqual(len(map), 1)
self.assert_(map.has_key(DOCID))
def test_simple_query_oneresult(self):
self.index.index_doc(1, 'not the same document')
results = self.index.search("document")
self.assertEqual(list(results.keys()), [1])
def test_simple_query_noresults(self):
self.index.index_doc(1, 'not the same document')
results = self.index.search("frobnicate")
self.assertEqual(list(results.keys()), [])
def test_query_oneresult(self):
self.index.index_doc(1, 'not the same document')
self.index.index_doc(2, 'something about something else')
results = self.index.search("document")
self.assertEqual(list(results.keys()), [1])
def test_search_phrase(self):
self.index.index_doc(1, "the quick brown fox jumps over the lazy dog")
self.index.index_doc(2, "the quick fox jumps lazy over the brown dog")
results = self.index.search_phrase("quick brown fox")
self.assertEqual(list(results.keys()), [1])
def test_search_glob(self):
self.index.index_doc(1, "how now brown cow")
self.index.index_doc(2, "hough nough browne cough")
self.index.index_doc(3, "bar brawl")
results = self.index.search_glob("bro*")
self.assertEqual(list(results.keys()), [1, 2])
results = self.index.search_glob("b*")
self.assertEqual(list(results.keys()), [1, 2, 3])
class CosineIndexTest(IndexTest):
IndexFactory = CosineIndex
class OkapiIndexTest(IndexTest):
IndexFactory = OkapiIndex
def test_suite():
return TestSuite((makeSuite(CosineIndexTest),
makeSuite(OkapiIndexTest),
))
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_lexicon.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import sys
from unittest import TestCase, main, makeSuite
from zope.index.text.lexicon import Lexicon
from zope.index.text.lexicon import Splitter, CaseNormalizer
class StupidPipelineElement:
def __init__(self, fromword, toword):
self.__fromword = fromword
self.__toword = toword
def process(self, seq):
res = []
for term in seq:
if term == self.__fromword:
res.append(self.__toword)
else:
res.append(term)
return res
class WackyReversePipelineElement:
def __init__(self, revword):
self.__revword = revword
def process(self, seq):
res = []
for term in seq:
if term == self.__revword:
x = list(term)
x.reverse()
res.append(''.join(x))
else:
res.append(term)
return res
class StopWordPipelineElement:
def __init__(self, stopdict={}):
self.__stopdict = stopdict
def process(self, seq):
res = []
for term in seq:
if self.__stopdict.get(term):
continue
else:
res.append(term)
return res
class Test(TestCase):
def testSourceToWordIds(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('cats and dogs')
self.assertEqual(wids, [1, 2, 3])
def testTermToWordIds(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('dogs')
self.assertEqual(wids, [3])
def testMissingTermToWordIds(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('boxes')
self.assertEqual(wids, [0])
def testOnePipelineElement(self):
lexicon = Lexicon(Splitter(), StupidPipelineElement('dogs', 'fish'))
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('fish')
self.assertEqual(wids, [3])
def testSplitterAdaptorFold(self):
lexicon = Lexicon(Splitter(), CaseNormalizer())
wids = lexicon.sourceToWordIds('CATS and dogs')
wids = lexicon.termToWordIds('cats and dogs')
self.assertEqual(wids, [1, 2, 3])
def testSplitterAdaptorNofold(self):
lexicon = Lexicon(Splitter())
wids = lexicon.sourceToWordIds('CATS and dogs')
wids = lexicon.termToWordIds('cats and dogs')
self.assertEqual(wids, [0, 2, 3])
def testTwoElementPipeline(self):
lexicon = Lexicon(Splitter(),
StupidPipelineElement('cats', 'fish'),
WackyReversePipelineElement('fish'))
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('hsif')
self.assertEqual(wids, [1])
def testThreeElementPipeline(self):
lexicon = Lexicon(Splitter(),
StopWordPipelineElement({'and':1}),
StupidPipelineElement('dogs', 'fish'),
WackyReversePipelineElement('fish'))
wids = lexicon.sourceToWordIds('cats and dogs')
wids = lexicon.termToWordIds('hsif')
self.assertEqual(wids, [2])
def testSplitterLocaleAwareness(self):
from zope.index.text.htmlsplitter import HTMLWordSplitter
import locale
loc = locale.setlocale(locale.LC_ALL) # get current locale
# set German locale
try:
if sys.platform != 'win32':
locale.setlocale(locale.LC_ALL, 'de_DE.ISO8859-1')
else:
locale.setlocale(locale.LC_ALL, 'German_Germany.1252')
except locale.Error:
return # This test doesn't work here :-(
expected = ['m\xfclltonne', 'waschb\xe4r',
'beh\xf6rde', '\xfcberflieger']
words = [" ".join(expected)]
words = Splitter().process(words)
self.assertEqual(words, expected)
words = HTMLWordSplitter().process(words)
self.assertEqual(words, expected)
locale.setlocale(locale.LC_ALL, loc) # restore saved locale
def test_suite():
return makeSuite(Test)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_nbest.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, main, makeSuite
from zope.index.text.nbest import NBest
class NBestTest(TestCase):
def testConstructor(self):
self.assertRaises(ValueError, NBest, 0)
self.assertRaises(ValueError, NBest, -1)
for n in range(1, 11):
nb = NBest(n)
self.assertEqual(len(nb), 0)
self.assertEqual(nb.capacity(), n)
def testOne(self):
nb = NBest(1)
nb.add('a', 0)
self.assertEqual(nb.getbest(), [('a', 0)])
nb.add('b', 1)
self.assertEqual(len(nb), 1)
self.assertEqual(nb.capacity(), 1)
self.assertEqual(nb.getbest(), [('b', 1)])
nb.add('c', -1)
self.assertEqual(len(nb), 1)
self.assertEqual(nb.capacity(), 1)
self.assertEqual(nb.getbest(), [('b', 1)])
nb.addmany([('d', 3), ('e', -6), ('f', 5), ('g', 4)])
self.assertEqual(len(nb), 1)
self.assertEqual(nb.capacity(), 1)
self.assertEqual(nb.getbest(), [('f', 5)])
def testMany(self):
import random
inputs = [(-i, i) for i in range(50)]
reversed_inputs = inputs[:]
reversed_inputs.reverse()
# Test the N-best for a variety of n (1, 6, 11, ... 50).
for n in range(1, len(inputs)+1, 5):
expected = inputs[-n:]
expected.reverse()
random_inputs = inputs[:]
random.shuffle(random_inputs)
for source in inputs, reversed_inputs, random_inputs:
# Try feeding them one at a time.
nb = NBest(n)
for item, score in source:
nb.add(item, score)
self.assertEqual(len(nb), n)
self.assertEqual(nb.capacity(), n)
self.assertEqual(nb.getbest(), expected)
# And again in one gulp.
nb = NBest(n)
nb.addmany(source)
self.assertEqual(len(nb), n)
self.assertEqual(nb.capacity(), n)
self.assertEqual(nb.getbest(), expected)
for i in range(1, n+1):
self.assertEqual(nb.pop_smallest(), expected[-i])
self.assertRaises(IndexError, nb.pop_smallest)
def testAllSameScore(self):
inputs = [(i, 0) for i in range(10)]
for n in range(1, 12):
nb = NBest(n)
nb.addmany(inputs)
outputs = nb.getbest()
self.assertEqual(outputs, inputs[:len(outputs)])
def test_suite():
return makeSuite(NBestTest)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_pipelinefactory.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from unittest import TestCase, main, makeSuite
from zope.index.interfaces.pipelineelement import IPipelineElement
from zope.index.text.pipelinefactory import PipelineElementFactory
from zope.interface import implements
class NullPipelineElement:
implements(IPipelineElement)
def process(source):
pass
class PipelineFactoryTest(TestCase):
def setUp(self):
self.huey = NullPipelineElement()
self.dooey = NullPipelineElement()
self.louie = NullPipelineElement()
self.daffy = NullPipelineElement()
def testPipeline(self):
pf = PipelineElementFactory()
pf.registerFactory('donald', 'huey', self.huey)
pf.registerFactory('donald', 'dooey', self.dooey)
pf.registerFactory('donald', 'louie', self.louie)
pf.registerFactory('looney', 'daffy', self.daffy)
self.assertRaises(ValueError, pf.registerFactory,'donald', 'huey',
self.huey)
self.assertEqual(pf.getFactoryGroups(), ['donald', 'looney'])
self.assertEqual(pf.getFactoryNames('donald'),
['dooey', 'huey', 'louie'])
def test_suite():
return makeSuite(PipelineFactoryTest)
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_queryengine.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest
from zodb.btrees.IIBTree import IIBucket
from zope.index.text.queryparser import QueryParser
from zope.index.text.parsetree import QueryError
from zope.index.text.lexicon import Lexicon, Splitter
class FauxIndex:
def search(self, term):
b = IIBucket()
if term == "foo":
b[1] = b[3] = 1
elif term == "bar":
b[1] = b[2] = 1
elif term == "ham":
b[1] = b[2] = b[3] = b[4] = 1
return b
class TestQueryEngine(unittest.TestCase):
def setUp(self):
self.lexicon = Lexicon(Splitter())
self.parser = QueryParser(self.lexicon)
self.index = FauxIndex()
def compareSet(self, set, dict):
d = {}
for k, v in set.items():
d[k] = v
self.assertEqual(d, dict)
def compareQuery(self, query, dict):
tree = self.parser.parseQuery(query)
set = tree.executeQuery(self.index)
self.compareSet(set, dict)
def testExecuteQuery(self):
self.compareQuery("foo AND bar", {1: 2})
self.compareQuery("foo OR bar", {1: 2, 2: 1, 3:1})
self.compareQuery("foo AND NOT bar", {3: 1})
self.compareQuery("foo AND foo AND foo", {1: 3, 3: 3})
self.compareQuery("foo OR foo OR foo", {1: 3, 3: 3})
self.compareQuery("ham AND NOT foo AND NOT bar", {4: 1})
self.compareQuery("ham OR foo OR bar", {1: 3, 2: 2, 3: 2, 4: 1})
self.compareQuery("ham AND foo AND bar", {1: 3})
def testInvalidQuery(self):
from zope.index.text.parsetree import NotNode, AtomNode
tree = NotNode(AtomNode("foo"))
self.assertRaises(QueryError, tree.executeQuery, self.index)
def test_suite():
return unittest.makeSuite(TestQueryEngine)
if __name__=='__main__':
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_queryparser.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestSuite, main, makeSuite
from zope.interface.verify import verifyClass
from zope.index.interfaces.queryparser import IQueryParser
from zope.index.interfaces.queryparsetree import IQueryParseTree
from zope.index.text.queryparser import QueryParser
from zope.index.text.parsetree import ParseError, ParseTreeNode
from zope.index.text.parsetree import OrNode, AndNode, NotNode
from zope.index.text.parsetree import AtomNode, PhraseNode, GlobNode
from zope.index.text.lexicon import Lexicon, Splitter
class TestInterfaces(TestCase):
def testInterfaces(self):
verifyClass(IQueryParser, QueryParser)
verifyClass(IQueryParseTree, ParseTreeNode)
verifyClass(IQueryParseTree, OrNode)
verifyClass(IQueryParseTree, AndNode)
verifyClass(IQueryParseTree, NotNode)
verifyClass(IQueryParseTree, AtomNode)
verifyClass(IQueryParseTree, PhraseNode)
verifyClass(IQueryParseTree, GlobNode)
class TestQueryParserBase(TestCase):
def setUp(self):
self.lexicon = Lexicon(Splitter())
self.parser = QueryParser(self.lexicon)
def expect(self, input, output, expected_ignored=[]):
tree = self.parser.parseQuery(input)
ignored = self.parser.getIgnored()
self.compareParseTrees(tree, output)
self.assertEqual(ignored, expected_ignored)
# Check that parseQueryEx() == (parseQuery(), getIgnored())
ex_tree, ex_ignored = self.parser.parseQueryEx(input)
self.compareParseTrees(ex_tree, tree)
self.assertEqual(ex_ignored, expected_ignored)
def failure(self, input):
self.assertRaises(ParseError, self.parser.parseQuery, input)
self.assertRaises(ParseError, self.parser.parseQueryEx, input)
def compareParseTrees(self, got, expected, msg=None):
if msg is None:
msg = repr(got)
self.assertEqual(isinstance(got, ParseTreeNode), 1)
self.assertEqual(got.__class__, expected.__class__, msg)
if isinstance(got, PhraseNode):
self.assertEqual(got.nodeType(), "PHRASE", msg)
self.assertEqual(got.getValue(), expected.getValue(), msg)
elif isinstance(got, GlobNode):
self.assertEqual(got.nodeType(), "GLOB", msg)
self.assertEqual(got.getValue(), expected.getValue(), msg)
elif isinstance(got, AtomNode):
self.assertEqual(got.nodeType(), "ATOM", msg)
self.assertEqual(got.getValue(), expected.getValue(), msg)
elif isinstance(got, NotNode):
self.assertEqual(got.nodeType(), "NOT")
self.compareParseTrees(got.getValue(), expected.getValue(), msg)
elif isinstance(got, AndNode) or isinstance(got, OrNode):
self.assertEqual(got.nodeType(),
isinstance(got, AndNode) and "AND" or "OR", msg)
list1 = got.getValue()
list2 = expected.getValue()
self.assertEqual(len(list1), len(list2), msg)
for i in range(len(list1)):
self.compareParseTrees(list1[i], list2[i], msg)
class TestQueryParser(TestQueryParserBase):
def test001(self):
self.expect("foo", AtomNode("foo"))
def test002(self):
self.expect("note", AtomNode("note"))
def test003(self):
self.expect("aa and bb AND cc",
AndNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))
def test004(self):
self.expect("aa OR bb or cc",
OrNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))
def test005(self):
self.expect("aa AND bb OR cc AnD dd",
OrNode([AndNode([AtomNode("aa"), AtomNode("bb")]),
AndNode([AtomNode("cc"), AtomNode("dd")])]))
def test006(self):
self.expect("(aa OR bb) AND (cc OR dd)",
AndNode([OrNode([AtomNode("aa"), AtomNode("bb")]),
OrNode([AtomNode("cc"), AtomNode("dd")])]))
def test007(self):
self.expect("aa AND NOT bb",
AndNode([AtomNode("aa"), NotNode(AtomNode("bb"))]))
def test010(self):
self.expect('"foo bar"', PhraseNode(["foo", "bar"]))
def test011(self):
self.expect("foo bar", AndNode([AtomNode("foo"), AtomNode("bar")]))
def test012(self):
self.expect('(("foo bar"))"', PhraseNode(["foo", "bar"]))
def test013(self):
self.expect("((foo bar))", AndNode([AtomNode("foo"), AtomNode("bar")]))
def test014(self):
self.expect("foo-bar", PhraseNode(["foo", "bar"]))
def test015(self):
self.expect("foo -bar", AndNode([AtomNode("foo"),
NotNode(AtomNode("bar"))]))
def test016(self):
self.expect("-foo bar", AndNode([AtomNode("bar"),
NotNode(AtomNode("foo"))]))
def test017(self):
self.expect("booh -foo-bar",
AndNode([AtomNode("booh"),
NotNode(PhraseNode(["foo", "bar"]))]))
def test018(self):
self.expect('booh -"foo bar"',
AndNode([AtomNode("booh"),
NotNode(PhraseNode(["foo", "bar"]))]))
def test019(self):
self.expect('foo"bar"',
AndNode([AtomNode("foo"), AtomNode("bar")]))
def test020(self):
self.expect('"foo"bar',
AndNode([AtomNode("foo"), AtomNode("bar")]))
def test021(self):
self.expect('foo"bar"blech',
AndNode([AtomNode("foo"), AtomNode("bar"),
AtomNode("blech")]))
def test022(self):
self.expect("foo*", GlobNode("foo*"))
def test023(self):
self.expect("foo* bar", AndNode([GlobNode("foo*"),
AtomNode("bar")]))
def test101(self):
self.failure("")
def test102(self):
self.failure("not")
def test103(self):
self.failure("or")
def test104(self):
self.failure("and")
def test105(self):
self.failure("NOT")
def test106(self):
self.failure("OR")
def test107(self):
self.failure("AND")
def test108(self):
self.failure("NOT foo")
def test109(self):
self.failure(")")
def test110(self):
self.failure("(")
def test111(self):
self.failure("foo OR")
def test112(self):
self.failure("foo AND")
def test113(self):
self.failure("OR foo")
def test114(self):
self.failure("AND foo")
def test115(self):
self.failure("(foo) bar")
def test116(self):
self.failure("(foo OR)")
def test117(self):
self.failure("(foo AND)")
def test118(self):
self.failure("(NOT foo)")
def test119(self):
self.failure("-foo")
def test120(self):
self.failure("-foo -bar")
def test121(self):
self.failure("foo OR -bar")
def test122(self):
self.failure("foo AND -bar")
class StopWordTestQueryParser(TestQueryParserBase):
def setUp(self):
# Only 'stop' is a stopword (but 'and' is still an operator)
self.lexicon = Lexicon(Splitter(), FakeStopWordRemover())
self.parser = QueryParser(self.lexicon)
def test201(self):
self.expect('and/', AtomNode("and"))
def test202(self):
self.expect('foo AND stop', AtomNode("foo"), ["stop"])
def test203(self):
self.expect('foo AND NOT stop', AtomNode("foo"), ["stop"])
def test204(self):
self.expect('stop AND foo', AtomNode("foo"), ["stop"])
def test205(self):
self.expect('foo OR stop', AtomNode("foo"), ["stop"])
def test206(self):
self.expect('stop OR foo', AtomNode("foo"), ["stop"])
def test301(self):
self.failure('stop')
def test302(self):
self.failure('stop stop')
def test303(self):
self.failure('stop AND stop')
def test304(self):
self.failure('stop OR stop')
def test305(self):
self.failure('stop -foo')
def test306(self):
self.failure('stop AND NOT foo')
class FakeStopWordRemover:
def process(self, list):
return [word for word in list if word != "stop"]
def test_suite():
return TestSuite((makeSuite(TestQueryParser),
makeSuite(StopWordTestQueryParser),
makeSuite(TestInterfaces),
))
if __name__=="__main__":
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_setops.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, main, makeSuite
from zodb.btrees.IIBTree import IIBTree, IIBucket
from zope.index.text.setops import mass_weightedIntersection
from zope.index.text.setops import mass_weightedUnion
class TestSetOps(TestCase):
def testEmptyLists(self):
self.assertEqual(len(mass_weightedIntersection([])), 0)
self.assertEqual(len(mass_weightedUnion([])), 0)
def testIdentity(self):
t = IIBTree([(1, 2)])
b = IIBucket([(1, 2)])
for x in t, b:
for func in mass_weightedUnion, mass_weightedIntersection:
result = func([(x, 1)])
self.assertEqual(len(result), 1)
self.assertEqual(list(result.items()), list(x.items()))
def testScalarMultiply(self):
t = IIBTree([(1, 2), (2, 3), (3, 4)])
allkeys = [1, 2, 3]
b = IIBucket(t)
for x in t, b:
self.assertEqual(list(x.keys()), allkeys)
for func in mass_weightedUnion, mass_weightedIntersection:
for factor in 0, 1, 5, 10:
result = func([(x, factor)])
self.assertEqual(allkeys, list(result.keys()))
for key in x.keys():
self.assertEqual(x[key] * factor, result[key])
def testPairs(self):
t1 = IIBTree([(1, 10), (3, 30), (7, 70)])
t2 = IIBTree([(3, 30), (5, 50), (7, 7), (9, 90)])
allkeys = [1, 3, 5, 7, 9]
b1 = IIBucket(t1)
b2 = IIBucket(t2)
for x in t1, t2, b1, b2:
for key in x.keys():
self.assertEqual(key in allkeys, 1)
for y in t1, t2, b1, b2:
for w1, w2 in (0, 0), (1, 10), (10, 1), (2, 3):
# Test the union.
expected = []
for key in allkeys:
if x.has_key(key) or y.has_key(key):
result = x.get(key, 0) * w1 + y.get(key, 0) * w2
expected.append((key, result))
expected.sort()
got = mass_weightedUnion([(x, w1), (y, w2)])
self.assertEqual(expected, list(got.items()))
got = mass_weightedUnion([(y, w2), (x, w1)])
self.assertEqual(expected, list(got.items()))
# Test the intersection.
expected = []
for key in allkeys:
if x.has_key(key) and y.has_key(key):
result = x[key] * w1 + y[key] * w2
expected.append((key, result))
expected.sort()
got = mass_weightedIntersection([(x, w1), (y, w2)])
self.assertEqual(expected, list(got.items()))
got = mass_weightedIntersection([(y, w2), (x, w1)])
self.assertEqual(expected, list(got.items()))
def testMany(self):
import random
N = 15 # number of IIBTrees to feed in
L = []
commonkey = N * 1000
allkeys = {commonkey: 1}
for i in range(N):
t = IIBTree()
t[commonkey] = i
for j in range(N-i):
key = i + j
allkeys[key] = 1
t[key] = N*i + j
L.append((t, i+1))
random.shuffle(L)
allkeys = allkeys.keys()
allkeys.sort()
# Test the union.
expected = []
for key in allkeys:
sum = 0
for t, w in L:
if t.has_key(key):
sum += t[key] * w
expected.append((key, sum))
# print 'union', expected
got = mass_weightedUnion(L)
self.assertEqual(expected, list(got.items()))
# Test the intersection.
expected = []
for key in allkeys:
sum = 0
for t, w in L:
if t.has_key(key):
sum += t[key] * w
else:
break
else:
# We didn't break out of the loop so it's in the intersection.
expected.append((key, sum))
# print 'intersection', expected
got = mass_weightedIntersection(L)
self.assertEqual(expected, list(got.items()))
def test_suite():
return makeSuite(TestSetOps)
if __name__=="__main__":
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/test_textindexwrapper.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Unit tests for TextIndexWrapper.
$Id: test_textindexwrapper.py,v 1.1 2003/07/14 03:53:48 anthony Exp $
"""
import unittest
from zope.index.text.textindexwrapper import TextIndexWrapper
from zope.index.text import parsetree
class TextIndexWrapperTest(unittest.TestCase):
def setUp(self):
w = TextIndexWrapper()
doc = u"the quick brown fox jumps over the lazy dog"
w.index_doc(1000, [doc])
doc = u"the brown fox and the yellow fox don't need the retriever"
w.index_doc(1001, [doc])
self.wrapper = w
def testCounts(self):
w = self.wrapper
self.assertEqual(self.wrapper.documentCount(), 2)
self.assertEqual(self.wrapper.wordCount(), 12)
doc = u"foo bar"
w.index_doc(1002, [doc])
self.assertEqual(self.wrapper.documentCount(), 3)
self.assertEqual(self.wrapper.wordCount(), 14)
def testOne(self):
matches, total = self.wrapper.query(u"quick fox", 0, 10)
self.assertEqual(total, 1)
[(docid, rank)] = matches # if this fails there's a problem
self.assertEqual(docid, 1000)
def testDefaultBatch(self):
matches, total = self.wrapper.query(u"fox", 0)
self.assertEqual(total, 2)
self.assertEqual(len(matches), 2)
matches, total = self.wrapper.query(u"fox")
self.assertEqual(total, 2)
self.assertEqual(len(matches), 2)
matches, total = self.wrapper.query(u" fox", 1)
self.assertEqual(total, 2)
self.assertEqual(len(matches), 1)
def testGlobbing(self):
matches, total = self.wrapper.query("fo*")
self.assertEqual(total, 2)
self.assertEqual(len(matches), 2)
def testLatin1(self):
w = self.wrapper
doc = u"Fran\xe7ois"
w.index_doc(1002, [doc])
matches, total = self.wrapper.query(doc, 0, 10)
self.assertEqual(total, 1)
[(docid, rank)] = matches # if this fails there's a problem
self.assertEqual(docid, 1002)
def testUnicode(self):
w = self.wrapper
# Verbose, but easy to debug
delta = u"\N{GREEK SMALL LETTER DELTA}"
delta += u"\N{GREEK SMALL LETTER EPSILON}"
delta += u"\N{GREEK SMALL LETTER LAMDA}"
delta += u"\N{GREEK SMALL LETTER TAU}"
delta += u"\N{GREEK SMALL LETTER ALPHA}"
assert delta.islower()
emdash = u"\N{EM DASH}"
assert not emdash.isalnum()
alpha = u"\N{GREEK SMALL LETTER ALPHA}"
assert alpha.islower()
lamda = u"\N{GREEK SMALL LETTER LAMDA}"
lamda += u"\N{GREEK SMALL LETTER ALPHA}"
assert lamda.islower()
doc = delta + emdash + alpha
w.index_doc(1002, [doc])
for word in delta, alpha:
matches, total = self.wrapper.query(word, 0, 10)
self.assertEqual(total, 1)
[(docid, rank)] = matches # if this fails there's a problem
self.assertEqual(docid, 1002)
self.assertRaises(parsetree.ParseError,
self.wrapper.query, emdash, 0, 10)
matches, total = self.wrapper.query(lamda, 0, 10)
self.assertEqual(total, 0)
def testNone(self):
matches, total = self.wrapper.query(u"dalmatian", 0, 10)
self.assertEqual(total, 0)
self.assertEqual(len(matches), 0)
def testAll(self):
matches, total = self.wrapper.query(u"brown fox", 0, 10)
self.assertEqual(total, 2)
self.assertEqual(len(matches), 2)
matches.sort()
self.assertEqual(matches[0][0], 1000)
self.assertEqual(matches[1][0], 1001)
def testBatching(self):
matches1, total = self.wrapper.query(u"brown fox", 0, 1)
self.assertEqual(total, 2)
self.assertEqual(len(matches1), 1)
matches2, total = self.wrapper.query(u"brown fox", 1, 1)
self.assertEqual(total, 2)
self.assertEqual(len(matches2), 1)
matches = matches1 + matches2
matches.sort()
self.assertEqual(matches[0][0], 1000)
self.assertEqual(matches[1][0], 1001)
def test_suite():
return unittest.makeSuite(TextIndexWrapperTest)
if __name__=='__main__':
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zope/index/text/tests/wordstats.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Dump statistics about each word in the index.
usage: wordstats.py data.fs [index key]
"""
from zodb.storage.file import FileStorage
def main(fspath, key):
fs = FileStorage(fspath, read_only=1)
db = ZODB.DB(fs)
rt = db.open().root()
index = rt[key]
lex = index.lexicon
idx = index.index
print "Words", lex.length()
print "Documents", idx.length()
print "Word frequencies: count, word, wid"
for word, wid in lex.items():
docs = idx._wordinfo[wid]
print len(docs), word, wid
print "Per-doc scores: wid, (doc, score,)+"
for wid in lex.wids():
print wid,
docs = idx._wordinfo[wid]
for docid, score in docs.items():
print docid, score,
print
if __name__ == "__main__":
import sys
args = sys.argv[1:]
index_key = "index"
if len(args) == 1:
fspath = args[0]
elif len(args) == 2:
fspath, index_key = args
else:
print "Expected 1 or 2 args, got", len(args)
main(fspath, index_key)