[Zope-Checkins] SVN: Zope/trunk/src/Products/ZCatalog/tests/test Start reorganizing catalog tests
Hanno Schlichting
hannosch at hannosch.eu
Sat Jul 31 16:45:52 EDT 2010
Log message for revision 115282:
Start reorganizing catalog tests
D Zope/trunk/src/Products/ZCatalog/tests/testBrains.py
D Zope/trunk/src/Products/ZCatalog/tests/testCatalog.py
D Zope/trunk/src/Products/ZCatalog/tests/testLazySequences.py
A Zope/trunk/src/Products/ZCatalog/tests/test_brains.py
A Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py
A Zope/trunk/src/Products/ZCatalog/tests/test_lazy.py
Deleted: Zope/trunk/src/Products/ZCatalog/tests/testBrains.py
--- Zope/trunk/src/Products/ZCatalog/tests/testBrains.py 2010-07-31 20:44:07 UTC (rev 115281)
+++ Zope/trunk/src/Products/ZCatalog/tests/testBrains.py 2010-07-31 20:45:52 UTC (rev 115282)
@@ -1,190 +0,0 @@
-# Copyright (c) 2002 Zope Foundation and Contributors.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-"""Unittests for Catalog brains
-import unittest
-import Acquisition
-from zExceptions import Unauthorized
-from ZODB.POSException import ConflictError
-class Happy(Acquisition.Implicit):
- """Happy content"""
- def __init__(self, id):
- self.id = id
- def check(self):
- pass
-class Secret(Happy):
- """Object that raises Unauthorized when accessed"""
- def check(self):
- raise Unauthorized
-class Conflicter(Happy):
- """Object that raises ConflictError when accessed"""
- def check(self):
- raise ConflictError
-class DummyRequest(Acquisition.Implicit):
- def physicalPathToURL(self, path, relative=False):
- if not relative:
- path = 'http://superbad.com' + path
- return path
-_marker = object()
-class DummyCatalog(Acquisition.Implicit):
- _objs = {'/happy':Happy('happy'),
- '/secret':Secret('secret'),
- '/conflicter':Conflicter('conflicter')}
- _paths = _objs.keys() + ['/zonked']
- _paths.sort()
- # This is sooooo ugly
- def unrestrictedTraverse(self, path, default=None):
- # for these tests...
- assert path == '' or path == ('') or path == [''], path
- return self
- def restrictedTraverse(self, path, default=_marker):
- if not path.startswith('/'):
- path = '/'+path
- try:
- ob = self._objs[path].__of__(self)
- ob.check()
- return ob
- except KeyError:
- if default is not _marker:
- return default
- raise
- def getpath(self, rid):
- return self._paths[rid]
- def getobject(self, rid):
- return self.restrictedTraverse(self._paths[rid])
- def resolve_url(self, path, REQUEST):
- path = path[path.find('/', path.find('//')+1):] # strip server part
- return self.restrictedTraverse(path)
-class ConflictingCatalog(DummyCatalog):
- def getpath(self, rid):
- raise ConflictError
-class BrainsTestBase:
- _old_flag = None
- def setUp(self):
- self.cat = DummyCatalog()
- self.cat.REQUEST = DummyRequest()
- self._init_getOb_flag()
- def tearDown(self):
- if self._old_flag is not None:
- self._restore_getOb_flag()
- def _init_getOb_flag(self):
- from Products.ZCatalog import CatalogBrains
- self._old_flag = CatalogBrains.GETOBJECT_RAISES
- CatalogBrains.GETOBJECT_RAISES = self._flag_value()
- def _restore_getOb_flag(self):
- from Products.ZCatalog import CatalogBrains
- CatalogBrains.GETOBJECT_RAISES = self._old_flag
- def _makeBrain(self, rid):
- from Products.ZCatalog.CatalogBrains import AbstractCatalogBrain
- class Brain(AbstractCatalogBrain):
- __record_schema__ = {'test_field': 0, 'data_record_id_':1}
- return Brain(('test', rid)).__of__(self.cat)
- def testHasKey(self):
- b = self._makeBrain(1)
- self.failUnless(b.has_key('test_field'))
- self.failUnless(b.has_key('data_record_id_'))
- self.failIf(b.has_key('godel'))
- def testGetPath(self):
- b = [self._makeBrain(rid) for rid in range(3)]
- self.assertEqual(b[0].getPath(), '/conflicter')
- self.assertEqual(b[1].getPath(), '/happy')
- self.assertEqual(b[2].getPath(), '/secret')
- def testGetPathPropagatesConflictErrors(self):
- self.cat = ConflictingCatalog()
- b = self._makeBrain(0)
- self.assertRaises(ConflictError, b.getPath)
- def testGetURL(self):
- b = self._makeBrain(0)
- self.assertEqual(b.getURL(), 'http://superbad.com/conflicter')
- def testGetRID(self):
- b = self._makeBrain(42)
- self.assertEqual(b.getRID(), 42)
- def testGetObjectHappy(self):
- b = self._makeBrain(1)
- self.assertEqual(b.getPath(), '/happy')
- self.failUnless(b.getObject().aq_base is self.cat.getobject(1).aq_base)
- def testGetObjectPropagatesConflictErrors(self):
- b = self._makeBrain(0)
- self.assertEqual(b.getPath(), '/conflicter')
- self.assertRaises(ConflictError, b.getObject)
-class TestBrains(BrainsTestBase, unittest.TestCase):
- def _flag_value(self):
- return True
- def testGetObjectRaisesUnauthorized(self):
- from zExceptions import Unauthorized
- b = self._makeBrain(2)
- self.assertEqual(b.getPath(), '/secret')
- self.assertRaises(Unauthorized, b.getObject)
- def testGetObjectRaisesNotFoundForMissing(self):
- from zExceptions import NotFound
- b = self._makeBrain(3)
- self.assertEqual(b.getPath(), '/zonked')
- self.assertRaises(KeyError, self.cat.getobject, 3)
- self.assertRaises((NotFound, AttributeError, KeyError), b.getObject)
-class TestBrainsOldBehavior(BrainsTestBase, unittest.TestCase):
- def _flag_value(self):
- return False
- def testGetObjectReturnsNoneForUnauthorized(self):
- b = self._makeBrain(2)
- self.assertEqual(b.getPath(), '/secret')
- self.assertEqual(b.getObject(), None)
- def testGetObjectReturnsNoneForMissing(self):
- b = self._makeBrain(3)
- self.assertEqual(b.getPath(), '/zonked')
- self.assertRaises(KeyError, self.cat.getobject, 3)
- self.assertEqual(b.getObject(), None)
-def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(TestBrains))
- suite.addTest(unittest.makeSuite(TestBrainsOldBehavior))
- return suite
Deleted: Zope/trunk/src/Products/ZCatalog/tests/testCatalog.py
--- Zope/trunk/src/Products/ZCatalog/tests/testCatalog.py 2010-07-31 20:44:07 UTC (rev 115281)
+++ Zope/trunk/src/Products/ZCatalog/tests/testCatalog.py 2010-07-31 20:45:52 UTC (rev 115282)
@@ -1,875 +0,0 @@
-# Copyright (c) 2002 Zope Foundation and Contributors.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-""" Unittests for Catalog.
-import unittest
-from Testing.ZopeTestCase.warnhook import WarningsHook
-import Zope2
-from itertools import chain
-import random
-import ExtensionClass
-import OFS.Application
-from AccessControl.SecurityManagement import setSecurityManager
-from AccessControl.SecurityManagement import noSecurityManager
-from AccessControl import Unauthorized
-from Acquisition import Implicit
-from Products.ZCatalog.Catalog import Catalog
-from Products.ZCatalog.Catalog import CatalogError
-from ZODB.DB import DB
-from ZODB.DemoStorage import DemoStorage
-import transaction
-from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex
-from Products.PluginIndexes.KeywordIndex.KeywordIndex import KeywordIndex
-from Products.ZCTextIndex.OkapiIndex import OkapiIndex
-from Products.ZCTextIndex.ZCTextIndex import PLexicon
-from Products.ZCTextIndex.ZCTextIndex import ZCTextIndex
-def createDatabase():
- # XXX We have to import and init products in order for PluginIndexes to
- # be registered.
- OFS.Application.import_products()
- # Create a DemoStorage and put an Application in it
- db = DB(DemoStorage())
- conn = db.open()
- root = conn.root()
- app = OFS.Application.Application()
- root['Application'] = app
- transaction.commit()
- # Init products
- #OFS.Application.initialize(app)
- OFS.Application.install_products(app) # XXX: this is still icky
- return app
-app = createDatabase()
-def sort(iterable):
- L = list(iterable)
- L.sort()
- return L
-from OFS.Folder import Folder as OFS_Folder
-class Folder(OFS_Folder):
- def __init__(self, id):
- self._setId(id)
- OFS_Folder.__init__(self)
-class CatalogBase:
- def setUp(self):
- self._catalog = Catalog()
- def tearDown(self):
- self._catalog = None
-class TestAddDelColumn(CatalogBase,unittest.TestCase):
- def testAdd(self):
- self._catalog.addColumn('id')
- self.assertEqual(self._catalog.schema.has_key('id'), 1,
- 'add column failed')
- def testAddBad(self):
- self.assertRaises(CatalogError, self._catalog.addColumn, '_id')
- def testDel(self):
- self._catalog.addColumn('id')
- self._catalog.delColumn('id')
- self.assert_(self._catalog.schema.has_key('id') != 1,
- 'del column failed')
-class TestAddDelIndexes(CatalogBase, unittest.TestCase):
- def testAddFieldIndex(self):
- idx = FieldIndex('id')
- self._catalog.addIndex('id', idx)
- self.assert_(isinstance(self._catalog.indexes['id'],
- type(FieldIndex('id'))),
- 'add field index failed')
- def testAddTextIndex(self):
- self._catalog.lexicon = PLexicon('lexicon')
- idx = ZCTextIndex('id', caller=self._catalog,
- index_factory=OkapiIndex, lexicon_id='lexicon')
- self._catalog.addIndex('id', idx)
- i = self._catalog.indexes['id']
- self.assert_(isinstance(i, ZCTextIndex), 'add text index failed')
- def testAddKeywordIndex(self):
- idx = KeywordIndex('id')
- self._catalog.addIndex('id', idx)
- i = self._catalog.indexes['id']
- self.assert_(isinstance(i, type(KeywordIndex('id'))),
- 'add kw index failed')
- def testDelFieldIndex(self):
- idx = FieldIndex('id')
- self._catalog.addIndex('id', idx)
- self._catalog.delIndex('id')
- self.assert_(self._catalog.indexes.has_key('id') != 1,
- 'del index failed')
- def testDelTextIndex(self):
- self._catalog.lexicon = PLexicon('lexicon')
- idx = ZCTextIndex('id', caller=self._catalog,
- index_factory=OkapiIndex, lexicon_id='lexicon')
- self._catalog.addIndex('id', idx)
- self._catalog.delIndex('id')
- self.assert_(self._catalog.indexes.has_key('id') != 1,
- 'del index failed')
- def testDelKeywordIndex(self):
- idx = KeywordIndex('id')
- self._catalog.addIndex('id', idx)
- self._catalog.delIndex('id')
- self.assert_(self._catalog.indexes.has_key('id') != 1,
- 'del index failed')
-# Removed unittests dealing with catalog instantiation and vocabularies
-# since catalog no longer creates/manages vocabularies automatically (Casey)
-# Test of the ZCatalog object, as opposed to Catalog
-class zdummy(ExtensionClass.Base):
- def __init__(self, num):
- self.num = num
- def title(self):
- return '%d' % self.num
-class zdummyFalse(zdummy):
- def __nonzero__(self):
- return False
-# make objects with failing __len__ and __nonzero__
-class dummyLenFail(zdummy):
- def __init__(self, num, fail):
- zdummy.__init__(self, num)
- self.fail = fail
- def __len__(self):
- self.fail("__len__() was called")
-class dummyNonzeroFail(zdummy):
- def __init__(self, num, fail):
- zdummy.__init__(self, num)
- self.fail = fail
- def __nonzero__(self):
- self.fail("__nonzero__() was called")
-class FakeTraversalError(KeyError):
- """fake traversal exception for testing"""
-class fakeparent(Implicit):
- # fake parent mapping unrestrictedTraverse to
- # catalog.resolve_path as simulated by TestZCatalog
- def __init__(self, d):
- self.d = d
- marker = object()
- def unrestrictedTraverse(self, path, default=marker):
- result = self.d.get(path, default)
- if result is self.marker:
- raise FakeTraversalError(path)
- return result
-class TestZCatalog(unittest.TestCase):
- def setUp(self):
- from Products.ZCatalog.ZCatalog import ZCatalog
- self._catalog = ZCatalog('Catalog')
- self._catalog.resolve_path = self._resolve_num
- self._catalog.addIndex('title', 'KeywordIndex')
- self._catalog.addColumn('title')
- self.upper = 10
- self.d = {}
- for x in range(0, self.upper):
- # make uid a string of the number
- ob = zdummy(x)
- self.d[str(x)] = ob
- self._catalog.catalog_object(ob, str(x))
- def _resolve_num(self, num):
- return self.d[num]
- def test_z3interfaces(self):
- from Products.ZCatalog.interfaces import IZCatalog
- from Products.ZCatalog.ZCatalog import ZCatalog
- from zope.interface.verify import verifyClass
- verifyClass(IZCatalog, ZCatalog)
- def testGetMetadataForUID(self):
- testNum = str(self.upper - 3) # as good as any..
- data = self._catalog.getMetadataForUID(testNum)
- self.assertEqual(data['title'], testNum)
- def testGetIndexDataForUID(self):
- testNum = str(self.upper - 3)
- data = self._catalog.getIndexDataForUID(testNum)
- self.assertEqual(data['title'][0], testNum)
- def testSearch(self):
- query = {'title': ['5','6','7']}
- sr = self._catalog.searchResults(query)
- self.assertEqual(len(sr), 3)
- sr = self._catalog.search(query)
- self.assertEqual(len(sr), 3)
- def testUpdateMetadata(self):
- self._catalog.catalog_object(zdummy(1), '1')
- data = self._catalog.getMetadataForUID('1')
- self.assertEqual(data['title'], '1')
- self._catalog.catalog_object(zdummy(2), '1', update_metadata=0)
- data = self._catalog.getMetadataForUID('1')
- self.assertEqual(data['title'], '1')
- self._catalog.catalog_object(zdummy(2), '1', update_metadata=1)
- data = self._catalog.getMetadataForUID('1')
- self.assertEqual(data['title'], '2')
- # update_metadata defaults to true, test that here
- self._catalog.catalog_object(zdummy(1), '1')
- data = self._catalog.getMetadataForUID('1')
- self.assertEqual(data['title'], '1')
- def testReindexIndexDoesntDoMetadata(self):
- self.d['0'].num = 9999
- self._catalog.reindexIndex('title', {})
- data = self._catalog.getMetadataForUID('0')
- self.assertEqual(data['title'], '0')
- def testReindexIndexesFalse(self):
- # setup
- false_id = self.upper + 1
- ob = zdummyFalse(false_id)
- self.d[str(false_id)] = ob
- self._catalog.catalog_object(ob, str(false_id))
- # test, object evaluates to false; there was bug which caused the
- # object to be removed from index
- ob.num = 9999
- self._catalog.reindexIndex('title', {})
- result = self._catalog(title='9999')
- self.assertEquals(1, len(result))
- def testBooleanEvalOn_manage_catalogObject(self):
- self.d['11'] = dummyLenFail(11, self.fail)
- self.d['12'] = dummyNonzeroFail(12, self.fail)
- # create a fake response that doesn't bomb on manage_catalogObject()
- class myresponse:
- def redirect(self, url):
- pass
- # this next call should not fail
- self._catalog.manage_catalogObject(None, myresponse(), 'URL1', urls=('11', '12'))
- def testBooleanEvalOn_refreshCatalog_getobject(self):
- # wrap catalog under the fake parent providing unrestrictedTraverse()
- catalog = self._catalog.__of__(fakeparent(self.d))
- # replace entries to test refreshCatalog
- self.d['0'] = dummyLenFail(0, self.fail)
- self.d['1'] = dummyNonzeroFail(1, self.fail)
- # this next call should not fail
- catalog.refreshCatalog()
- for uid in ('0', '1'):
- rid = catalog.getrid(uid)
- # neither should these
- catalog.getobject(rid)
- def test_getobject_doesntMaskTraversalErrorsAndDoesntDelegateTo_resolve_url(self):
- # wrap catalog under the fake parent providing unrestrictedTraverse()
- catalog = self._catalog.__of__(fakeparent(self.d))
- # make resolve_url fail if ZCatalog falls back on it
- def resolve_url(path, REQUEST):
- self.fail(".resolve_url() should not be called by .getobject()")
- catalog.resolve_url = resolve_url
- # traversal should work at first
- rid0 = catalog.getrid('0')
- # lets set it up so the traversal fails
- del self.d['0']
- self.assertRaises(FakeTraversalError, catalog.getobject, rid0, REQUEST=object())
- # and if there is a None at the traversal point, that's where it should return
- self.d['0'] = None
- self.assertEquals(catalog.getobject(rid0), None)
-class dummy(ExtensionClass.Base):
- att1 = 'att1'
- att2 = 'att2'
- att3 = ['att3']
- def __init__(self, num):
- self.num = num
- def col1(self):
- return 'col1'
- def col2(self):
- return 'col2'
- def col3(self):
- return ['col3']
-class TestCatalogObject(unittest.TestCase):
- upper = 1000
- nums = range(upper)
- for i in range(upper):
- j = random.randrange(0, upper)
- tmp = nums[i]
- nums[i] = nums[j]
- nums[j] = tmp
- def setUp(self):
- self.warningshook = WarningsHook()
- self.warningshook.install()
- self._catalog = Catalog()
- self._catalog.lexicon = PLexicon('lexicon')
- col1 = FieldIndex('col1')
- col2 = ZCTextIndex('col2', caller=self._catalog,
- index_factory=OkapiIndex, lexicon_id='lexicon')
- col3 = KeywordIndex('col3')
- self._catalog.addIndex('col1', col1)
- self._catalog.addIndex('col2', col2)
- self._catalog.addIndex('col3', col3)
- self._catalog.addColumn('col1')
- self._catalog.addColumn('col2')
- self._catalog.addColumn('col3')
- att1 = FieldIndex('att1')
- att2 = ZCTextIndex('att2', caller=self._catalog,
- index_factory=OkapiIndex, lexicon_id='lexicon')
- att3 = KeywordIndex('att3')
- num = FieldIndex('num')
- self._catalog.addIndex('att1', att1)
- self._catalog.addIndex('att2', att2)
- self._catalog.addIndex('att3', att3)
- self._catalog.addIndex('num', num)
- self._catalog.addColumn('att1')
- self._catalog.addColumn('att2')
- self._catalog.addColumn('att3')
- self._catalog.addColumn('num')
- for x in range(0, self.upper):
- self._catalog.catalogObject(dummy(self.nums[x]), `x`)
- self._catalog.aq_parent = dummy('foo') # fake out acquisition
- def tearDown(self):
- self._catalog = None
- self.warningshook.uninstall()
- def testResultLength(self):
- a = self._catalog()
- self.assertEqual(len(a), self.upper,
- 'length should be %s, its %s' % (self.upper, len(a)))
- def testEmptyMappingReturnsAll(self):
- upper = self.upper
- a = self._catalog({})
- self.assertEqual(len(a), upper,
- 'length should be %s, its %s' % (upper, len(a)))
- # Queries used to do the same, because of a bug in the
- # parseIndexRequest function, mistaking a CatalogSearchArgumentsMap
- # for a Record class
- a = self._catalog({'col1':'', 'col2':'', 'col3':''})
- self.assertEqual(len(a), 0,
- 'length should be %s, its %s' % (upper, len(a)))
- def testFieldIndexLength(self):
- a = self._catalog(att1='att1')
- self.assertEqual(len(a), self.upper,
- 'should be %s, but is %s' % (self.upper, len(a)))
- def testTextIndexLength(self):
- a = self._catalog(att2='att2')
- self.assertEqual(len(a), self.upper,
- 'should be %s, but is %s' % (self.upper, len(a)))
- def testKeywordIndexLength(self):
- a = self._catalog(att3='att3')
- self.assertEqual(len(a), self.upper,
- 'should be %s, but is %s' % (self.upper, len(a)))
- def testUncatalogFieldIndex(self):
- self.uncatalog()
- a = self._catalog(att1='att1')
- self.assertEqual(len(a), 0, 'len: %s' % len(a))
- def testUncatalogTextIndex(self):
- self.uncatalog()
- a = self._catalog(att2='att2')
- self.assertEqual(len(a), 0, 'len: %s' % len(a))
- def testUncatalogKeywordIndex(self):
- self.uncatalog()
- a = self._catalog(att3='att3')
- self.assertEqual(len(a), 0, 'len: %s' % len(a))
- def testBadUncatalog(self):
- try:
- self._catalog.uncatalogObject('asdasdasd')
- except:
- self.fail('uncatalogObject raised exception on bad uid')
- def testUniqueValuesForLength(self):
- a = self._catalog.uniqueValuesFor('att1')
- self.assertEqual(len(a), 1, 'bad number of unique values %s' % a)
- def testUniqueValuesForContent(self):
- a = self._catalog.uniqueValuesFor('att1')
- self.assertEqual(a[0], 'att1', 'bad content %s' % a[0])
- def testUncatalogTwice(self):
- self._catalog.uncatalogObject(`0`)
- self.assertRaises(Exception, '_second')
- def testCatalogLength(self):
- for x in range(0, self.upper):
- self._catalog.uncatalogObject(`x`)
- self.assertEqual(len(self._catalog), 0)
- def _second(self):
- self._catalog.uncatalogObject(`0`)
- def uncatalog(self):
- for x in range(0, self.upper):
- self._catalog.uncatalogObject(`x`)
- def testGoodSortIndex(self):
- upper = self.upper
- a = self._catalog(sort_on='num')
- self.assertEqual(len(a), upper,
- 'length should be %s, its %s' % (upper, len(a)))
- for x in range(self.upper):
- self.assertEqual(a[x].num, x)
- def testBadSortIndex(self):
- self.assertRaises(CatalogError, self.badsortindex)
- def badsortindex(self):
- a = self._catalog(sort_on='foofaraw')
- def testWrongKindOfIndexForSort(self):
- self.assertRaises(CatalogError, self.wrongsortindex)
- def wrongsortindex(self):
- a = self._catalog(sort_on='att2')
- def testTextIndexQWithSortOn(self):
- upper = self.upper
- a = self._catalog(sort_on='num', att2='att2')
- self.assertEqual(len(a), upper,
- 'length should be %s, its %s' % (upper, len(a)))
- for x in range(self.upper):
- self.assertEqual(a[x].num, x)
- def testTextIndexQWithoutSortOn(self):
- upper = self.upper
- a = self._catalog(att2='att2')
- self.assertEqual(len(a), upper,
- 'length should be %s, its %s' % (upper, len(a)))
-# XXX: don't know how to adjust this test for ZCTextIndex
-# for x in range(self.upper):
-# self.assertEqual(a[x].data_record_score_, 1)
- def testKeywordIndexWithMinRange(self):
- a = self._catalog(att3={'query': 'att', 'range': 'min'})
- self.assertEqual(len(a), self.upper)
- def testKeywordIndexWithMaxRange(self):
- a = self._catalog(att3={'query': 'att35', 'range': ':max'})
- self.assertEqual(len(a), self.upper)
- def testKeywordIndexWithMinMaxRangeCorrectSyntax(self):
- a = self._catalog(att3={'query': ['att', 'att35'], 'range': 'min:max'})
- self.assertEqual(len(a), self.upper)
- def testKeywordIndexWithMinMaxRangeWrongSyntax(self):
- # checkKeywordIndex with min/max range wrong syntax.
- a = self._catalog(att3={'query': ['att'], 'range': 'min:max'})
- self.assert_(len(a) != self.upper)
- def testCombinedTextandKeywordQuery(self):
- a = self._catalog(att3='att3', att2='att2')
- self.assertEqual(len(a), self.upper)
- def testLargeSortedResultSetWithSmallIndex(self):
- # This exercises the optimization in the catalog that iterates
- # over the sort index rather than the result set when the result
- # set is much larger than the sort index.
- a = self._catalog(sort_on='att1')
- self.assertEqual(len(a), self.upper)
- def testBadSortLimits(self):
- self.assertRaises(
- AssertionError, self._catalog, sort_on='num', sort_limit=0)
- self.assertRaises(
- AssertionError, self._catalog, sort_on='num', sort_limit=-10)
- def testSortLimit(self):
- full = self._catalog(sort_on='num')
- a = self._catalog(sort_on='num', sort_limit=10)
- self.assertEqual([r.num for r in a], [r.num for r in full[:10]])
- self.assertEqual(a.actual_result_count, self.upper)
- a = self._catalog(sort_on='num', sort_limit=10, sort_order='reverse')
- rev = [r.num for r in full[-10:]]
- rev.reverse()
- self.assertEqual([r.num for r in a], rev)
- self.assertEqual(a.actual_result_count, self.upper)
- def testBigSortLimit(self):
- a = self._catalog(sort_on='num', sort_limit=self.upper*3)
- self.assertEqual(a.actual_result_count, self.upper)
- self.assertEqual(a[0].num, 0)
- a = self._catalog(
- sort_on='num', sort_limit=self.upper*3, sort_order='reverse')
- self.assertEqual(a.actual_result_count, self.upper)
- self.assertEqual(a[0].num, self.upper - 1)
- def testUpdateMetadataFalse(self):
- ob = dummy(9999)
- self._catalog.catalogObject(ob, `9999`)
- brain = self._catalog(num=9999)[0]
- self.assertEqual(brain.att1, 'att1')
- ob.att1 = 'foobar'
- self._catalog.catalogObject(ob, `9999`, update_metadata=0)
- brain = self._catalog(num=9999)[0]
- self.assertEqual(brain.att1, 'att1')
- self._catalog.catalogObject(ob, `9999`)
- brain = self._catalog(num=9999)[0]
- self.assertEqual(brain.att1, 'foobar')
-class objRS(ExtensionClass.Base):
- def __init__(self,num):
- self.number = num
-class TestRS(unittest.TestCase):
- def setUp(self):
- self._catalog = Catalog()
- index = FieldIndex('number')
- self._catalog.addIndex('number', index)
- self._catalog.addColumn('number')
- for i in range(5000):
- obj = objRS(random.randrange(0,20000))
- self._catalog.catalogObject(obj,i)
- self._catalog.aq_parent = objRS(200)
- def testRangeSearch(self):
- for i in range(1000):
- m = random.randrange(0,20000)
- n = m + 1000
- for r in self._catalog.searchResults(
- number= {'query': (m,n) , 'range' : 'min:max' } ):
- size = r.number
- self.assert_(m<=size and size<=n,
- "%d vs [%d,%d]" % (r.number,m,n))
-class TestMerge(unittest.TestCase):
- # Test merging results from multiple catalogs
- def setUp(self):
- self.catalogs = []
- for i in range(3):
- cat = Catalog()
- cat.lexicon = PLexicon('lexicon')
- cat.addIndex('num', FieldIndex('num'))
- cat.addIndex('big', FieldIndex('big'))
- cat.addIndex('number', FieldIndex('number'))
- i = ZCTextIndex('title', caller=cat, index_factory=OkapiIndex,
- lexicon_id='lexicon')
- cat.addIndex('title', i)
- cat.aq_parent = zdummy(16336)
- for i in range(10):
- obj = zdummy(i)
- obj.big = i > 5
- obj.number = True
- cat.catalogObject(obj, str(i))
- self.catalogs.append(cat)
- def testNoFilterOrSort(self):
- from Products.ZCatalog.Catalog import mergeResults
- results = [cat.searchResults(
- dict(number=True), _merge=0) for cat in self.catalogs]
- merged_rids = [r.getRID() for r in mergeResults(
- results, has_sort_keys=False, reverse=False)]
- expected = [r.getRID() for r in chain(*results)]
- self.assertEqual(sort(merged_rids), sort(expected))
- def testSortedOnly(self):
- from Products.ZCatalog.Catalog import mergeResults
- results = [cat.searchResults(
- dict(number=True, sort_on='num'), _merge=0)
- for cat in self.catalogs]
- merged_rids = [r.getRID() for r in mergeResults(
- results, has_sort_keys=True, reverse=False)]
- expected = sort(chain(*results))
- expected = [rid for sortkey, rid, getitem in expected]
- self.assertEqual(merged_rids, expected)
- def testSortReverse(self):
- from Products.ZCatalog.Catalog import mergeResults
- results = [cat.searchResults(
- dict(number=True, sort_on='num'), _merge=0)
- for cat in self.catalogs]
- merged_rids = [r.getRID() for r in mergeResults(
- results, has_sort_keys=True, reverse=True)]
- expected = sort(chain(*results))
- expected.reverse()
- expected = [rid for sortkey, rid, getitem in expected]
- self.assertEqual(merged_rids, expected)
- def testLimitSort(self):
- from Products.ZCatalog.Catalog import mergeResults
- results = [cat.searchResults(
- dict(number=True, sort_on='num'), sort_limit=2, _merge=0)
- for cat in self.catalogs]
- merged_rids = [r.getRID() for r in mergeResults(
- results, has_sort_keys=True, reverse=False)]
- expected = sort(chain(*results))
- expected = [rid for sortkey, rid, getitem in expected]
- self.assertEqual(merged_rids, expected)
- def testScored(self):
- from Products.ZCatalog.Catalog import mergeResults
- results = [cat.searchResults(title='4 or 5 or 6', _merge=0)
- for cat in self.catalogs]
- merged_rids = [r.getRID() for r in mergeResults(
- results, has_sort_keys=True, reverse=False)]
- expected = sort(chain(*results))
- expected = [rid for sortkey, (nscore, score, rid), getitem in expected]
- self.assertEqual(merged_rids, expected)
- def testSmallIndexSort(self):
- # Test that small index sort optimization is not used for merging
- from Products.ZCatalog.Catalog import mergeResults
- results = [cat.searchResults(
- dict(number=True, sort_on='big'), _merge=0)
- for cat in self.catalogs]
- merged_rids = [r.getRID() for r in mergeResults(
- results, has_sort_keys=True, reverse=False)]
- expected = sort(chain(*results))
- expected = [rid for sortkey, rid, getitem in expected]
- self.assertEqual(merged_rids, expected)
-class PickySecurityManager:
- def __init__(self, badnames=[]):
- self.badnames = badnames
- def validateValue(self, value):
- return 1
- def validate(self, accessed, container, name, value):
- if name not in self.badnames:
- return 1
- raise Unauthorized(name)
-class TestZCatalogGetObject(unittest.TestCase):
- # Test what objects are returned by brain.getObject()
- _old_flag = None
- def setUp(self):
- from Products.ZCatalog.ZCatalog import ZCatalog
- catalog = ZCatalog('catalog')
- catalog.addIndex('id', 'FieldIndex')
- root = Folder('')
- root.getPhysicalRoot = lambda: root
- self.root = root
- self.root.catalog = catalog
- def tearDown(self):
- noSecurityManager()
- if self._old_flag is not None:
- self._restore_getObject_flag()
- def _init_getObject_flag(self, flag):
- from Products.ZCatalog import CatalogBrains
- self._old_flag = CatalogBrains.GETOBJECT_RAISES
- CatalogBrains.GETOBJECT_RAISES = flag
- def _restore_getObject_flag(self):
- from Products.ZCatalog import CatalogBrains
- CatalogBrains.GETOBJECT_RAISES = self._old_flag
- def test_getObject_found(self):
- # Check normal traversal
- root = self.root
- catalog = root.catalog
- root.ob = Folder('ob')
- catalog.catalog_object(root.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- self.assertEqual(brain.getPath(), '/ob')
- self.assertEqual(brain.getObject().getId(), 'ob')
- def test_getObject_missing_raises_NotFound(self):
- # Check that if the object is missing we raise
- from zExceptions import NotFound
- self._init_getObject_flag(True)
- root = self.root
- catalog = root.catalog
- root.ob = Folder('ob')
- catalog.catalog_object(root.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- del root.ob
- self.assertRaises((NotFound, AttributeError, KeyError), brain.getObject)
- def test_getObject_restricted_raises_Unauthorized(self):
- # Check that if the object's security does not allow traversal,
- # None is returned
- self._init_getObject_flag(True)
- root = self.root
- catalog = root.catalog
- root.fold = Folder('fold')
- root.fold.ob = Folder('ob')
- catalog.catalog_object(root.fold.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- # allow all accesses
- pickySecurityManager = PickySecurityManager()
- setSecurityManager(pickySecurityManager)
- self.assertEqual(brain.getObject().getId(), 'ob')
- # disallow just 'ob' access
- pickySecurityManager = PickySecurityManager(['ob'])
- setSecurityManager(pickySecurityManager)
- self.assertRaises(Unauthorized, brain.getObject)
- # disallow just 'fold' access
- pickySecurityManager = PickySecurityManager(['fold'])
- setSecurityManager(pickySecurityManager)
- ob = brain.getObject()
- self.failIf(ob is None)
- self.assertEqual(ob.getId(), 'ob')
- def test_getObject_missing_returns_None(self):
- # Check that if the object is missing None is returned
- self._init_getObject_flag(False)
- root = self.root
- catalog = root.catalog
- root.ob = Folder('ob')
- catalog.catalog_object(root.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- del root.ob
- self.assertEqual(brain.getObject(), None)
- def test_getObject_restricted_returns_None(self):
- # Check that if the object's security does not allow traversal,
- # None is returned
- self._init_getObject_flag(False)
- root = self.root
- catalog = root.catalog
- root.fold = Folder('fold')
- root.fold.ob = Folder('ob')
- catalog.catalog_object(root.fold.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- # allow all accesses
- pickySecurityManager = PickySecurityManager()
- setSecurityManager(pickySecurityManager)
- self.assertEqual(brain.getObject().getId(), 'ob')
- # disallow just 'ob' access
- pickySecurityManager = PickySecurityManager(['ob'])
- setSecurityManager(pickySecurityManager)
- self.assertEqual(brain.getObject(), None)
- # disallow just 'fold' access
- pickySecurityManager = PickySecurityManager(['fold'])
- setSecurityManager(pickySecurityManager)
- ob = brain.getObject()
- self.failIf(ob is None)
- self.assertEqual(ob.getId(), 'ob')
- # Now test _unrestrictedGetObject
- def test_unrestrictedGetObject_found(self):
- # Check normal traversal
- root = self.root
- catalog = root.catalog
- root.ob = Folder('ob')
- catalog.catalog_object(root.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- self.assertEqual(brain.getPath(), '/ob')
- self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
- def test_unrestrictedGetObject_restricted(self):
- # Check that if the object's security does not allow traversal,
- # it's still is returned
- root = self.root
- catalog = root.catalog
- root.fold = Folder('fold')
- root.fold.ob = Folder('ob')
- catalog.catalog_object(root.fold.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- # allow all accesses
- pickySecurityManager = PickySecurityManager()
- setSecurityManager(pickySecurityManager)
- self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
- # disallow just 'ob' access
- pickySecurityManager = PickySecurityManager(['ob'])
- setSecurityManager(pickySecurityManager)
- self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
- # disallow just 'fold' access
- pickySecurityManager = PickySecurityManager(['fold'])
- setSecurityManager(pickySecurityManager)
- self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
- def test_unrestrictedGetObject_missing_raises_NotFound(self):
- # Check that if the object is missing we raise
- from zExceptions import NotFound
- self._init_getObject_flag(True)
- root = self.root
- catalog = root.catalog
- root.ob = Folder('ob')
- catalog.catalog_object(root.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- del root.ob
- self.assertRaises((NotFound, AttributeError, KeyError),
- brain._unrestrictedGetObject)
- def test_unrestrictedGetObject_missing_returns_None(self):
- # Check that if the object is missing None is returned
- self._init_getObject_flag(False)
- root = self.root
- catalog = root.catalog
- root.ob = Folder('ob')
- catalog.catalog_object(root.ob)
- brain = catalog.searchResults({'id': 'ob'})[0]
- del root.ob
- self.assertEqual(brain._unrestrictedGetObject(), None)
-def test_suite():
- suite = unittest.TestSuite()
- suite.addTest( unittest.makeSuite( TestAddDelColumn ) )
- suite.addTest( unittest.makeSuite( TestAddDelIndexes ) )
- suite.addTest( unittest.makeSuite( TestZCatalog ) )
- suite.addTest( unittest.makeSuite( TestCatalogObject ) )
- suite.addTest( unittest.makeSuite( TestRS ) )
- suite.addTest( unittest.makeSuite( TestMerge ) )
- suite.addTest( unittest.makeSuite( TestZCatalogGetObject ) )
- return suite
Deleted: Zope/trunk/src/Products/ZCatalog/tests/testLazySequences.py
--- Zope/trunk/src/Products/ZCatalog/tests/testLazySequences.py 2010-07-31 20:44:07 UTC (rev 115281)
+++ Zope/trunk/src/Products/ZCatalog/tests/testLazySequences.py 2010-07-31 20:45:52 UTC (rev 115282)
@@ -1,215 +0,0 @@
-# Copyright (c) 2002 Zope Foundation and Contributors.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-"""Unittests for Lazy sequence classes
-import unittest
-class BaseSequenceTest(unittest.TestCase):
- def _compare(self, lseq, seq):
- self.assertEqual(len(lseq), len(seq))
- self.assertEqual(list(lseq), seq)
-class TestLazyCat(BaseSequenceTest):
- def _createLSeq(self, *sequences):
- from Products.ZCatalog.Lazy import LazyCat
- return LazyCat(sequences)
- def testEmpty(self):
- lcat = self._createLSeq([])
- self._compare(lcat, [])
- def testSingleSequence(self):
- seq = range(10)
- lcat = self._createLSeq(seq)
- self._compare(lcat, seq)
- def testMultipleSequences(self):
- from string import hexdigits, letters
- seq1 = range(10)
- seq2 = list(hexdigits)
- seq3 = list(letters)
- lcat = self._createLSeq(seq1, seq2, seq3)
- self._compare(lcat, seq1 + seq2 + seq3)
- def testNestedLazySequences(self):
- from string import hexdigits, letters
- seq1 = range(10)
- seq2 = list(hexdigits)
- seq3 = list(letters)
- lcat = apply(self._createLSeq,
- [self._createLSeq(seq) for seq in (seq1, seq2, seq3)])
- self._compare(lcat, seq1 + seq2 + seq3)
- def testSlicedSequences(self):
- from string import hexdigits, letters
- seq1 = range(10)
- seq2 = list(hexdigits)
- seq3 = list(letters)
- lcat = apply(self._createLSeq,
- [self._createLSeq(seq) for seq in (seq1, seq2, seq3)])
- self._compare(lcat[5:-5], seq1[5:] + seq2 + seq3[:-5])
- def testConsistentLength(self):
- # Unaccessed length
- lcat = self._createLSeq(range(10))
- self.assertEqual(len(lcat), 10)
- # Accessed in the middle
- lcat = self._createLSeq(range(10))
- lcat[4]
- self.assertEqual(len(lcat), 10)
- # Accessed after the lcat is accessed over the whole range
- lcat = self._createLSeq(range(10))
- lcat[:]
- self.assertEqual(len(lcat), 10)
-class TestLazyMap(TestLazyCat):
- def _createLSeq(self, *seq):
- return self._createLMap(lambda x: x, *seq)
- def _createLMap(self, mapfunc, *seq):
- from Products.ZCatalog.Lazy import LazyMap
- totalseq = []
- for s in seq:
- totalseq.extend(s)
- return LazyMap(mapfunc, totalseq)
- def testMap(self):
- from string import hexdigits, letters
- seq1 = range(10)
- seq2 = list(hexdigits)
- seq3 = list(letters)
- filter = lambda x: str(x).lower()
- lmap = self._createLMap(filter, seq1, seq2, seq3)
- self._compare(lmap, [str(x).lower() for x in (seq1 + seq2 + seq3)])
-class TestLazyFilter(TestLazyCat):
- def _createLSeq(self, *seq):
- return self._createLFilter(lambda x: True, *seq)
- def _createLFilter(self, filter, *seq):
- from Products.ZCatalog.Lazy import LazyFilter
- totalseq = []
- for s in seq:
- totalseq.extend(s)
- return LazyFilter(filter, totalseq)
- def testFilter(self):
- from string import hexdigits, letters
- seq1 = range(10)
- seq2 = list(hexdigits)
- seq3 = list(letters)
- filter = lambda x: str(x).isalpha()
- lmap = self._createLFilter(filter, seq1, seq2, seq3)
- self._compare(lmap, seq2[10:] + seq3)
- def testConsistentLengthWithFilter(self):
- from string import letters
- # Unaccessed length
- lfilter = self._createLFilter(lambda x: x.islower(), list(letters))
- self.assertEqual(len(lfilter), 26)
- # Accessed in the middle
- lfilter = self._createLFilter(lambda x: x.islower(), list(letters))
- lfilter[13]
- self.assertEqual(len(lfilter), 26)
- # Accessed after the lcat is accessed over the whole range
- lfilter = self._createLFilter(lambda x: x.islower(), list(letters))
- lfilter[:]
- self.assertEqual(len(lfilter), 26)
-class TestLazyMop(TestLazyCat):
- def _createLSeq(self, *seq):
- return self._createLMop(lambda x: x, *seq)
- def _createLMop(self, mapfunc, *seq):
- from Products.ZCatalog.Lazy import LazyMop
- totalseq = []
- for s in seq:
- totalseq.extend(s)
- return LazyMop(mapfunc, totalseq)
- def testMop(self):
- from string import hexdigits, letters
- seq1 = range(10)
- seq2 = list(hexdigits)
- seq3 = list(letters)
- def filter(x):
- if isinstance(x, int):
- raise ValueError
- return x.lower()
- lmop = self._createLMop(filter, seq1, seq2, seq3)
- self._compare(lmop, [str(x).lower() for x in (seq2 + seq3)])
- def testConsistentLengthWithMop(self):
- from string import letters
- seq = range(10) + list(letters)
- def filter(x):
- if isinstance(x, int):
- raise ValueError
- return x.lower()
- # Unaccessed length
- lmop = self._createLMop(filter, seq)
- self.assertEqual(len(lmop), 52)
- # Accessed in the middle
- lmop = self._createLMop(filter, seq)
- lmop[26]
- self.assertEqual(len(lmop), 52)
- # Accessed after the lcat is accessed over the whole range
- lmop = self._createLMop(filter, letters)
- lmop[:]
- self.assertEqual(len(lmop), 52)
-class TestLazyValues(BaseSequenceTest):
- def _createLValues(self, seq):
- from Products.ZCatalog.Lazy import LazyValues
- return LazyValues(seq)
- def testEmpty(self):
- lvals = self._createLValues([])
- self._compare(lvals, [])
- def testValues(self):
- from string import letters
- seq = zip(letters, range(10))
- lvals = self._createLValues(seq)
- self._compare(lvals, range(10))
- def testSlice(self):
- from string import letters
- seq = zip(letters, range(10))
- lvals = self._createLValues(seq)
- self._compare(lvals[2:-2], range(2, 8))
-def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(TestLazyCat))
- suite.addTest(unittest.makeSuite(TestLazyMap))
- suite.addTest(unittest.makeSuite(TestLazyFilter))
- suite.addTest(unittest.makeSuite(TestLazyMop))
- suite.addTest(unittest.makeSuite(TestLazyValues))
- return suite
Copied: Zope/trunk/src/Products/ZCatalog/tests/test_brains.py (from rev 115275, Zope/trunk/src/Products/ZCatalog/tests/testBrains.py)
--- Zope/trunk/src/Products/ZCatalog/tests/test_brains.py (rev 0)
+++ Zope/trunk/src/Products/ZCatalog/tests/test_brains.py 2010-07-31 20:45:52 UTC (rev 115282)
@@ -0,0 +1,190 @@
+# Copyright (c) 2002 Zope Foundation and Contributors.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+"""Unittests for Catalog brains
+import unittest
+import Acquisition
+from zExceptions import Unauthorized
+from ZODB.POSException import ConflictError
+class Happy(Acquisition.Implicit):
+ """Happy content"""
+ def __init__(self, id):
+ self.id = id
+ def check(self):
+ pass
+class Secret(Happy):
+ """Object that raises Unauthorized when accessed"""
+ def check(self):
+ raise Unauthorized
+class Conflicter(Happy):
+ """Object that raises ConflictError when accessed"""
+ def check(self):
+ raise ConflictError
+class DummyRequest(Acquisition.Implicit):
+ def physicalPathToURL(self, path, relative=False):
+ if not relative:
+ path = 'http://superbad.com' + path
+ return path
+_marker = object()
+class DummyCatalog(Acquisition.Implicit):
+ _objs = {'/happy':Happy('happy'),
+ '/secret':Secret('secret'),
+ '/conflicter':Conflicter('conflicter')}
+ _paths = _objs.keys() + ['/zonked']
+ _paths.sort()
+ # This is sooooo ugly
+ def unrestrictedTraverse(self, path, default=None):
+ # for these tests...
+ assert path == '' or path == ('') or path == [''], path
+ return self
+ def restrictedTraverse(self, path, default=_marker):
+ if not path.startswith('/'):
+ path = '/'+path
+ try:
+ ob = self._objs[path].__of__(self)
+ ob.check()
+ return ob
+ except KeyError:
+ if default is not _marker:
+ return default
+ raise
+ def getpath(self, rid):
+ return self._paths[rid]
+ def getobject(self, rid):
+ return self.restrictedTraverse(self._paths[rid])
+ def resolve_url(self, path, REQUEST):
+ path = path[path.find('/', path.find('//')+1):] # strip server part
+ return self.restrictedTraverse(path)
+class ConflictingCatalog(DummyCatalog):
+ def getpath(self, rid):
+ raise ConflictError
+class BrainsTestBase:
+ _old_flag = None
+ def setUp(self):
+ self.cat = DummyCatalog()
+ self.cat.REQUEST = DummyRequest()
+ self._init_getOb_flag()
+ def tearDown(self):
+ if self._old_flag is not None:
+ self._restore_getOb_flag()
+ def _init_getOb_flag(self):
+ from Products.ZCatalog import CatalogBrains
+ self._old_flag = CatalogBrains.GETOBJECT_RAISES
+ CatalogBrains.GETOBJECT_RAISES = self._flag_value()
+ def _restore_getOb_flag(self):
+ from Products.ZCatalog import CatalogBrains
+ CatalogBrains.GETOBJECT_RAISES = self._old_flag
+ def _makeBrain(self, rid):
+ from Products.ZCatalog.CatalogBrains import AbstractCatalogBrain
+ class Brain(AbstractCatalogBrain):
+ __record_schema__ = {'test_field': 0, 'data_record_id_':1}
+ return Brain(('test', rid)).__of__(self.cat)
+ def testHasKey(self):
+ b = self._makeBrain(1)
+ self.failUnless(b.has_key('test_field'))
+ self.failUnless(b.has_key('data_record_id_'))
+ self.failIf(b.has_key('godel'))
+ def testGetPath(self):
+ b = [self._makeBrain(rid) for rid in range(3)]
+ self.assertEqual(b[0].getPath(), '/conflicter')
+ self.assertEqual(b[1].getPath(), '/happy')
+ self.assertEqual(b[2].getPath(), '/secret')
+ def testGetPathPropagatesConflictErrors(self):
+ self.cat = ConflictingCatalog()
+ b = self._makeBrain(0)
+ self.assertRaises(ConflictError, b.getPath)
+ def testGetURL(self):
+ b = self._makeBrain(0)
+ self.assertEqual(b.getURL(), 'http://superbad.com/conflicter')
+ def testGetRID(self):
+ b = self._makeBrain(42)
+ self.assertEqual(b.getRID(), 42)
+ def testGetObjectHappy(self):
+ b = self._makeBrain(1)
+ self.assertEqual(b.getPath(), '/happy')
+ self.failUnless(b.getObject().aq_base is self.cat.getobject(1).aq_base)
+ def testGetObjectPropagatesConflictErrors(self):
+ b = self._makeBrain(0)
+ self.assertEqual(b.getPath(), '/conflicter')
+ self.assertRaises(ConflictError, b.getObject)
+class TestBrains(BrainsTestBase, unittest.TestCase):
+ def _flag_value(self):
+ return True
+ def testGetObjectRaisesUnauthorized(self):
+ from zExceptions import Unauthorized
+ b = self._makeBrain(2)
+ self.assertEqual(b.getPath(), '/secret')
+ self.assertRaises(Unauthorized, b.getObject)
+ def testGetObjectRaisesNotFoundForMissing(self):
+ from zExceptions import NotFound
+ b = self._makeBrain(3)
+ self.assertEqual(b.getPath(), '/zonked')
+ self.assertRaises(KeyError, self.cat.getobject, 3)
+ self.assertRaises((NotFound, AttributeError, KeyError), b.getObject)
+class TestBrainsOldBehavior(BrainsTestBase, unittest.TestCase):
+ def _flag_value(self):
+ return False
+ def testGetObjectReturnsNoneForUnauthorized(self):
+ b = self._makeBrain(2)
+ self.assertEqual(b.getPath(), '/secret')
+ self.assertEqual(b.getObject(), None)
+ def testGetObjectReturnsNoneForMissing(self):
+ b = self._makeBrain(3)
+ self.assertEqual(b.getPath(), '/zonked')
+ self.assertRaises(KeyError, self.cat.getobject, 3)
+ self.assertEqual(b.getObject(), None)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestBrains))
+ suite.addTest(unittest.makeSuite(TestBrainsOldBehavior))
+ return suite
Copied: Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py (from rev 115275, Zope/trunk/src/Products/ZCatalog/tests/testCatalog.py)
--- Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py (rev 0)
+++ Zope/trunk/src/Products/ZCatalog/tests/test_catalog.py 2010-07-31 20:45:52 UTC (rev 115282)
@@ -0,0 +1,875 @@
+# Copyright (c) 2002 Zope Foundation and Contributors.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+""" Unittests for Catalog.
+import unittest
+from Testing.ZopeTestCase.warnhook import WarningsHook
+import Zope2
+from itertools import chain
+import random
+import ExtensionClass
+import OFS.Application
+from AccessControl.SecurityManagement import setSecurityManager
+from AccessControl.SecurityManagement import noSecurityManager
+from AccessControl import Unauthorized
+from Acquisition import Implicit
+from Products.ZCatalog.Catalog import Catalog
+from Products.ZCatalog.Catalog import CatalogError
+from ZODB.DB import DB
+from ZODB.DemoStorage import DemoStorage
+import transaction
+from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex
+from Products.PluginIndexes.KeywordIndex.KeywordIndex import KeywordIndex
+from Products.ZCTextIndex.OkapiIndex import OkapiIndex
+from Products.ZCTextIndex.ZCTextIndex import PLexicon
+from Products.ZCTextIndex.ZCTextIndex import ZCTextIndex
+def createDatabase():
+ # XXX We have to import and init products in order for PluginIndexes to
+ # be registered.
+ OFS.Application.import_products()
+ # Create a DemoStorage and put an Application in it
+ db = DB(DemoStorage())
+ conn = db.open()
+ root = conn.root()
+ app = OFS.Application.Application()
+ root['Application'] = app
+ transaction.commit()
+ # Init products
+ #OFS.Application.initialize(app)
+ OFS.Application.install_products(app) # XXX: this is still icky
+ return app
+app = createDatabase()
+def sort(iterable):
+ L = list(iterable)
+ L.sort()
+ return L
+from OFS.Folder import Folder as OFS_Folder
+class Folder(OFS_Folder):
+ def __init__(self, id):
+ self._setId(id)
+ OFS_Folder.__init__(self)
+class CatalogBase:
+ def setUp(self):
+ self._catalog = Catalog()
+ def tearDown(self):
+ self._catalog = None
+class TestAddDelColumn(CatalogBase,unittest.TestCase):
+ def testAdd(self):
+ self._catalog.addColumn('id')
+ self.assertEqual(self._catalog.schema.has_key('id'), 1,
+ 'add column failed')
+ def testAddBad(self):
+ self.assertRaises(CatalogError, self._catalog.addColumn, '_id')
+ def testDel(self):
+ self._catalog.addColumn('id')
+ self._catalog.delColumn('id')
+ self.assert_(self._catalog.schema.has_key('id') != 1,
+ 'del column failed')
+class TestAddDelIndexes(CatalogBase, unittest.TestCase):
+ def testAddFieldIndex(self):
+ idx = FieldIndex('id')
+ self._catalog.addIndex('id', idx)
+ self.assert_(isinstance(self._catalog.indexes['id'],
+ type(FieldIndex('id'))),
+ 'add field index failed')
+ def testAddTextIndex(self):
+ self._catalog.lexicon = PLexicon('lexicon')
+ idx = ZCTextIndex('id', caller=self._catalog,
+ index_factory=OkapiIndex, lexicon_id='lexicon')
+ self._catalog.addIndex('id', idx)
+ i = self._catalog.indexes['id']
+ self.assert_(isinstance(i, ZCTextIndex), 'add text index failed')
+ def testAddKeywordIndex(self):
+ idx = KeywordIndex('id')
+ self._catalog.addIndex('id', idx)
+ i = self._catalog.indexes['id']
+ self.assert_(isinstance(i, type(KeywordIndex('id'))),
+ 'add kw index failed')
+ def testDelFieldIndex(self):
+ idx = FieldIndex('id')
+ self._catalog.addIndex('id', idx)
+ self._catalog.delIndex('id')
+ self.assert_(self._catalog.indexes.has_key('id') != 1,
+ 'del index failed')
+ def testDelTextIndex(self):
+ self._catalog.lexicon = PLexicon('lexicon')
+ idx = ZCTextIndex('id', caller=self._catalog,
+ index_factory=OkapiIndex, lexicon_id='lexicon')
+ self._catalog.addIndex('id', idx)
+ self._catalog.delIndex('id')
+ self.assert_(self._catalog.indexes.has_key('id') != 1,
+ 'del index failed')
+ def testDelKeywordIndex(self):
+ idx = KeywordIndex('id')
+ self._catalog.addIndex('id', idx)
+ self._catalog.delIndex('id')
+ self.assert_(self._catalog.indexes.has_key('id') != 1,
+ 'del index failed')
+# Removed unittests dealing with catalog instantiation and vocabularies
+# since catalog no longer creates/manages vocabularies automatically (Casey)
+# Test of the ZCatalog object, as opposed to Catalog
+class zdummy(ExtensionClass.Base):
+ def __init__(self, num):
+ self.num = num
+ def title(self):
+ return '%d' % self.num
+class zdummyFalse(zdummy):
+ def __nonzero__(self):
+ return False
+# make objects with failing __len__ and __nonzero__
+class dummyLenFail(zdummy):
+ def __init__(self, num, fail):
+ zdummy.__init__(self, num)
+ self.fail = fail
+ def __len__(self):
+ self.fail("__len__() was called")
+class dummyNonzeroFail(zdummy):
+ def __init__(self, num, fail):
+ zdummy.__init__(self, num)
+ self.fail = fail
+ def __nonzero__(self):
+ self.fail("__nonzero__() was called")
+class FakeTraversalError(KeyError):
+ """fake traversal exception for testing"""
+class fakeparent(Implicit):
+ # fake parent mapping unrestrictedTraverse to
+ # catalog.resolve_path as simulated by TestZCatalog
+ def __init__(self, d):
+ self.d = d
+ marker = object()
+ def unrestrictedTraverse(self, path, default=marker):
+ result = self.d.get(path, default)
+ if result is self.marker:
+ raise FakeTraversalError(path)
+ return result
+class TestZCatalog(unittest.TestCase):
+ def setUp(self):
+ from Products.ZCatalog.ZCatalog import ZCatalog
+ self._catalog = ZCatalog('Catalog')
+ self._catalog.resolve_path = self._resolve_num
+ self._catalog.addIndex('title', 'KeywordIndex')
+ self._catalog.addColumn('title')
+ self.upper = 10
+ self.d = {}
+ for x in range(0, self.upper):
+ # make uid a string of the number
+ ob = zdummy(x)
+ self.d[str(x)] = ob
+ self._catalog.catalog_object(ob, str(x))
+ def _resolve_num(self, num):
+ return self.d[num]
+ def test_z3interfaces(self):
+ from Products.ZCatalog.interfaces import IZCatalog
+ from Products.ZCatalog.ZCatalog import ZCatalog
+ from zope.interface.verify import verifyClass
+ verifyClass(IZCatalog, ZCatalog)
+ def testGetMetadataForUID(self):
+ testNum = str(self.upper - 3) # as good as any..
+ data = self._catalog.getMetadataForUID(testNum)
+ self.assertEqual(data['title'], testNum)
+ def testGetIndexDataForUID(self):
+ testNum = str(self.upper - 3)
+ data = self._catalog.getIndexDataForUID(testNum)
+ self.assertEqual(data['title'][0], testNum)
+ def testSearch(self):
+ query = {'title': ['5','6','7']}
+ sr = self._catalog.searchResults(query)
+ self.assertEqual(len(sr), 3)
+ sr = self._catalog.search(query)
+ self.assertEqual(len(sr), 3)
+ def testUpdateMetadata(self):
+ self._catalog.catalog_object(zdummy(1), '1')
+ data = self._catalog.getMetadataForUID('1')
+ self.assertEqual(data['title'], '1')
+ self._catalog.catalog_object(zdummy(2), '1', update_metadata=0)
+ data = self._catalog.getMetadataForUID('1')
+ self.assertEqual(data['title'], '1')
+ self._catalog.catalog_object(zdummy(2), '1', update_metadata=1)
+ data = self._catalog.getMetadataForUID('1')
+ self.assertEqual(data['title'], '2')
+ # update_metadata defaults to true, test that here
+ self._catalog.catalog_object(zdummy(1), '1')
+ data = self._catalog.getMetadataForUID('1')
+ self.assertEqual(data['title'], '1')
+ def testReindexIndexDoesntDoMetadata(self):
+ self.d['0'].num = 9999
+ self._catalog.reindexIndex('title', {})
+ data = self._catalog.getMetadataForUID('0')
+ self.assertEqual(data['title'], '0')
+ def testReindexIndexesFalse(self):
+ # setup
+ false_id = self.upper + 1
+ ob = zdummyFalse(false_id)
+ self.d[str(false_id)] = ob
+ self._catalog.catalog_object(ob, str(false_id))
+ # test, object evaluates to false; there was bug which caused the
+ # object to be removed from index
+ ob.num = 9999
+ self._catalog.reindexIndex('title', {})
+ result = self._catalog(title='9999')
+ self.assertEquals(1, len(result))
+ def testBooleanEvalOn_manage_catalogObject(self):
+ self.d['11'] = dummyLenFail(11, self.fail)
+ self.d['12'] = dummyNonzeroFail(12, self.fail)
+ # create a fake response that doesn't bomb on manage_catalogObject()
+ class myresponse:
+ def redirect(self, url):
+ pass
+ # this next call should not fail
+ self._catalog.manage_catalogObject(None, myresponse(), 'URL1', urls=('11', '12'))
+ def testBooleanEvalOn_refreshCatalog_getobject(self):
+ # wrap catalog under the fake parent providing unrestrictedTraverse()
+ catalog = self._catalog.__of__(fakeparent(self.d))
+ # replace entries to test refreshCatalog
+ self.d['0'] = dummyLenFail(0, self.fail)
+ self.d['1'] = dummyNonzeroFail(1, self.fail)
+ # this next call should not fail
+ catalog.refreshCatalog()
+ for uid in ('0', '1'):
+ rid = catalog.getrid(uid)
+ # neither should these
+ catalog.getobject(rid)
+ def test_getobject_doesntMaskTraversalErrorsAndDoesntDelegateTo_resolve_url(self):
+ # wrap catalog under the fake parent providing unrestrictedTraverse()
+ catalog = self._catalog.__of__(fakeparent(self.d))
+ # make resolve_url fail if ZCatalog falls back on it
+ def resolve_url(path, REQUEST):
+ self.fail(".resolve_url() should not be called by .getobject()")
+ catalog.resolve_url = resolve_url
+ # traversal should work at first
+ rid0 = catalog.getrid('0')
+ # lets set it up so the traversal fails
+ del self.d['0']
+ self.assertRaises(FakeTraversalError, catalog.getobject, rid0, REQUEST=object())
+ # and if there is a None at the traversal point, that's where it should return
+ self.d['0'] = None
+ self.assertEquals(catalog.getobject(rid0), None)
+class dummy(ExtensionClass.Base):
+ att1 = 'att1'
+ att2 = 'att2'
+ att3 = ['att3']
+ def __init__(self, num):
+ self.num = num
+ def col1(self):
+ return 'col1'
+ def col2(self):
+ return 'col2'
+ def col3(self):
+ return ['col3']
+class TestCatalogObject(unittest.TestCase):
+ upper = 1000
+ nums = range(upper)
+ for i in range(upper):
+ j = random.randrange(0, upper)
+ tmp = nums[i]
+ nums[i] = nums[j]
+ nums[j] = tmp
+ def setUp(self):
+ self.warningshook = WarningsHook()
+ self.warningshook.install()
+ self._catalog = Catalog()
+ self._catalog.lexicon = PLexicon('lexicon')
+ col1 = FieldIndex('col1')
+ col2 = ZCTextIndex('col2', caller=self._catalog,
+ index_factory=OkapiIndex, lexicon_id='lexicon')
+ col3 = KeywordIndex('col3')
+ self._catalog.addIndex('col1', col1)
+ self._catalog.addIndex('col2', col2)
+ self._catalog.addIndex('col3', col3)
+ self._catalog.addColumn('col1')
+ self._catalog.addColumn('col2')
+ self._catalog.addColumn('col3')
+ att1 = FieldIndex('att1')
+ att2 = ZCTextIndex('att2', caller=self._catalog,
+ index_factory=OkapiIndex, lexicon_id='lexicon')
+ att3 = KeywordIndex('att3')
+ num = FieldIndex('num')
+ self._catalog.addIndex('att1', att1)
+ self._catalog.addIndex('att2', att2)
+ self._catalog.addIndex('att3', att3)
+ self._catalog.addIndex('num', num)
+ self._catalog.addColumn('att1')
+ self._catalog.addColumn('att2')
+ self._catalog.addColumn('att3')
+ self._catalog.addColumn('num')
+ for x in range(0, self.upper):
+ self._catalog.catalogObject(dummy(self.nums[x]), `x`)
+ self._catalog.aq_parent = dummy('foo') # fake out acquisition
+ def tearDown(self):
+ self._catalog = None
+ self.warningshook.uninstall()
+ def testResultLength(self):
+ a = self._catalog()
+ self.assertEqual(len(a), self.upper,
+ 'length should be %s, its %s' % (self.upper, len(a)))
+ def testEmptyMappingReturnsAll(self):
+ upper = self.upper
+ a = self._catalog({})
+ self.assertEqual(len(a), upper,
+ 'length should be %s, its %s' % (upper, len(a)))
+ # Queries used to do the same, because of a bug in the
+ # parseIndexRequest function, mistaking a CatalogSearchArgumentsMap
+ # for a Record class
+ a = self._catalog({'col1':'', 'col2':'', 'col3':''})
+ self.assertEqual(len(a), 0,
+ 'length should be %s, its %s' % (upper, len(a)))
+ def testFieldIndexLength(self):
+ a = self._catalog(att1='att1')
+ self.assertEqual(len(a), self.upper,
+ 'should be %s, but is %s' % (self.upper, len(a)))
+ def testTextIndexLength(self):
+ a = self._catalog(att2='att2')
+ self.assertEqual(len(a), self.upper,
+ 'should be %s, but is %s' % (self.upper, len(a)))
+ def testKeywordIndexLength(self):
+ a = self._catalog(att3='att3')
+ self.assertEqual(len(a), self.upper,
+ 'should be %s, but is %s' % (self.upper, len(a)))
+ def testUncatalogFieldIndex(self):
+ self.uncatalog()
+ a = self._catalog(att1='att1')
+ self.assertEqual(len(a), 0, 'len: %s' % len(a))
+ def testUncatalogTextIndex(self):
+ self.uncatalog()
+ a = self._catalog(att2='att2')
+ self.assertEqual(len(a), 0, 'len: %s' % len(a))
+ def testUncatalogKeywordIndex(self):
+ self.uncatalog()
+ a = self._catalog(att3='att3')
+ self.assertEqual(len(a), 0, 'len: %s' % len(a))
+ def testBadUncatalog(self):
+ try:
+ self._catalog.uncatalogObject('asdasdasd')
+ except:
+ self.fail('uncatalogObject raised exception on bad uid')
+ def testUniqueValuesForLength(self):
+ a = self._catalog.uniqueValuesFor('att1')
+ self.assertEqual(len(a), 1, 'bad number of unique values %s' % a)
+ def testUniqueValuesForContent(self):
+ a = self._catalog.uniqueValuesFor('att1')
+ self.assertEqual(a[0], 'att1', 'bad content %s' % a[0])
+ def testUncatalogTwice(self):
+ self._catalog.uncatalogObject(`0`)
+ self.assertRaises(Exception, '_second')
+ def testCatalogLength(self):
+ for x in range(0, self.upper):
+ self._catalog.uncatalogObject(`x`)
+ self.assertEqual(len(self._catalog), 0)
+ def _second(self):
+ self._catalog.uncatalogObject(`0`)
+ def uncatalog(self):
+ for x in range(0, self.upper):
+ self._catalog.uncatalogObject(`x`)
+ def testGoodSortIndex(self):
+ upper = self.upper
+ a = self._catalog(sort_on='num')
+ self.assertEqual(len(a), upper,
+ 'length should be %s, its %s' % (upper, len(a)))
+ for x in range(self.upper):
+ self.assertEqual(a[x].num, x)
+ def testBadSortIndex(self):
+ self.assertRaises(CatalogError, self.badsortindex)
+ def badsortindex(self):
+ a = self._catalog(sort_on='foofaraw')
+ def testWrongKindOfIndexForSort(self):
+ self.assertRaises(CatalogError, self.wrongsortindex)
+ def wrongsortindex(self):
+ a = self._catalog(sort_on='att2')
+ def testTextIndexQWithSortOn(self):
+ upper = self.upper
+ a = self._catalog(sort_on='num', att2='att2')
+ self.assertEqual(len(a), upper,
+ 'length should be %s, its %s' % (upper, len(a)))
+ for x in range(self.upper):
+ self.assertEqual(a[x].num, x)
+ def testTextIndexQWithoutSortOn(self):
+ upper = self.upper
+ a = self._catalog(att2='att2')
+ self.assertEqual(len(a), upper,
+ 'length should be %s, its %s' % (upper, len(a)))
+# XXX: don't know how to adjust this test for ZCTextIndex
+# for x in range(self.upper):
+# self.assertEqual(a[x].data_record_score_, 1)
+ def testKeywordIndexWithMinRange(self):
+ a = self._catalog(att3={'query': 'att', 'range': 'min'})
+ self.assertEqual(len(a), self.upper)
+ def testKeywordIndexWithMaxRange(self):
+ a = self._catalog(att3={'query': 'att35', 'range': ':max'})
+ self.assertEqual(len(a), self.upper)
+ def testKeywordIndexWithMinMaxRangeCorrectSyntax(self):
+ a = self._catalog(att3={'query': ['att', 'att35'], 'range': 'min:max'})
+ self.assertEqual(len(a), self.upper)
+ def testKeywordIndexWithMinMaxRangeWrongSyntax(self):
+ # checkKeywordIndex with min/max range wrong syntax.
+ a = self._catalog(att3={'query': ['att'], 'range': 'min:max'})
+ self.assert_(len(a) != self.upper)
+ def testCombinedTextandKeywordQuery(self):
+ a = self._catalog(att3='att3', att2='att2')
+ self.assertEqual(len(a), self.upper)
+ def testLargeSortedResultSetWithSmallIndex(self):
+ # This exercises the optimization in the catalog that iterates
+ # over the sort index rather than the result set when the result
+ # set is much larger than the sort index.
+ a = self._catalog(sort_on='att1')
+ self.assertEqual(len(a), self.upper)
+ def testBadSortLimits(self):
+ self.assertRaises(
+ AssertionError, self._catalog, sort_on='num', sort_limit=0)
+ self.assertRaises(
+ AssertionError, self._catalog, sort_on='num', sort_limit=-10)
+ def testSortLimit(self):
+ full = self._catalog(sort_on='num')
+ a = self._catalog(sort_on='num', sort_limit=10)
+ self.assertEqual([r.num for r in a], [r.num for r in full[:10]])
+ self.assertEqual(a.actual_result_count, self.upper)
+ a = self._catalog(sort_on='num', sort_limit=10, sort_order='reverse')
+ rev = [r.num for r in full[-10:]]
+ rev.reverse()
+ self.assertEqual([r.num for r in a], rev)
+ self.assertEqual(a.actual_result_count, self.upper)
+ def testBigSortLimit(self):
+ a = self._catalog(sort_on='num', sort_limit=self.upper*3)
+ self.assertEqual(a.actual_result_count, self.upper)
+ self.assertEqual(a[0].num, 0)
+ a = self._catalog(
+ sort_on='num', sort_limit=self.upper*3, sort_order='reverse')
+ self.assertEqual(a.actual_result_count, self.upper)
+ self.assertEqual(a[0].num, self.upper - 1)
+ def testUpdateMetadataFalse(self):
+ ob = dummy(9999)
+ self._catalog.catalogObject(ob, `9999`)
+ brain = self._catalog(num=9999)[0]
+ self.assertEqual(brain.att1, 'att1')
+ ob.att1 = 'foobar'
+ self._catalog.catalogObject(ob, `9999`, update_metadata=0)
+ brain = self._catalog(num=9999)[0]
+ self.assertEqual(brain.att1, 'att1')
+ self._catalog.catalogObject(ob, `9999`)
+ brain = self._catalog(num=9999)[0]
+ self.assertEqual(brain.att1, 'foobar')
+class objRS(ExtensionClass.Base):
+ def __init__(self,num):
+ self.number = num
+class TestRS(unittest.TestCase):
+ def setUp(self):
+ self._catalog = Catalog()
+ index = FieldIndex('number')
+ self._catalog.addIndex('number', index)
+ self._catalog.addColumn('number')
+ for i in range(5000):
+ obj = objRS(random.randrange(0,20000))
+ self._catalog.catalogObject(obj,i)
+ self._catalog.aq_parent = objRS(200)
+ def testRangeSearch(self):
+ for i in range(1000):
+ m = random.randrange(0,20000)
+ n = m + 1000
+ for r in self._catalog.searchResults(
+ number= {'query': (m,n) , 'range' : 'min:max' } ):
+ size = r.number
+ self.assert_(m<=size and size<=n,
+ "%d vs [%d,%d]" % (r.number,m,n))
+class TestMerge(unittest.TestCase):
+ # Test merging results from multiple catalogs
+ def setUp(self):
+ self.catalogs = []
+ for i in range(3):
+ cat = Catalog()
+ cat.lexicon = PLexicon('lexicon')
+ cat.addIndex('num', FieldIndex('num'))
+ cat.addIndex('big', FieldIndex('big'))
+ cat.addIndex('number', FieldIndex('number'))
+ i = ZCTextIndex('title', caller=cat, index_factory=OkapiIndex,
+ lexicon_id='lexicon')
+ cat.addIndex('title', i)
+ cat.aq_parent = zdummy(16336)
+ for i in range(10):
+ obj = zdummy(i)
+ obj.big = i > 5
+ obj.number = True
+ cat.catalogObject(obj, str(i))
+ self.catalogs.append(cat)
+ def testNoFilterOrSort(self):
+ from Products.ZCatalog.Catalog import mergeResults
+ results = [cat.searchResults(
+ dict(number=True), _merge=0) for cat in self.catalogs]
+ merged_rids = [r.getRID() for r in mergeResults(
+ results, has_sort_keys=False, reverse=False)]
+ expected = [r.getRID() for r in chain(*results)]
+ self.assertEqual(sort(merged_rids), sort(expected))
+ def testSortedOnly(self):
+ from Products.ZCatalog.Catalog import mergeResults
+ results = [cat.searchResults(
+ dict(number=True, sort_on='num'), _merge=0)
+ for cat in self.catalogs]
+ merged_rids = [r.getRID() for r in mergeResults(
+ results, has_sort_keys=True, reverse=False)]
+ expected = sort(chain(*results))
+ expected = [rid for sortkey, rid, getitem in expected]
+ self.assertEqual(merged_rids, expected)
+ def testSortReverse(self):
+ from Products.ZCatalog.Catalog import mergeResults
+ results = [cat.searchResults(
+ dict(number=True, sort_on='num'), _merge=0)
+ for cat in self.catalogs]
+ merged_rids = [r.getRID() for r in mergeResults(
+ results, has_sort_keys=True, reverse=True)]
+ expected = sort(chain(*results))
+ expected.reverse()
+ expected = [rid for sortkey, rid, getitem in expected]
+ self.assertEqual(merged_rids, expected)
+ def testLimitSort(self):
+ from Products.ZCatalog.Catalog import mergeResults
+ results = [cat.searchResults(
+ dict(number=True, sort_on='num'), sort_limit=2, _merge=0)
+ for cat in self.catalogs]
+ merged_rids = [r.getRID() for r in mergeResults(
+ results, has_sort_keys=True, reverse=False)]
+ expected = sort(chain(*results))
+ expected = [rid for sortkey, rid, getitem in expected]
+ self.assertEqual(merged_rids, expected)
+ def testScored(self):
+ from Products.ZCatalog.Catalog import mergeResults
+ results = [cat.searchResults(title='4 or 5 or 6', _merge=0)
+ for cat in self.catalogs]
+ merged_rids = [r.getRID() for r in mergeResults(
+ results, has_sort_keys=True, reverse=False)]
+ expected = sort(chain(*results))
+ expected = [rid for sortkey, (nscore, score, rid), getitem in expected]
+ self.assertEqual(merged_rids, expected)
+ def testSmallIndexSort(self):
+ # Test that small index sort optimization is not used for merging
+ from Products.ZCatalog.Catalog import mergeResults
+ results = [cat.searchResults(
+ dict(number=True, sort_on='big'), _merge=0)
+ for cat in self.catalogs]
+ merged_rids = [r.getRID() for r in mergeResults(
+ results, has_sort_keys=True, reverse=False)]
+ expected = sort(chain(*results))
+ expected = [rid for sortkey, rid, getitem in expected]
+ self.assertEqual(merged_rids, expected)
+class PickySecurityManager:
+ def __init__(self, badnames=[]):
+ self.badnames = badnames
+ def validateValue(self, value):
+ return 1
+ def validate(self, accessed, container, name, value):
+ if name not in self.badnames:
+ return 1
+ raise Unauthorized(name)
+class TestZCatalogGetObject(unittest.TestCase):
+ # Test what objects are returned by brain.getObject()
+ _old_flag = None
+ def setUp(self):
+ from Products.ZCatalog.ZCatalog import ZCatalog
+ catalog = ZCatalog('catalog')
+ catalog.addIndex('id', 'FieldIndex')
+ root = Folder('')
+ root.getPhysicalRoot = lambda: root
+ self.root = root
+ self.root.catalog = catalog
+ def tearDown(self):
+ noSecurityManager()
+ if self._old_flag is not None:
+ self._restore_getObject_flag()
+ def _init_getObject_flag(self, flag):
+ from Products.ZCatalog import CatalogBrains
+ self._old_flag = CatalogBrains.GETOBJECT_RAISES
+ CatalogBrains.GETOBJECT_RAISES = flag
+ def _restore_getObject_flag(self):
+ from Products.ZCatalog import CatalogBrains
+ CatalogBrains.GETOBJECT_RAISES = self._old_flag
+ def test_getObject_found(self):
+ # Check normal traversal
+ root = self.root
+ catalog = root.catalog
+ root.ob = Folder('ob')
+ catalog.catalog_object(root.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ self.assertEqual(brain.getPath(), '/ob')
+ self.assertEqual(brain.getObject().getId(), 'ob')
+ def test_getObject_missing_raises_NotFound(self):
+ # Check that if the object is missing we raise
+ from zExceptions import NotFound
+ self._init_getObject_flag(True)
+ root = self.root
+ catalog = root.catalog
+ root.ob = Folder('ob')
+ catalog.catalog_object(root.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ del root.ob
+ self.assertRaises((NotFound, AttributeError, KeyError), brain.getObject)
+ def test_getObject_restricted_raises_Unauthorized(self):
+ # Check that if the object's security does not allow traversal,
+ # None is returned
+ self._init_getObject_flag(True)
+ root = self.root
+ catalog = root.catalog
+ root.fold = Folder('fold')
+ root.fold.ob = Folder('ob')
+ catalog.catalog_object(root.fold.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ # allow all accesses
+ pickySecurityManager = PickySecurityManager()
+ setSecurityManager(pickySecurityManager)
+ self.assertEqual(brain.getObject().getId(), 'ob')
+ # disallow just 'ob' access
+ pickySecurityManager = PickySecurityManager(['ob'])
+ setSecurityManager(pickySecurityManager)
+ self.assertRaises(Unauthorized, brain.getObject)
+ # disallow just 'fold' access
+ pickySecurityManager = PickySecurityManager(['fold'])
+ setSecurityManager(pickySecurityManager)
+ ob = brain.getObject()
+ self.failIf(ob is None)
+ self.assertEqual(ob.getId(), 'ob')
+ def test_getObject_missing_returns_None(self):
+ # Check that if the object is missing None is returned
+ self._init_getObject_flag(False)
+ root = self.root
+ catalog = root.catalog
+ root.ob = Folder('ob')
+ catalog.catalog_object(root.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ del root.ob
+ self.assertEqual(brain.getObject(), None)
+ def test_getObject_restricted_returns_None(self):
+ # Check that if the object's security does not allow traversal,
+ # None is returned
+ self._init_getObject_flag(False)
+ root = self.root
+ catalog = root.catalog
+ root.fold = Folder('fold')
+ root.fold.ob = Folder('ob')
+ catalog.catalog_object(root.fold.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ # allow all accesses
+ pickySecurityManager = PickySecurityManager()
+ setSecurityManager(pickySecurityManager)
+ self.assertEqual(brain.getObject().getId(), 'ob')
+ # disallow just 'ob' access
+ pickySecurityManager = PickySecurityManager(['ob'])
+ setSecurityManager(pickySecurityManager)
+ self.assertEqual(brain.getObject(), None)
+ # disallow just 'fold' access
+ pickySecurityManager = PickySecurityManager(['fold'])
+ setSecurityManager(pickySecurityManager)
+ ob = brain.getObject()
+ self.failIf(ob is None)
+ self.assertEqual(ob.getId(), 'ob')
+ # Now test _unrestrictedGetObject
+ def test_unrestrictedGetObject_found(self):
+ # Check normal traversal
+ root = self.root
+ catalog = root.catalog
+ root.ob = Folder('ob')
+ catalog.catalog_object(root.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ self.assertEqual(brain.getPath(), '/ob')
+ self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
+ def test_unrestrictedGetObject_restricted(self):
+ # Check that if the object's security does not allow traversal,
+ # it's still is returned
+ root = self.root
+ catalog = root.catalog
+ root.fold = Folder('fold')
+ root.fold.ob = Folder('ob')
+ catalog.catalog_object(root.fold.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ # allow all accesses
+ pickySecurityManager = PickySecurityManager()
+ setSecurityManager(pickySecurityManager)
+ self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
+ # disallow just 'ob' access
+ pickySecurityManager = PickySecurityManager(['ob'])
+ setSecurityManager(pickySecurityManager)
+ self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
+ # disallow just 'fold' access
+ pickySecurityManager = PickySecurityManager(['fold'])
+ setSecurityManager(pickySecurityManager)
+ self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob')
+ def test_unrestrictedGetObject_missing_raises_NotFound(self):
+ # Check that if the object is missing we raise
+ from zExceptions import NotFound
+ self._init_getObject_flag(True)
+ root = self.root
+ catalog = root.catalog
+ root.ob = Folder('ob')
+ catalog.catalog_object(root.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ del root.ob
+ self.assertRaises((NotFound, AttributeError, KeyError),
+ brain._unrestrictedGetObject)
+ def test_unrestrictedGetObject_missing_returns_None(self):
+ # Check that if the object is missing None is returned
+ self._init_getObject_flag(False)
+ root = self.root
+ catalog = root.catalog
+ root.ob = Folder('ob')
+ catalog.catalog_object(root.ob)
+ brain = catalog.searchResults({'id': 'ob'})[0]
+ del root.ob
+ self.assertEqual(brain._unrestrictedGetObject(), None)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest( unittest.makeSuite( TestAddDelColumn ) )
+ suite.addTest( unittest.makeSuite( TestAddDelIndexes ) )
+ suite.addTest( unittest.makeSuite( TestZCatalog ) )
+ suite.addTest( unittest.makeSuite( TestCatalogObject ) )
+ suite.addTest( unittest.makeSuite( TestRS ) )
+ suite.addTest( unittest.makeSuite( TestMerge ) )
+ suite.addTest( unittest.makeSuite( TestZCatalogGetObject ) )
+ return suite
Copied: Zope/trunk/src/Products/ZCatalog/tests/test_lazy.py (from rev 115275, Zope/trunk/src/Products/ZCatalog/tests/testLazySequences.py)
--- Zope/trunk/src/Products/ZCatalog/tests/test_lazy.py (rev 0)
+++ Zope/trunk/src/Products/ZCatalog/tests/test_lazy.py 2010-07-31 20:45:52 UTC (rev 115282)
@@ -0,0 +1,215 @@
+# Copyright (c) 2002 Zope Foundation and Contributors.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+"""Unittests for Lazy sequence classes
+import unittest
+class BaseSequenceTest(unittest.TestCase):
+ def _compare(self, lseq, seq):
+ self.assertEqual(len(lseq), len(seq))
+ self.assertEqual(list(lseq), seq)
+class TestLazyCat(BaseSequenceTest):
+ def _createLSeq(self, *sequences):
+ from Products.ZCatalog.Lazy import LazyCat
+ return LazyCat(sequences)
+ def testEmpty(self):
+ lcat = self._createLSeq([])
+ self._compare(lcat, [])
+ def testSingleSequence(self):
+ seq = range(10)
+ lcat = self._createLSeq(seq)
+ self._compare(lcat, seq)
+ def testMultipleSequences(self):
+ from string import hexdigits, letters
+ seq1 = range(10)
+ seq2 = list(hexdigits)
+ seq3 = list(letters)
+ lcat = self._createLSeq(seq1, seq2, seq3)
+ self._compare(lcat, seq1 + seq2 + seq3)
+ def testNestedLazySequences(self):
+ from string import hexdigits, letters
+ seq1 = range(10)
+ seq2 = list(hexdigits)
+ seq3 = list(letters)
+ lcat = apply(self._createLSeq,
+ [self._createLSeq(seq) for seq in (seq1, seq2, seq3)])
+ self._compare(lcat, seq1 + seq2 + seq3)
+ def testSlicedSequences(self):
+ from string import hexdigits, letters
+ seq1 = range(10)
+ seq2 = list(hexdigits)
+ seq3 = list(letters)
+ lcat = apply(self._createLSeq,
+ [self._createLSeq(seq) for seq in (seq1, seq2, seq3)])
+ self._compare(lcat[5:-5], seq1[5:] + seq2 + seq3[:-5])
+ def testConsistentLength(self):
+ # Unaccessed length
+ lcat = self._createLSeq(range(10))
+ self.assertEqual(len(lcat), 10)
+ # Accessed in the middle
+ lcat = self._createLSeq(range(10))
+ lcat[4]
+ self.assertEqual(len(lcat), 10)
+ # Accessed after the lcat is accessed over the whole range
+ lcat = self._createLSeq(range(10))
+ lcat[:]
+ self.assertEqual(len(lcat), 10)
+class TestLazyMap(TestLazyCat):
+ def _createLSeq(self, *seq):
+ return self._createLMap(lambda x: x, *seq)
+ def _createLMap(self, mapfunc, *seq):
+ from Products.ZCatalog.Lazy import LazyMap
+ totalseq = []
+ for s in seq:
+ totalseq.extend(s)
+ return LazyMap(mapfunc, totalseq)
+ def testMap(self):
+ from string import hexdigits, letters
+ seq1 = range(10)
+ seq2 = list(hexdigits)
+ seq3 = list(letters)
+ filter = lambda x: str(x).lower()
+ lmap = self._createLMap(filter, seq1, seq2, seq3)
+ self._compare(lmap, [str(x).lower() for x in (seq1 + seq2 + seq3)])
+class TestLazyFilter(TestLazyCat):
+ def _createLSeq(self, *seq):
+ return self._createLFilter(lambda x: True, *seq)
+ def _createLFilter(self, filter, *seq):
+ from Products.ZCatalog.Lazy import LazyFilter
+ totalseq = []
+ for s in seq:
+ totalseq.extend(s)
+ return LazyFilter(filter, totalseq)
+ def testFilter(self):
+ from string import hexdigits, letters
+ seq1 = range(10)
+ seq2 = list(hexdigits)
+ seq3 = list(letters)
+ filter = lambda x: str(x).isalpha()
+ lmap = self._createLFilter(filter, seq1, seq2, seq3)
+ self._compare(lmap, seq2[10:] + seq3)
+ def testConsistentLengthWithFilter(self):
+ from string import letters
+ # Unaccessed length
+ lfilter = self._createLFilter(lambda x: x.islower(), list(letters))
+ self.assertEqual(len(lfilter), 26)
+ # Accessed in the middle
+ lfilter = self._createLFilter(lambda x: x.islower(), list(letters))
+ lfilter[13]
+ self.assertEqual(len(lfilter), 26)
+ # Accessed after the lcat is accessed over the whole range
+ lfilter = self._createLFilter(lambda x: x.islower(), list(letters))
+ lfilter[:]
+ self.assertEqual(len(lfilter), 26)
+class TestLazyMop(TestLazyCat):
+ def _createLSeq(self, *seq):
+ return self._createLMop(lambda x: x, *seq)
+ def _createLMop(self, mapfunc, *seq):
+ from Products.ZCatalog.Lazy import LazyMop
+ totalseq = []
+ for s in seq:
+ totalseq.extend(s)
+ return LazyMop(mapfunc, totalseq)
+ def testMop(self):
+ from string import hexdigits, letters
+ seq1 = range(10)
+ seq2 = list(hexdigits)
+ seq3 = list(letters)
+ def filter(x):
+ if isinstance(x, int):
+ raise ValueError
+ return x.lower()
+ lmop = self._createLMop(filter, seq1, seq2, seq3)
+ self._compare(lmop, [str(x).lower() for x in (seq2 + seq3)])
+ def testConsistentLengthWithMop(self):
+ from string import letters
+ seq = range(10) + list(letters)
+ def filter(x):
+ if isinstance(x, int):
+ raise ValueError
+ return x.lower()
+ # Unaccessed length
+ lmop = self._createLMop(filter, seq)
+ self.assertEqual(len(lmop), 52)
+ # Accessed in the middle
+ lmop = self._createLMop(filter, seq)
+ lmop[26]
+ self.assertEqual(len(lmop), 52)
+ # Accessed after the lcat is accessed over the whole range
+ lmop = self._createLMop(filter, letters)
+ lmop[:]
+ self.assertEqual(len(lmop), 52)
+class TestLazyValues(BaseSequenceTest):
+ def _createLValues(self, seq):
+ from Products.ZCatalog.Lazy import LazyValues
+ return LazyValues(seq)
+ def testEmpty(self):
+ lvals = self._createLValues([])
+ self._compare(lvals, [])
+ def testValues(self):
+ from string import letters
+ seq = zip(letters, range(10))
+ lvals = self._createLValues(seq)
+ self._compare(lvals, range(10))
+ def testSlice(self):
+ from string import letters
+ seq = zip(letters, range(10))
+ lvals = self._createLValues(seq)
+ self._compare(lvals[2:-2], range(2, 8))
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestLazyCat))
+ suite.addTest(unittest.makeSuite(TestLazyMap))
+ suite.addTest(unittest.makeSuite(TestLazyFilter))
+ suite.addTest(unittest.makeSuite(TestLazyMop))
+ suite.addTest(unittest.makeSuite(TestLazyValues))
+ return suite
More information about the Zope-Checkins
mailing list