[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/Tasks - ISchedule.py:1.1.2.1 ITask.py:1.1.2.1 ITimeEvent.py:1.1.2.1 Schedule.py:1.1.2.1 Task.py:1.1.2.1 TimeEvent.py:1.1.2.1 TimeEventProducer.py:1.1.2.1 __init__.py:1.1.2.1
Ulrich Eck
ueck@net-labs.de
Mon, 9 Dec 2002 11:18:00 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/App/Tasks
In directory cvs.zope.org:/tmp/cvs-serv530
Added Files:
Tag: jack-e_scheduler_branch
ISchedule.py ITask.py ITimeEvent.py Schedule.py Task.py
TimeEvent.py TimeEventProducer.py __init__.py
Log Message:
Zope.App.Tasks:
Prototype Implementation of Jim's Schedule Design
=== Added File Zope3/lib/python/Zope/App/Tasks/ISchedule.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interfaces for schedule.
$Id: ISchedule.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Interface import Interface
class ISchedule(Interface):
"""A Task Queue with Scheduling Policy
"""
def schedule(task):
"""add a task to the Queue
"""
def unschedule(task):
"""remove a task from the Queue
"""
class IScheduleControl(Interface):
"""A Task Queue Control Interface
"""
def process(time):
"""Process due tasks
"""
def getTaskTimes():
"""Return all datetimes for scheduled Tasks
"""
=== Added File Zope3/lib/python/Zope/App/Tasks/ITask.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interfaces for task.
$Id: ITask.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Interface import Interface
from Zope.Schema import Datetime, Line
class ITask(Interface):
time = Datetime(description=u"The time at which the task should execute",
readonly=1,
required=1)
principalId = Line(description=
u"The principalId that the task should be executed as.",
readonly=1,
required=1)
def __call__():
"""Execute the task
Exceptions raised by execute will be logged.
If execution is sucessful, return None.
A task may be returned to retry work after an error.
"""
def schedule():
"""Schedule a new task
If the execution of the task is sucessful (execute returns None),
then schedule is called to optionally (re)schedule a task.
None may be returned if no task needs to be scheduled.
This method is used to implement repeating tasks.
"""
=== Added File Zope3/lib/python/Zope/App/Tasks/ITimeEvent.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: ITimeEvent.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Interface.Attribute import Attribute
from Zope.Event.IEvent import IEvent
class ITimeEvent(IEvent):
"""The Base interface for Time Events"""
time = Attribute("datetime object that stores the time for the Event")
=== Added File Zope3/lib/python/Zope/App/Tasks/Schedule.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Implementation for schedule.
$Id: Schedule.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
import time
from Persistence import Persistent
from Persistence.BTrees.OOBTree import OOBTree
from ZODB.POSException import ConflictError
from Zope.Security.SecurityManagement import newSecurityManager
from Zope.ComponentArchitecture import getService
from Zope.ContextWrapper import ContextMethod
from Zope.Exceptions import NotFoundError
from Zope.Event.ISubscriber import ISubscriber
from Zope.App.Tasks.ISchedule import ISchedule, IScheduleControl
from Zope.App.Tasks.ITask import ITask
from Zope.App.Tasks.ITimeEvent import ITimeEvent
# Implementation note:
#
# Internally, there's a BTree mapping time -> [tasks]
class Schedule(Persistent):
__implements__ = (ISchedule, IScheduleControl, ISubscriber)
def __init__(self):
self.tasks = OOBTree()
## Implementation for ISchedule
def schedule(self, task):
tasks = self.tasks
task_added = 0
if ITask.isImplementedBy(task):
time = task.time
self.tasks[time] = self.tasks.get(time, ()) + (task, )
else:
# task does not Implement ITask
raise TypeError
def unschedule(self, task):
tasks = self.tasks
if ITask.isImplementedBy(task):
time = task.time
if time in tasks:
tasklist = tuple([ t for t in tasks[time] if not t is task ])
if not tasklist:
del tasks[time]
else:
tasks[time] = tasklist
else:
# task was not in queue
raise ValueError
else:
# task does not implement ITask
raise TypeError
def process(self, time):
# get Authentication Service
auth_svc = getService(self, "Authentication")
# retrieve all tasklists up to time
seq = self.tasks.items(None,time)
# process tasks
for time_key, tasklist in seq:
del self.tasks[time_key]
for task in tasklist:
print "processing task"
# create SecurityContext
try:
principal = auth_svc.getPrincipal(task.principalId)
except NotFoundError:
# unkown Principal -> don't execute Task
# XXX Log this
print "principal not found"
continue
old_sec_manager = newSecurityManager(principal)
try:
try:
errtask = task()
if errtask is None:
# give task a chance to reschedule
newtask = task.schedule()
if newtask is not None:
self.schedule(newtask)
else:
self.schedule(errtask)
except:
# prevent the scheduler from crashing
# when bogus task raised exception
# need to Log Exception though
print "Exception in Task"
pass
finally:
newSecurityManager(old_sec_manager)
process = ContextMethod(process)
def getTaskTimes(self):
return self.tasks.keys()
## Implementation for ISubscriber
def notify(self, event):
if ITimeEvent.isImplementedBy(event):
time = event.time
self.process(time)
else:
raise TypeError
notify = ContextMethod(notify)
currentlySubscribed = False
def subscribe(self, channel=None):
if self.currentlySubscribed:
raise RuntimeError, "already subscribed; please unsubscribe first"
channel = self._getChannel(channel)
channel.subscribe(self,ITimeEvent)
self.currentlySubscribed = True
subscribe = ContextMethod(subscribe)
def unsubscribe(self, channel=None):
if not self.currentlySubscribed:
raise RuntimeError, "already subscribed; please unsubscribe first"
channel = self._getChannel(channel)
channel.unsubscribe(self,ITimeEvent)
self.currentlySubscribed = False
unsubscribe = ContextMethod(unsubscribe)
def isSubscribed(self):
return self.currentlySubscribed
def _getChannel(self, channel):
if channel is None:
channel = getService(self,"Events")
return channel
_getChannel = ContextMethod(_getChannel)
=== Added File Zope3/lib/python/Zope/App/Tasks/Task.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Implementation of Task.
$Id: Task.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Zope.ContextWrapper import ContextMethod
from Zope.App.Tasks.ITask import ITask
class Task:
"""Generic Task
"""
__implements__ = ITask
time = None
principalId = None
def __init__(self, time, principalId, func=None, *args, **kw):
self.time = time
self.principalId = principalId
self._func = func
self._args = args
self._kw = kw
#######################################################
# Basic Implementation for Interface ITask
def __call__(self):
if callable(self._func):
return apply(self._func, self._args, self._kw)
__call__ = ContextMethod(__call__)
def schedule(self):
return None
#
#######################################################
=== Added File Zope3/lib/python/Zope/App/Tasks/TimeEvent.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: TimeEvent.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
__metaclass__ = type
from Zope.App.Tasks.ITimeEvent import ITimeEvent
_marker = object()
class TimeEvent:
"""an event that signals that some time has come"""
__implements__ = ITimeEvent
time = None
def __init__(self, time):
self.time = time
=== Added File Zope3/lib/python/Zope/App/Tasks/TimeEventProducer.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
This module starts a new Thread for producing TimeEvents.
$Id: TimeEventProducer.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Transaction import get_transaction
from Zope.App.ZopePublication.ZopePublication import ZopePublication
from Zope.App.Traversing import traverse
from Zope.ComponentArchitecture import getService
from Zope.App.Tasks.TimeEvent import TimeEvent
from datetime import datetime, timedelta
import time
time_units = {'sec':1,
'min':60,
'hour':3600,
'day':86400}
class TimeEventPublication:
root_name = ZopePublication.root_name
def __init__(self, db):
self.db = db
self._conn = None
def beforeCall(self):
get_transaction().begin()
def getApplication(self):
self._conn = conn = self.db.open()
root = conn.root()
app = root.get(self.root_name, None)
if app is None:
raise SystemError, "Zope Application Not Found"
return app
def getEventService(self, app, path):
place = traverse(app, path)
event_service = getService(place,"Events")
return event_service
def publishTimeEvent(self, event_service, event_time=datetime.now()):
event_service.publish(TimeEvent(event_time))
# print "published TimeEvent %s" % event_time
def afterCall(self):
get_transaction().commit()
self._conn.close()
def startTimeEventProducer(_zodb, *args):
"""TimeEventProducer producing regular TimeEvents
A TimeEventProducer takes a resolution and unit and produces
TimeEvents according to the specified resolution.
"""
config = {}
for arg in args:
key,val = arg.split('=')
config[key] = val
resolution = int(config.get('resolution', 60))
if resolution < 1:
resolution = 1
unit = config.get('unit', 'sec')
if not unit in time_units.keys():
# unkown unit
return
unit_val = time_units[unit]
path = config.get('path', '/')
# try to fire at first time feasible for unit
now = datetime.now()
if unit == 'sec':
delta = datetime(now.year, now.month, now.day, now.hour, now.minute, now.second+1) - now
elif unit == 'min':
delta = datetime(now.year, now.month, now.day, now.hour, now.minute+1) - now
elif unit == 'hour':
delta = datetime(now.year, now.month, now.day, now.hour+1) - now
elif unit == 'day':
delta = datetime(now.year, now.month, now.day+1) - now
else:
delta = timedelta(0, 0, 0)
time.sleep(delta.seconds + delta.microseconds/1000000)
while 1:
try:
pub = TimeEventPublication(_zodb)
app = pub.getApplication()
pub.beforeCall()
evt_svc = pub.getEventService(app, path)
now = datetime.now()
pub.publishTimeEvent(evt_svc, now)
pub.afterCall()
except:
try:
pub.afterCall()
except:
pass
time.sleep(resolution*unit_val)
# cleanup ??
=== Added File Zope3/lib/python/Zope/App/Tasks/__init__.py ===