[Checkins] SVN: z3c.indexing.dispatch/ Initial import.
Malthe Borch
mborch at gmail.com
Sat Mar 29 07:16:30 EDT 2008
Log message for revision 85009:
Initial import.
Changed:
A z3c.indexing.dispatch/
A z3c.indexing.dispatch/trunk/
A z3c.indexing.dispatch/trunk/AUTHOR.txt
A z3c.indexing.dispatch/trunk/README.txt
A z3c.indexing.dispatch/trunk/bootstrap.py
A z3c.indexing.dispatch/trunk/buildout.cfg
A z3c.indexing.dispatch/trunk/setup.py
A z3c.indexing.dispatch/trunk/src/
A z3c.indexing.dispatch/trunk/src/z3c/
A z3c.indexing.dispatch/trunk/src/z3c/__init__.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/
A z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py
A z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py
-=-
Added: z3c.indexing.dispatch/trunk/AUTHOR.txt
===================================================================
--- z3c.indexing.dispatch/trunk/AUTHOR.txt (rev 0)
+++ z3c.indexing.dispatch/trunk/AUTHOR.txt 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,7 @@
+Authors
+=======
+
+Enfold Systems, Helge Tesdal, Andreas Zeidler, Malthe Borch
+
+Most of the code stems from other packages, and was only reshaped to
+fit into the ``z3c.indexing``-umbrella.
Added: z3c.indexing.dispatch/trunk/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/README.txt (rev 0)
+++ z3c.indexing.dispatch/trunk/README.txt 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,4 @@
+Overview
+========
+
+This package implements a transaction-safe indexing dispatcher.
Added: z3c.indexing.dispatch/trunk/bootstrap.py
===================================================================
--- z3c.indexing.dispatch/trunk/bootstrap.py (rev 0)
+++ z3c.indexing.dispatch/trunk/bootstrap.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Bootstrap a buildout-based project
+
+Simply run this script in a directory containing a buildout.cfg.
+The script accepts buildout command-line options, so you can
+use the -c option to specify an alternate configuration file.
+
+$Id: bootstrap.py 71627 2006-12-20 16:46:11Z jim $
+"""
+
+import os, shutil, sys, tempfile, urllib2
+
+tmpeggs = tempfile.mkdtemp()
+
+ez = {}
+exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
+ ).read() in ez
+ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
+
+import pkg_resources
+
+cmd = 'from setuptools.command.easy_install import main; main()'
+if sys.platform == 'win32':
+ cmd = '"%s"' % cmd # work around spawn lamosity on windows
+
+ws = pkg_resources.working_set
+assert os.spawnle(
+ os.P_WAIT, sys.executable, sys.executable,
+ '-c', cmd, '-mqNxd', tmpeggs, 'zc.buildout',
+ dict(os.environ,
+ PYTHONPATH=
+ ws.find(pkg_resources.Requirement.parse('setuptools')).location
+ ),
+ ) == 0
+
+ws.add_entry(tmpeggs)
+ws.require('zc.buildout')
+import zc.buildout.buildout
+zc.buildout.buildout.main(sys.argv[1:] + ['bootstrap'])
+shutil.rmtree(tmpeggs)
Added: z3c.indexing.dispatch/trunk/buildout.cfg
===================================================================
--- z3c.indexing.dispatch/trunk/buildout.cfg (rev 0)
+++ z3c.indexing.dispatch/trunk/buildout.cfg 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,9 @@
+[buildout]
+develop = .
+parts = test
+
+find-links = http://download.zope.org/distribution/
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = z3c.indexing.dispatch
\ No newline at end of file
Added: z3c.indexing.dispatch/trunk/setup.py
===================================================================
--- z3c.indexing.dispatch/trunk/setup.py (rev 0)
+++ z3c.indexing.dispatch/trunk/setup.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,38 @@
+from setuptools import setup, find_packages
+import sys, os
+
+version = '0.1'
+
+setup(name='z3c.indexing.dispatch',
+ version=version,
+ description="Transaction-safe indexing dispatcher.",
+ long_description=open('README.txt').read(),
+ classifiers=[
+ "Framework :: Plone",
+ "Framework :: Zope2",
+ "Framework :: Zope3",
+ "Programming Language :: Python",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ keywords='',
+ author='Zope Corporation and Contributors',
+ author_email='zope3-dev at zope.org',
+ url='',
+ license='ZPL',
+ packages=find_packages('src'),
+ package_dir={'': 'src'},
+ namespace_packages=['z3c', 'z3c.indexing'],
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=[
+ 'setuptools',
+ 'zope.interface',
+ 'zope.component',
+ 'zope.testing',
+ 'transaction',
+ # -*- Extra requirements: -*-
+ ],
+ entry_points="""
+ # -*- Entry points: -*-
+ """,
+ )
Added: z3c.indexing.dispatch/trunk/src/z3c/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/__init__.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/__init__.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/__init__.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/README.txt 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,27 @@
+z3c.indexing.dispatch
+=====================
+
+The indexing dispatcher is the main entry point for indexing content.
+
+A dispatcher must implement three basic operations (defined in the
+``IDispatcher`` interface): index, reindex and unindex.
+
+Dispatching flow
+----------------
+
+Dispatchers can perform indexing operations directly or defer work to
+other dispatchers using the following lookup pattern:
+
+ IDispatcher(self, obj) -> IDispatcher
+
+Example dispatching flows:
+
+ transactional dispatcher -> zcatalog
+ transactional dispatcher -> async -> xapian
+
+Transactional dispatching
+-------------------------
+
+The transactional dispatcher will queue indexing operations while
+waiting for the transaction boundary; then pass the operations on to
+the next set of dispatchers.
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/__init__.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1 @@
+#
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/constants.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,5 @@
+# constants for indexing operations
+UNINDEX = -1
+REINDEX = 0
+INDEX = 1
+
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/interfaces.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,57 @@
+from zope.interface import Interface
+
+class IDispatcher(Interface):
+ """Interface for dispatching indexing operations.
+
+ Defines basic indexing operations corresponding to content being
+ added, modified or deleted.
+ """
+
+ def index(obj, attributes=None):
+ """Queue an index operation for the given object and attributes."""
+
+ def reindex(obj, attributes=None):
+ """Queue a reindex operation for the given object and attributes."""
+
+ def unindex(obj):
+ """Queue an unindex operation for the given object."""
+
+ def flush(obj):
+ """Flush queue."""
+
+
+class ITransactionalDispatcher(IDispatcher):
+ """A transactional dispatcher will keep operations in a queue
+ until a transaction boundary."""
+
+ def commit():
+ """Commit transaction."""
+
+ def clear():
+ """Clear internal state and release transaction manager."""
+
+ def getState():
+ """Return copy of queue state."""
+
+ def setState(state):
+ """Set queue state."""
+
+
+class IQueueReducer(Interface):
+ """Operation queue optimization.
+
+ This component might be merged with the transactional dispatcher
+ at some point. The motivation for splitting this functionality out
+ seems to primarily be a matter of optional configuration.
+ """
+
+ def optimize(queue):
+ """Remove redundant entries from queue.
+
+ The provided ``queue`` should be a sequence of operations on
+ the form:
+
+ (operator, object, attributes)
+
+ An optimized sequence is returned.
+ """
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/operation.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,33 @@
+import constants
+
+class Operation(tuple):
+ """Represents an indexing operation."""
+
+ op = None
+
+ def __new__(cls, obj=None, attributes=None):
+ inst = tuple.__new__(cls, (cls.op, obj, attributes))
+ inst.obj = obj
+ inst.attributes = attributes
+ return inst
+
+ def process(self, dispatcher):
+ return NotImplemented("Should be implemented in subclass.")
+
+class Add(Operation):
+ op = constants.INDEX
+
+ def process(self, dispatcher):
+ dispatcher.index(self.obj, self.attributes)
+
+class Modify(Operation):
+ op = constants.REINDEX
+
+ def process(self, dispatcher):
+ dispatcher.reindex(self.obj, self.attributes)
+
+class Delete(Operation):
+ op = constants.UNINDEX
+
+ def process(self, dispatcher):
+ dispatcher.unindex(self.obj)
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/queue.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,112 @@
+from zope import interface
+from zope import component
+
+from threading import local
+
+from z3c.indexing.dispatch.interfaces import IDispatcher
+from z3c.indexing.dispatch.interfaces import ITransactionalDispatcher
+from z3c.indexing.dispatch.interfaces import IQueueReducer
+
+from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
+from z3c.indexing.dispatch.transactions import QueueTM
+
+from z3c.indexing.dispatch import operation
+
+import transaction
+
+from logging import getLogger
+debug = getLogger('z3c.indexing.dispatch.queue').debug
+
+localQueue = None
+
+def getDispatcher():
+ """Return a (thread-local) dispatcher, creating one if necessary."""
+
+ global localQueue
+ if localQueue is None:
+ localQueue = TransactionalDispatcher()
+ return localQueue
+
+
+class TransactionalDispatcher(local):
+ """An indexing queue."""
+
+ interface.implements(ITransactionalDispatcher)
+
+ tmhook = None
+
+ def __init__(self):
+ self.queue = []
+
+ def index(self, obj, attributes=None):
+ debug('adding index operation for %r', obj)
+ self.queue.append(operation.Add(obj, attributes))
+ self._hook()
+
+ def reindex(self, obj, attributes=None):
+ debug('adding reindex operation for %r', obj)
+ self.queue.append(operation.Modify(obj, attributes))
+ self._hook()
+
+ def unindex(self, obj):
+ debug('adding unindex operation for %r', obj)
+ self.queue.append(operation.Delete(obj))
+ self._hook()
+
+ def flush(self):
+ return self.commit()
+
+ def commit(self):
+ self._optimize()
+
+ dispatchers = set()
+
+ for op, obj, attributes in self.queue:
+ for name, dispatcher in component.getAdapters((self, obj), IDispatcher):
+ if op == INDEX:
+ dispatcher.index(obj, attributes)
+ elif op == REINDEX:
+ dispatcher.reindex(obj, attributes)
+ elif op == UNINDEX:
+ dispatcher.unindex(obj)
+ else:
+ raise ValueError('Invalid queue operation code: %d' % op)
+
+ dispatchers.add(dispatcher)
+
+ self.clear()
+
+ for dispatcher in dispatchers:
+ dispatcher.flush()
+
+ def clear(self):
+ debug('clearing %d queue item(s)', len(self.queue))
+ del self.queue[:]
+ self.tmhook = None
+
+ def setState(self, state):
+ self.queue = state
+
+ def getState(self):
+ return list(self.queue)
+
+ def __len__(self):
+ return len(self.queue)
+
+ def _hook(self):
+ """Register a hook into the transaction machinery.
+
+ This is to make sure the queue's processing method gets called
+ back just before the transaction is about to be committed.
+ """
+
+ if self.tmhook is None:
+ self.tmhook = QueueTM(self).register
+
+ self.tmhook()
+
+ def _optimize(self):
+ reducer = component.queryUtility(IQueueReducer)
+ if reducer is not None:
+ self.queue = reducer.optimize(self.queue)
+
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/reducer.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,38 @@
+from logging import getLogger
+from zope.interface import implements
+from z3c.indexing.dispatch.interfaces import IQueueReducer
+from z3c.indexing.dispatch.constants import INDEX, UNINDEX
+
+debug = getLogger('z3c.indexing.dispatch.reducer').debug
+
+class QueueReducer(object):
+ """Reduce a queue of index operations."""
+
+ implements(IQueueReducer)
+
+ def optimize(self, queue):
+ res = {}
+ debug('start reducing %d item(s): %r', len(queue), queue)
+
+ for iop, obj, iattr in queue:
+ oid = hash(obj)
+ op, dummy, attr = res.get(oid, (0, obj, iattr))
+ # If we are going to delete an item that was added in this transaction, ignore it
+ if op == INDEX and iop == UNINDEX:
+ del res[oid]
+ else:
+ # Operators are -1, 0 or 1 which makes it safe to add them
+ op += iop
+ op = min(max(op,UNINDEX), INDEX) # operator always between -1 and 1
+
+ # Handle attributes, None means all fields, and takes presedence
+ if isinstance(attr, (tuple,list)) and isinstance(iattr, (tuple,list)):
+ attr = tuple(set(attr + iattr))
+ else:
+ attr = None
+
+ res[oid] = (op, obj, attr)
+
+ debug('finished reducing; %d item(s) in queue...', len(res))
+
+ return res.values()
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/__init__.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1 @@
+#
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_queue.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,269 @@
+from zope import interface
+from zope import component
+
+from unittest import TestSuite, makeSuite, main, TestCase
+from threading import Thread, currentThread
+
+from zope.interface import implements
+from zope.component import provideUtility, provideAdapter
+from zope.testing.cleanup import CleanUp
+
+from z3c.indexing.dispatch.interfaces import IDispatcher
+from z3c.indexing.dispatch.interfaces import ITransactionalDispatcher
+from z3c.indexing.dispatch.interfaces import IQueueReducer
+
+from z3c.indexing.dispatch.reducer import QueueReducer
+from z3c.indexing.dispatch.queue import TransactionalDispatcher
+from z3c.indexing.dispatch.queue import getDispatcher
+from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
+from z3c.indexing.dispatch.tests import utils
+
+class QueueTests(CleanUp, TestCase):
+
+ def setUp(self):
+ self.dispatcher = TransactionalDispatcher()
+
+ def tearDown(self):
+ self.queues = {}
+ self.dispatcher.clear()
+
+ def _provide_dispatcher(self, name=""):
+ factory = utils.MockDispatcherFactory()
+
+ provideAdapter(
+ factory,
+ (IDispatcher, str),
+ IDispatcher,
+ name=name)
+
+ return factory.queue
+
+ def testInterface(self):
+ self.failUnless(ITransactionalDispatcher.providedBy(self.dispatcher))
+
+ def testQueueHook(self):
+ class CaptainHook:
+ def __init__(self):
+ self.hooked = 0
+ def __call__(self):
+ self.hooked += 1
+ hook = CaptainHook()
+ dispatcher = self.dispatcher
+ dispatcher.tmhook = hook
+ self.assertEqual(hook.hooked, 0)
+ dispatcher.index('foo')
+ dispatcher.reindex('foo')
+ dispatcher.reindex('bar')
+ self.assertEqual(len(dispatcher.getState()), 3)
+ self.assertEqual(hook.hooked, 3)
+ dispatcher.commit()
+ self.assertEqual(hook.hooked, 3)
+
+ def testQueueState(self):
+ dispatcher = self.dispatcher
+ dispatcher.index('foo')
+ self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None)])
+ state = dispatcher.getState()
+ dispatcher.reindex('bar')
+ self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
+ dispatcher.setState(state)
+ self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None)])
+ dispatcher.commit()
+ self.assertEqual(len(dispatcher), 0)
+
+ def testDispatching(self):
+ self.dispatcher.index('foo')
+ queue = self._provide_dispatcher()
+ self.dispatcher.commit()
+ self.assertEqual(self.dispatcher.getState(), [])
+ self.assertEqual(queue, [(INDEX, 'foo', None), 'flush'])
+
+ def testMultipleDispatchers(self):
+ dispatcher = self.dispatcher
+
+ queue1 = self._provide_dispatcher(name='dispatcher1')
+ queue2 = self._provide_dispatcher(name='dispatcher2')
+
+ dispatcher.index('foo')
+ dispatcher.commit()
+
+ self.assertEqual(dispatcher.getState(), [])
+ self.assertEqual(queue1, [(INDEX, 'foo', None), 'flush'])
+ self.assertEqual(queue2, [(INDEX, 'foo', None), 'flush'])
+
+ def testQueueOperations(self):
+ dispatcher = self.dispatcher
+
+ queue = self._provide_dispatcher()
+
+ dispatcher.index('foo')
+ dispatcher.reindex('foo')
+ dispatcher.unindex('foo')
+
+ dispatcher.commit()
+
+ self.assertEqual(len(dispatcher), 0)
+ self.assertEqual(queue, [(INDEX, 'foo', None), (REINDEX, 'foo', None), (UNINDEX, 'foo', None), 'flush'])
+
+ def testFlush(self):
+ queue = self._provide_dispatcher()
+
+ self.dispatcher.index('foo')
+ self.dispatcher.commit()
+
+ self.failUnless('flush' in queue)
+
+ def testQueueReducer(self):
+ class MessyReducer(object):
+ implements(IQueueReducer)
+ def optimize(self, queue):
+ return [ op for op in queue if not op[0] == UNINDEX ]
+ dispatcher = self.dispatcher
+ dispatcher.index('foo')
+ dispatcher.reindex('foo')
+ dispatcher.unindex('foo')
+ dispatcher.index('foo', 'bar')
+ dispatcher._optimize()
+ self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None), (REINDEX, 'foo', None), (UNINDEX, 'foo', None), (INDEX, 'foo', 'bar')])
+ provideUtility(MessyReducer()) # hook up the reducer
+ dispatcher._optimize() # and try again...
+ self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None), (REINDEX, 'foo', None), (INDEX, 'foo', 'bar')])
+
+ def testRealQueueReducer(self):
+ provideUtility(QueueReducer())
+ dispatcher = self.dispatcher
+ dispatcher.index('foo')
+ dispatcher.reindex('foo')
+ dispatcher.unindex('foo')
+ dispatcher.index('foo', 'bar')
+ dispatcher._optimize()
+ self.assertEqual(dispatcher.getState(), [(INDEX, 'foo', None)])
+
+
+class QueueReducerTests(TestCase):
+
+ def testReduceQueue(self):
+ reducer = QueueReducer()
+
+ queue = [(REINDEX, 'A', None), (REINDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+ queue = [(INDEX, 'A', None), (REINDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [(INDEX, 'A', None)])
+
+ queue = [(INDEX, 'A', None), (UNINDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [])
+
+ queue = [(UNINDEX, 'A', None), (INDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+ def testReduceQueueWithAttributes(self):
+ reducer = QueueReducer()
+
+ queue = [(REINDEX, 'A', None), (REINDEX, 'A', ('a','b'))]
+ self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+ queue = [(REINDEX, 'A', ('a','b')), (REINDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+ queue = [(REINDEX, 'A', ('a','b')), (REINDEX, 'A', ('b','c'))]
+ self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', ('a', 'c', 'b'))])
+
+ queue = [(INDEX, 'A', None), (REINDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [(INDEX, 'A', None)])
+
+ queue = [(REINDEX, 'A', ('a','b')), (UNINDEX, 'A', None), (INDEX, 'A', None)]
+ self.failUnlessEqual(reducer.optimize(queue), [(REINDEX, 'A', None)])
+
+
+class QueueThreadTests(TestCase):
+ """ thread tests modeled after zope.thread doctests """
+
+ def setUp(self):
+ self.dispatcher = getDispatcher()
+
+ def tearDown(self):
+ self.dispatcher.clear()
+
+ def testLocalQueues(self):
+ me = self.dispatcher # get the queued indexer...
+ other = []
+ def runner(): # and a callable for the thread to run...
+ me.reindex('bar')
+ other[:] = me.getState()
+ thread = Thread(target=runner) # another thread is created...
+ thread.start() # and started...
+ while thread.isAlive(): '...' # wait until it's done...
+ self.assertEqual(other, [(REINDEX, 'bar', None)])
+ self.assertEqual(me.getState(), [])
+ me.index('foo') # something happening on our side...
+ self.assertEqual(other, [(REINDEX, 'bar', None)])
+ self.assertEqual(me.getState(), [(INDEX, 'foo', None)])
+ thread.join() # finally the threads are re-united...
+
+ def testQueuesOnTwoThreads(self):
+ me = self.dispatcher # get the queued indexer...
+ first = []
+ def runner1(): # and callables for the first...
+ me.index('foo')
+ first[:] = me.getState()
+ thread1 = Thread(target=runner1)
+ second = []
+ def runner2(): # and second thread
+ me.index('bar')
+ second[:] = me.getState()
+ thread2 = Thread(target=runner2)
+ self.assertEqual(first, []) # clean table before we start...
+ self.assertEqual(second, [])
+ self.assertEqual(me.getState(), [])
+ thread1.start() # do stuff here...
+ self.assertEqual(first, [(INDEX, 'foo', None)])
+ self.assertEqual(second, [])
+ self.assertEqual(me.getState(), [])
+ thread2.start() # and there...
+ self.assertEqual(first, [(INDEX, 'foo', None)])
+ self.assertEqual(second, [(INDEX, 'bar', None)])
+ self.assertEqual(me.getState(), [])
+ thread1.join() # re-unite with first thread and...
+ me.unindex('f00') # let something happening on our side
+ self.assertEqual(first, [(INDEX, 'foo', None)])
+ self.assertEqual(second, [(INDEX, 'bar', None)])
+ self.assertEqual(me.getState(), [(UNINDEX, 'f00', None)])
+ thread2.join() # also re-unite the second and...
+ me.unindex('f00') # let something happening again...
+ self.assertEqual(first, [(INDEX, 'foo', None)])
+ self.assertEqual(second, [(INDEX, 'bar', None)])
+ self.assertEqual(me.getState(), [(UNINDEX, 'f00', None), (UNINDEX, 'f00', None)])
+
+ def testManyThreads(self):
+ me = self.dispatcher # get the queued indexer...
+ queues = {} # container for local queues
+ def makeRunner(name, idx):
+ def runner():
+ for n in range(idx): # index idx times
+ me.index(name)
+ queues[currentThread()] = me.queue
+ return runner
+ threads = []
+ for idx in range(99):
+ threads.append(Thread(target=makeRunner('t%d' % idx, idx)))
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+ for idx, thread in enumerate(threads):
+ tid = 't%d' % idx
+ queue = queues[thread]
+ names = [ name for op, name, attrs in queue ]
+ self.assertEquals(names, [tid] * idx)
+
+
+def test_suite():
+ return TestSuite([
+ makeSuite(QueueTests),
+ makeSuite(QueueReducerTests),
+ makeSuite(QueueThreadTests),
+ ])
+
+if __name__ == '__main__':
+ main(defaultTest='test_suite')
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/test_transaction.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,53 @@
+from unittest import TestCase, TestSuite, makeSuite, main
+from transaction import savepoint, commit, abort
+
+from z3c.indexing.dispatch.transactions import QueueTM
+from z3c.indexing.dispatch.constants import INDEX, REINDEX
+from z3c.indexing.dispatch.tests import utils
+
+class QueueTransactionManagerTests(TestCase):
+
+ def setUp(self):
+ self.queue = utils.MockTransactionalDispatcher()
+ self.tman = QueueTM(self.queue)
+ self.queue._hook = self.tman.register # set up the transaction manager hook
+
+ def testFlushQueueOnCommit(self):
+ self.queue.index('foo')
+ commit()
+ self.assertEqual(self.queue.getState(), [])
+ self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+
+ def testFlushQueueOnAbort(self):
+ self.queue.index('foo')
+ abort()
+ self.assertEqual(self.queue.getState(), [])
+ self.assertEqual(self.queue.processed, None)
+
+ def testUseSavePoint(self):
+ self.queue.index('foo')
+ savepoint()
+ self.queue.reindex('bar')
+ commit()
+ self.assertEqual(self.queue.getState(), [])
+ self.assertEqual(self.queue.processed, [(INDEX, 'foo', None), (REINDEX, 'bar', None)])
+
+ def testRollbackSavePoint(self):
+ self.queue.index('foo')
+ sp = savepoint()
+ self.queue.reindex('bar')
+ sp.rollback()
+ commit()
+ self.assertEqual(self.queue.getState(), [])
+ self.assertEqual(self.queue.processed, [(INDEX, 'foo', None)])
+
+
+def test_suite():
+ return TestSuite([
+ makeSuite(QueueTransactionManagerTests),
+ ])
+
+if __name__ == '__main__':
+ main(defaultTest='test_suite')
+
+
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/tests/utils.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,68 @@
+from zope import interface
+
+from z3c.indexing.dispatch.interfaces import IDispatcher, ITransactionalDispatcher
+from z3c.indexing.dispatch.constants import INDEX, REINDEX, UNINDEX
+
+class MockDispatcherFactory(object):
+ def __init__(self):
+ self.queue = []
+ self.dispatcher = MockDispatcher()
+
+ def __call__(self, *args):
+ self.dispatcher.queue = self.queue
+ return self.dispatcher
+
+class MockDispatcher(object):
+ interface.implements(IDispatcher)
+
+ def __init__(self):
+ self.queue = []
+
+ def index(self, obj, attributes=None):
+ self.queue.append((INDEX, obj, attributes))
+
+ def reindex(self, obj, attributes=None):
+ self.queue.append((REINDEX, obj, attributes))
+
+ def unindex(self, obj):
+ self.queue.append((UNINDEX, obj, None))
+
+ def flush(self):
+ self.queue.append('flush')
+
+class MockTransactionalDispatcher(MockDispatcher):
+ interface.implements(ITransactionalDispatcher)
+
+ processed = None
+ _hook = lambda self: 42
+
+ def index(self, obj, attributes=None):
+ super(MockTransactionalDispatcher, self).index(obj, attributes)
+ self._hook()
+
+ def reindex(self, obj, attributes=None):
+ super(MockTransactionalDispatcher, self).reindex(obj, attributes)
+ self._hook()
+
+ def unindex(self, obj):
+ super(MockTransactionalDispatcher, self).unindex(obj)
+ self._hook()
+
+ def getState(self):
+ return list(self.queue)
+
+ def setState(self, state):
+ self.queue = state
+
+ def optimize(self):
+ pass
+
+ def commit(self):
+ self.processed = self.queue
+ self.clear()
+
+ def clear(self):
+ self.queue = []
+
+ def __len__(self):
+ return len(self.queue)
Added: z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py
===================================================================
--- z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py (rev 0)
+++ z3c.indexing.dispatch/trunk/src/z3c/indexing/dispatch/transactions.py 2008-03-29 11:16:28 UTC (rev 85009)
@@ -0,0 +1,65 @@
+from zope import interface
+
+from transaction.interfaces import ISavepointDataManager
+from transaction import get as getTransaction
+
+from interfaces import ITransactionalDispatcher
+
+from threading import local
+
+import logging
+logger = logging.getLogger('z3c.indexing.dispatch.transactions')
+
+class QueueSavepoint:
+ """Transaction savepoints using the ITransactionalDispatcher interface."""
+
+ def __init__(self, queue):
+ self.queue = queue
+ self.state = queue.getState()
+
+ def rollback(self):
+ self.queue.setState(self.state)
+
+
+class QueueTM(local):
+ """Transaction manager hook for the transactional dispatcher."""
+
+ interface.implements(ISavepointDataManager)
+
+ def __init__(self, queue):
+ local.__init__(self)
+ self.registered = False
+ self.vote = False
+ assert ITransactionalDispatcher.providedBy(queue), queue
+ self.queue = queue
+
+ def register(self):
+ if not self.registered:
+ getTransaction().join(self)
+ self.registered = True
+
+ def savepoint(self):
+ return QueueSavepoint(self.queue)
+
+ def tpc_begin(self, transaction):
+ pass
+
+ def commit(self, transaction):
+ self.queue.commit()
+
+ def tpc_vote(self, transaction):
+ pass
+
+ def tpc_finish(self, transaction):
+ self.registered = False
+
+ def tpc_abort(self, transaction):
+ if len(self.queue):
+ logger.debug('emptying unprocessed queue due to abort()...')
+ self.queue.clear()
+ self.registered = False
+
+ abort = tpc_abort
+
+ def sortKey(self):
+ return id(self)
More information about the Checkins
mailing list