[Zope3-checkins] CVS: Zope3/src/zope/textindex/tests - __init__.py:1.1.2.1 hs-tool.py:1.1.2.1 indexhtml.py:1.1.2.1 mailtest.py:1.1.2.1 mhindex.py:1.1.2.1 queryhtml.py:1.1.2.1 test_index.py:1.1.2.1 test_lexicon.py:1.1.2.1 test_nbest.py:1.1.2.1 test_pipelinefactory.py:1.1.2.1 test_queryengine.py:1.1.2.1 test_queryparser.py:1.1.2.1 test_setops.py:1.1.2.1 test_textindexwrapper.py:1.1.2.1 wordstats.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:39 -0500


Update of /cvs-repository/Zope3/src/zope/textindex/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/textindex/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py hs-tool.py indexhtml.py mailtest.py mhindex.py 
	queryhtml.py test_index.py test_lexicon.py test_nbest.py 
	test_pipelinefactory.py test_queryengine.py 
	test_queryparser.py test_setops.py test_textindexwrapper.py 
	wordstats.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/textindex/tests/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zope/textindex/tests/hs-tool.py ===
#! /usr/bin/env python

import cPickle
import os.path
import sys

from hotshot.log import LogReader

def load_line_info(log):
    byline = {}
    prevloc = None
    for what, place, tdelta in log:
        if tdelta > 0:
            t, nhits = byline.get(prevloc, (0, 0))
            byline[prevloc] = (tdelta + t), (nhits + 1)
            prevloc = place
    return byline

def basename(path, cache={}):
    try:
        return cache[path]
    except KeyError:
        fn = os.path.split(path)[1]
        cache[path] = fn
        return fn

def print_results(results):
    for info, place in results:
        if place is None:
            # This is the startup time for the profiler, and only
            # occurs at the very beginning.  Just ignore it, since it
            # corresponds to frame setup of the outermost call, not
            # anything that's actually interesting.
            continue
        filename, line, funcname = place
        print '%8d %8d' % info, basename(filename), line

def annotate_results(results):
    files = {}
    for stats, place in results:
        if not place:
            continue
        time, hits = stats
        file, line, func = place
        l = files.get(file)
        if l is None:
            l = files[file] = []
        l.append((line, hits, time))
    order = files.keys()
    order.sort()
    for k in order:
        if os.path.exists(k):
            v = files[k]
            v.sort()
            annotate(k, v)

def annotate(file, lines):
    print "-" * 60
    print file
    print "-" * 60
    f = open(file)
    i = 1
    match = lines[0][0]
    for line in f:
        if match == i:
            print "%6d %8d " % lines[0][1:], line,
            del lines[0]
            if lines:
                match = lines[0][0]
            else:
                match = None
        else:
            print " " * 16, line,
        i += 1
    print

def get_cache_name(filename):
    d, fn = os.path.split(filename)
    cache_dir = os.path.join(d, '.hs-tool')
    cache_file = os.path.join(cache_dir, fn)
    return cache_dir, cache_file

def cache_results(filename, results):
    cache_dir, cache_file = get_cache_name(filename)
    if not os.path.exists(cache_dir):
        os.mkdir(cache_dir)
    fp = open(cache_file, 'wb')
    try:
        cPickle.dump(results, fp, 1)
    finally:
        fp.close()

def main(filename, annotate):
    cache_dir, cache_file = get_cache_name(filename)

    if (  os.path.isfile(cache_file)
          and os.path.getmtime(cache_file) > os.path.getmtime(filename)):
        # cached data is up-to-date:
        fp = open(cache_file, 'rb')
        results = cPickle.load(fp)
        fp.close()
    else:
        log = LogReader(filename)
        byline = load_line_info(log)
        # Sort
        results = [(v, k) for k, v in byline.items()]
        results.sort()
        cache_results(filename, results)

    if annotate:
        annotate_results(results)
    else:
        print_results(results)


if __name__ == "__main__":
    import getopt

    annotate_p = 0
    opts, args = getopt.getopt(sys.argv[1:], 'A')
    for o, v in opts:
        if o == '-A':
            annotate_p = 1
    if args:
        filename, = args
    else:
        filename = "profile.dat"

    main(filename, annotate_p)


=== Added File Zope3/src/zope/textindex/tests/indexhtml.py ===
#! /usr/bin/env python
"""Index a collection of HTML files on the filesystem.

usage: indexhtml.py [options] dir

Will create an index of all files in dir or its subdirectories.

options:
-f data.fs  -- the path to the filestorage datafile
"""
from __future__ import nested_scopes

import os
from time import clock

import zodb
from zodb.storage.file import FileStorage
from BTrees.IOBTree import IOBTree

from Zope.TextIndex.ZCTextIndex import ZCTextIndex
from zope.textindex.htmlsplitter import HTMLWordSplitter
from zope.textindex.lexicon import Lexicon, StopWordRemover

def make_zc_index():
    # there's an elaborate dance necessary to construct an index
    class Struct:
        pass
    extra = Struct()
    extra.doc_attr = "read"
    extra.lexicon_id = "lexicon"
    caller = Struct()
    caller.lexicon = Lexicon(HTMLWordSplitter(), StopWordRemover())
    return ZCTextIndex("read", extra, caller)

# XXX make a splitter more like the HTMLSplitter for TextIndex
# signature is
# Splitter(string, stop_words, encoding,
#          singlechar, indexnumbers, casefolding)

class MySplitter:
    def __init__(self):
        self._v_splitter = HTMLWordSplitter()
    def __call__(self, text, stopdict, *args, **kwargs):
        words = self._v_splitter._split(text)
        def lookup(w):
            return stopdict.get(w, w)
        return filter(None, map(lookup, words))

def make_old_index():
    from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
    from Products.PluginIndexes.TextIndex.Lexicon  import Lexicon
    from zope.textindex.stopdict import get_stopdict

    l = Lexicon(get_stopdict())
    l.SplitterFunc = MySplitter()
    return TextIndex("read", lexicon=l)

def main(db, root, dir):
    rt["index"] = index = INDEX()
    rt["files"] = paths = IOBTree()
    get_transaction().commit()

    zodb_time = 0.0
    pack_time = 0.0

    files = [os.path.join(dir, file) for file in os.listdir(dir)]
    docid = 0
    t0 = clock()
    for file in files:
        if os.path.isdir(file):
            files += [os.path.join(file, sub) for sub in os.listdir(file)]
        else:
            if not file.endswith(".html"):
                continue
            docid += 1
            if LIMIT is not None and docid > LIMIT:
                break
            if VERBOSE:
                print "%5d" % docid, file
            f = open(file, "rb")
            paths[docid] = file
            index.index_object(docid, f)
            f.close()
            if docid % TXN_INTERVAL == 0:
                z0 = clock()
                get_transaction().commit()
                z1 = clock()
                zodb_time += z1 - z0
                if VERBOSE:
                    print "commit took", z1 - z0, zodb_time
            if docid % PACK_INTERVAL == 0:
                p0 = clock()
                db.pack()
                p1 = clock()
                zodb_time += p1 - p0
                pack_time += p1 - p0
                if VERBOSE:
                    print "pack took", p1 - p0, pack_time
    z0 = clock()
    get_transaction().commit()
    z1 = t1 = clock()
    total_time = t1 - t0
    zodb_time += z1 - z0
    if VERBOSE:
        print "Total index time", total_time
        print "Non-pack time", total_time - pack_time
        print "Non-ZODB time", total_time - zodb_time

if __name__ == "__main__":
    import sys
    import getopt

    VERBOSE = 0
    FSPATH = "Data.fs"
    TXN_INTERVAL = 100
    PACK_INTERVAL = 500
    LIMIT = None
    INDEX = make_zc_index
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'vf:t:p:n:T')
    except getopt.error, msg:
        print msg
        print __doc__
        sys.exit(2)

    for o, v in opts:
        if o == '-v':
            VERBOSE += 1
        if o == '-f':
            FSPATH = v
        if o == '-t':
            TXN_INTERVAL = int(v)
        if o == '-p':
            PACK_INTERVAL = int(v)
        if o == '-n':
            LIMIT = int(v)
        if o == '-T':
            INDEX = make_old_index

    if len(args) != 1:
        print "Expected on argument"
        print __doc__
        sys.exit(2)
    dir = args[0]

    fs = FileStorage(FSPATH)
    db = ZODB.DB(fs)
    cn = db.open()
    rt = cn.root()
    dir = os.path.join(os.getcwd(), dir)
    print dir
    main(db, rt, dir)
    cn.close()
    fs.close()


=== Added File Zope3/src/zope/textindex/tests/mailtest.py ===
"""Test an index with a Unix mailbox file.

usage: python mailtest.py [options] <data.fs>

options:
    -v     -- verbose

    Index Generation
    -i mailbox
    -n NNN -- max number of messages to read from mailbox
    -t NNN -- commit a transaction every NNN messages (default: 1)
    -p NNN -- pack <data.fs> every NNN messages (default: 500), and at end
    -p 0   -- don't pack at all
    -x     -- exclude the message text from the data.fs

    Queries
    -q query
    -b NNN -- return the NNN best matches (default: 10)
    -c NNN -- context; if -v, show the first NNN lines of results (default: 5)

The script either indexes or queries depending on whether -q or -i is
passed as an option.

For -i mailbox, the script reads mail messages from the mailbox and
indexes them.  It indexes one message at a time, then commits the
transaction.

For -q query, it performs a query on an existing index.

If both are specified, the index is performed first.

You can also interact with the index after it is completed. Load the
index from the database:

    import zodb
    from zodb.storage.file import FileStorage
    fs = FileStorage(<data.fs>
    db = ZODB.DB(fs)
    index = cn.open().root()["index"]
    index.search("python AND unicode")
"""

import zodb
import zodb.storage.file
from zope.textindex.lexicon import \
     Lexicon, CaseNormalizer, Splitter, StopWordRemover
from Zope.TextIndex.ZCTextIndex import ZCTextIndex
from BTrees.IOBTree import IOBTree
from zope.textindex.queryparser import QueryParser

import sys
import mailbox
import time

def usage(msg):
    print msg
    print __doc__
    sys.exit(2)

class Message:

    total_bytes = 0

    def __init__(self, msg):
        subject = msg.getheader('subject', '')
        author = msg.getheader('from', '')
        if author:
            summary = "%s (%s)\n" % (subject, author)
        else:
            summary = "%s\n" % subject
        self.text = summary + msg.fp.read()
        Message.total_bytes += len(self.text)

class Extra:
    pass

def index(rt, mboxfile, db, profiler):
    global NUM
    idx_time = 0
    pack_time = 0
    start_time = time.time()

    lexicon = Lexicon(Splitter(), CaseNormalizer(), StopWordRemover())
    extra = Extra()
    extra.lexicon_id = 'lexicon'
    extra.doc_attr = 'text'
    extra.index_type = 'Okapi BM25 Rank'
    caller = Extra()
    caller.lexicon = lexicon
    rt["index"] = idx = ZCTextIndex("index", extra, caller)
    if not EXCLUDE_TEXT:
        rt["documents"] = docs = IOBTree()
    else:
        docs = None
    get_transaction().commit()

    mbox = mailbox.UnixMailbox(open(mboxfile, 'rb'))
    if VERBOSE:
        print "opened", mboxfile
    if not NUM:
        NUM = sys.maxint

    if profiler:
        itime, ptime, i = profiler.runcall(indexmbox, mbox, idx, docs, db)
    else:
        itime, ptime, i = indexmbox(mbox, idx, docs, db)
    idx_time += itime
    pack_time += ptime

    get_transaction().commit()

    if PACK_INTERVAL and i % PACK_INTERVAL != 0:
        if VERBOSE >= 2:
            print "packing one last time..."
        p0 = time.clock()
        db.pack(time.time())
        p1 = time.clock()
        if VERBOSE:
            print "pack took %s sec" % (p1 - p0)
        pack_time += p1 - p0

    if VERBOSE:
        finish_time = time.time()
        print
        print "Index time", round(idx_time / 60, 3), "minutes"
        print "Pack time", round(pack_time / 60, 3), "minutes"
        print "Index bytes", Message.total_bytes
        rate = (Message.total_bytes / idx_time) / 1024
        print "Index rate %.2f KB/sec" % rate
        print "Indexing began", time.ctime(start_time)
        print "Indexing ended", time.ctime(finish_time)
        print "Wall clock minutes", round((finish_time - start_time)/60, 3)

def indexmbox(mbox, idx, docs, db):
    idx_time = 0
    pack_time = 0
    i = 0
    while i < NUM:
        _msg = mbox.next()
        if _msg is None:
            break
        i += 1
        msg = Message(_msg)
        if VERBOSE >= 2:
            print "indexing msg", i
        i0 = time.clock()
        idx.index_object(i, msg)
        if not EXCLUDE_TEXT:
            docs[i] = msg
        if i % TXN_SIZE == 0:
            get_transaction().commit()
        i1 = time.clock()
        idx_time += i1 - i0
        if VERBOSE and i % 50 == 0:
            print i, "messages indexed"
            print "cache size", db.cacheSize()
        if PACK_INTERVAL and i % PACK_INTERVAL == 0:
            if VERBOSE >= 2:
                print "packing..."
            p0 = time.clock()
            db.pack(time.time())
            p1 = time.clock()
            if VERBOSE:
                print "pack took %s sec" % (p1 - p0)
            pack_time += p1 - p0
    return idx_time, pack_time, i


def query(rt, query_str, profiler):
    idx = rt["index"]
    docs = rt["documents"]

    start = time.clock()
    if profiler is None:
        results, num_results = idx.query(query_str, BEST)
    else:
        if WARM_CACHE:
            print "Warming the cache..."
            idx.query(query_str, BEST)
        start = time.clock()
        results, num_results = profiler.runcall(idx.query, query_str, BEST)
    elapsed = time.clock() - start

    print "query:", query_str
    print "# results:", len(results), "of", num_results, \
          "in %.2f ms" % (elapsed * 1000)

    tree = QueryParser(idx.lexicon).parseQuery(query_str)
    qw = idx.index.query_weight(tree.terms())

    for docid, score in results:
        scaled = 100.0 * score / qw
        print "docid %7d score %6d scaled %5.2f%%" % (docid, score, scaled)
        if VERBOSE:
            msg = docs[docid]
            ctx = msg.text.split("\n", CONTEXT)
            del ctx[-1]
            print "-" * 60
            print "message:"
            for l in ctx:
                print l
            print "-" * 60


def main(fs_path, mbox_path, query_str, profiler):
    f = ZODB.FileStorage.FileStorage(fs_path)
    db = ZODB.DB(f, cache_size=CACHE_SIZE)
    cn = db.open()
    rt = cn.root()

    if mbox_path is not None:
        index(rt, mbox_path, db, profiler)
    if query_str is not None:
        query(rt, query_str, profiler)

    cn.close()
    db.close()
    f.close()

if __name__ == "__main__":
    import getopt

    NUM = 0
    VERBOSE = 0
    PACK_INTERVAL = 500
    EXCLUDE_TEXT = 0
    CACHE_SIZE = 10000
    TXN_SIZE = 1
    BEST = 10
    CONTEXT = 5
    WARM_CACHE = 0
    query_str = None
    mbox_path = None
    profile = None
    old_profile = None
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'vn:p:i:q:b:c:xt:w',
                                   ['profile=', 'old-profile='])
    except getopt.error, msg:
        usage(msg)
    if len(args) != 1:
        usage("exactly 1 filename argument required")
    for o, v in opts:
        if o == '-n':
            NUM = int(v)
        elif o == '-v':
            VERBOSE += 1
        elif o == '-p':
            PACK_INTERVAL = int(v)
        elif o == '-q':
            query_str = v
        elif o == '-i':
            mbox_path = v
        elif o == '-b':
            BEST = int(v)
        elif o == '-x':
            EXCLUDE_TEXT = 1
        elif o == '-t':
            TXN_SIZE = int(v)
        elif o == '-c':
            CONTEXT = int(v)
        elif o == '-w':
            WARM_CACHE = 1
        elif o == '--profile':
            profile = v
        elif o == '--old-profile':
            old_profile = v
    fs_path, = args

    if profile:
        import hotshot
        profiler = hotshot.Profile(profile, lineevents=1, linetimings=1)
    elif old_profile:
        import profile
        profiler = profile.Profile()
    else:
        profiler = None

    main(fs_path, mbox_path, query_str, profiler)

    if profile:
        profiler.close()
    elif old_profile:
        import pstats
        profiler.dump_stats(old_profile)
        stats = pstats.Stats(old_profile)
        stats.strip_dirs().sort_stats('time').print_stats(20)


=== Added File Zope3/src/zope/textindex/tests/mhindex.py === (475/575 lines abridged)
#! /usr/bin/env python2.2

"""MH mail indexer.

To index messages from a single folder (messages defaults to 'all'):
  mhindex.py [options] -u +folder [messages ...]

To bulk index all messages from several folders:
  mhindex.py [options] -b folder ...; the folder name ALL means all folders.

To execute a single query:
  mhindex.py [options] query

To enter interactive query mode:
  mhindex.py [options]

Common options:
  -d FILE -- specify the Data.fs to use (default ~/.Data.fs)
  -w -- dump the word list in alphabetical order and exit
  -W -- dump the word list ordered by word id and exit

Indexing options:
  -O -- do a prescan on the data to compute optimal word id assignments;
        this is only useful the first time the Data.fs is used
  -t N -- commit a transaction after every N messages (default 20000)
  -p N -- pack after every N commits (by default no packing is done)

Querying options:
  -m N -- show at most N matching lines from the message (default 3)
  -n N -- show the N best matching messages (default 3)
"""

import os
import re
import sys
import time
import mhlib
import getopt
import traceback
from StringIO import StringIO
from stat import ST_MTIME

DATAFS = "~/.mhindex.fs"
ZOPECODE = "~/projects/Zope3/lib/python"

zopecode = os.path.expanduser(ZOPECODE)
sys.path.insert(0, zopecode)

from zodb.db import DB
from zodb.storage.file import FileStorage

[-=- -=- -=- 475 lines omitted -=- -=- -=-]

            if value:
                H.append(value)
        if H:
            L.append("\n".join(H))

    def newdocid(self, path):
        docid = self.path2docid.get(path)
        if docid is not None:
            self.doctimes[docid] = self.getmtime(path)
            return docid
        docid = self.maxdocid + 1
        self.maxdocid = docid
        self.docpaths[docid] = path
        self.doctimes[docid] = self.getmtime(path)
        self.path2docid[path] = docid
        return docid

    def getmtime(self, path):
        path = os.path.join(self.mh.getpath(), path)
        try:
            st = os.stat(path)
        except os.error, msg:
            return 0
        return int(st[ST_MTIME])

    def maycommit(self):
        self.trans_count += 1
        if self.trans_count >= self.trans_limit > 0:
            self.commit()

    def commit(self):
        if self.trans_count > 0:
            print "committing..."
            get_transaction().commit()
            self.trans_count = 0
            self.pack_count += 1
            if self.pack_count >= self.pack_limit > 0:
                self.pack()

    def pack(self):
        if self.pack_count > 0:
            print "packing..."
            self.database.pack()
            self.pack_count = 0

def reportexc():
    traceback.print_exc()

if __name__ == "__main__":
    sys.exit(main())


=== Added File Zope3/src/zope/textindex/tests/queryhtml.py ===
import os
from time import clock

import zodb
from zodb.storage.file import FileStorage

QUERIES = ["nested recursive functions",
           "explicit better than implicit",
           "build hpux",
           "cannot create 'method-wrapper' instances",
            "extension module C++",
           "class method",
           "instance variable",
           "articulate information",
           "import default files",
           "gopher ftp http",
           "documentation",
           ]

def path2url(p):
    # convert the paths to a python.org URL
    # hack: only works for the way Jeremy indexed his copy of python.org
    marker = "www.python.org/."
    i = p.find(marker)
    if i == -1:
        return p
    i += len(marker)
    return "http://www.python.org" + p[i:]

from Products.PluginIndexes.TextIndex.TextIndex import And, Or
from zope.textindex.tests.indexhtml import MySplitter
from zope.textindex.nbest import NBest

def main(rt):
    index = rt["index"]
    files = rt["files"]
    times = {}
    ITERS = range(50)
    for i in range(11):
        for q in QUERIES:
            terms = q.split()
            for c in " OR ", " AND ":
                query = c.join(terms)
                t0 = clock()
                if TEXTINDEX:
                    if c == " OR ":
                        op = Or
                    else:
                        op = And
                    _q = " ".join(terms)
                    for _ in ITERS:
                        b = index.query(_q, op).bucket()
                        num = len(b)
                        chooser = NBest(10)
                        chooser.addmany(b.items())
                        results = chooser.getbest()

                else:
                    try:
                        for _ in ITERS:
                            results, num = index.query(query)
                    except:
                        continue
                t1 = clock()
                print "<p>Query: \"%s\"" % query
                print "<br>Num results: %d" % num
                print "<br>time.clock(): %s" % (t1 - t0)
                key = query
                if i == 0:
                    print "<ol>"
                    for docid, score in results:
                        url = path2url(files[docid])
                        fmt = '<li><a href="%s">%s</A> score = %s'
                        print fmt % (url, url, score)
                    print "</ol>"
                    continue
                l = times.setdefault(key, [])
                l.append(t1 - t0)

    l = times.keys()
    l.sort()
    print "<hr>"
    for k in l:
        v = times[k]
        print "<p>Query: \"%s\"" % k
        print "<br>Min time: %s" % min(v)
        print "<br>All times: %s" % " ".join(map(str, v))

if __name__ == "__main__":
    import sys
    import getopt

    VERBOSE = 0
    FSPATH = "Data.fs"
    TEXTINDEX = 0

    try:
        opts, args = getopt.getopt(sys.argv[1:], 'vf:T')
    except getopt.error, msg:
        print msg
        print __doc__
        sys.exit(2)

    for o, v in opts:
        if o == '-v':
            VERBOSE += 1
        if o == '-f':
            FSPATH = v
        if o == '-T':
            TEXTINDEX = 1

    fs = FileStorage(FSPATH, read_only=1)
    db = ZODB.DB(fs, cache_size=10000)
    cn = db.open()
    rt = cn.root()
    main(rt)


=== Added File Zope3/src/zope/textindex/tests/test_index.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zope.textindex.lexicon import Lexicon, Splitter
from zope.textindex.cosineindex import CosineIndex
from zope.textindex.okapiindex import OkapiIndex

# Subclasses must set a class variable IndexFactory to the appropriate
# index object constructor.

class IndexTest(TestCase):

    def setUp(self):
        self.lexicon = Lexicon(Splitter())
        self.index = self.IndexFactory(self.lexicon)

    def test_index_document(self, DOCID=1):
        doc = "simple document contains five words"
        self.assert_(not self.index.has_doc(DOCID))
        self.index.index_doc(DOCID, doc)
        self.assertEqual(self.index.documentCount(), 1)
        self.assertEqual(self.index.wordCount(), 5)
        self.assertEqual(self.lexicon.wordCount(), 5)
        self.assert_(self.index.has_doc(DOCID))
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._docweight), 1)
        self.assertEqual(len(self.index._wordinfo), 5)
        self.assertEqual(len(self.index._docwords), 1)
        self.assertEqual(len(self.index.get_words(DOCID)), 5)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        for map in self.index._wordinfo.values():
            self.assertEqual(len(map), 1)
            self.assert_(map.has_key(DOCID))

    def test_unindex_document(self):
        DOCID = 1
        self.test_index_document(DOCID)
        self.index.unindex_doc(DOCID)
        self.assertEqual(len(self.index._docweight), 0)
        self.assertEqual(len(self.index._wordinfo), 0)
        self.assertEqual(len(self.index._docwords), 0)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())

    def test_index_two_documents(self):
        self.test_index_document()
        doc = "another document just four"
        DOCID = 2
        self.index.index_doc(DOCID, doc)
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._docweight), 2)
        self.assertEqual(len(self.index._wordinfo), 8)
        self.assertEqual(len(self.index._docwords), 2)
        self.assertEqual(len(self.index.get_words(DOCID)), 4)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        wids = self.lexicon.termToWordIds("document")
        self.assertEqual(len(wids), 1)
        document_wid = wids[0]
        for wid, map in self.index._wordinfo.items():
            if wid == document_wid:
                self.assertEqual(len(map), 2)
                self.assert_(map.has_key(1))
                self.assert_(map.has_key(DOCID))
            else:
                self.assertEqual(len(map), 1)

    def test_index_two_unindex_one(self):
        # index two documents, unindex one, and test the results
        self.test_index_two_documents()
        self.index.unindex_doc(1)
        DOCID = 2
        self.assertEqual(len(self.index._docweight), 1)
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._wordinfo), 4)
        self.assertEqual(len(self.index._docwords), 1)
        self.assertEqual(len(self.index.get_words(DOCID)), 4)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        for map in self.index._wordinfo.values():
            self.assertEqual(len(map), 1)
            self.assert_(map.has_key(DOCID))

    def test_index_duplicated_words(self, DOCID=1):
        doc = "very simple repeat repeat repeat document test"
        self.index.index_doc(DOCID, doc)
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._wordinfo), 5)
        self.assertEqual(len(self.index._docwords), 1)
        self.assertEqual(len(self.index.get_words(DOCID)), 7)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        wids = self.lexicon.termToWordIds("repeat")
        self.assertEqual(len(wids), 1)
        repititive_wid = wids[0]
        for wid, map in self.index._wordinfo.items():
            self.assertEqual(len(map), 1)
            self.assert_(map.has_key(DOCID))

    def test_simple_query_oneresult(self):
        self.index.index_doc(1, 'not the same document')
        results = self.index.search("document")
        self.assertEqual(list(results.keys()), [1])

    def test_simple_query_noresults(self):
        self.index.index_doc(1, 'not the same document')
        results = self.index.search("frobnicate")
        self.assertEqual(list(results.keys()), [])

    def test_query_oneresult(self):
        self.index.index_doc(1, 'not the same document')
        self.index.index_doc(2, 'something about something else')
        results = self.index.search("document")
        self.assertEqual(list(results.keys()), [1])

    def test_search_phrase(self):
        self.index.index_doc(1, "the quick brown fox jumps over the lazy dog")
        self.index.index_doc(2, "the quick fox jumps lazy over the brown dog")
        results = self.index.search_phrase("quick brown fox")
        self.assertEqual(list(results.keys()), [1])

    def test_search_glob(self):
        self.index.index_doc(1, "how now brown cow")
        self.index.index_doc(2, "hough nough browne cough")
        self.index.index_doc(3, "bar brawl")
        results = self.index.search_glob("bro*")
        self.assertEqual(list(results.keys()), [1, 2])
        results = self.index.search_glob("b*")
        self.assertEqual(list(results.keys()), [1, 2, 3])

class CosineIndexTest(IndexTest):
    IndexFactory = CosineIndex

class OkapiIndexTest(IndexTest):
    IndexFactory = OkapiIndex

def test_suite():
    return TestSuite((makeSuite(CosineIndexTest),
                      makeSuite(OkapiIndexTest),
                    ))

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_lexicon.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import sys
from unittest import TestCase, TestSuite, main, makeSuite

from zope.textindex.lexicon import Lexicon
from zope.textindex.lexicon import Splitter, CaseNormalizer

class StupidPipelineElement:
    def __init__(self, fromword, toword):
        self.__fromword = fromword
        self.__toword = toword

    def process(self, seq):
        res = []
        for term in seq:
            if term == self.__fromword:
                res.append(self.__toword)
            else:
                res.append(term)
        return res

class WackyReversePipelineElement:
    def __init__(self, revword):
        self.__revword = revword

    def process(self, seq):
        res = []
        for term in seq:
            if term == self.__revword:
                x = list(term)
                x.reverse()
                res.append(''.join(x))
            else:
                res.append(term)
        return res

class StopWordPipelineElement:
    def __init__(self, stopdict={}):
        self.__stopdict = stopdict

    def process(self, seq):
        res = []
        for term in seq:
            if self.__stopdict.get(term):
                continue
            else:
                res.append(term)
        return res


class Test(TestCase):
    def testSourceToWordIds(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('cats and dogs')
        self.assertEqual(wids, [1, 2, 3])

    def testTermToWordIds(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('dogs')
        self.assertEqual(wids, [3])

    def testMissingTermToWordIds(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('boxes')
        self.assertEqual(wids, [0])

    def testOnePipelineElement(self):
        lexicon = Lexicon(Splitter(), StupidPipelineElement('dogs', 'fish'))
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('fish')
        self.assertEqual(wids, [3])

    def testSplitterAdaptorFold(self):
        lexicon = Lexicon(Splitter(), CaseNormalizer())
        wids = lexicon.sourceToWordIds('CATS and dogs')
        wids = lexicon.termToWordIds('cats and dogs')
        self.assertEqual(wids, [1, 2, 3])

    def testSplitterAdaptorNofold(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('CATS and dogs')
        wids = lexicon.termToWordIds('cats and dogs')
        self.assertEqual(wids, [0, 2, 3])

    def testTwoElementPipeline(self):
        lexicon = Lexicon(Splitter(),
                          StupidPipelineElement('cats', 'fish'),
                          WackyReversePipelineElement('fish'))
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('hsif')
        self.assertEqual(wids, [1])

    def testThreeElementPipeline(self):
        lexicon = Lexicon(Splitter(),
                          StopWordPipelineElement({'and':1}),
                          StupidPipelineElement('dogs', 'fish'),
                          WackyReversePipelineElement('fish'))
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('hsif')
        self.assertEqual(wids, [2])

    def testSplitterLocaleAwareness(self):
        from zope.textindex.htmlsplitter import HTMLWordSplitter
        import locale
        loc = locale.setlocale(locale.LC_ALL) # get current locale
         # set German locale
        try:
            if sys.platform != 'win32':
                locale.setlocale(locale.LC_ALL, 'de_DE.ISO8859-1')
            else:
                locale.setlocale(locale.LC_ALL, 'German_Germany.1252')
        except locale.Error:
            return # This test doesn't work here :-(
        expected = ['m\xfclltonne', 'waschb\xe4r',
                    'beh\xf6rde', '\xfcberflieger']
        words = [" ".join(expected)]
        words = Splitter().process(words)
        self.assertEqual(words, expected)
        words = HTMLWordSplitter().process(words)
        self.assertEqual(words, expected)
        locale.setlocale(locale.LC_ALL, loc) # restore saved locale

def test_suite():
    return makeSuite(Test)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_nbest.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zope.textindex.nbest import NBest

class NBestTest(TestCase):

    def testConstructor(self):
        self.assertRaises(ValueError, NBest, 0)
        self.assertRaises(ValueError, NBest, -1)

        for n in range(1, 11):
            nb = NBest(n)
            self.assertEqual(len(nb), 0)
            self.assertEqual(nb.capacity(), n)

    def testOne(self):
        nb = NBest(1)
        nb.add('a', 0)
        self.assertEqual(nb.getbest(), [('a', 0)])

        nb.add('b', 1)
        self.assertEqual(len(nb), 1)
        self.assertEqual(nb.capacity(), 1)
        self.assertEqual(nb.getbest(), [('b', 1)])

        nb.add('c', -1)
        self.assertEqual(len(nb), 1)
        self.assertEqual(nb.capacity(), 1)
        self.assertEqual(nb.getbest(), [('b', 1)])

        nb.addmany([('d', 3), ('e', -6), ('f', 5), ('g', 4)])
        self.assertEqual(len(nb), 1)
        self.assertEqual(nb.capacity(), 1)
        self.assertEqual(nb.getbest(), [('f', 5)])

    def testMany(self):
        import random
        inputs = [(-i, i) for i in range(50)]

        reversed_inputs = inputs[:]
        reversed_inputs.reverse()

        # Test the N-best for a variety of n (1, 6, 11, ... 50).
        for n in range(1, len(inputs)+1, 5):
            expected = inputs[-n:]
            expected.reverse()

            random_inputs = inputs[:]
            random.shuffle(random_inputs)

            for source in inputs, reversed_inputs, random_inputs:
                # Try feeding them one at a time.
                nb = NBest(n)
                for item, score in source:
                    nb.add(item, score)
                self.assertEqual(len(nb), n)
                self.assertEqual(nb.capacity(), n)
                self.assertEqual(nb.getbest(), expected)

                # And again in one gulp.
                nb = NBest(n)
                nb.addmany(source)
                self.assertEqual(len(nb), n)
                self.assertEqual(nb.capacity(), n)
                self.assertEqual(nb.getbest(), expected)

                for i in range(1, n+1):
                    self.assertEqual(nb.pop_smallest(), expected[-i])
                self.assertRaises(IndexError, nb.pop_smallest)

    def testAllSameScore(self):
        inputs = [(i, 0) for i in range(10)]
        for n in range(1, 12):
            nb = NBest(n)
            nb.addmany(inputs)
            outputs = nb.getbest()
            self.assertEqual(outputs, inputs[:len(outputs)])

def test_suite():
    return makeSuite(NBestTest)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_pipelinefactory.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite
from zope.textindex.ipipelineelement import IPipelineElement
from zope.textindex.pipelinefactory import PipelineElementFactory

class NullPipelineElement:

    __implements__ = IPipelineElement

    def process(source):
        pass

class PipelineFactoryTest(TestCase):

    def setUp(self):
        self.huey = NullPipelineElement()
        self.dooey = NullPipelineElement()
        self.louie = NullPipelineElement()
        self.daffy = NullPipelineElement()

    def testPipeline(self):
        pf = PipelineElementFactory()
        pf.registerFactory('donald', 'huey', self.huey)
        pf.registerFactory('donald', 'dooey',  self.dooey)
        pf.registerFactory('donald', 'louie', self.louie)
        pf.registerFactory('looney', 'daffy', self.daffy)
        self.assertRaises(ValueError, pf.registerFactory,'donald',  'huey',
                          self.huey)
        self.assertEqual(pf.getFactoryGroups(), ['donald', 'looney'])
        self.assertEqual(pf.getFactoryNames('donald'),
                         ['dooey', 'huey', 'louie'])

def test_suite():
    return makeSuite(PipelineFactoryTest)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_queryengine.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zodb.btrees.IIBTree import IIBucket

from zope.textindex.queryparser import QueryParser
from zope.textindex.parsetree import ParseError, QueryError
from zope.textindex.lexicon import Lexicon, Splitter

class FauxIndex:

    def search(self, term):
        b = IIBucket()
        if term == "foo":
            b[1] = b[3] = 1
        elif term == "bar":
            b[1] = b[2] = 1
        elif term == "ham":
            b[1] = b[2] = b[3] = b[4] = 1
        return b

class TestQueryEngine(TestCase):

    def setUp(self):
        self.lexicon = Lexicon(Splitter())
        self.parser = QueryParser(self.lexicon)
        self.index = FauxIndex()

    def compareSet(self, set, dict):
        d = {}
        for k, v in set.items():
            d[k] = v
        self.assertEqual(d, dict)

    def compareQuery(self, query, dict):
        tree = self.parser.parseQuery(query)
        set = tree.executeQuery(self.index)
        self.compareSet(set, dict)

    def testExecuteQuery(self):
        self.compareQuery("foo AND bar", {1: 2})
        self.compareQuery("foo OR bar", {1: 2, 2: 1, 3:1})
        self.compareQuery("foo AND NOT bar", {3: 1})
        self.compareQuery("foo AND foo AND foo", {1: 3, 3: 3})
        self.compareQuery("foo OR foo OR foo", {1: 3, 3: 3})
        self.compareQuery("ham AND NOT foo AND NOT bar", {4: 1})
        self.compareQuery("ham OR foo OR bar", {1: 3, 2: 2, 3: 2, 4: 1})
        self.compareQuery("ham AND foo AND bar", {1: 3})

    def testInvalidQuery(self):
        from zope.textindex.parsetree import NotNode, AtomNode
        tree = NotNode(AtomNode("foo"))
        self.assertRaises(QueryError, tree.executeQuery, self.index)

def test_suite():
    return makeSuite(TestQueryEngine)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_queryparser.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zope.interface.verify import verifyClass

from zope.textindex.iqueryparser import IQueryParser
from zope.textindex.iqueryparsetree import IQueryParseTree

from zope.textindex.queryparser import QueryParser
from zope.textindex.parsetree import ParseError, ParseTreeNode
from zope.textindex.parsetree import OrNode, AndNode, NotNode
from zope.textindex.parsetree import AtomNode, PhraseNode, GlobNode
from zope.textindex.lexicon import Lexicon, Splitter


class TestInterfaces(TestCase):

    def testInterfaces(self):
        verifyClass(IQueryParser, QueryParser)
        verifyClass(IQueryParseTree, ParseTreeNode)
        verifyClass(IQueryParseTree, OrNode)
        verifyClass(IQueryParseTree, AndNode)
        verifyClass(IQueryParseTree, NotNode)
        verifyClass(IQueryParseTree, AtomNode)
        verifyClass(IQueryParseTree, PhraseNode)
        verifyClass(IQueryParseTree, GlobNode)


class TestQueryParserBase(TestCase):

    def setUp(self):
        self.lexicon = Lexicon(Splitter())
        self.parser = QueryParser(self.lexicon)

    def expect(self, input, output, expected_ignored=[]):
        tree = self.parser.parseQuery(input)
        ignored = self.parser.getIgnored()
        self.compareParseTrees(tree, output)
        self.assertEqual(ignored, expected_ignored)
        # Check that parseQueryEx() == (parseQuery(), getIgnored())
        ex_tree, ex_ignored = self.parser.parseQueryEx(input)
        self.compareParseTrees(ex_tree, tree)
        self.assertEqual(ex_ignored, expected_ignored)

    def failure(self, input):
        self.assertRaises(ParseError, self.parser.parseQuery, input)
        self.assertRaises(ParseError, self.parser.parseQueryEx, input)

    def compareParseTrees(self, got, expected, msg=None):
        if msg is None:
            msg = repr(got)
        self.assertEqual(isinstance(got, ParseTreeNode), 1)
        self.assertEqual(got.__class__, expected.__class__, msg)
        if isinstance(got, PhraseNode):
            self.assertEqual(got.nodeType(), "PHRASE", msg)
            self.assertEqual(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, GlobNode):
            self.assertEqual(got.nodeType(), "GLOB", msg)
            self.assertEqual(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, AtomNode):
            self.assertEqual(got.nodeType(), "ATOM", msg)
            self.assertEqual(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, NotNode):
            self.assertEqual(got.nodeType(), "NOT")
            self.compareParseTrees(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, AndNode) or isinstance(got, OrNode):
            self.assertEqual(got.nodeType(),
                             isinstance(got, AndNode) and "AND" or "OR", msg)
            list1 = got.getValue()
            list2 = expected.getValue()
            self.assertEqual(len(list1), len(list2), msg)
            for i in range(len(list1)):
                self.compareParseTrees(list1[i], list2[i], msg)


class TestQueryParser(TestQueryParserBase):

    def test001(self):
        self.expect("foo", AtomNode("foo"))

    def test002(self):
        self.expect("note", AtomNode("note"))

    def test003(self):
        self.expect("aa and bb AND cc",
                    AndNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))

    def test004(self):
        self.expect("aa OR bb or cc",
                    OrNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))

    def test005(self):
        self.expect("aa AND bb OR cc AnD dd",
                    OrNode([AndNode([AtomNode("aa"), AtomNode("bb")]),
                            AndNode([AtomNode("cc"), AtomNode("dd")])]))

    def test006(self):
        self.expect("(aa OR bb) AND (cc OR dd)",
                    AndNode([OrNode([AtomNode("aa"), AtomNode("bb")]),
                             OrNode([AtomNode("cc"), AtomNode("dd")])]))

    def test007(self):
        self.expect("aa AND NOT bb",
                    AndNode([AtomNode("aa"), NotNode(AtomNode("bb"))]))

    def test010(self):
        self.expect('"foo bar"', PhraseNode(["foo", "bar"]))

    def test011(self):
        self.expect("foo bar", AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test012(self):
        self.expect('(("foo bar"))"', PhraseNode(["foo", "bar"]))

    def test013(self):
        self.expect("((foo bar))", AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test014(self):
        self.expect("foo-bar", PhraseNode(["foo", "bar"]))

    def test015(self):
        self.expect("foo -bar", AndNode([AtomNode("foo"),
                                         NotNode(AtomNode("bar"))]))

    def test016(self):
        self.expect("-foo bar", AndNode([AtomNode("bar"),
                                         NotNode(AtomNode("foo"))]))

    def test017(self):
        self.expect("booh -foo-bar",
                    AndNode([AtomNode("booh"),
                             NotNode(PhraseNode(["foo", "bar"]))]))

    def test018(self):
        self.expect('booh -"foo bar"',
                    AndNode([AtomNode("booh"),
                             NotNode(PhraseNode(["foo", "bar"]))]))

    def test019(self):
        self.expect('foo"bar"',
                    AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test020(self):
        self.expect('"foo"bar',
                    AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test021(self):
        self.expect('foo"bar"blech',
                    AndNode([AtomNode("foo"), AtomNode("bar"),
                             AtomNode("blech")]))

    def test022(self):
        self.expect("foo*", GlobNode("foo*"))

    def test023(self):
        self.expect("foo* bar", AndNode([GlobNode("foo*"),
                                         AtomNode("bar")]))

    def test101(self):
        self.failure("")

    def test102(self):
        self.failure("not")

    def test103(self):
        self.failure("or")

    def test104(self):
        self.failure("and")

    def test105(self):
        self.failure("NOT")

    def test106(self):
        self.failure("OR")

    def test107(self):
        self.failure("AND")

    def test108(self):
        self.failure("NOT foo")

    def test109(self):
        self.failure(")")

    def test110(self):
        self.failure("(")

    def test111(self):
        self.failure("foo OR")

    def test112(self):
        self.failure("foo AND")

    def test113(self):
        self.failure("OR foo")

    def test114(self):
        self.failure("AND foo")

    def test115(self):
        self.failure("(foo) bar")

    def test116(self):
        self.failure("(foo OR)")

    def test117(self):
        self.failure("(foo AND)")

    def test118(self):
        self.failure("(NOT foo)")

    def test119(self):
        self.failure("-foo")

    def test120(self):
        self.failure("-foo -bar")

    def test121(self):
        self.failure("foo OR -bar")

    def test122(self):
        self.failure("foo AND -bar")


class StopWordTestQueryParser(TestQueryParserBase):

    def setUp(self):
        # Only 'stop' is a stopword (but 'and' is still an operator)
        self.lexicon = Lexicon(Splitter(), FakeStopWordRemover())
        self.parser = QueryParser(self.lexicon)

    def test201(self):
        self.expect('and/', AtomNode("and"))

    def test202(self):
        self.expect('foo AND stop', AtomNode("foo"), ["stop"])

    def test203(self):
        self.expect('foo AND NOT stop', AtomNode("foo"), ["stop"])

    def test204(self):
        self.expect('stop AND foo', AtomNode("foo"), ["stop"])

    def test205(self):
        self.expect('foo OR stop', AtomNode("foo"), ["stop"])

    def test206(self):
        self.expect('stop OR foo', AtomNode("foo"), ["stop"])

    def test301(self):
        self.failure('stop')

    def test302(self):
        self.failure('stop stop')

    def test303(self):
        self.failure('stop AND stop')

    def test304(self):
        self.failure('stop OR stop')

    def test305(self):
        self.failure('stop -foo')

    def test306(self):
        self.failure('stop AND NOT foo')


class FakeStopWordRemover:

    def process(self, list):
        return [word for word in list if word != "stop"]


def test_suite():
    return TestSuite((makeSuite(TestQueryParser),
                      makeSuite(StopWordTestQueryParser),
                      makeSuite(TestInterfaces),
                    ))


if __name__=="__main__":
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_setops.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zodb.btrees.IIBTree import IIBTree, IIBucket

from zope.textindex.setops import mass_weightedIntersection
from zope.textindex.setops import mass_weightedUnion

class TestSetOps(TestCase):

    def testEmptyLists(self):
        self.assertEqual(len(mass_weightedIntersection([])), 0)
        self.assertEqual(len(mass_weightedUnion([])), 0)

    def testIdentity(self):
        t = IIBTree([(1, 2)])
        b = IIBucket([(1, 2)])
        for x in t, b:
            for func in mass_weightedUnion, mass_weightedIntersection:
                result = func([(x, 1)])
                self.assertEqual(len(result), 1)
                self.assertEqual(list(result.items()), list(x.items()))

    def testScalarMultiply(self):
        t = IIBTree([(1, 2), (2, 3), (3, 4)])
        allkeys = [1, 2, 3]
        b = IIBucket(t)
        for x in t, b:
            self.assertEqual(list(x.keys()), allkeys)
            for func in mass_weightedUnion, mass_weightedIntersection:
                for factor in 0, 1, 5, 10:
                    result = func([(x, factor)])
                    self.assertEqual(allkeys, list(result.keys()))
                    for key in x.keys():
                        self.assertEqual(x[key] * factor, result[key])

    def testPairs(self):
        t1 = IIBTree([(1, 10), (3, 30), (7, 70)])
        t2 = IIBTree([(3, 30), (5, 50), (7, 7), (9, 90)])
        allkeys = [1, 3, 5, 7, 9]
        b1 = IIBucket(t1)
        b2 = IIBucket(t2)
        for x in t1, t2, b1, b2:
            for key in x.keys():
                self.assertEqual(key in allkeys, 1)
            for y in t1, t2, b1, b2:
                for w1, w2 in (0, 0), (1, 10), (10, 1), (2, 3):
                    # Test the union.
                    expected = []
                    for key in allkeys:
                        if x.has_key(key) or y.has_key(key):
                            result = x.get(key, 0) * w1 + y.get(key, 0) * w2
                            expected.append((key, result))
                    expected.sort()
                    got = mass_weightedUnion([(x, w1), (y, w2)])
                    self.assertEqual(expected, list(got.items()))
                    got = mass_weightedUnion([(y, w2), (x, w1)])
                    self.assertEqual(expected, list(got.items()))

                    # Test the intersection.
                    expected = []
                    for key in allkeys:
                        if x.has_key(key) and y.has_key(key):
                            result = x[key] * w1 + y[key] * w2
                            expected.append((key, result))
                    expected.sort()
                    got = mass_weightedIntersection([(x, w1), (y, w2)])
                    self.assertEqual(expected, list(got.items()))
                    got = mass_weightedIntersection([(y, w2), (x, w1)])
                    self.assertEqual(expected, list(got.items()))

    def testMany(self):
        import random
        N = 15  # number of IIBTrees to feed in
        L = []
        commonkey = N * 1000
        allkeys = {commonkey: 1}
        for i in range(N):
            t = IIBTree()
            t[commonkey] = i
            for j in range(N-i):
                key = i + j
                allkeys[key] = 1
                t[key] = N*i + j
            L.append((t, i+1))
        random.shuffle(L)
        allkeys = allkeys.keys()
        allkeys.sort()

        # Test the union.
        expected = []
        for key in allkeys:
            sum = 0
            for t, w in L:
                if t.has_key(key):
                    sum += t[key] * w
            expected.append((key, sum))
        # print 'union', expected
        got = mass_weightedUnion(L)
        self.assertEqual(expected, list(got.items()))

        # Test the intersection.
        expected = []
        for key in allkeys:
            sum = 0
            for t, w in L:
                if t.has_key(key):
                    sum += t[key] * w
                else:
                    break
            else:
                # We didn't break out of the loop so it's in the intersection.
                expected.append((key, sum))
        # print 'intersection', expected
        got = mass_weightedIntersection(L)
        self.assertEqual(expected, list(got.items()))

def test_suite():
    return makeSuite(TestSetOps)

if __name__=="__main__":
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/test_textindexwrapper.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Unit tests for TextIndexWrapper.

$Id: test_textindexwrapper.py,v 1.1.2.1 2002/12/23 19:33:35 jim Exp $
"""

from unittest import TestCase, TestSuite, main, makeSuite

from zope.textindex.textindexwrapper import TextIndexWrapper
from Zope.TextIndex import ParseTree

class Test(TestCase):

    def setUp(self):
        w = TextIndexWrapper()
        doc = u"the quick brown fox jumps over the lazy dog"
        w.index_doc(1000, [doc])
        doc = u"the brown fox and the yellow fox don't need the retriever"
        w.index_doc(1001, [doc])
        self.wrapper = w

    def testCounts(self):
        w = self.wrapper
        self.assertEqual(self.wrapper.documentCount(), 2)
        self.assertEqual(self.wrapper.wordCount(), 12)
        doc = u"foo bar"
        w.index_doc(1002, [doc])
        self.assertEqual(self.wrapper.documentCount(), 3)
        self.assertEqual(self.wrapper.wordCount(), 14)

    def testOne(self):
        matches, total = self.wrapper.query(u"quick fox", 0, 10)
        self.assertEqual(total, 1)
        [(docid, rank)] = matches # if this fails there's a problem
        self.assertEqual(docid, 1000)

    def testDefaultBatch(self):
        matches, total = self.wrapper.query(u"fox", 0)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)
        matches, total = self.wrapper.query(u"fox")
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)
        matches, total = self.wrapper.query(u" fox", 1)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 1)

    def testGlobbing(self):
        matches, total = self.wrapper.query("fo*")
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)

    def testLatin1(self):
        w = self.wrapper
        doc = u"Fran\xe7ois"
        w.index_doc(1002, [doc])
        matches, total = self.wrapper.query(doc, 0, 10)
        self.assertEqual(total, 1)
        [(docid, rank)] = matches # if this fails there's a problem
        self.assertEqual(docid, 1002)

    def testUnicode(self):
        w = self.wrapper
        # Verbose, but easy to debug
        delta  = u"\N{GREEK SMALL LETTER DELTA}"
        delta += u"\N{GREEK SMALL LETTER EPSILON}"
        delta += u"\N{GREEK SMALL LETTER LAMDA}"
        delta += u"\N{GREEK SMALL LETTER TAU}"
        delta += u"\N{GREEK SMALL LETTER ALPHA}"
        assert delta.islower()
        emdash = u"\N{EM DASH}"
        assert not emdash.isalnum()
        alpha  = u"\N{GREEK SMALL LETTER ALPHA}"
        assert alpha.islower()
        lamda  = u"\N{GREEK SMALL LETTER LAMDA}"
        lamda += u"\N{GREEK SMALL LETTER ALPHA}"
        assert lamda.islower()
        doc = delta + emdash + alpha
        w.index_doc(1002, [doc])
        for word in delta, alpha:
            matches, total = self.wrapper.query(word, 0, 10)
            self.assertEqual(total, 1)
            [(docid, rank)] = matches # if this fails there's a problem
            self.assertEqual(docid, 1002)
        self.assertRaises(ParseTree.ParseError,
                          self.wrapper.query, emdash, 0, 10)
        matches, total = self.wrapper.query(lamda, 0, 10)
        self.assertEqual(total, 0)

    def testNone(self):
        matches, total = self.wrapper.query(u"dalmatian", 0, 10)
        self.assertEqual(total, 0)
        self.assertEqual(len(matches), 0)

    def testAll(self):
        matches, total = self.wrapper.query(u"brown fox", 0, 10)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)
        matches.sort()
        self.assertEqual(matches[0][0], 1000)
        self.assertEqual(matches[1][0], 1001)

    def testBatching(self):
        matches1, total = self.wrapper.query(u"brown fox", 0, 1)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches1), 1)
        matches2, total = self.wrapper.query(u"brown fox", 1, 1)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches2), 1)
        matches = matches1 + matches2
        matches.sort()
        self.assertEqual(matches[0][0], 1000)
        self.assertEqual(matches[1][0], 1001)

def test_suite():
    return makeSuite(Test)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/textindex/tests/wordstats.py ===
#! /usr/bin/env python
"""Dump statistics about each word in the index.

usage: wordstats.py data.fs [index key]
"""

import zodb
from zodb.storage.file import FileStorage

def main(fspath, key):
    fs = FileStorage(fspath, read_only=1)
    db = ZODB.DB(fs)
    rt = db.open().root()
    index = rt[key]

    lex = index.lexicon
    idx = index.index
    print "Words", lex.length()
    print "Documents", idx.length()

    print "Word frequencies: count, word, wid"
    for word, wid in lex.items():
        docs = idx._wordinfo[wid]
        print len(docs), word, wid

    print "Per-doc scores: wid, (doc, score,)+"
    for wid in lex.wids():
        print wid,
        docs = idx._wordinfo[wid]
        for docid, score in docs.items():
            print docid, score,
        print

if __name__ == "__main__":
    import sys

    args = sys.argv[1:]
    index_key = "index"
    if len(args) == 1:
        fspath = args[0]
    elif len(args) == 2:
        fspath, index_key = args
    else:
        print "Expected 1 or 2 args, got", len(args)
    main(fspath, index_key)