[Zope-dev] Re: Event & Timer Service for Zope 2.8

Dylan Jay gmane at dylanjay.com
Tue Jul 19 00:20:31 EDT 2005


Tres Seaver wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
> 
> Dylan Jay wrote:
>>+1 for using http://cvs.zope.org/Products/Scheduler
>>I think the time of a non-core scheduler needs to come to an end. Its
>>crazy the number of implementations out there.\
>>I'm using the zope cvs version and I've enhanced it to make it
>>serialised and non reentrant. I'd like to check that in.
> 
> 
> I'd want to see the rationale and the patch, first.  The product keeps
> the schedule itself in the ZODB, which means that the transactional
> semantics are going to fight with several types of "serialization".


The cvs.zope scheduler will run more than one task at a time which isn't 
always desireable if they cause conflicts. I added a lock so that when 
notify is called, it will not run if a task is still running. Of couse 
this will only work on a single machine but I figure thats a fair enough 
comprimise. I'm pretty sure the scheduler was also rerunning the same 
event again if it had not finished before the next notify occured anyway 
so that any longrunning process was causes a conflict error.

I couldn't see that these problems would cause any transactional 
problems. I've included the changes if you want to have a look.

So is the cvs.zope scheduler the 'official' one? Will it be included in 
any zope releases?

> 
>>In order to simplify installation I do think a Product version of a
>>clock would be required. Perhaps the clock can optional be promoted from
>>outside to ensure its thread is still alive?
>>
>>I'm willing to have a crack at it as long as that effort isn't going to
>>yet another non-core scheduler/clock.
> 
> 
> - -10 for incluing in the core any implementation which depends on the
> existence of a "wild" thread inside the appserver:  coping with threads
> and async together is a *very* tricky dance, and when it goes south, you
> can't do anything to debug it, or fix it;  all you can do is restart the
> appserver.
> 
> Chris' ClockServer removes the need for such a thread, by hooking
> ZServer's mainloop to generate the "faux" request needed to kick off
> async processing.  A "crontab" - like schedule can be driven equally
> well from ClockScheduler as from a separate thread.

I was suggesting making ClockServer easy to install. At the moment its a 
pain in the arse. Productise it as chris suggested. Is that not trivial?

Would ClockServer be acceptable to in the core?

Would the cvs scheduler be able to be changed such that ClockServer 
could be selected as a clock source?


> For real scalability, you need to make the long-running async processing
> run in a separate process (and maybe a separate machine!):  in this
> case, you can:

Not sure what scalability has to do with it. Do you mean stability?

Dylan.
-------------- next part --------------
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################
"""
Scheduler module.

$Id: Scheduler.py,v 1.24 2003/07/10 01:42:57 tseaver Exp $
"""
import os, time, sys, random
from types import FloatType, IntType, StringType
import threading

import Globals
from BTrees import IOBTree, OOBTree
from Persistence import Persistent
from ExtensionClass import Base
from Acquisition import aq_base
from AccessControl import ClassSecurityInfo
from ZODB.POSException import ConflictError

from OFS.SimpleItem import SimpleItem
from OFS.PropertyManager import PropertyManager

from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.Event.ISubscriptionAware import ISubscriptionAware
from Products.Event.ISubscriber import ISubscriber

from Products.Scheduler import IScheduler
from Products.Scheduler import IScheduledEvent
from Products.Scheduler import IDescheduledEvent
from Products.Scheduler import ITimeEvent
from Products.Scheduler.Task import Task
from Products.Scheduler.Task import InconsistentSchedulerMarkerTask
from Products.Scheduler.SchedulerPermissions import *

from zLOG import LOG, PROBLEM, ERROR, BLATHER, INFO

_NotifyLock = threading.Lock()

__version__ = "$Revision: 1.24 $"[11:-2]

class Scheduler(SimpleItem, PropertyManager):
    
    meta_type = 'Scheduler'
    manage_options = (
        {'label': 'Current Tasks', 'action': 'manage_current_tasks'},
        {'label': 'Schedule A Task', 'action': 'manage_schedule_tasks'},
        {'label': 'Properties', 'action':'manage_propertiesForm'},
        {'label': 'Security', 'action':'manage_access'},
        {'label': 'Ownership', 'action':'manage_owner'},
        {'label': 'Undo', 'action':'manage_UndoForm'},
        )
    _properties = (
        {'id':'steps', 'type':'lines', 'mode':'w'},
        {'id':'event_service', 'type':'string', 'mode':'w'},
        {'id':'filter_data', 'type':'string', 'mode':'r'},
        {'id':'interval', 'type':'int', 'mode':'w'},
        )
    
    # offering to the security gods
    security = ClassSecurityInfo()
    ok = {'meta_type':1, 'id':1, 'icon':1, 'bobobase_modification_time':1 ,
          'title_or_id': 1, 'filter_data': 1, 'event_service': 1}
    security.setDefaultAccess(ok)
    security.setPermissionDefault(MGMT_SCREEN_PERM, ['Manager'])
    security.setPermissionDefault(CHANGE_SCHEDULE_PERM, ['Manager'])
    security.setPermissionDefault(VIEW_SCHEDULE_PERM, ['Manager'])
    security.setPermissionDefault(NOTIFY_SCHEDULE_PERM, ['Manager'])
        
    __implements__ = (IScheduler, ISubscriptionAware, ISubscriber)

    def __init__(self, id, title='', filter_data='',
                 event_service='portal_events'):
        self.id = id
        self.title = title
        self.subscribed = []
        self.tasks = OOBTree.OOBTree()
        self.times = IOBTree.IOBTree()
        if filter_data:
            self.filter_data = filter_data
        else:
            self.filter_data = ''
        self.event_service = event_service
        self.steps = []
        self.interval = 0

    def subscribedTo(self, subscribable, event_type, filter):
        """
        alerts the object that it has subscribed, via a call from
        itself or from another object, to the subscribable.  The
        event_type and filter match the arguments provided to
        ISubscribable.subscribe.
        """
        subscribable = aq_base(subscribable)
        self.subscribed.append((subscribable, event_type, filter))
        self._p_changed = 1
    
    def unsubscribedFrom(self, subscribable, event_type, filter):
        """alerts the object that it has unsubscribed, via a call from
        itself or from another object, to the subscribable.  The
        event_type and filter match the exact event_type and filter of
        the deleted subscription, rather than, necessarily, the
        arguments provided to ISubscribable.unsubscribe.
        """
        subscribable = aq_base(subscribable)
        self.subscribed.remove((subscribable, event_type, filter))
        self._p_changed = 1

    def getFilter(self):
        if self.filter_data:
            return Filter(self.filter_data)
        return None

    def manage_afterAdd(self, item, container):
        event_service = getattr(self, self.event_service, None)
        if event_service is not None:
            # receive all time events
            event_service.subscribe(self, ITimeEvent)
            # receive schedule events that match our id
            event_service.subscribe(self, IScheduledEvent,
                                    filter=self.getFilter())

    def manage_beforeDelete(self, item, container):
        event_service = getattr(self, self.event_service, None)
        if event_service is not None:
            # un-receive all time events
            event_service.unsubscribe(self, ITimeEvent)
            # un-receive schedule events that match our id
            event_service.unsubscribe(self, IScheduledEvent,
                                      filter=self.getFilter())

    security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notify')
    def notify(self, event=None, max_tasks=None):

        """If it is specified, 'event' must be a "time event" or a
        "schedule event".

        A time event may be signified by any of the following objects:
        
          - a float or an int (although the ability for the event to be
            a float or int is not specified in the IScheduler interface)

          - None, which is translated into a time event represented by the
            current time.

          - an object that implements the ITimeEvent interface

        When we receive a time event, we process all events in the queue
        that have a scheduled time before the time event.

        A schedule event is an object that implements the IScheduledEvent
        interface.  When we receive a schedule event, we schedule the
        event but we do not actually process the event queue.

        'max_tasks' limits the number of tasks to be dispatched;  it must
        be a non-negative integer, or None (a string representation of the
        integer or "None" will be converted).  If the value is None, dispatch
        all pending tasks;  otherwise dispatch up to 'max_tasks'.
        """
        
        LOG('Scheduler (%s)' % self.getId(), BLATHER, 'notify called')

        if event is None:
            # If event is None, assume that we want to run tasks before
            # "now" by turning event into a time.time as of now.
            # This is the common case.
            t = int(time.time())
        elif (isinstance(event, IntType) or isinstance(event, FloatType)):
            # Check if event is an integer or a float.
            t = int(event)
        elif ITimeEvent.isImplementedBy(event):
            # Check if event is an object that implements the ITimeEvent
            # interface that we need to get a number time from
            t = int(event.getTime())
        elif IScheduledEvent.isImplementedBy(event):
            # this is a schedule event, we want to schedule this event
            # with ourselves, but we don't want to actually perform
            # any tasks.
            self.schedule(event.getTime(), event)
            return
        elif IDescheduledEvent.isImplementedBy(event):
            # this is a deschedule event, we want to deschedule this event,
            # but we don't want to actually perform any tasks.
            self.deschedule(event.getTaskId())
            return
        else:
            # We don't know what to do, so we punt.
            raise ValueError, (
                '"event" must be an object that supports the ITimeEvent '
                'interface, an object that supports the IScheduledEvent '
                'interface, an integer, or a float.  %s is not a valid '
                'value.' % event)

        if not _NotifyLock.acquire(0): 
            LOG('Scheduler (%s)' % self.getId(), INFO, 'notify ended: reentrant')
            return


        if max_tasks is None or max_tasks == 'None':
            max_tasks = sys.maxint
        count, max_tasks = 0, int(max_tasks)
        for this_time, this_task, taskid in self.getPendingTasks(t):
            ok = 1
            if count >= max_tasks:
                LOG('Scheduler (%s)' % self.getId(),
                    INFO, 'Processed %d tasks;  done.' % count )
                break
            try:
                this_task = this_task.__of__(self)
                status = this_task() # perform the task
            except ConflictError:
                # allow conflict errors to permeate
                self.deschedule(taskid) # deschedule the task
                _NotifyLock.release()
                raise
            except:
                msg = ('The task %s call failed hard!' %
                       this_task.getDescription())
                LOG('Scheduler (%s)' % self.getId(),
                    ERROR, msg, error=sys.exc_info())
                ok = 0
                
            count += 1

            if ok and not status: # task may want to reschedule
                try:
                    status = this_task.next() # does it want to reschedule?
                    msg = 'next returned %s' % str(status)
                    LOG('Scheduler (%s)' % self.getId() , BLATHER, msg)
                    if not status: # if not, that's ok.
                        ok = 0
                except ConflictError:
                    # allow conflict errors to permeate
                    self.deschedule(taskid) # deschedule the task
                    _NotifyLock.release()
                    raise
                except:
                    msg = ('The task %s next failed hard!' %
                           this_task.getDescription())
                    LOG('Scheduler (%s)' % self.getId(), ERROR, msg,
                        error=sys.exc_info())
                    ok = 0
            try:
                next_time, next_task = status
                # make sure we don't try to store an acquisition wrapper
                next_task = aq_base(next_task)
            except TypeError:
                next_time, next_task = None, None
            
            if IScheduledEvent.isImplementedBy(next_task):
                self.schedule(next_time, next_task)
            else:
                msg = ('The task %s declined to reschedule itself '
                       'by returning %s from its __call__'
                       % (this_task.getDescription(), str(status)))
                LOG('Scheduler (%s)' % self.getId(), BLATHER, msg)

            #Deschedule after so others can tell its not done yet
            self.deschedule(taskid) # deschedule the task
        _NotifyLock.release()

    security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notifyOne')
    def notifyOne(self):

        """ Dispatch a single task.

        Primarily a helper function for XMLRPC-driven clock.
        """
        self.notify(max_tasks=1)

    security.declareProtected(NOTIFY_SCHEDULE_PERM, 'notifyMax')
    def notifyMax(self, max_tasks=1):

        """ Dispatch up to 'max_tasks' tasks.

        Primarily a helper function for XMLRPC-driven clock.
        """
        self.notify(max_tasks=max_tasks)
                
    security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTasks')
    def getPendingTasks(self, when=None):
        if when is None: when = time.time()
        when = int(when)
        # XXX:  We should really be storing paths and dereferencing them
        #       here;  re-wrapping the task object in the schedule is icky!
        timelist = self.times.items(None, when) # min, max
        l = []
        for t, taskid_list in timelist:
            for taskid in taskid_list:
                task = self.tasks.get(taskid)
                if task is None:
                    # there is a consistency problem between the times and
                    # tasks btrees.
                    # this has happened in production on the bonzai
                    # project.  we don't know what has caused it yet, so
                    # we work around it but don't fail.
                    task=InconsistentSchedulerMarkerTask(t,taskid).__of__(self)
                else:
                    task = aq_base(task).__of__(self)
                l.append((t, task, taskid))
        return l
                
    security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTaskInfo')
    def getPendingTaskInfo(self, when=None):
        """
            Return a sequence of mappings for use by UI.
        """
        l = []
        for key, task, taskid in self.getPendingTasks(when):
            d = { 'when':task.getTime(), 'info':task.getInfo(),
                  'description':task.getDescription(),
                  'taskid':taskid }
            l.append((key, d))
        return l

    def new_task_id(self, time):
        return '%010i%010i' % (time, random.randint(0, sys.maxint - 1))
    
    security.declareProtected(CHANGE_SCHEDULE_PERM, 'schedule')
    def schedule(self, time, task):
        task = aq_base(task)
        taskid = self.new_task_id(time)
        while not self.tasks.insert(taskid, task):
            taskid = self.new_task_id(time)
        task.taskid = taskid
        time = int(time)
        l = self.times.get(time, [])
        l.append(taskid)
        self.times[time] = l
        return taskid

    security.declareProtected(CHANGE_SCHEDULE_PERM, 'deschedule')
    def deschedule(self, taskid):
        """ deschedule the task by removing from the tasks and times BTrees """
        time = int(taskid[:10])
        try:
            del self.tasks[taskid]
        except KeyError:
            # an inconsistency occurred between the tasks and times
            # btrees or the taskid is bogus
            LOG('Scheduler (%s)' % self.getId(), ERROR,
                'a task with taskid %s could not be removed from the tasks '
                'btree' % taskid)
        l = self.times.get(time)
        try:
            l.remove(taskid)
        except (ValueError, IndexError, KeyError, AttributeError):
            # an inconsistency occurred between the tasks and times
            # btrees or the times entry is bogus
            LOG('Scheduler (%s)' % self.getId(), ERROR,
                'taskid %s could not be removed from the time list for time '
                '%s' % (taskid, time))
        if l:
            # a nonempty list
            self.times[time] = l
        elif l is not None:
            # an empty list
            del self.times[time]

    security.declareProtected(CHANGE_SCHEDULE_PERM, 'checkConsistency')
    def checkConsistency(self):
        """ """
        l = []
        
        # check task id consistency
        timelist = self.times.items(None, sys.maxint - 1)

        for t, taskid_list in timelist:
            for taskid in taskid_list:
                task = self.tasks.get(taskid)
                if hasattr(task, 'taskid'):
                    if task.taskid != taskid:
                        l.append(
                            ('Task registered under taskid "%s" has taskid '
                             'attribute "%s"' % (taskid, task.taskid)
                             ))
                else:
                    l.append('Task registered under taskid "%s" has no taskid')

        # check BTree consistency
        from cStringIO import StringIO
        io = StringIO()
        import traceback
        from BTrees.check import check
        for tree in (self.tasks, self.times):
            try:
                check(aq_base(tree))
            except:
                traceback.print_exc(io)
        io.seek(0)
        s = io.read()
        if s:
            l.append(s)

        if l:
            return '\n'.join(l)
        return 'OK'

    security.declareProtected(CHANGE_SCHEDULE_PERM, 'fixupTimesBTrees')
    def fixupTimesBtree(self):
        """ Make times btree consistent (recover from desync) """
        newtree = IOBTree.IOBTree()
        timelist = self.times.items(None, sys.maxint - 1)
        for t, taskid_list in timelist:
            for taskid in taskid_list:
                task = self.tasks.get(taskid)
                if task is None:
                    continue
                else:
                    l = newtree.get(t, [])
                    l.append(taskid)
                    newtree[t] = l
        self.oldtimes = self.times
        self.times = newtree
        return 'OK'

    security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_scheduleTask')
    def manage_scheduleTask(self, description, when, path, interval,
                            max_age, max_retries, retry_backoff_time,
                            REQUEST=None):
        """ """
        task = Task(description, when, path, interval, max_age, max_retries,
                     retry_backoff_time)
        self.schedule(when, task)
        if REQUEST is not None:
            return self.manage_current_tasks(self, REQUEST)

    security.declareProtected(NOTIFY_SCHEDULE_PERM, 'manage_notifyTasks')
    def manage_notifyTasks(self, REQUEST=None):

        """ Run any pending tasks (allow poking via the ZMI)."""
        LOG('Scheduler (%s)' % self.getId(), BLATHER,
            'manage_notifyTasks was called via manual ZMI poke.')
        self.notify()

        if REQUEST is not None:
            return self.manage_current_tasks(self, REQUEST)

    security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_descheduleTask')
    def manage_descheduleTask(self, taskids, REQUEST=None):
        """ """
        if isinstance(taskids, StringType):
            taskids = [taskids]
        for t in taskids:
            try:
                # this try-except is in case the form is submitted after
                # notify has kicked a task out of the queue.
                self.deschedule(t)
            except KeyError:
                pass
        if REQUEST is not None:
            return self.manage_current_tasks(self, REQUEST)

    security.declarePublic('format_time')
    def format_time(self, t):
        return time.ctime(t)

    security.declarePublic('now')
    def now(self):
        return int(time.time())

    security.declarePublic('maxTime')
    def maxTime(self):
        return sys.maxint

    security.declareProtected(MGMT_SCREEN_PERM, 'manage_current_tasks')
    manage_current_tasks = PageTemplateFile(
        'www/manage_current_tasks', globals(),__name__='manage_current_tasks'
        )

    security.declareProtected(MGMT_SCREEN_PERM, 'manage_schedule_tasks')
    manage_schedule_tasks = PageTemplateFile(
       'www/manage_schedule_tasks',globals(),__name__='manage_schedule_tasks'
        )

class Filter(Base, Persistent):
    def __init__(self, key):
        self.key = key
        
    def __call__(self, event):
        key = getattr(event, 'getFilterData', None)
        if key and key() == self.key:
            return 1
        return 0

manage_addSchedulerForm = PageTemplateFile(
        'www/addScheduler',
        globals(),
        __name__='manage_addSchedulerForm'
        )

def manage_addScheduler(self, id, title, filter_data=None, 
                        event_service='portal_events', REQUEST=None):
    """ """
    ob = Scheduler(id, title, filter_data, event_service)
    event_service = getattr(self, ob.event_service)
    self._setObject(id, ob)
    if REQUEST is not None:
        return self.manage_main(self, REQUEST, update_menu=1)



Globals.InitializeClass(Scheduler)
Globals.InitializeClass(Task)


More information about the Zope-Dev mailing list