[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/Tasks - IManagedThread.py:1.1.2.1 IThreadManager.py:1.1.2.1 ManagedThread.py:1.1.2.1 ThreadManager.py:1.1.2.1 configure.zcml:1.1.2.1 ISchedule.py:1.1.2.2 ITask.py:1.1.2.2 Schedule.py:1.1.2.2 Task.py:1.1.2.2
Ulrich Eck
ueck@net-labs.de
Wed, 11 Dec 2002 05:19:51 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/App/Tasks
In directory cvs.zope.org:/tmp/cvs-serv12134
Modified Files:
Tag: jack-e_scheduler_branch
ISchedule.py ITask.py Schedule.py Task.py
Added Files:
Tag: jack-e_scheduler_branch
IManagedThread.py IThreadManager.py ManagedThread.py
ThreadManager.py configure.zcml
Log Message:
started refactoring to use LifeCycle and ThreadManagement
not finished .. still proving concepts
=== Added File Zope3/lib/python/Zope/App/Tasks/IManagedThread.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__doc__ = """ Interface for Managed Threads
$Id: IManagedThread.py,v 1.1.2.1 2002/12/11 10:19:51 jack-e Exp $"""
from Interface import Interface
class IManagedThread(Interface):
"""A ManagedThread.
"""
def start(*args, **kw):
"""Start Working.
"""
def stop(timeout):
"""Notify and wait for Shutdown.
"""
def isRunning():
"""Return True if Thread is running.
"""
=== Added File Zope3/lib/python/Zope/App/Tasks/IThreadManager.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__doc__ = """ Interface for ThreadManager Utility
$Id: IThreadManager.py,v 1.1.2.1 2002/12/11 10:19:51 jack-e Exp $"""
from Interface import Interface
class IThreadManager(Interface):
"""ThreadManager Utility.
"""
def add(name, thread_object, *args, **kw):
"""add new named ThreadObject and start
"""
def remove(name, timeout):
"""Stop a named Thread with timeout and remove
"""
def get(name):
"""Return named ThreadObject"""
def names():
"""Return thread names."""
=== Added File Zope3/lib/python/Zope/App/Tasks/ManagedThread.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__doc__ = """ Default Implementation for Managed Threads
$Id: ManagedThread.py,v 1.1.2.1 2002/12/11 10:19:51 jack-e Exp $"""
import sys
from threading import Thread, Event
from Zope.App.Tasks.IManagedThread import IManagedThread
THREAD_WORKING = 0
THREAD_FINISHED = 1
THREAD_SLEEPING = 2
THREAD_EXCEPTION = 3
class ManagedThread:
__implements__ = IManagedThread
def __init__(self):
self.args = ()
self.kw = {}
self.running = 0
self.shutdown_asap = 0
self.thread_wakeup = Event()
self._thread = None
def start(self, *args, **kw):
self.args = args
self.kw = kw
self._thread = t = Thread(target=self.run)
t.setDaemon(1)
t.start()
def stop(self, timeout=0):
if not self.isRunning():
return
self.shudown_asap = 1
self.thread_wakeup.set()
self._thread.join(timeout)
return self.isRunning()
def isRunning(self):
if self._thread is None:
return
return self.running and self._thread.isAlive()
def run(self):
self.running = 1
work = apply(self.doWork, self.args, self.kw)
result = THREAD_WORKING
while not self.shutdown_asap and not result == THREAD_FINISHED:
try:
result, value = work.next()
except StopIteration:
break
except:
zLOG.LOG("ThreadManager",zLOG.ERROR,"Exception in Thread: %s" % str(sys.exc_value))
break
if result == THREAD_EXCEPTION:
zLOG.LOG("ThreadManager",zLOG.ERROR,"Exception in Thread: %s" % str(value))
break
elif result == THREAD_SLEEPING:
self.thread_wakeup.wait(float(value))
self.running = 0
def doWork(self, *args, **kw):
## this is implemented a generator function
## that yields a Tuple (result, value) for ThreadManagement
## yield THREAD_WORKING, None -> Do more work
## yield THREAD_FINISHED, None -> No more work
## yield THREAD_EXCEPTION, exc_value -> There was an Exception
## yield THREAD_SLEEPING, sleeptime -> Thread wants to sleep
raise NotImplementedError, "Overwrite this Method to do some work."
=== Added File Zope3/lib/python/Zope/App/Tasks/ThreadManager.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__doc__ = """ Implementation for ThreadManager Utility
$Id: ThreadManager.py,v 1.1.2.1 2002/12/11 10:19:51 jack-e Exp $"""
from thread import start_new_thread, allocate_lock
from Zope.App.Tasks.IThreadManager import IThreadManager
from Zope.App.Tasks.IManagedThread import IManagedThread
class ThreadManager:
__implements__ = IThreadManager
def __init__(self):
self.threads = {}
self.mgmt_lock = allocate_lock()
def add(self, name, thread_object, *args, **kw):
if self.threads.has_key(name):
raise ValueError, "Thread with name %s already exists." % name
if not IManagedThread.isImplementedBy(thread_object):
raise TypeError, "Only ManagedThreads can be added."
self.mgmt_lock.acquire()
self.threads[name] = thread_object
self.mgmt_lock.release()
thread_object.start(*args, **kw)
def remove(self, name, timeout=0):
if not self.threads.has_key(name):
raise ValueError, "Thread with name %s unknown." % name
self.mgmt_lock.acquire()
thread_object = self.threads[name]
del self.threads[name]
self.mgmt_lock.release()
thread_object.stop(timeout)
return not thread_object.isRunning()
def get(self, name):
return self.threads.get(name,None)
def names(self):
return self.threads.keys()
=== Added File Zope3/lib/python/Zope/App/Tasks/configure.zcml ===
<zopeConfigure
xmlns='http://namespaces.zope.org/zope'
xmlns:browser='http://namespaces.zope.org/browser'
xmlns:service='http://namespaces.zope.org/service'>
<content class=".Schedule.">
<factory
id="Schedule"
permission="Zope.ManageServices"
/>
<!-- a special permission for managing tasks is needed -->
<require
permission="Zope.View"
interface="Zope.App.Tasks.ISchedule." />
<!-- do we need an interface for managing subscription seperatly? -->
<require
permission="Zope.ManageServices"
attributes="subscribe unsubscribe isSubscribed" />
<require
permission="Zope.ManageServices"
interface="Zope.App.Tasks.ISchedule.IScheduleControl" />
<implements interface="Zope.App.OFS.Annotation.IAttributeAnnotatable." />
</content>
<!-- Temporary Solution till we have placefull utilities -->
<serviceType id='Schedules'
interface='.ISchedule.' />
<!-- Need a place for ThreadManager .. while there is no place i'll leave it in Tasks package -->
<utility factory=".ThreadManager.ThreadManager"
permission="Zope.ManageServices"
provides=".IThreadManager." />
<browser:menuItem
menu="add_component"
for="Zope.App.OFS.Container.IAdding."
action="Schedule"
title='Task Schedule'
description="A Task Schedule" />
<include package=".Browser" />
</zopeConfigure>
=== Zope3/lib/python/Zope/App/Tasks/ISchedule.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/lib/python/Zope/App/Tasks/ISchedule.py:1.1.2.1 Mon Dec 9 11:17:59 2002
+++ Zope3/lib/python/Zope/App/Tasks/ISchedule.py Wed Dec 11 05:19:51 2002
@@ -41,3 +41,7 @@
def getTaskTimes():
"""Return all datetimes for scheduled Tasks
"""
+
+ def getTaskInfo(time):
+ """Return Tasklist with (time,principalId,description).
+ """
=== Zope3/lib/python/Zope/App/Tasks/ITask.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/lib/python/Zope/App/Tasks/ITask.py:1.1.2.1 Mon Dec 9 11:17:59 2002
+++ Zope3/lib/python/Zope/App/Tasks/ITask.py Wed Dec 11 05:19:51 2002
@@ -17,7 +17,7 @@
"""
from Interface import Interface
-from Zope.Schema import Datetime, Line
+from Zope.Schema import Datetime, Line, Text
class ITask(Interface):
@@ -29,6 +29,10 @@
u"The principalId that the task should be executed as.",
readonly=1,
required=1)
+
+ description = Text(description=
+ u"The description of the task for task ")
+
def __call__():
"""Execute the task
=== Zope3/lib/python/Zope/App/Tasks/Schedule.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/lib/python/Zope/App/Tasks/Schedule.py:1.1.2.1 Mon Dec 9 11:17:59 2002
+++ Zope3/lib/python/Zope/App/Tasks/Schedule.py Wed Dec 11 05:19:51 2002
@@ -15,6 +15,7 @@
"""Implementation for schedule.
$Id$
"""
+from __future__ import generators
import time
from Persistence import Persistent
@@ -23,20 +24,34 @@
from Zope.Security.SecurityManagement import newSecurityManager
-from Zope.ComponentArchitecture import getService
+from Zope.ComponentArchitecture import getService, getUtility
from Zope.ContextWrapper import ContextMethod
from Zope.Exceptions import NotFoundError
from Zope.Event.ISubscriber import ISubscriber
+from Zope.App.Tasks.ManagedThread import ManagedThread
+from Zope.App.Tasks.ManagedThread import THREAD_WORKING, THREAD_SLEEPING, THREAD_EXCEPTION, THREAD_FINISHED
+from Zope.App.Tasks.IThreadManager import IThreadManager
from Zope.App.Tasks.ISchedule import ISchedule, IScheduleControl
from Zope.App.Tasks.ITask import ITask
-from Zope.App.Tasks.ITimeEvent import ITimeEvent
+# from Zope.App.Tasks.ITimeEvent import ITimeEvent
+
+from Zope.App.LifeCycle.ILifeCycle import IZopeStartupEvent, IZopeShutdownEvent
# Implementation note:
#
# Internally, there's a BTree mapping time -> [tasks]
+class SchedulerThread(ManagedThread):
+
+ def doWork(self, schedule_times, scheduler_path):
+ while 1:
+ print "Scheduler Thread running: %s, %s" % (schedule_times, scheduler_path)
+ yield THREAD_SLEEPING, 1
+ doWork = ContextMethod(doWork)
+
+
class Schedule(Persistent):
__implements__ = (ISchedule, IScheduleControl, ISubscriber)
@@ -130,16 +145,22 @@
def getTaskTimes(self):
return self.tasks.keys()
+ def getTaskInfo(self, time):
+ return ["%s, %s, %s" % (t.time, t.principalId, t.description) for t in self.tasks.get(time,())]
+
+
## Implementation for ISubscriber
def notify(self, event):
- if ITimeEvent.isImplementedBy(event):
- time = event.time
- self.process(time)
- else:
- raise TypeError
+ if IZopeStartupEvent.isImplementedBy(event):
+ print "Scheduler received Startup and should start its thread"
+ thread_manager = getUtility(self,IThreadManager)
+ schedule_thread = SchedulerThread()
+ thread_manager.add('aname', schedule_thread, [1,2,3,4], 'apath')
+ elif IZopeShutdownEvent.isImplementedBy(event):
+ print "Scheduler received Shutdown and should stop its thread"
notify = ContextMethod(notify)
@@ -149,7 +170,8 @@
if self.currentlySubscribed:
raise RuntimeError, "already subscribed; please unsubscribe first"
channel = self._getChannel(channel)
- channel.subscribe(self,ITimeEvent)
+ channel.subscribe(self,IZopeStartupEvent)
+ channel.subscribe(self,IZopeShutdownEvent)
self.currentlySubscribed = True
subscribe = ContextMethod(subscribe)
@@ -157,7 +179,8 @@
if not self.currentlySubscribed:
raise RuntimeError, "already subscribed; please unsubscribe first"
channel = self._getChannel(channel)
- channel.unsubscribe(self,ITimeEvent)
+ channel.unsubscribe(self,IZopeStartupEvent)
+ channel.unsubscribe(self,IZopeShutdownEvent)
self.currentlySubscribed = False
unsubscribe = ContextMethod(unsubscribe)
=== Zope3/lib/python/Zope/App/Tasks/Task.py 1.1.2.1 => 1.1.2.2 ===
--- Zope3/lib/python/Zope/App/Tasks/Task.py:1.1.2.1 Mon Dec 9 11:17:59 2002
+++ Zope3/lib/python/Zope/App/Tasks/Task.py Wed Dec 11 05:19:51 2002
@@ -30,6 +30,8 @@
time = None
principalId = None
+
+ description = ''
def __init__(self, time, principalId, func=None, *args, **kw):
self.time = time