[Zope-CVS] CVS: Products/Scheduler - Task.py:1.1 Scheduler.py:1.12
Chris McDonough
chrism@zope.com
Sat, 17 May 2003 14:46:38 -0400
Update of /cvs-repository/Products/Scheduler
In directory cvs.zope.org:/tmp/cvs-serv11728
Modified Files:
Scheduler.py
Added Files:
Task.py
Log Message:
Changes which allow the descheduling of a task by its task id rather than by its time. Now, when a task is scheduled, it is assigned a task id (available as the taskid attribute of the task). The task must now be descheduled using its task id rather than its time. A tag was made before this change named "before_taskid" in CVS in case changes to the HEAD cause breakages in dependent code.
=== Added File Products/Scheduler/Task.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Task module.
$Id: Task.py,v 1.1 2003/05/17 18:46:07 chrism Exp $
"""
import time
import sys
import Globals
from AccessControl import ClassSecurityInfo
from Acquisition import Implicit
from Persistence import Persistent
from zLOG import LOG, INFO, BLATHER, ERROR, WARNING, PROBLEM
from ZPublisher.mapply import mapply
from ZPublisher.Publish import call_object, missing_name, dont_publish_class
from Products.Scheduler import IScheduledEvent
class Task(Implicit, Persistent):
security = ClassSecurityInfo()
security.declareObjectPublic()
security.setDefaultAccess("allow")
__implements__ = IScheduledEvent
def __init__(self, description, when, path, interval=None, max_age=600,
max_retries=5, retry_backoff_time=60, filter_data=None):
self.description = description
self.when = when
self.path = path
self.interval = interval
self.max_age = max_age
self.max_retries = max_retries
self.retry_backoff_time = retry_backoff_time
self.retries = 0
self.filter_data = filter_data
def __call__(self):
"""Run the method on self.path with REQUEST['args'].
On failure we reschedule, iff max_age and max_retries have not been
exceeded."""
now = int(time.time())
# secondary max_age guard (in case of clock failure)
if self.max_age and (now > self.when + self.max_age):
msg = ('Task "%s" scheduled for %s failed due to exceeded maximum '
'age - not rescheduling'
% (self.getDescription(), pretty_time(self.when)))
LOG('Scheduler', ERROR, msg)
return 1
try:
REQUEST = getattr(self, 'REQUEST')
result = self._runMethod(REQUEST)
except:
exc_name, exc_msg, exc_tb = sys.exc_info()
synopsis = "'%s', %s" % (exc_name, exc_msg)
exc_name = exc_msg = exc_tb = None
# retry guard
if self.max_retries and ((self.retries + 1) > self.max_retries):
msg = ('Task "%s" scheduled for %s failed on exception'
'- not rescheduling due to exceeded maximum retries.'
' Exception: %s'
% (self.description, pretty_time(self.when), synopsis))
LOG('Scheduler', ERROR, msg, error=sys.exc_info())
return 1
then = now + self.retry_backoff_time
# primary max_age guard
if self.max_age and (then > self.when + self.max_age):
msg = ('Task "%s" scheduled for %s failed on exception'
' - not rescheduling due to exceeded maximum age'
' Exception: %s'
% (self.description, pretty_time(self.when), synopsis))
LOG('Scheduler', ERROR, msg, error=sys.exc_info())
return 1
# we schedule a retry if we get to here
msg = ('Task "%s" scheduled for %s was not completed due '
'to an exception - retrying at %s.'
' Exception: %s.'
% (self.description, pretty_time(self.when),
pretty_time(then), synopsis))
LOG('Scheduler', PROBLEM, msg, error=sys.exc_info())
self.retries = self.retries + 1
return then, self
# If we get here, we log success and return None
msg = ('Task %s was run successfully with result %s' %
(self.description, result))
LOG('Scheduler', BLATHER, msg)
return None
def next(self):
# having an interval means the task is recurring
if self.interval:
now = int(time.time())
next_time = self.when + self.interval
skipped = 0
while next_time < now:
# catch up!
skipped = skipped + 1
next_time = next_time + self.interval
if skipped:
# report that we've lost some task runs
msg = ('Task "%s" was skipped %s times due to a clock '
'failure or task server downtime' % (self.description,
skipped))
LOG('Scheduler', BLATHER, msg)
return next_time, self.clone(next_time)
return None # we do not have an interval
def info(self):
return ""
def getDescription(self):
return self.description
def getInfo(self):
return ''
def getTime(self):
return self.when
def getFilterData(self):
return self.filter_data
def _runMethod(self, request):
args = getattr(request, 'args', {})
method = self.restrictedTraverse(self.path)
result = mapply(method, args, request,
call_object, 1,
missing_name,
dont_publish_class,
request, bind=1)
return result
def clone(self, when):
LOG('Scheduler', BLATHER, 'clone called')
return self.__class__(description=self.description,
when=when,
path=self.path,
interval=self.interval,
max_age=self.max_age,
max_retries=self.max_retries,
retry_backoff_time=self.retry_backoff_time,
filter_data=self.filter_data)
def pretty_time(t):
return time.ctime(t)
Globals.InitializeClass(Task)
=== Products/Scheduler/Scheduler.py 1.11 => 1.12 ===
--- Products/Scheduler/Scheduler.py:1.11 Tue May 13 19:32:21 2003
+++ Products/Scheduler/Scheduler.py Sat May 17 14:46:07 2003
@@ -15,21 +15,18 @@
$Id$
"""
-import os, time, sys
-from types import FloatType, IntType
+import os, time, sys, random
+from types import FloatType, IntType, StringType
import Globals
-from BTrees import IOBTree
+from BTrees import IOBTree, OOBTree
from Persistence import Persistent
from ExtensionClass import Base
-from Acquisition import Implicit
from Acquisition import aq_base
from AccessControl import ClassSecurityInfo
from OFS.SimpleItem import SimpleItem
from OFS.PropertyManager import PropertyManager
-from ZPublisher.mapply import mapply
-from ZPublisher.Publish import call_object, missing_name, dont_publish_class
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.Event.ISubscriptionAware import ISubscriptionAware
@@ -39,6 +36,7 @@
from Products.Scheduler import IScheduledEvent
from Products.Scheduler import IDescheduledEvent
from Products.Scheduler import ITimeEvent
+from Products.Scheduler.Task import Task
from SchedulerPermissions import *
from zLOG import LOG, PROBLEM, ERROR, BLATHER
@@ -79,7 +77,8 @@
self.id = id
self.title = title
self.subscribed = []
- self.tasks = IOBTree.IOBTree()
+ self.tasks = OOBTree.OOBTree()
+ self.times = IOBTree.IOBTree()
if filter_data:
self.filter_data = filter_data
else:
@@ -95,7 +94,7 @@
event_type and filter match the arguments provided to
ISubscribable.subscribe.
"""
- subscribable = getattr(subscribable, 'aq_base', subscribable)
+ subscribable = aq_base(subscribable)
self.subscribed.append((subscribable, event_type, filter))
self._p_changed = 1
@@ -106,7 +105,7 @@
the deleted subscription, rather than, necessarily, the
arguments provided to ISubscribable.unsubscribe.
"""
- subscribable = getattr(subscribable, 'aq_base', subscribable)
+ subscribable = aq_base(subscribable)
self.subscribed.remove((subscribable, event_type, filter))
self._p_changed = 1
@@ -178,7 +177,7 @@
elif IDescheduledEvent.isImplementedBy(event):
# this is a deschedule event, we want to deschedule this event,
# but we don't want to actually perform any tasks.
- self.deschedule(event.getTime())
+ self.deschedule(event.getTaskId())
return
else:
# We don't know what to do, so we punt.
@@ -189,7 +188,8 @@
'value.' % event)
for this_time, this_task in self.getPendingTasks(t):
- self.deschedule(this_time) # deschedule the task
+ taskid = this_task.taskid
+ self.deschedule(taskid) # deschedule the task
try:
this_task = this_task.__of__(self)
status = this_task() # perform the task
@@ -214,7 +214,7 @@
try:
next_time, next_task = status
# make sure we don't try to store an acquisition wrapper
- next_task = getattr(next_task, 'aq_base', next_task)
+ next_task = aq_base(next_task)
except TypeError:
next_time, next_task = None, None
@@ -233,31 +233,50 @@
when = int(when)
# XXX: We should really be storing paths and dereferencing them
# here; re-wrapping the task object in the schedule is icky!
- tasks = self.tasks.items(None, when) #min, max
- return [ ( x[0], aq_base( x[1] ).__of__( self ) ) for x in tasks ]
+ timelist = self.times.items(None, when) # min, max
+ l = []
+ for t, taskid_list in timelist:
+ for taskid in taskid_list:
+ task = self.tasks.get(taskid)
+ task = aq_base(task).__of__(self)
+ l.append((t, task))
+ return l
security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTaskInfo')
def getPendingTaskInfo(self, when=None):
"""
Return a sequence of mappings for use by UI.
"""
- return [(x[0], {'when': x[1].getTime(),
- 'info': x[1].getInfo(),
- 'description': x[1].getDescription()})
- for x in self.getPendingTasks(when)]
+ l = []
+ for key, task in self.getPendingTasks(when):
+ d = { 'when':task.getTime(), 'info':task.getInfo(),
+ 'description':task.getDescription(),
+ 'taskid':task.taskid }
+ l.append((key, d))
+ return l
+ def new_task_id(self, time):
+ return '%010i%010i' % (time, random.randint(0, sys.maxint - 1))
+
security.declareProtected(CHANGE_SCHEDULE_PERM, 'schedule')
def schedule(self, time, task):
+ task = aq_base(task)
+ taskid = self.new_task_id(time)
+ while not self.tasks.insert(taskid, task):
+ taskid = self.new_task_id(time)
+ task.taskid = taskid
time = int(time)
- while not self.tasks.insert(time, task):
- # give the task as close as possible time to its requested
- # runtime
- time = time + 1
+ l = self.times.get(time, [])
+ l.append(taskid)
+ self.times[time] = l
security.declareProtected(CHANGE_SCHEDULE_PERM, 'deschedule')
- def deschedule(self, time):
- """ deschedule the task by removing from the tasks BTree """
- del self.tasks[time]
+ def deschedule(self, taskid):
+ """ deschedule the task by removing from the tasks and times BTrees """
+ time = int(taskid[:10])
+ del self.tasks[taskid]
+ l = self.times.get(time)
+ l.remove(taskid)
security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_scheduleTask')
def manage_scheduleTask(self, description, when, path, interval,
@@ -280,11 +299,11 @@
return self.manage_current_tasks(self, REQUEST)
security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_descheduleTask')
- def manage_descheduleTask(self, when, REQUEST=None):
+ def manage_descheduleTask(self, taskids, REQUEST=None):
""" """
- if isinstance(when, IntType):
- when = [when]
- for t in when:
+ if isinstance(taskids, StringType):
+ taskids = [taskids]
+ for t in taskids:
try:
# this try-except is in case the form is submitted after
# notify has kicked a task out of the queue.
@@ -302,6 +321,10 @@
def now(self):
return int(time.time())
+ security.declarePublic('maxTime')
+ def maxTime(self):
+ return sys.maxint
+
security.declareProtected(MGMT_SCREEN_PERM, 'manage_current_tasks')
manage_current_tasks = PageTemplateFile(
'www/manage_current_tasks', globals(),__name__='manage_current_tasks'
@@ -312,143 +335,6 @@
'www/manage_schedule_tasks',globals(),__name__='manage_schedule_tasks'
)
-class Task(Implicit, Persistent):
-
- security = ClassSecurityInfo()
- security.declareObjectPublic()
- security.setDefaultAccess("allow")
-
- __implements__ = IScheduledEvent
-
- def __init__(self, description, when, path, interval=None, max_age=600,
- max_retries=5, retry_backoff_time=60, filter_data=None):
- self.description = description
- self.when = when
- self.path = path
- self.interval = interval
- self.max_age = max_age
- self.max_retries = max_retries
- self.retry_backoff_time = retry_backoff_time
- self.retries = 0
- self.filter_data = filter_data
-
- def __call__(self):
- """Run the method on self.path with REQUEST['args'].
-
- On failure we reschedule, iff max_age and max_retries have not been
- exceeded."""
-
- now = int(time.time())
- # secondary max_age guard (in case of clock failure)
- if self.max_age and (now > self.when + self.max_age):
- msg = ('Task "%s" scheduled for %s failed due to exceeded maximum '
- 'age - not rescheduling'
- % (self.getDescription(), pretty_time(self.when)))
- LOG('Scheduler', ERROR, msg)
- return 1
-
- try:
-
- REQUEST = getattr(self, 'REQUEST')
- result = self._runMethod(REQUEST)
-
- except:
-
- exc_name, exc_msg, exc_tb = sys.exc_info()
- synopsis = "'%s', %s" % (exc_name, exc_msg)
- exc_name = exc_msg = exc_tb = None
-
- # retry guard
- if self.max_retries and ((self.retries + 1) > self.max_retries):
- msg = ('Task "%s" scheduled for %s failed on exception'
- '- not rescheduling due to exceeded maximum retries.'
- ' Exception: %s'
- % (self.description, pretty_time(self.when), synopsis))
- LOG('Scheduler', ERROR, msg, error=sys.exc_info())
- return 1
-
- then = now + self.retry_backoff_time
-
- # primary max_age guard
- if self.max_age and (then > self.when + self.max_age):
- msg = ('Task "%s" scheduled for %s failed on exception'
- ' - not rescheduling due to exceeded maximum age'
- ' Exception: %s'
- % (self.description, pretty_time(self.when), synopsis))
- LOG('Scheduler', ERROR, msg, error=sys.exc_info())
- return 1
-
- # we schedule a retry if we get to here
- msg = ('Task "%s" scheduled for %s was not completed due '
- 'to an exception - retrying at %s.'
- ' Exception: %s.'
- % (self.description, pretty_time(self.when),
- pretty_time(then), synopsis))
- LOG('Scheduler', PROBLEM, msg, error=sys.exc_info())
- self.retries = self.retries + 1
- return then, self
-
- # If we get here, we log success and return None
- msg = ('Task %s was run successfully with result %s' %
- (self.description, result))
- LOG('Scheduler', BLATHER, msg)
- return None
-
- def next(self):
- # having an interval means the task is recurring
- if self.interval:
- now = int(time.time())
- next_time = self.when + self.interval
- skipped = 0
- while next_time < now:
- # catch up!
- skipped = skipped + 1
- next_time = next_time + self.interval
- if skipped:
- # report that we've lost some task runs
- msg = ('Task "%s" was skipped %s times due to a clock '
- 'failure or task server downtime' % (self.description,
- skipped))
- LOG('Scheduler', BLATHER, msg)
- return next_time, self.clone(next_time)
- return None # we do not have an interval
-
- def info(self):
- return ""
-
- def getDescription(self):
- return self.description
-
- def getInfo(self):
- return ''
-
- def getTime(self):
- return self.when
-
- def getFilterData(self):
- return self.filter_data
-
- def _runMethod(self, request):
- args = getattr(request, 'args', {})
- method = self.restrictedTraverse(self.path)
- result = mapply(method, args, request,
- call_object, 1,
- missing_name,
- dont_publish_class,
- request, bind=1)
- return result
-
- def clone(self, when):
- LOG('Scheduler', BLATHER, 'clone called')
- return self.__class__(description=self.description,
- when=when,
- path=self.path,
- interval=self.interval,
- max_age=self.max_age,
- max_retries=self.max_retries,
- retry_backoff_time=self.retry_backoff_time,
- filter_data=self.filter_data)
-
class Filter(Base, Persistent):
def __init__(self, key):
self.key = key
@@ -458,9 +344,6 @@
if key and key() == self.key:
return 1
return 0
-
-def pretty_time(t):
- return time.ctime(t)
manage_addSchedulerForm = PageTemplateFile(
'www/addScheduler',