Tres Seaver wrote:
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1
Dylan Jay wrote:
+1 for using http://cvs.zope.org/Products/Scheduler I think the time of a non-core scheduler needs to come to an end. Its crazy the number of implementations out there.\ I'm using the zope cvs version and I've enhanced it to make it serialised and non reentrant. I'd like to check that in.
I'd want to see the rationale and the patch, first. The product keeps the schedule itself in the ZODB, which means that the transactional semantics are going to fight with several types of "serialization".
The cvs.zope scheduler will run more than one task at a time which isn't always desireable if they cause conflicts. I added a lock so that when notify is called, it will not run if a task is still running. Of couse this will only work on a single machine but I figure thats a fair enough comprimise. I'm pretty sure the scheduler was also rerunning the same event again if it had not finished before the next notify occured anyway so that any longrunning process was causes a conflict error. I couldn't see that these problems would cause any transactional problems. I've included the changes if you want to have a look. So is the cvs.zope scheduler the 'official' one? Will it be included in any zope releases?
In order to simplify installation I do think a Product version of a clock would be required. Perhaps the clock can optional be promoted from outside to ensure its thread is still alive?
I'm willing to have a crack at it as long as that effort isn't going to yet another non-core scheduler/clock.
- -10 for incluing in the core any implementation which depends on the existence of a "wild" thread inside the appserver: coping with threads and async together is a *very* tricky dance, and when it goes south, you can't do anything to debug it, or fix it; all you can do is restart the appserver.
Chris' ClockServer removes the need for such a thread, by hooking ZServer's mainloop to generate the "faux" request needed to kick off async processing. A "crontab" - like schedule can be driven equally well from ClockScheduler as from a separate thread.
I was suggesting making ClockServer easy to install. At the moment its a pain in the arse. Productise it as chris suggested. Is that not trivial? Would ClockServer be acceptable to in the core? Would the cvs scheduler be able to be changed such that ClockServer could be selected as a clock source?
For real scalability, you need to make the long-running async processing run in a separate process (and maybe a separate machine!): in this case, you can:
Not sure what scalability has to do with it. Do you mean stability? Dylan. ############################################################################## # # Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE # ############################################################################## """ Scheduler module. $Id: Scheduler.py,v 1.24 2003/07/10 01:42:57 tseaver Exp $ """ import os, time, sys, random from types import FloatType, IntType, StringType import threading import Globals from BTrees import IOBTree, OOBTree from Persistence import Persistent from ExtensionClass import Base from Acquisition import aq_base from AccessControl import ClassSecurityInfo from ZODB.POSException import ConflictError from OFS.SimpleItem import SimpleItem from OFS.PropertyManager import PropertyManager from Products.PageTemplates.PageTemplateFile import PageTemplateFile from Products.Event.ISubscriptionAware import ISubscriptionAware from Products.Event.ISubscriber import ISubscriber from Products.Scheduler import IScheduler from Products.Scheduler import IScheduledEvent from Products.Scheduler import IDescheduledEvent from Products.Scheduler import ITimeEvent from Products.Scheduler.Task import Task from Products.Scheduler.Task import InconsistentSchedulerMarkerTask from Products.Scheduler.SchedulerPermissions import * from zLOG import LOG, PROBLEM, ERROR, BLATHER, INFO _NotifyLock = threading.Lock() __version__ = "$Revision: 1.24 $"[11:-2] class Scheduler(SimpleItem, PropertyManager): meta_type = 'Scheduler' manage_options = ( {'label': 'Current Tasks', 'action': 'manage_current_tasks'}, {'label': 'Schedule A Task', 'action': 'manage_schedule_tasks'}, {'label': 'Properties', 'action':'manage_propertiesForm'}, {'label': 'Security', 'action':'manage_access'}, {'label': 'Ownership', 'action':'manage_owner'}, {'label': 'Undo', 'action':'manage_UndoForm'}, ) _properties = ( {'id':'steps', 'type':'lines', 'mode':'w'}, {'id':'event_service', 'type':'string', 'mode':'w'}, {'id':'filter_data', 'type':'string', 'mode':'r'}, {'id':'interval', 'type':'int', 'mode':'w'}, ) # offering to the security gods security = ClassSecurityInfo() ok = {'meta_type':1, 'id':1, 'icon':1, 'bobobase_modification_time':1 , 'title_or_id': 1, 'filter_data': 1, 'event_service': 1} security.setDefaultAccess(ok) security.setPermissionDefault(MGMT_SCREEN_PERM, ['Manager']) security.setPermissionDefault(CHANGE_SCHEDULE_PERM, ['Manager']) security.setPermissionDefault(VIEW_SCHEDULE_PERM, ['Manager']) security.setPermissionDefault(NOTIFY_SCHEDULE_PERM, ['Manager']) __implements__ = (IScheduler, ISubscriptionAware, ISubscriber) def __init__(self, id, title='', filter_data='', event_service='portal_events'): self.id = id self.title = title self.subscribed = [] self.tasks = OOBTree.OOBTree() self.times = IOBTree.IOBTree() if filter_data: self.filter_data = filter_data else: self.filter_data = '' self.event_service = event_service self.steps = [] self.interval = 0 def subscribedTo(self, subscribable, event_type, filter): """ alerts the object that it has subscribed, via a call from itself or from another object, to the subscribable. The event_type and filter match the arguments provided to ISubscribable.subscribe. """ subscribable = aq_base(subscribable) self.subscribed.append((subscribable, event_type, filter)) self._p_changed = 1 def unsubscribedFrom(self, subscribable, event_type, filter): """alerts the object that it has unsubscribed, via a call from itself or from another object, to the subscribable. The event_type and filter match the exact event_type and filter of the deleted subscription, rather than, necessarily, the arguments provided to ISubscribable.unsubscribe. """ subscribable = aq_base(subscribable) self.subscribed.remove((subscribable, event_type, filter)) self._p_changed = 1 def getFilter(self): if self.filter_data: return Filter(self.filter_data) return None def manage_afterAdd(self, item, container): event_service = getattr(self, self.event_service, None) if event_service is not None: # receive all time events event_service.subscribe(self, ITimeEvent) # receive schedule events that match our id event_service.subscribe(self, IScheduledEvent, filter=self.getFilter()) def manage_beforeDelete(self, item, container): event_service = getattr(self, self.event_service, None) if event_service is not None: # un-receive all time events event_service.unsubscribe(self, ITimeEvent) # un-receive schedule events that match our id event_service.unsubscribe(self, IScheduledEvent, filter=self.getFilter()) security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notify') def notify(self, event=None, max_tasks=None): """If it is specified, 'event' must be a "time event" or a "schedule event". A time event may be signified by any of the following objects: - a float or an int (although the ability for the event to be a float or int is not specified in the IScheduler interface) - None, which is translated into a time event represented by the current time. - an object that implements the ITimeEvent interface When we receive a time event, we process all events in the queue that have a scheduled time before the time event. A schedule event is an object that implements the IScheduledEvent interface. When we receive a schedule event, we schedule the event but we do not actually process the event queue. 'max_tasks' limits the number of tasks to be dispatched; it must be a non-negative integer, or None (a string representation of the integer or "None" will be converted). If the value is None, dispatch all pending tasks; otherwise dispatch up to 'max_tasks'. """ LOG('Scheduler (%s)' % self.getId(), BLATHER, 'notify called') if event is None: # If event is None, assume that we want to run tasks before # "now" by turning event into a time.time as of now. # This is the common case. t = int(time.time()) elif (isinstance(event, IntType) or isinstance(event, FloatType)): # Check if event is an integer or a float. t = int(event) elif ITimeEvent.isImplementedBy(event): # Check if event is an object that implements the ITimeEvent # interface that we need to get a number time from t = int(event.getTime()) elif IScheduledEvent.isImplementedBy(event): # this is a schedule event, we want to schedule this event # with ourselves, but we don't want to actually perform # any tasks. self.schedule(event.getTime(), event) return elif IDescheduledEvent.isImplementedBy(event): # this is a deschedule event, we want to deschedule this event, # but we don't want to actually perform any tasks. self.deschedule(event.getTaskId()) return else: # We don't know what to do, so we punt. raise ValueError, ( '"event" must be an object that supports the ITimeEvent ' 'interface, an object that supports the IScheduledEvent ' 'interface, an integer, or a float. %s is not a valid ' 'value.' % event) if not _NotifyLock.acquire(0): LOG('Scheduler (%s)' % self.getId(), INFO, 'notify ended: reentrant') return if max_tasks is None or max_tasks == 'None': max_tasks = sys.maxint count, max_tasks = 0, int(max_tasks) for this_time, this_task, taskid in self.getPendingTasks(t): ok = 1 if count >= max_tasks: LOG('Scheduler (%s)' % self.getId(), INFO, 'Processed %d tasks; done.' % count ) break try: this_task = this_task.__of__(self) status = this_task() # perform the task except ConflictError: # allow conflict errors to permeate self.deschedule(taskid) # deschedule the task _NotifyLock.release() raise except: msg = ('The task %s call failed hard!' % this_task.getDescription()) LOG('Scheduler (%s)' % self.getId(), ERROR, msg, error=sys.exc_info()) ok = 0 count += 1 if ok and not status: # task may want to reschedule try: status = this_task.next() # does it want to reschedule? msg = 'next returned %s' % str(status) LOG('Scheduler (%s)' % self.getId() , BLATHER, msg) if not status: # if not, that's ok. ok = 0 except ConflictError: # allow conflict errors to permeate self.deschedule(taskid) # deschedule the task _NotifyLock.release() raise except: msg = ('The task %s next failed hard!' % this_task.getDescription()) LOG('Scheduler (%s)' % self.getId(), ERROR, msg, error=sys.exc_info()) ok = 0 try: next_time, next_task = status # make sure we don't try to store an acquisition wrapper next_task = aq_base(next_task) except TypeError: next_time, next_task = None, None if IScheduledEvent.isImplementedBy(next_task): self.schedule(next_time, next_task) else: msg = ('The task %s declined to reschedule itself ' 'by returning %s from its __call__' % (this_task.getDescription(), str(status))) LOG('Scheduler (%s)' % self.getId(), BLATHER, msg) #Deschedule after so others can tell its not done yet self.deschedule(taskid) # deschedule the task _NotifyLock.release() security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notifyOne') def notifyOne(self): """ Dispatch a single task. Primarily a helper function for XMLRPC-driven clock. """ self.notify(max_tasks=1) security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notifyMax') def notifyMax(self, max_tasks=1): """ Dispatch up to 'max_tasks' tasks. Primarily a helper function for XMLRPC-driven clock. """ self.notify(max_tasks=max_tasks) security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTasks') def getPendingTasks(self, when=None): if when is None: when = time.time() when = int(when) # XXX: We should really be storing paths and dereferencing them # here; re-wrapping the task object in the schedule is icky! timelist = self.times.items(None, when) # min, max l = [] for t, taskid_list in timelist: for taskid in taskid_list: task = self.tasks.get(taskid) if task is None: # there is a consistency problem between the times and # tasks btrees. # this has happened in production on the bonzai # project. we don't know what has caused it yet, so # we work around it but don't fail. task=InconsistentSchedulerMarkerTask(t,taskid).__of__(self) else: task = aq_base(task).__of__(self) l.append((t, task, taskid)) return l security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTaskInfo') def getPendingTaskInfo(self, when=None): """ Return a sequence of mappings for use by UI. """ l = [] for key, task, taskid in self.getPendingTasks(when): d = { 'when':task.getTime(), 'info':task.getInfo(), 'description':task.getDescription(), 'taskid':taskid } l.append((key, d)) return l def new_task_id(self, time): return '%010i%010i' % (time, random.randint(0, sys.maxint - 1)) security.declareProtected(CHANGE_SCHEDULE_PERM, 'schedule') def schedule(self, time, task): task = aq_base(task) taskid = self.new_task_id(time) while not self.tasks.insert(taskid, task): taskid = self.new_task_id(time) task.taskid = taskid time = int(time) l = self.times.get(time, []) l.append(taskid) self.times[time] = l return taskid security.declareProtected(CHANGE_SCHEDULE_PERM, 'deschedule') def deschedule(self, taskid): """ deschedule the task by removing from the tasks and times BTrees """ time = int(taskid[:10]) try: del self.tasks[taskid] except KeyError: # an inconsistency occurred between the tasks and times # btrees or the taskid is bogus LOG('Scheduler (%s)' % self.getId(), ERROR, 'a task with taskid %s could not be removed from the tasks ' 'btree' % taskid) l = self.times.get(time) try: l.remove(taskid) except (ValueError, IndexError, KeyError, AttributeError): # an inconsistency occurred between the tasks and times # btrees or the times entry is bogus LOG('Scheduler (%s)' % self.getId(), ERROR, 'taskid %s could not be removed from the time list for time ' '%s' % (taskid, time)) if l: # a nonempty list self.times[time] = l elif l is not None: # an empty list del self.times[time] security.declareProtected(CHANGE_SCHEDULE_PERM, 'checkConsistency') def checkConsistency(self): """ """ l = [] # check task id consistency timelist = self.times.items(None, sys.maxint - 1) for t, taskid_list in timelist: for taskid in taskid_list: task = self.tasks.get(taskid) if hasattr(task, 'taskid'): if task.taskid != taskid: l.append( ('Task registered under taskid "%s" has taskid ' 'attribute "%s"' % (taskid, task.taskid) )) else: l.append('Task registered under taskid "%s" has no taskid') # check BTree consistency from cStringIO import StringIO io = StringIO() import traceback from BTrees.check import check for tree in (self.tasks, self.times): try: check(aq_base(tree)) except: traceback.print_exc(io) io.seek(0) s = io.read() if s: l.append(s) if l: return '\n'.join(l) return 'OK' security.declareProtected(CHANGE_SCHEDULE_PERM, 'fixupTimesBTrees') def fixupTimesBtree(self): """ Make times btree consistent (recover from desync) """ newtree = IOBTree.IOBTree() timelist = self.times.items(None, sys.maxint - 1) for t, taskid_list in timelist: for taskid in taskid_list: task = self.tasks.get(taskid) if task is None: continue else: l = newtree.get(t, []) l.append(taskid) newtree[t] = l self.oldtimes = self.times self.times = newtree return 'OK' security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_scheduleTask') def manage_scheduleTask(self, description, when, path, interval, max_age, max_retries, retry_backoff_time, REQUEST=None): """ """ task = Task(description, when, path, interval, max_age, max_retries, retry_backoff_time) self.schedule(when, task) if REQUEST is not None: return self.manage_current_tasks(self, REQUEST) security.declareProtected(NOTIFY_SCHEDULE_PERM, 'manage_notifyTasks') def manage_notifyTasks(self, REQUEST=None): """ Run any pending tasks (allow poking via the ZMI).""" LOG('Scheduler (%s)' % self.getId(), BLATHER, 'manage_notifyTasks was called via manual ZMI poke.') self.notify() if REQUEST is not None: return self.manage_current_tasks(self, REQUEST) security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_descheduleTask') def manage_descheduleTask(self, taskids, REQUEST=None): """ """ if isinstance(taskids, StringType): taskids = [taskids] for t in taskids: try: # this try-except is in case the form is submitted after # notify has kicked a task out of the queue. self.deschedule(t) except KeyError: pass if REQUEST is not None: return self.manage_current_tasks(self, REQUEST) security.declarePublic('format_time') def format_time(self, t): return time.ctime(t) security.declarePublic('now') def now(self): return int(time.time()) security.declarePublic('maxTime') def maxTime(self): return sys.maxint security.declareProtected(MGMT_SCREEN_PERM, 'manage_current_tasks') manage_current_tasks = PageTemplateFile( 'www/manage_current_tasks', globals(),__name__='manage_current_tasks' ) security.declareProtected(MGMT_SCREEN_PERM, 'manage_schedule_tasks') manage_schedule_tasks = PageTemplateFile( 'www/manage_schedule_tasks',globals(),__name__='manage_schedule_tasks' ) class Filter(Base, Persistent): def __init__(self, key): self.key = key def __call__(self, event): key = getattr(event, 'getFilterData', None) if key and key() == self.key: return 1 return 0 manage_addSchedulerForm = PageTemplateFile( 'www/addScheduler', globals(), __name__='manage_addSchedulerForm' ) def manage_addScheduler(self, id, title, filter_data=None, event_service='portal_events', REQUEST=None): """ """ ob = Scheduler(id, title, filter_data, event_service) event_service = getattr(self, ob.event_service) self._setObject(id, ob) if REQUEST is not None: return self.manage_main(self, REQUEST, update_menu=1) Globals.InitializeClass(Scheduler) Globals.InitializeClass(Task)