[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py remove code related to processor and threads
Godefroid Chapelle
gotcha at bubblenet.be
Mon Mar 8 05:39:26 EST 2010
Log message for revision 109810:
remove code related to processor and threads
Changed:
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
-=-
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py 2010-03-08 10:36:29 UTC (rev 109809)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py 2010-03-08 10:39:25 UTC (rev 109810)
@@ -16,7 +16,7 @@
"""
__docformat__ = 'restructuredtext'
-from z3c.taskqueue import interfaces, job, task, processor
+from z3c.taskqueue import interfaces, job, task
from zope import component
from zope.app.container import contained
from zope.component.interfaces import ComponentLookupError
@@ -26,7 +26,6 @@
import logging
import persistent
import random
-import threading
import time
import zc.queue
import zope.interface
@@ -35,9 +34,7 @@
log = logging.getLogger('z3c.taskqueue')
-storage = threading.local()
-
class TaskService(contained.Contained, persistent.Persistent):
"""A persistent task service.
@@ -46,8 +43,6 @@
zope.interface.implements(interfaces.ITaskService)
taskInterface = interfaces.ITask
- processorFactory = processor.SimpleProcessor
- processorArguments = {'waitTime': 1.0}
_scheduledJobs = None
_scheduledQueue = None
@@ -161,46 +156,16 @@
if parent.__name__]
servicePath.reverse()
servicePath.append(self.__name__)
- # Start the thread running the processor inside.
- processor = self.processorFactory(
- self._p_jar.db(), servicePath, **self.processorArguments)
- thread = threading.Thread(target=processor, name=self._threadName())
- thread.setDaemon(True)
- thread.running = True
- thread.start()
def stopProcessing(self):
"""See interfaces.ITaskService"""
if self.__name__ is None:
return
- name = self._threadName()
- for thread in threading.enumerate():
- if thread.getName() == name:
- thread.running = False
- break
def isProcessing(self):
"""See interfaces.ITaskService"""
- if self.__name__ is not None:
- name = self._threadName()
- for thread in threading.enumerate():
- if thread.getName() == name:
- if thread.running:
- return True
- break
return False
- def _threadName(self):
- """Return name of the processing thread."""
- # This name isn't unique based on the path to self, but this doesn't
- # change the name that's been used in past versions.
- path = [parent.__name__ for parent in getParents(self)
- if parent.__name__]
- path.append('remotetasks')
- path.reverse()
- path.append(self.__name__)
- return '.'.join(path)
-
def hasJobsWaiting(self, now=None):
# If there is are any simple jobs in the queue, we have work to do.
if self._queue:
@@ -247,9 +212,6 @@
job.status = interfaces.ERROR
return True
job.started = datetime.datetime.now()
- if not hasattr(storage, 'runCount'):
- storage.runCount = 0
- storage.runCount += 1
try:
job.output = jobtask(self, job.id, job.input)
if job.status != interfaces.CRONJOB:
@@ -259,17 +221,10 @@
if job.status != interfaces.CRONJOB:
job.status = interfaces.ERROR
except Exception, error:
- if storage.runCount <= 3:
- log.error('Caught a generic exception, preventing thread '
- 'from crashing')
- log.exception(error)
- raise
- else:
- job.error = error
- if job.status != interfaces.CRONJOB:
- job.status = interfaces.ERROR
+ job.error = error
+ if job.status != interfaces.CRONJOB:
+ job.status = interfaces.ERROR
job.completed = datetime.datetime.now()
- storage.runCount = 0
return True
def process(self, now=None):
More information about the checkins
mailing list