[Checkins] SVN: Sandbox/gotcha/five.taskqueue/ first steps to make processor.txt test to pass
Godefroid Chapelle
gotcha at bubblenet.be
Thu Apr 1 10:27:50 EDT 2010
Log message for revision 110399:
first steps to make processor.txt test to pass
Changed:
_U Sandbox/gotcha/five.taskqueue/
U Sandbox/gotcha/five.taskqueue/buildout.cfg
A Sandbox/gotcha/five.taskqueue/src/five/taskqueue/processor.py
A Sandbox/gotcha/five.taskqueue/src/five/taskqueue/service.py
U Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/processor.txt
A Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_doctests.py
D Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_service.py
-=-
Property changes on: Sandbox/gotcha/five.taskqueue
___________________________________________________________________
Modified: svn:ignore
- develop-eggs
bin
fake-eggs
parts
.installed.cfg
.mr.developer.cfg
+ devel
var
develop-eggs
bin
fake-eggs
parts
.installed.cfg
.mr.developer.cfg
Modified: Sandbox/gotcha/five.taskqueue/buildout.cfg
===================================================================
--- Sandbox/gotcha/five.taskqueue/buildout.cfg 2010-04-01 13:40:13 UTC (rev 110398)
+++ Sandbox/gotcha/five.taskqueue/buildout.cfg 2010-04-01 14:27:49 UTC (rev 110399)
@@ -1,16 +1,25 @@
[buildout]
extensions = mr.developer
develop = .
-parts = py zope2 test test-coverage z3c.coverage omelette
+parts = zope2 instance test test-coverage z3c.coverage omelette py
extends = http://dist.plone.org/release/3.3.5/versions.cfg
+versions = versions
+sources-dir = devel
[sources]
z3c.taskqueue = svn svn+ssh://gotcha@svn.zope.org/repos/main/Sandbox/gotcha/z3c.taskqueue
+[instance]
+recipe = plone.recipe.zope2instance
+zope2-location = ${zope2:location}
+user = admin:admin
+eggs = five.taskqueue
+
[test]
recipe = zc.recipe.testrunner
defaults = ['--tests-pattern', '^f?tests$']
-eggs = five.taskqueue
+extra-paths = ${zope2:location}/lib/python
+eggs = ${instance:eggs}
[zope2]
recipe = plone.recipe.zope2install
@@ -29,13 +38,13 @@
[py]
recipe = zc.recipe.egg
interpreter = python
-eggs = five.taskqueue
+eggs = ${test:eggs}
[test-coverage]
recipe = zc.recipe.testrunner
eggs = ${test:eggs}
defaults = ['--coverage', '${buildout:directory}/coverage', '--auto-progress']
-extra-paths =
+extra-paths = ${test:extra-paths}
[z3c.coverage]
recipe = zc.recipe.egg
Added: Sandbox/gotcha/five.taskqueue/src/five/taskqueue/processor.py
===================================================================
--- Sandbox/gotcha/five.taskqueue/src/five/taskqueue/processor.py (rev 0)
+++ Sandbox/gotcha/five.taskqueue/src/five/taskqueue/processor.py 2010-04-01 14:27:49 UTC (rev 110399)
@@ -0,0 +1,45 @@
+import logging
+import time
+
+from ZPublisher.HTTPRequest import HTTPRequest
+from ZPublisher.HTTPResponse import HTTPResponse
+import ZPublisher
+
+from z3c.taskqueue.processor import BaseSimpleProcessor
+from z3c.taskqueue.processor import ERROR_MARKER
+
+log = logging.getLogger('five.taskqueue')
+
+
+class SimpleProcessor(BaseSimpleProcessor):
+ """ SimpleProcessor for Zope2 """
+
+ def call(self, method, args=(), errorValue=ERROR_MARKER):
+ path = self.servicePath[:] + [method]
+ response = HTTPResponse()
+ env = {'SERVER_NAME': 'dummy',
+ 'SERVER_PORT': '8080',
+ 'PATH_INFO': '/' + '/'.join(path)}
+ log.info(env['PATH_INFO'])
+ request = HTTPRequest(None, env, response)
+ conn = self.db.open()
+ root = conn.root()
+ request['PARENTS'] = [root['Application']]
+ try:
+ try:
+ ZPublisher.Publish.publish(request, 'Zope2', [None])
+ except Exception, error:
+ # This thread should never crash, thus a blank except
+ log.error('Processor: ``%s()`` caused an error!' % method)
+ log.exception(error)
+ return errorValue is ERROR_MARKER and error or errorValue
+ finally:
+ request.close()
+ conn.close()
+ if not request.response.body:
+ time.sleep(1)
+ else:
+ return request.response.body
+
+ def processNext(self, jobid=None):
+ return self.call('processNext', args=(None, jobid)) == "True"
Added: Sandbox/gotcha/five.taskqueue/src/five/taskqueue/service.py
===================================================================
--- Sandbox/gotcha/five.taskqueue/src/five/taskqueue/service.py (rev 0)
+++ Sandbox/gotcha/five.taskqueue/src/five/taskqueue/service.py 2010-04-01 14:27:49 UTC (rev 110399)
@@ -0,0 +1,15 @@
+import sys
+
+from BTrees.IOBTree import IOBTree
+from OFS.SimpleItem import SimpleItem
+
+from z3c.taskqueue.baseservice import BaseTaskService
+
+
+class TaskService(BaseTaskService, SimpleItem):
+ containerClass = IOBTree
+ maxint = sys.maxint
+
+ def getServicePath(self):
+ path = [part for part in self.getPhysicalPath() if part]
+ return path
Modified: Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/processor.txt
===================================================================
--- Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/processor.txt 2010-04-01 13:40:13 UTC (rev 110398)
+++ Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/processor.txt 2010-04-01 14:27:49 UTC (rev 110399)
@@ -6,22 +6,25 @@
component, known as the job processor. This component usually runs in its own
thread and provides its own main loop.
- >>> from z3c.taskqueue import processor
+ >>> from five.taskqueue import processor
The ``processor`` module provides several implementations of the processor
API. Let's create the necessary components to test the processor:
1. Create the task service and add it to the root site:
- >>> from z3c.taskqueue import service
+ >>> from five.taskqueue import service
>>> tasks = service.TaskService()
- >>> sm = root['tasks'] = tasks
+ >>> root._setOb('tasks', tasks)
+ >>> tasks.__parent__ = root
+ >>> tasks.__name__ = 'tasks'
2. Register the service as a utility:
>>> from z3c.taskqueue import interfaces
- >>> sm = root.getSiteManager()
+ >>> from zope.component import getGlobalSiteManager
+ >>> sm = getGlobalSiteManager()
>>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
3. Register a task that simply sleeps and writes a message:
@@ -39,18 +42,7 @@
>>> import zope.component
>>> zope.component.provideUtility(sleepTask, name='sleep')
-4. Setup a database:
- >>> from ZODB.tests import util
- >>> import transaction
- >>> db = util.DB()
-
- >>> from zope.app.publication.zopepublication import ZopePublication
- >>> conn = db.open()
- >>> conn.root()[ZopePublication.root_name] = root
- >>> transaction.commit()
-
-
The Simple Processor
--------------------
@@ -60,10 +52,12 @@
Let's first register a few tasks:
+
>>> jobid = tasks.add(u'sleep', (0.04, 1))
>>> jobid = tasks.add(u'sleep', (0.1, 2))
>>> jobid = tasks.add(u'sleep', (0, 3))
>>> jobid = tasks.add(u'sleep', (0.08, 4))
+ >>> import transaction
>>> transaction.commit()
Let's start by executing a job directly. The first argument to the simple
@@ -81,7 +75,7 @@
>>> from zope.security import management
>>> management.endInteraction()
>>> proc.processNext()
- True
+ 'True'
>>> print log_info
z3c.taskqueue INFO
Added: Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_doctests.py
===================================================================
--- Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_doctests.py (rev 0)
+++ Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_doctests.py 2010-04-01 14:27:49 UTC (rev 110399)
@@ -0,0 +1,84 @@
+##############################################################################
+#
+# Copyright (c) 2006 Lovely Systems and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Remote Task test setup
+
+"""
+__docformat__ = "reStructuredText"
+
+from Testing import ZopeTestCase
+
+from zope.testing.doctest import INTERPRET_FOOTNOTES
+from zope.testing.doctestunit import DocFileSuite
+from zope.testing.loggingsupport import InstalledHandler
+import doctest
+import random
+import unittest
+
+from five.taskqueue import service
+
+ZopeTestCase.installProduct('Five')
+
+
+def setUp(test):
+ log_info = InstalledHandler('z3c.taskqueue')
+ test.globs['root'] = ZopeTestCase.base.app()
+ test.globs['log_info'] = log_info
+ test.origArgs = service.TaskService.processorArguments
+ service.TaskService.processorArguments = {'waitTime': 0.0}
+ # Make tests predictable
+ random.seed(27)
+
+
+def tearDown(test):
+ random.seed()
+ service.TaskService.processorArguments = test.origArgs
+
+
+class TestIdGenerator(unittest.TestCase):
+
+ def setUp(self):
+ random.seed(27)
+ self.service = service.TaskService()
+
+ def tearDown(self):
+ random.seed()
+
+ def test_sequence(self):
+ id = 8852258758999665121
+ self.assertEquals(id, self.service._generateId())
+ self.assertEquals(id + 1, self.service._generateId())
+ self.assertEquals(id + 2, self.service._generateId())
+ self.assertEquals(id + 3, self.service._generateId())
+
+ def test_in_use_randomises(self):
+ id = 8852258758999665121
+ self.assertEquals(id, self.service._generateId())
+ self.service.jobs[id + 1] = object()
+ id = 5113777911774377050
+ self.assertEquals(id, self.service._generateId())
+ self.assertEquals(id + 1, self.service._generateId())
+ self.service.jobs[id + 1] = object()
+ self.assertEquals(id + 2, self.service._generateId())
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(TestIdGenerator),
+ DocFileSuite('processor.txt', package='five.taskqueue.tests',
+ setUp=setUp,
+ tearDown=tearDown,
+ optionflags=doctest.NORMALIZE_WHITESPACE
+ | doctest.ELLIPSIS
+ | INTERPRET_FOOTNOTES),
+ ))
Deleted: Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_service.py
===================================================================
--- Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_service.py 2010-04-01 13:40:13 UTC (rev 110398)
+++ Sandbox/gotcha/five.taskqueue/src/five/taskqueue/tests/test_service.py 2010-04-01 14:27:49 UTC (rev 110399)
@@ -1,11 +0,0 @@
-import unittest
-
-
-class TestDummy(unittest.TestCase):
-
- def test_base(self):
- self.failIf(1 != 1)
-
-
-def test_suite():
- return unittest.TestLoader().loadTestsFromTestCase(TestDummy)
More information about the checkins
mailing list