[Checkins] SVN: z3c.indexing.async/ Initial import.
Malthe Borch
mborch at gmail.com
Sat Mar 29 07:20:50 EDT 2008
Log message for revision 85010:
Initial import.
Changed:
A z3c.indexing.async/
A z3c.indexing.async/trunk/
A z3c.indexing.async/trunk/AUTHOR.txt
A z3c.indexing.async/trunk/README.txt
A z3c.indexing.async/trunk/bootstrap.py
A z3c.indexing.async/trunk/buildout.cfg
A z3c.indexing.async/trunk/setup.py
A z3c.indexing.async/trunk/src/
A z3c.indexing.async/trunk/src/z3c/
A z3c.indexing.async/trunk/src/z3c/__init__.py
A z3c.indexing.async/trunk/src/z3c/indexing/
A z3c.indexing.async/trunk/src/z3c/indexing/__init__.py
A z3c.indexing.async/trunk/src/z3c/indexing/async/
A z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt
A z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py
A z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py
A z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py
A z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py
-=-
Added: z3c.indexing.async/trunk/AUTHOR.txt
===================================================================
--- z3c.indexing.async/trunk/AUTHOR.txt (rev 0)
+++ z3c.indexing.async/trunk/AUTHOR.txt 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,4 @@
+Authors
+=======
+
+Malthe Borch, Sylvian Viollon, Kapil Thangavelu
Added: z3c.indexing.async/trunk/README.txt
===================================================================
--- z3c.indexing.async/trunk/README.txt (rev 0)
+++ z3c.indexing.async/trunk/README.txt 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,4 @@
+Overview
+--------
+
+Provides an asynchronous indexing dispatcher.
Added: z3c.indexing.async/trunk/bootstrap.py
===================================================================
--- z3c.indexing.async/trunk/bootstrap.py (rev 0)
+++ z3c.indexing.async/trunk/bootstrap.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Bootstrap a buildout-based project
+
+Simply run this script in a directory containing a buildout.cfg.
+The script accepts buildout command-line options, so you can
+use the -c option to specify an alternate configuration file.
+
+$Id: bootstrap.py 71627 2006-12-20 16:46:11Z jim $
+"""
+
+import os, shutil, sys, tempfile, urllib2
+
+tmpeggs = tempfile.mkdtemp()
+
+ez = {}
+exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
+ ).read() in ez
+ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
+
+import pkg_resources
+
+cmd = 'from setuptools.command.easy_install import main; main()'
+if sys.platform == 'win32':
+ cmd = '"%s"' % cmd # work around spawn lamosity on windows
+
+ws = pkg_resources.working_set
+assert os.spawnle(
+ os.P_WAIT, sys.executable, sys.executable,
+ '-c', cmd, '-mqNxd', tmpeggs, 'zc.buildout',
+ dict(os.environ,
+ PYTHONPATH=
+ ws.find(pkg_resources.Requirement.parse('setuptools')).location
+ ),
+ ) == 0
+
+ws.add_entry(tmpeggs)
+ws.require('zc.buildout')
+import zc.buildout.buildout
+zc.buildout.buildout.main(sys.argv[1:] + ['bootstrap'])
+shutil.rmtree(tmpeggs)
Added: z3c.indexing.async/trunk/buildout.cfg
===================================================================
--- z3c.indexing.async/trunk/buildout.cfg (rev 0)
+++ z3c.indexing.async/trunk/buildout.cfg 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,9 @@
+[buildout]
+develop = . ../z3c.indexing.dispatch
+parts = test
+
+find-links = http://download.zope.org/distribution/
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = z3c.indexing.async
\ No newline at end of file
Added: z3c.indexing.async/trunk/setup.py
===================================================================
--- z3c.indexing.async/trunk/setup.py (rev 0)
+++ z3c.indexing.async/trunk/setup.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,38 @@
+from setuptools import setup, find_packages
+import sys, os
+
+version = '0.1'
+
+setup(name='z3c.indexing.async',
+ version=version,
+ description="Asynchronous operation dispatching support.",
+ long_description=open('README.txt').read(),
+ classifiers=[
+ "Framework :: Plone",
+ "Framework :: Zope2",
+ "Framework :: Zope3",
+ "Programming Language :: Python",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ keywords='',
+ author='Zope Corporation and Contributors',
+ author_email='zope3-dev at zope.org',
+ url='',
+ license='ZPL',
+ packages=find_packages('src'),
+ package_dir={'': 'src'},
+ namespace_packages=['z3c', 'z3c.indexing'],
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=[
+ 'setuptools',
+ 'zope.interface',
+ 'zope.component',
+ 'zope.testing',
+ 'z3c.indexing.dispatch',
+ # -*- Extra requirements: -*-
+ ],
+ entry_points="""
+ # -*- Entry points: -*-
+ """,
+ )
Added: z3c.indexing.async/trunk/src/z3c/__init__.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/__init__.py (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/__init__.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
Added: z3c.indexing.async/trunk/src/z3c/indexing/__init__.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/__init__.py (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/__init__.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,6 @@
+# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
+try:
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
Added: z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/README.txt 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,67 @@
+z3c.indexing.async
+==================
+
+The asynchronous dispatcher passes operations on to a worker thread
+which is initialized with a connection object.
+
+Let's start the queue.
+
+ >>> from z3c.indexing.async import queue
+ >>> queue.QueueProcessor.FLUSH_TIMEOUT = 0.5
+ >>> queue.QueueProcessor.start()
+ <z3c.indexing.async.queue.QueueProcessor object at ...>
+
+The asynchronous dispatcher leaves operations up to other
+dispatchers. We'll provide a mock implementation and register it as a
+component.
+
+ >>> class MockDispatcher(object):
+ ... def __init__(self):
+ ... self.queue = []
+ ...
+ ... def index(self, obj, attributes=None):
+ ... self.queue.append((obj, attributes))
+ ...
+ ... def flush(self):
+ ... print "Flushing queue: %s" % str(self.queue)
+ ... del self.queue[:]
+
+We'll provide this dispatcher for string items.
+
+ >>> from z3c.indexing.dispatch.interfaces import IDispatcher
+ >>> _dispatcher = MockDispatcher()
+ >>> component.provideAdapter(
+ ... lambda *args: _dispatcher, (IDispatcher, str), IDispatcher)
+
+ >>> from z3c.indexing.async.dispatcher import AsynchronousDispatcher
+ >>> dispatcher = AsynchronousDispatcher()
+
+Index some strings:
+
+ >>> dispatcher.index('rabbit')
+ >>> dispatcher.index('elephant')
+
+Wait for the timeout (set to 0.5 seconds in this test)...
+
+ >>> import time
+ >>> time.sleep(0.6)
+ Flushing queue: [('rabbit', None), ('elephant', None)]
+
+Let's try and index another item and flush the queue manually:
+
+ >>> dispatcher.index('snake')
+ >>> dispatcher.flush()
+
+Since the queue is running in its own thread, we'll want to sleep for
+just a short while.
+
+ >>> time.sleep(0.1)
+ Flushing queue: [('snake', None)]
+
+Cleanup
+-------
+
+To be a good testing citizen, we cleanup our queue processing thread.
+
+ >>> queue.QueueProcessor.stop()
+
Added: z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/__init__.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1 @@
+#
Added: z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/dispatcher.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,39 @@
+from zope import interface
+from zope import component
+
+from z3c.indexing.dispatch.interfaces import IDispatcher
+from z3c.indexing.dispatch import operation
+
+from queue import index_queue as queue
+
+class AsynchronousProcess(object):
+ def __init__(self, operation, dispatcher):
+ self.operation = operation
+ self.dispatcher = dispatcher
+
+ def dispatch(self):
+ self.operation.process(self.dispatcher)
+
+class AsynchronousDispatcher(object):
+ """Asynchronous indexing dispatcher."""
+
+ interface.implements(IDispatcher)
+
+ def index(self, obj, attributes=None):
+ self._enqueue(operation.Add(obj, attributes))
+
+ def reindex(self, obj, attributes=None):
+ self._enqueue(operation.Modify(obj, attributes))
+
+ def unindex(self, obj):
+ self._enqueue(operation.Delete(obj, attributes))
+
+ def flush(self):
+ queue.put(None)
+
+ def _enqueue(self, op):
+ obj = op.obj
+
+ for name, dispatcher in component.getAdapters((self, obj), IDispatcher):
+ process = AsynchronousProcess(op, dispatcher)
+ queue.put(process)
Added: z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/queue.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,81 @@
+import Queue, threading
+
+# we do async indexing with all indexing operations put into this queue
+index_queue = Queue.Queue()
+
+# async queue processor
+class QueueProcessor( object ):
+
+ # Flush every _n_ changes to the db
+ FLUSH_THRESHOLD = 20
+
+ # Flush every _n_ seconds since the last change
+ FLUSH_TIMEOUT = 60
+
+ indexer_running = False
+ indexer_thread = None
+
+ def __iter__(self):
+ # iterator never ends, just sleeps when no results to process
+ while self.indexer_running:
+ # get an operation in blocking fashion
+ try:
+ op = index_queue.get(True, self.FLUSH_TIMEOUT)
+ except Queue.Empty:
+ yield None
+ else:
+ yield op
+
+ def __call__(self):
+ # number of documents indexed since last flush
+ op_delta = 0
+
+ dispatchers = set()
+
+ def flush():
+ for dispatcher in dispatchers:
+ dispatcher.flush()
+
+ dispatchers.clear()
+
+ # loop through queue iteration
+ for process in self:
+
+ # on timeout the op is none
+ if process is None:
+ # if we indexed anything since the last flush, flush it now
+ if op_delta:
+ flush()
+ op_delta = 0
+ continue
+
+ # process the operation
+ process.dispatch()
+
+ # keep track of dispatcher
+ dispatchers.add(process.dispatcher)
+
+ op_delta += 1
+
+ if op_delta % self.FLUSH_THRESHOLD == 0:
+ flush()
+ op_delta = 0
+
+ @classmethod
+ def start(klass):
+ if klass.indexer_running:
+ raise SyntaxError("Indexer already running")
+
+ klass.indexer_running = True
+ indexer = klass()
+ klass.indexer_thread = threading.Thread(target=indexer)
+ klass.indexer_thread.setDaemon(True)
+ klass.indexer_thread.start()
+ return indexer
+
+ @classmethod
+ def stop(klass):
+ if not klass.indexer_running:
+ return
+ klass.indexer_running = False
+ klass.indexer_thread.join()
Added: z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py
===================================================================
--- z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py (rev 0)
+++ z3c.indexing.async/trunk/src/z3c/indexing/async/tests.py 2008-03-29 11:20:49 UTC (rev 85010)
@@ -0,0 +1,17 @@
+from zope import interface
+from zope import component
+
+import unittest
+from zope.testing import doctest
+
+def test_suite():
+ globs = dict(interface=interface, component=component)
+
+ return unittest.TestSuite((
+ doctest.DocFileSuite(
+ 'README.txt',
+ globs=globs,
+ optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
+ ),
+ ))
+
More information about the Checkins
mailing list