[Zope-Checkins] SVN: Zope/branches/2.12/ Let OFS File/Image fire lifecycle events
Martin Aspeli
optilude at gmx.net
Mon Jan 11 19:26:44 EST 2010
Log message for revision 108041:
Let OFS File/Image fire lifecycle events
Changed:
U Zope/branches/2.12/doc/CHANGES.rst
U Zope/branches/2.12/src/OFS/Image.py
U Zope/branches/2.12/src/OFS/tests/testFileAndImage.py
-=-
Modified: Zope/branches/2.12/doc/CHANGES.rst
===================================================================
--- Zope/branches/2.12/doc/CHANGES.rst 2010-01-11 22:46:46 UTC (rev 108040)
+++ Zope/branches/2.12/doc/CHANGES.rst 2010-01-12 00:26:43 UTC (rev 108041)
@@ -27,6 +27,10 @@
Features Added
++++++++++++++
+- Made OFS.Image.File and OFS.Image.Image send IObjectModifiedEvent when
+ created through their factories and modified through the ZMI forms
+ (manage_edit() and manage_upload()).
+
- Moved zope.formlib / zope.app.form integration into a separate package
called five.formlib.
Modified: Zope/branches/2.12/src/OFS/Image.py
===================================================================
--- Zope/branches/2.12/src/OFS/Image.py 2010-01-11 22:46:46 UTC (rev 108040)
+++ Zope/branches/2.12/src/OFS/Image.py 2010-01-12 00:26:43 UTC (rev 108041)
@@ -46,6 +46,10 @@
from OFS.PropertyManager import PropertyManager
from OFS.SimpleItem import Item_w__name__
+from zope.event import notify
+from zope.lifecycleevent import ObjectModifiedEvent
+from zope.lifecycleevent import ObjectCreatedEvent
+
manage_addFileForm = DTMLFile('dtml/imageAdd',
globals(),
Kind='File',
@@ -63,19 +67,23 @@
precondition = str(precondition)
id, title = cookId(id, title, file)
-
+
self=self.this()
# First, we create the file without data:
self._setObject(id, File(id,title,'',content_type, precondition))
-
+
+ newFile = self._getOb(id)
+
# Now we "upload" the data. By doing this in two steps, we
# can use a database trick to make the upload more efficient.
if file:
- self._getOb(id).manage_upload(file)
+ newFile.manage_upload(file)
if content_type:
- self._getOb(id).content_type=content_type
-
+ newFile.content_type=content_type
+
+ notify(ObjectCreatedEvent(newFile))
+
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(self.absolute_url()+'/manage_main')
@@ -469,6 +477,9 @@
self.update_data(filedata, content_type, len(filedata))
else:
self.ZCacheable_invalidate()
+
+ notify(ObjectModifiedEvent(self))
+
if REQUEST:
message="Saved changes."
return self.manage_main(self,REQUEST,manage_tabs_message=message)
@@ -487,7 +498,9 @@
content_type=self._get_content_type(file, data, self.__name__,
'application/octet-stream')
self.update_data(data, content_type, size)
-
+
+ notify(ObjectModifiedEvent(self))
+
if REQUEST:
message="Saved changes."
return self.manage_main(self,REQUEST,manage_tabs_message=message)
@@ -665,14 +678,18 @@
# First, we create the image without data:
self._setObject(id, Image(id,title,'',content_type, precondition))
-
+
+ newFile = self._getOb(id)
+
# Now we "upload" the data. By doing this in two steps, we
# can use a database trick to make the upload more efficient.
if file:
- self._getOb(id).manage_upload(file)
+ newFile.manage_upload(file)
if content_type:
- self._getOb(id).content_type=content_type
-
+ newFile.content_type=content_type
+
+ notify(ObjectCreatedEvent(newFile))
+
if REQUEST is not None:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
Modified: Zope/branches/2.12/src/OFS/tests/testFileAndImage.py
===================================================================
--- Zope/branches/2.12/src/OFS/tests/testFileAndImage.py 2010-01-11 22:46:46 UTC (rev 108040)
+++ Zope/branches/2.12/src/OFS/tests/testFileAndImage.py 2010-01-12 00:26:43 UTC (rev 108041)
@@ -8,6 +8,8 @@
import time
from cStringIO import StringIO
+from Acquisition import aq_base
+
from OFS.Application import Application
from OFS.SimpleItem import SimpleItem
from OFS.Cache import ZCM_MANAGERS
@@ -19,6 +21,12 @@
from zExceptions import Redirect
import transaction
+import OFS.Image
+
+from zope.component import adapter
+from zope.lifecycleevent.interfaces import IObjectModifiedEvent
+from zope.lifecycleevent.interfaces import IObjectCreatedEvent
+
try:
here = os.path.dirname(os.path.abspath(__file__))
except:
@@ -57,7 +65,7 @@
mtime_func=None):
self.get = ob
if self.si:
- return si
+ return self.si
def ZCache_invalidate(self, ob):
self.invalidated = ob
@@ -78,6 +86,38 @@
def ZCacheManager_getCache(self):
return ADummyCache
+class EventCatcher(object):
+
+ def __init__(self):
+ self.created = []
+ self.modified = []
+
+ self.setUp()
+
+ def setUp(self):
+ from zope.component import provideHandler
+ provideHandler(self.handleCreated)
+ provideHandler(self.handleModified)
+
+ def tearDown(self):
+ from zope.component import getSiteManager
+ getSiteManager().unregisterHandler(self.handleCreated)
+ getSiteManager().unregisterHandler(self.handleModified)
+
+ def reset(self):
+ self.created = []
+ self.modified = []
+
+ @adapter(IObjectCreatedEvent)
+ def handleCreated(self, event):
+ if isinstance(event.object, OFS.Image.File):
+ self.created.append(event)
+
+ @adapter(IObjectModifiedEvent)
+ def handleModified(self, event):
+ if isinstance(event.object, OFS.Image.File):
+ self.modified.append(event)
+
class FileTests(unittest.TestCase):
data = open(filedata, 'rb').read()
content_type = 'application/octet-stream'
@@ -85,6 +125,8 @@
def setUp( self ):
self.connection = makeConnection()
+ self.eventCatcher = EventCatcher()
+
try:
r = self.connection.root()
a = Application()
@@ -107,7 +149,16 @@
raise
transaction.begin()
self.file = getattr( self.app, 'file' )
-
+
+ # Since we do the create here, let's test the events here too
+ self.assertEquals(1, len(self.eventCatcher.created))
+ self.failUnless(aq_base(self.eventCatcher.created[0].object) is aq_base(self.file))
+
+ self.assertEquals(1, len(self.eventCatcher.modified))
+ self.failUnless(aq_base(self.eventCatcher.created[0].object) is aq_base(self.file))
+
+ self.eventCatcher.reset()
+
def tearDown( self ):
del self.file
transaction.abort()
@@ -117,6 +168,8 @@
del self.root
del self.connection
ADummyCache.clear()
+
+ self.eventCatcher.tearDown()
def testViewImageOrFile(self):
self.assertRaises(Redirect, self.file.view_image_or_file, 'foo')
@@ -153,18 +206,24 @@
self.assertEqual(self.file.content_type, 'text/plain')
self.failUnless(ADummyCache.invalidated)
self.failUnless(ADummyCache.set)
+ self.assertEquals(1, len(self.eventCatcher.modified))
+ self.failUnless(self.eventCatcher.modified[0].object is self.file)
def testManageEditWithoutFileData(self):
self.file.manage_edit('foobar', 'text/plain')
self.assertEqual(self.file.title, 'foobar')
self.assertEqual(self.file.content_type, 'text/plain')
self.failUnless(ADummyCache.invalidated)
+ self.assertEquals(1, len(self.eventCatcher.modified))
+ self.failUnless(self.eventCatcher.modified[0].object is self.file)
def testManageUpload(self):
f = StringIO('jammyjohnson')
self.file.manage_upload(f)
self.assertEqual(self.file.data, 'jammyjohnson')
self.assertEqual(self.file.content_type, 'application/octet-stream')
+ self.assertEquals(1, len(self.eventCatcher.modified))
+ self.failUnless(self.eventCatcher.modified[0].object is self.file)
def testIfModSince(self):
now = time.time()
@@ -302,3 +361,4 @@
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
+
More information about the Zope-Checkins
mailing list