[Zope-dev] Re: Event & Timer Service for Zope 2.8
Dylan Jay
gmane at dylanjay.com
Tue Jul 19 00:20:31 EDT 2005
Tres Seaver wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Dylan Jay wrote:
>>+1 for using http://cvs.zope.org/Products/Scheduler
>>I think the time of a non-core scheduler needs to come to an end. Its
>>crazy the number of implementations out there.\
>>I'm using the zope cvs version and I've enhanced it to make it
>>serialised and non reentrant. I'd like to check that in.
>
>
> I'd want to see the rationale and the patch, first. The product keeps
> the schedule itself in the ZODB, which means that the transactional
> semantics are going to fight with several types of "serialization".
The cvs.zope scheduler will run more than one task at a time which isn't
always desireable if they cause conflicts. I added a lock so that when
notify is called, it will not run if a task is still running. Of couse
this will only work on a single machine but I figure thats a fair enough
comprimise. I'm pretty sure the scheduler was also rerunning the same
event again if it had not finished before the next notify occured anyway
so that any longrunning process was causes a conflict error.
I couldn't see that these problems would cause any transactional
problems. I've included the changes if you want to have a look.
So is the cvs.zope scheduler the 'official' one? Will it be included in
any zope releases?
>
>>In order to simplify installation I do think a Product version of a
>>clock would be required. Perhaps the clock can optional be promoted from
>>outside to ensure its thread is still alive?
>>
>>I'm willing to have a crack at it as long as that effort isn't going to
>>yet another non-core scheduler/clock.
>
>
> - -10 for incluing in the core any implementation which depends on the
> existence of a "wild" thread inside the appserver: coping with threads
> and async together is a *very* tricky dance, and when it goes south, you
> can't do anything to debug it, or fix it; all you can do is restart the
> appserver.
>
> Chris' ClockServer removes the need for such a thread, by hooking
> ZServer's mainloop to generate the "faux" request needed to kick off
> async processing. A "crontab" - like schedule can be driven equally
> well from ClockScheduler as from a separate thread.
I was suggesting making ClockServer easy to install. At the moment its a
pain in the arse. Productise it as chris suggested. Is that not trivial?
Would ClockServer be acceptable to in the core?
Would the cvs scheduler be able to be changed such that ClockServer
could be selected as a clock source?
> For real scalability, you need to make the long-running async processing
> run in a separate process (and maybe a separate machine!): in this
> case, you can:
Not sure what scalability has to do with it. Do you mean stability?
Dylan.
-------------- next part --------------
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Scheduler module.
$Id: Scheduler.py,v 1.24 2003/07/10 01:42:57 tseaver Exp $
"""
import os, time, sys, random
from types import FloatType, IntType, StringType
import threading
import Globals
from BTrees import IOBTree, OOBTree
from Persistence import Persistent
from ExtensionClass import Base
from Acquisition import aq_base
from AccessControl import ClassSecurityInfo
from ZODB.POSException import ConflictError
from OFS.SimpleItem import SimpleItem
from OFS.PropertyManager import PropertyManager
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.Event.ISubscriptionAware import ISubscriptionAware
from Products.Event.ISubscriber import ISubscriber
from Products.Scheduler import IScheduler
from Products.Scheduler import IScheduledEvent
from Products.Scheduler import IDescheduledEvent
from Products.Scheduler import ITimeEvent
from Products.Scheduler.Task import Task
from Products.Scheduler.Task import InconsistentSchedulerMarkerTask
from Products.Scheduler.SchedulerPermissions import *
from zLOG import LOG, PROBLEM, ERROR, BLATHER, INFO
_NotifyLock = threading.Lock()
__version__ = "$Revision: 1.24 $"[11:-2]
class Scheduler(SimpleItem, PropertyManager):
meta_type = 'Scheduler'
manage_options = (
{'label': 'Current Tasks', 'action': 'manage_current_tasks'},
{'label': 'Schedule A Task', 'action': 'manage_schedule_tasks'},
{'label': 'Properties', 'action':'manage_propertiesForm'},
{'label': 'Security', 'action':'manage_access'},
{'label': 'Ownership', 'action':'manage_owner'},
{'label': 'Undo', 'action':'manage_UndoForm'},
)
_properties = (
{'id':'steps', 'type':'lines', 'mode':'w'},
{'id':'event_service', 'type':'string', 'mode':'w'},
{'id':'filter_data', 'type':'string', 'mode':'r'},
{'id':'interval', 'type':'int', 'mode':'w'},
)
# offering to the security gods
security = ClassSecurityInfo()
ok = {'meta_type':1, 'id':1, 'icon':1, 'bobobase_modification_time':1 ,
'title_or_id': 1, 'filter_data': 1, 'event_service': 1}
security.setDefaultAccess(ok)
security.setPermissionDefault(MGMT_SCREEN_PERM, ['Manager'])
security.setPermissionDefault(CHANGE_SCHEDULE_PERM, ['Manager'])
security.setPermissionDefault(VIEW_SCHEDULE_PERM, ['Manager'])
security.setPermissionDefault(NOTIFY_SCHEDULE_PERM, ['Manager'])
__implements__ = (IScheduler, ISubscriptionAware, ISubscriber)
def __init__(self, id, title='', filter_data='',
event_service='portal_events'):
self.id = id
self.title = title
self.subscribed = []
self.tasks = OOBTree.OOBTree()
self.times = IOBTree.IOBTree()
if filter_data:
self.filter_data = filter_data
else:
self.filter_data = ''
self.event_service = event_service
self.steps = []
self.interval = 0
def subscribedTo(self, subscribable, event_type, filter):
"""
alerts the object that it has subscribed, via a call from
itself or from another object, to the subscribable. The
event_type and filter match the arguments provided to
ISubscribable.subscribe.
"""
subscribable = aq_base(subscribable)
self.subscribed.append((subscribable, event_type, filter))
self._p_changed = 1
def unsubscribedFrom(self, subscribable, event_type, filter):
"""alerts the object that it has unsubscribed, via a call from
itself or from another object, to the subscribable. The
event_type and filter match the exact event_type and filter of
the deleted subscription, rather than, necessarily, the
arguments provided to ISubscribable.unsubscribe.
"""
subscribable = aq_base(subscribable)
self.subscribed.remove((subscribable, event_type, filter))
self._p_changed = 1
def getFilter(self):
if self.filter_data:
return Filter(self.filter_data)
return None
def manage_afterAdd(self, item, container):
event_service = getattr(self, self.event_service, None)
if event_service is not None:
# receive all time events
event_service.subscribe(self, ITimeEvent)
# receive schedule events that match our id
event_service.subscribe(self, IScheduledEvent,
filter=self.getFilter())
def manage_beforeDelete(self, item, container):
event_service = getattr(self, self.event_service, None)
if event_service is not None:
# un-receive all time events
event_service.unsubscribe(self, ITimeEvent)
# un-receive schedule events that match our id
event_service.unsubscribe(self, IScheduledEvent,
filter=self.getFilter())
security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notify')
def notify(self, event=None, max_tasks=None):
"""If it is specified, 'event' must be a "time event" or a
"schedule event".
A time event may be signified by any of the following objects:
- a float or an int (although the ability for the event to be
a float or int is not specified in the IScheduler interface)
- None, which is translated into a time event represented by the
current time.
- an object that implements the ITimeEvent interface
When we receive a time event, we process all events in the queue
that have a scheduled time before the time event.
A schedule event is an object that implements the IScheduledEvent
interface. When we receive a schedule event, we schedule the
event but we do not actually process the event queue.
'max_tasks' limits the number of tasks to be dispatched; it must
be a non-negative integer, or None (a string representation of the
integer or "None" will be converted). If the value is None, dispatch
all pending tasks; otherwise dispatch up to 'max_tasks'.
"""
LOG('Scheduler (%s)' % self.getId(), BLATHER, 'notify called')
if event is None:
# If event is None, assume that we want to run tasks before
# "now" by turning event into a time.time as of now.
# This is the common case.
t = int(time.time())
elif (isinstance(event, IntType) or isinstance(event, FloatType)):
# Check if event is an integer or a float.
t = int(event)
elif ITimeEvent.isImplementedBy(event):
# Check if event is an object that implements the ITimeEvent
# interface that we need to get a number time from
t = int(event.getTime())
elif IScheduledEvent.isImplementedBy(event):
# this is a schedule event, we want to schedule this event
# with ourselves, but we don't want to actually perform
# any tasks.
self.schedule(event.getTime(), event)
return
elif IDescheduledEvent.isImplementedBy(event):
# this is a deschedule event, we want to deschedule this event,
# but we don't want to actually perform any tasks.
self.deschedule(event.getTaskId())
return
else:
# We don't know what to do, so we punt.
raise ValueError, (
'"event" must be an object that supports the ITimeEvent '
'interface, an object that supports the IScheduledEvent '
'interface, an integer, or a float. %s is not a valid '
'value.' % event)
if not _NotifyLock.acquire(0):
LOG('Scheduler (%s)' % self.getId(), INFO, 'notify ended: reentrant')
return
if max_tasks is None or max_tasks == 'None':
max_tasks = sys.maxint
count, max_tasks = 0, int(max_tasks)
for this_time, this_task, taskid in self.getPendingTasks(t):
ok = 1
if count >= max_tasks:
LOG('Scheduler (%s)' % self.getId(),
INFO, 'Processed %d tasks; done.' % count )
break
try:
this_task = this_task.__of__(self)
status = this_task() # perform the task
except ConflictError:
# allow conflict errors to permeate
self.deschedule(taskid) # deschedule the task
_NotifyLock.release()
raise
except:
msg = ('The task %s call failed hard!' %
this_task.getDescription())
LOG('Scheduler (%s)' % self.getId(),
ERROR, msg, error=sys.exc_info())
ok = 0
count += 1
if ok and not status: # task may want to reschedule
try:
status = this_task.next() # does it want to reschedule?
msg = 'next returned %s' % str(status)
LOG('Scheduler (%s)' % self.getId() , BLATHER, msg)
if not status: # if not, that's ok.
ok = 0
except ConflictError:
# allow conflict errors to permeate
self.deschedule(taskid) # deschedule the task
_NotifyLock.release()
raise
except:
msg = ('The task %s next failed hard!' %
this_task.getDescription())
LOG('Scheduler (%s)' % self.getId(), ERROR, msg,
error=sys.exc_info())
ok = 0
try:
next_time, next_task = status
# make sure we don't try to store an acquisition wrapper
next_task = aq_base(next_task)
except TypeError:
next_time, next_task = None, None
if IScheduledEvent.isImplementedBy(next_task):
self.schedule(next_time, next_task)
else:
msg = ('The task %s declined to reschedule itself '
'by returning %s from its __call__'
% (this_task.getDescription(), str(status)))
LOG('Scheduler (%s)' % self.getId(), BLATHER, msg)
#Deschedule after so others can tell its not done yet
self.deschedule(taskid) # deschedule the task
_NotifyLock.release()
security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notifyOne')
def notifyOne(self):
""" Dispatch a single task.
Primarily a helper function for XMLRPC-driven clock.
"""
self.notify(max_tasks=1)
security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notifyMax')
def notifyMax(self, max_tasks=1):
""" Dispatch up to 'max_tasks' tasks.
Primarily a helper function for XMLRPC-driven clock.
"""
self.notify(max_tasks=max_tasks)
security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTasks')
def getPendingTasks(self, when=None):
if when is None: when = time.time()
when = int(when)
# XXX: We should really be storing paths and dereferencing them
# here; re-wrapping the task object in the schedule is icky!
timelist = self.times.items(None, when) # min, max
l = []
for t, taskid_list in timelist:
for taskid in taskid_list:
task = self.tasks.get(taskid)
if task is None:
# there is a consistency problem between the times and
# tasks btrees.
# this has happened in production on the bonzai
# project. we don't know what has caused it yet, so
# we work around it but don't fail.
task=InconsistentSchedulerMarkerTask(t,taskid).__of__(self)
else:
task = aq_base(task).__of__(self)
l.append((t, task, taskid))
return l
security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTaskInfo')
def getPendingTaskInfo(self, when=None):
"""
Return a sequence of mappings for use by UI.
"""
l = []
for key, task, taskid in self.getPendingTasks(when):
d = { 'when':task.getTime(), 'info':task.getInfo(),
'description':task.getDescription(),
'taskid':taskid }
l.append((key, d))
return l
def new_task_id(self, time):
return '%010i%010i' % (time, random.randint(0, sys.maxint - 1))
security.declareProtected(CHANGE_SCHEDULE_PERM, 'schedule')
def schedule(self, time, task):
task = aq_base(task)
taskid = self.new_task_id(time)
while not self.tasks.insert(taskid, task):
taskid = self.new_task_id(time)
task.taskid = taskid
time = int(time)
l = self.times.get(time, [])
l.append(taskid)
self.times[time] = l
return taskid
security.declareProtected(CHANGE_SCHEDULE_PERM, 'deschedule')
def deschedule(self, taskid):
""" deschedule the task by removing from the tasks and times BTrees """
time = int(taskid[:10])
try:
del self.tasks[taskid]
except KeyError:
# an inconsistency occurred between the tasks and times
# btrees or the taskid is bogus
LOG('Scheduler (%s)' % self.getId(), ERROR,
'a task with taskid %s could not be removed from the tasks '
'btree' % taskid)
l = self.times.get(time)
try:
l.remove(taskid)
except (ValueError, IndexError, KeyError, AttributeError):
# an inconsistency occurred between the tasks and times
# btrees or the times entry is bogus
LOG('Scheduler (%s)' % self.getId(), ERROR,
'taskid %s could not be removed from the time list for time '
'%s' % (taskid, time))
if l:
# a nonempty list
self.times[time] = l
elif l is not None:
# an empty list
del self.times[time]
security.declareProtected(CHANGE_SCHEDULE_PERM, 'checkConsistency')
def checkConsistency(self):
""" """
l = []
# check task id consistency
timelist = self.times.items(None, sys.maxint - 1)
for t, taskid_list in timelist:
for taskid in taskid_list:
task = self.tasks.get(taskid)
if hasattr(task, 'taskid'):
if task.taskid != taskid:
l.append(
('Task registered under taskid "%s" has taskid '
'attribute "%s"' % (taskid, task.taskid)
))
else:
l.append('Task registered under taskid "%s" has no taskid')
# check BTree consistency
from cStringIO import StringIO
io = StringIO()
import traceback
from BTrees.check import check
for tree in (self.tasks, self.times):
try:
check(aq_base(tree))
except:
traceback.print_exc(io)
io.seek(0)
s = io.read()
if s:
l.append(s)
if l:
return '\n'.join(l)
return 'OK'
security.declareProtected(CHANGE_SCHEDULE_PERM, 'fixupTimesBTrees')
def fixupTimesBtree(self):
""" Make times btree consistent (recover from desync) """
newtree = IOBTree.IOBTree()
timelist = self.times.items(None, sys.maxint - 1)
for t, taskid_list in timelist:
for taskid in taskid_list:
task = self.tasks.get(taskid)
if task is None:
continue
else:
l = newtree.get(t, [])
l.append(taskid)
newtree[t] = l
self.oldtimes = self.times
self.times = newtree
return 'OK'
security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_scheduleTask')
def manage_scheduleTask(self, description, when, path, interval,
max_age, max_retries, retry_backoff_time,
REQUEST=None):
""" """
task = Task(description, when, path, interval, max_age, max_retries,
retry_backoff_time)
self.schedule(when, task)
if REQUEST is not None:
return self.manage_current_tasks(self, REQUEST)
security.declareProtected(NOTIFY_SCHEDULE_PERM, 'manage_notifyTasks')
def manage_notifyTasks(self, REQUEST=None):
""" Run any pending tasks (allow poking via the ZMI)."""
LOG('Scheduler (%s)' % self.getId(), BLATHER,
'manage_notifyTasks was called via manual ZMI poke.')
self.notify()
if REQUEST is not None:
return self.manage_current_tasks(self, REQUEST)
security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_descheduleTask')
def manage_descheduleTask(self, taskids, REQUEST=None):
""" """
if isinstance(taskids, StringType):
taskids = [taskids]
for t in taskids:
try:
# this try-except is in case the form is submitted after
# notify has kicked a task out of the queue.
self.deschedule(t)
except KeyError:
pass
if REQUEST is not None:
return self.manage_current_tasks(self, REQUEST)
security.declarePublic('format_time')
def format_time(self, t):
return time.ctime(t)
security.declarePublic('now')
def now(self):
return int(time.time())
security.declarePublic('maxTime')
def maxTime(self):
return sys.maxint
security.declareProtected(MGMT_SCREEN_PERM, 'manage_current_tasks')
manage_current_tasks = PageTemplateFile(
'www/manage_current_tasks', globals(),__name__='manage_current_tasks'
)
security.declareProtected(MGMT_SCREEN_PERM, 'manage_schedule_tasks')
manage_schedule_tasks = PageTemplateFile(
'www/manage_schedule_tasks',globals(),__name__='manage_schedule_tasks'
)
class Filter(Base, Persistent):
def __init__(self, key):
self.key = key
def __call__(self, event):
key = getattr(event, 'getFilterData', None)
if key and key() == self.key:
return 1
return 0
manage_addSchedulerForm = PageTemplateFile(
'www/addScheduler',
globals(),
__name__='manage_addSchedulerForm'
)
def manage_addScheduler(self, id, title, filter_data=None,
event_service='portal_events', REQUEST=None):
""" """
ob = Scheduler(id, title, filter_data, event_service)
event_service = getattr(self, ob.event_service)
self._setObject(id, ob)
if REQUEST is not None:
return self.manage_main(self, REQUEST, update_menu=1)
Globals.InitializeClass(Scheduler)
Globals.InitializeClass(Task)
More information about the Zope-Dev
mailing list