[Zope3-checkins] CVS: Zope3/src/zope/app/container -
contained.py:1.2
Jim Fulton
jim at zope.com
Sun Sep 21 13:32:01 EDT 2003
Update of /cvs-repository/Zope3/src/zope/app/container
In directory cvs.zope.org:/tmp/cvs-serv13214/src/zope/app/container
Added Files:
contained.py
Log Message:
Added module with several containment-related helper functions and classes.
=== Zope3/src/zope/app/container/contained.py 1.1 => 1.2 ===
--- /dev/null Sun Sep 21 13:32:01 2003
+++ Zope3/src/zope/app/container/contained.py Sun Sep 21 13:31:31 2003
@@ -0,0 +1,801 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Classes to support implenting IContained
+
+$Id$
+"""
+
+from zope.app import zapi
+from zope.app.event.objectevent import ObjectEvent, modified
+from zope.app.event import publish
+from zope.app.i18n import ZopeMessageIDFactory as _
+from zope.app.interfaces.container import IAddNotifiable, IMoveNotifiable
+from zope.app.interfaces.container import IContained
+from zope.app.interfaces.container import INameChooser
+from zope.app.interfaces.container import IObjectAddedEvent
+from zope.app.interfaces.container import IObjectMovedEvent
+from zope.app.interfaces.container import IObjectRemovedEvent
+from zope.app.interfaces.container import IRemoveNotifiable
+from zope.app.interfaces.location import ILocation
+from zope.exceptions import DuplicationError
+from zope.proxy import ProxyBase, getProxiedObject
+import zope.interface
+from zope.app.container._zope_app_container_contained import ContainedProxyBase
+from zope.app.container._zope_app_container_contained import getProxiedObject
+from zope.security.checker import selectChecker, CombinedChecker
+from zope.interface.declarations import ObjectSpecificationDescriptor
+from zope.interface.declarations import getObjectSpecification
+from zope.interface.declarations import ObjectSpecification
+from zope.interface import providedBy
+
+class Contained(object):
+ """Stupid mix-in that defines __parent__ and __name__ attributes
+ """
+
+ zope.interface.implements(IContained)
+
+ __parent__ = __name__ = None
+
+class ObjectMovedEvent(ObjectEvent):
+ """An object has been moved"""
+
+ zope.interface.implements(IObjectMovedEvent)
+
+ def __init__(self, object, oldParent, oldName, newParent, newName):
+ ObjectEvent.__init__(self, object)
+ self.oldParent = oldParent
+ self.oldName = oldName
+ self.newParent = newParent
+ self.newName = newName
+
+class ObjectAddedEvent(ObjectMovedEvent):
+ """An object has been added to a container"""
+
+ zope.interface.implements(IObjectAddedEvent)
+
+ def __init__(self, object, newParent=None, newName=None):
+ if newParent is None:
+ newParent = object.__parent__
+ if newName is None:
+ newName = object.__name__
+ ObjectMovedEvent.__init__(self, object, None, None, newParent, newName)
+
+class ObjectRemovedEvent(ObjectMovedEvent):
+ """An object has been removed from a container"""
+
+ zope.interface.implements(IObjectRemovedEvent)
+
+ def __init__(self, object, oldParent=None, oldName=None):
+ if oldParent is None:
+ oldParent = object.__parent__
+ if oldName is None:
+ oldName = object.__name__
+ ObjectMovedEvent.__init__(self, object, oldParent, oldName, None, None)
+
+
+def containedEvent(object, container, name=None):
+ """Establish the containment of the object in the container
+
+ The object and necessary event are returned. The object may be a
+ ContainedProxy around the original object. The event is an added
+ event, a moved event, or None.
+
+ If the object implements IContained, simply set it's __parent__
+ and __name attributes:
+
+ >>> container = {}
+ >>> item = Contained()
+ >>> x, event = containedEvent(item, container, u'foo')
+ >>> x is item
+ 1
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ u'foo'
+
+ We have an added event:
+
+ >>> event.__class__.__name__
+ 'ObjectAddedEvent'
+ >>> event.object is item
+ 1
+ >>> event.newParent is container
+ 1
+ >>> event.newName
+ u'foo'
+ >>> event.oldParent
+ >>> event.oldName
+
+ Now if we call contained again:
+
+ >>> x2, event = containedEvent(item, container, u'foo')
+ >>> x2 is item
+ 1
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ u'foo'
+
+ We don't get a new added event:
+
+ >>> event
+
+ If the object already had a parent but the parent or name was
+ different, we get a moved event:
+
+ >>> x, event = containedEvent(item, container, u'foo2')
+ >>> event.__class__.__name__
+ 'ObjectMovedEvent'
+ >>> event.object is item
+ 1
+ >>> event.newParent is container
+ 1
+ >>> event.newName
+ u'foo2'
+ >>> event.oldParent is container
+ 1
+ >>> event.oldName
+ u'foo'
+
+ If the object implements ILocation, but not IContained, set it's
+ __parent__ and __name__ attributes *and* declare that it
+ implements IContained:
+
+ >>> from zope.app.location import Location
+ >>> item = Location()
+ >>> IContained.isImplementedBy(item)
+ 0
+ >>> x, event = containedEvent(item, container, 'foo')
+ >>> x is item
+ 1
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ 'foo'
+ >>> IContained.isImplementedBy(item)
+ 1
+
+ If the object doesn't even implement ILocation, put a
+ ContainedProxy around it:
+
+ >>> item = []
+ >>> x, event = containedEvent(item, container, 'foo')
+ >>> x is item
+ 0
+ >>> x.__parent__ is container
+ 1
+ >>> x.__name__
+ 'foo'
+
+ """
+
+ if not IContained.isImplementedBy(object):
+ if ILocation.isImplementedBy(object):
+ zope.interface.directlyProvides(object, IContained)
+ else:
+ object = ContainedProxy(object)
+
+ oldparent = object.__parent__
+ oldname = object.__name__
+
+ if oldparent is container and oldname == name:
+ # No events
+ return object, None
+
+ object.__parent__ = container
+ object.__name__ = name
+
+ if oldparent is None or oldname is None:
+ event = ObjectAddedEvent(object, container, name)
+ else:
+ event = ObjectMovedEvent(object, oldparent, oldname, container, name)
+
+ return object, event
+
+def contained(object, container, name=None):
+ """Establish the containment of the object in the container
+
+ Just return the contained object without an event. This is a convenience
+ "macro" for:
+
+ containedEvent(object, container, name)[0]
+
+ This function is only used for tests.
+ """
+ return containedEvent(object, container, name)[0]
+
+def setitem(container, setitemf, name, object):
+ """Helper function to set an item and generate needed events
+
+ This helper is needed, in part, because the events need to get
+ published after the object has been added to the container.
+
+ If the item implements IContained, simply set it's __parent__
+ and __name attributes:
+
+ >>> class Item(Contained):
+ ... zope.interface.implements(IAddNotifiable, IMoveNotifiable)
+ ... def addNotify(self, event):
+ ... self.added = event
+ ... def moveNotify(self, event):
+ ... self.moved = event
+
+ >>> item = Item()
+
+ >>> container = {}
+ >>> setitem(container, container.__setitem__, u'c', item)
+ >>> container[u'c'] is item
+ 1
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ u'c'
+
+ If we run this using the testing framework, we'll use getEvents to
+ track the events generated:
+
+ >>> from zope.app.event.tests.placelesssetup import getEvents
+ >>> from zope.app.interfaces.container import IObjectAddedEvent
+ >>> from zope.app.interfaces.event import IObjectModifiedEvent
+
+ We have an added event:
+
+ >>> len(getEvents(IObjectAddedEvent))
+ 1
+ >>> event = getEvents(IObjectAddedEvent)[-1]
+ >>> event.object is item
+ 1
+ >>> event.newParent is container
+ 1
+ >>> event.newName
+ u'c'
+ >>> event.oldParent
+ >>> event.oldName
+
+ As well as a modification event for the container:
+
+ >>> len(getEvents(IObjectModifiedEvent))
+ 1
+ >>> getEvents(IObjectModifiedEvent)[-1].object is container
+ 1
+
+ The item's hooks have been called:
+
+ >>> item.added is event
+ 1
+ >>> item.moved is event
+ 1
+
+ We can suppress events and hooks by setting the __parent__ and
+ __name__ first:
+
+ >>> item = Item()
+ >>> item.__parent__, item.__name__ = container, 'c2'
+ >>> setitem(container, container.__setitem__, u'c2', item)
+ >>> len(container)
+ 2
+ >>> len(getEvents(IObjectAddedEvent))
+ 1
+ >>> len(getEvents(IObjectModifiedEvent))
+ 1
+
+ >>> getattr(item, 'added', None)
+ >>> getattr(item, 'moved', None)
+
+ If the item had a parent or name (as in a move or rename),
+ we generate a move event, rather than an add event:
+
+ >>> setitem(container, container.__setitem__, u'c3', item)
+ >>> len(container)
+ 3
+ >>> len(getEvents(IObjectAddedEvent))
+ 1
+ >>> len(getEvents(IObjectModifiedEvent))
+ 2
+ >>> from zope.app.interfaces.container import IObjectMovedEvent
+ >>> len(getEvents(IObjectMovedEvent))
+ 2
+
+ (Note that we have 2 move events because add are move events.)
+
+ We also get the move hook called, but not the add hook:
+
+ >>> event = getEvents(IObjectMovedEvent)[-1]
+ >>> getattr(item, 'added', None)
+ >>> item.moved is event
+ 1
+
+ If we try to replace an item without deleting it first, we'll get
+ an error:
+
+ >>> setitem(container, container.__setitem__, u'c', [])
+ Traceback (most recent call last):
+ ...
+ DuplicationError: c
+
+
+ >>> del container[u'c']
+ >>> setitem(container, container.__setitem__, u'c', [])
+ >>> len(getEvents(IObjectAddedEvent))
+ 2
+ >>> len(getEvents(IObjectModifiedEvent))
+ 3
+
+
+ If the object implements ILocation, but not IContained, set it's
+ __parent__ and __name__ attributes *and* declare that it
+ implements IContained:
+
+ >>> from zope.app.location import Location
+ >>> item = Location()
+ >>> IContained.isImplementedBy(item)
+ 0
+ >>> setitem(container, container.__setitem__, u'l', item)
+ >>> container[u'l'] is item
+ 1
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ u'l'
+ >>> IContained.isImplementedBy(item)
+ 1
+
+ We get new added and modification events:
+
+ >>> len(getEvents(IObjectAddedEvent))
+ 3
+ >>> len(getEvents(IObjectModifiedEvent))
+ 4
+
+ If the object doesn't even implement ILocation, put a
+ ContainedProxy around it:
+
+ >>> item = []
+ >>> setitem(container, container.__setitem__, u'i', item)
+ >>> container[u'i']
+ []
+ >>> container[u'i'] is item
+ 0
+ >>> item = container[u'i']
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ u'i'
+ >>> IContained.isImplementedBy(item)
+ 1
+
+ >>> len(getEvents(IObjectAddedEvent))
+ 4
+ >>> len(getEvents(IObjectModifiedEvent))
+ 5
+
+ We'll get type errors if we give keys that aren't unicode or ascii keys:
+
+ >>> setitem(container, container.__setitem__, 42, item)
+ Traceback (most recent call last):
+ ...
+ TypeError: name not unicode or ascii string
+
+ >>> setitem(container, container.__setitem__, None, item)
+ Traceback (most recent call last):
+ ...
+ TypeError: name not unicode or ascii string
+
+ >>> setitem(container, container.__setitem__, 'hello ' + chr(200), item)
+ Traceback (most recent call last):
+ ...
+ TypeError: name not unicode or ascii string
+
+ and we'll get a value error of we give an empty string or unicode:
+
+ >>> setitem(container, container.__setitem__, '', item)
+ Traceback (most recent call last):
+ ...
+ ValueError: empty names are not allowed
+
+ >>> setitem(container, container.__setitem__, u'', item)
+ Traceback (most recent call last):
+ ...
+ ValueError: empty names are not allowed
+
+ """
+
+ # Do basic name check:
+ if isinstance(name, str):
+ try:
+ name = unicode(name)
+ except UnicodeError:
+ raise TypeError("name not unicode or ascii string")
+ elif not isinstance(name, unicode):
+ raise TypeError("name not unicode or ascii string")
+
+ if not name:
+ raise ValueError("empty names are not allowed")
+
+ old = container.get(name)
+ if old is object:
+ return
+ if old is not None:
+ raise DuplicationError(name)
+
+ object, event = containedEvent(object, container, name)
+ setitemf(name, object)
+ if event:
+ if event.__class__ is ObjectAddedEvent:
+ a = zapi.queryAdapter(object, IAddNotifiable)
+ if a is not None:
+ a.addNotify(event)
+ a = zapi.queryAdapter(object, IMoveNotifiable)
+ if a is not None:
+ a.moveNotify(event)
+ publish(container, event)
+ modified(container)
+
+fixing_up = False
+def uncontained(object, container, name=None):
+ """Clear the containment relationship between the object amd the container
+
+ If we run this using the testing framework, we'll use getEvents to
+ track the events generated:
+
+ >>> from zope.app.event.tests.placelesssetup import getEvents
+ >>> from zope.app.interfaces.container import IObjectRemovedEvent
+ >>> from zope.app.interfaces.event import IObjectModifiedEvent
+
+ We'll start by creating a container with an item:
+
+ >>> class Item(Contained):
+ ... zope.interface.implements(IRemoveNotifiable)
+ ... def removeNotify(self, event):
+ ... self.removed = event
+
+ >>> item = Item()
+ >>> container = {u'foo': item}
+ >>> x, event = containedEvent(item, container, u'foo')
+ >>> item.__parent__ is container
+ 1
+ >>> item.__name__
+ u'foo'
+
+ Now we'll remove the item. It's parent and name are cleared:
+
+ >>> uncontained(item, container, u'foo')
+ >>> item.__parent__
+ >>> item.__name__
+
+ We now have a new removed event:
+
+ >>> len(getEvents(IObjectRemovedEvent))
+ 1
+ >>> event = getEvents(IObjectRemovedEvent)[-1]
+ >>> event.object is item
+ 1
+ >>> event.oldParent is container
+ 1
+ >>> event.oldName
+ u'foo'
+ >>> event.newParent
+ >>> event.newName
+
+ As well as a modification event for the container:
+
+ >>> len(getEvents(IObjectModifiedEvent))
+ 1
+ >>> getEvents(IObjectModifiedEvent)[-1].object is container
+ 1
+
+ The reoved hook was also called:
+
+ >>> item.removed is event
+ 1
+
+ Now if we call uncontained again:
+
+ >>> uncontained(item, container, u'foo')
+
+ We won't get any new events, because __parent__ and __name__ are None:
+
+ >>> len(getEvents(IObjectRemovedEvent))
+ 1
+ >>> len(getEvents(IObjectModifiedEvent))
+ 1
+
+ But, if either the name or parent are not None and they are not
+ the container and the old name, we'll get a modified event but not
+ a removed event.
+
+ >>> item.__parent__, item.__name__ = container, None
+ >>> uncontained(item, container, u'foo')
+ >>> len(getEvents(IObjectRemovedEvent))
+ 1
+ >>> len(getEvents(IObjectModifiedEvent))
+ 2
+
+ >>> item.__parent__, item.__name__ = None, u'bar'
+ >>> uncontained(item, container, u'foo')
+ >>> len(getEvents(IObjectRemovedEvent))
+ 1
+ >>> len(getEvents(IObjectModifiedEvent))
+ 3
+
+ """
+ try:
+ oldparent = object.__parent__
+ oldname = object.__name__
+ except AttributeError:
+ # The old object doesn't implements IContained
+ # Maybe we're converting old data:
+ if not fixing_up:
+ raise
+ oldparent = None
+ oldname = None
+
+ if oldparent is not container or oldname != name:
+ if oldparent is not None or oldname is not None:
+ modified(container)
+ return
+
+ event = ObjectRemovedEvent(object, oldparent, oldname)
+ a = zapi.queryAdapter(object, IRemoveNotifiable)
+ if a is not None:
+ a.removeNotify(event)
+ a = zapi.queryAdapter(object, IMoveNotifiable)
+ if a is not None:
+ a.moveNotify(event)
+ publish(container, event)
+
+ object.__parent__ = None
+ object.__name__ = None
+ modified(container)
+
+class NameChooser:
+
+ zope.interface.implements(INameChooser)
+
+ def __init__(self, context):
+ self.context = context
+
+ def checkName(self, name, object):
+ "See zope.app.interfaces.container.INameChooser"
+
+ if not name:
+ raise zapi.UserError(
+ _("An empty name was provided. Names cannot be empty.")
+ )
+
+ if isinstance(name, str):
+ name = unicode(name)
+ elif not isinstance(name, unicode):
+ raise TypeError("Invalid name type", type(name))
+
+ if name[:1] in '+@' or '/' in name:
+ raise zapi.UserError(
+ _("Names cannot begin with '+' or '@' or contain '/'")
+ )
+
+ if name in self.context:
+ raise zapi.UserError(
+ _("The given name is already being used")
+ )
+
+ return True
+
+
+ def chooseName(self, name, object):
+ "See zope.app.interfaces.container.INameChooser"
+
+ container = self.context
+
+ if not name:
+ name = object.__class__.__name__
+
+ dot = name.rfind('.')
+ if dot >= 0:
+ suffix = name[dot:]
+ name = name[:dot]
+ else:
+ suffix = ''
+
+
+ n = name + suffix
+ i = 1
+ while n in container:
+ i += 1
+ n = name + u'-' + unicode(i) + suffix
+
+ # Make sure tha name is valid. We may have started with something bad.
+ self.checkName(n, object)
+
+ return n
+
+
+class DecoratorSpecificationDescriptor(ObjectSpecificationDescriptor):
+ """Support for interface declarations on decorators
+
+ >>> from zope.interface import *
+ >>> class I1(Interface):
+ ... pass
+ >>> class I2(Interface):
+ ... pass
+ >>> class I3(Interface):
+ ... pass
+ >>> class I4(Interface):
+ ... pass
+
+ >>> class D1(ContainedProxy):
+ ... implements(I1)
+
+
+ >>> class D2(ContainedProxy):
+ ... implements(I2)
+
+ >>> class X:
+ ... implements(I3)
+
+ >>> x = X()
+ >>> directlyProvides(x, I4)
+
+ Interfaces of X are ordered with the directly-provided interfaces first
+
+ >>> [interface.getName() for interface in list(providedBy(x))]
+ ['I4', 'I3']
+
+ When we decorate objects, what order should the interfaces come
+ in? One could argue that decorators are less specific, so they
+ should come last.
+
+ >>> [interface.getName() for interface in list(providedBy(D1(x)))]
+ ['I4', 'I3', 'I1', 'IContained', 'IPersistent']
+
+ >>> [interface.getName() for interface in list(providedBy(D2(D1(x))))]
+ ['I4', 'I3', 'I1', 'IContained', 'IPersistent', 'I2']
+ """
+ def __get__(self, inst, cls=None):
+ if inst is None:
+ return getObjectSpecification(cls)
+ else:
+ provided = providedBy(getProxiedObject(inst))
+
+ # Use type rather than __class__ because inst is a proxy and
+ # will return the proxied object's class.
+ cls = type(inst)
+ return ObjectSpecification(provided, cls)
+
+
+class DecoratedSecurityCheckerDescriptor(object):
+ """Descriptor for a Decorator that provides a decorated security checker.
+ """
+ def __get__(self, inst, cls=None):
+ if inst is None:
+ return self
+ else:
+ proxied_object = getProxiedObject(inst)
+ checker = getattr(proxied_object, '__Security_checker__', None)
+ if checker is None:
+ checker = selectChecker(proxied_object)
+ wrapper_checker = selectChecker(inst)
+ if wrapper_checker is None:
+ return checker
+ elif checker is None:
+ return wrapper_checker
+ else:
+ return CombinedChecker(wrapper_checker, checker)
+
+
+class ContainedProxy(ContainedProxyBase):
+
+ __safe_for_unpickling__ = True
+
+ zope.interface.implements(IContained)
+
+ __providedBy__ = DecoratorSpecificationDescriptor()
+
+ __Security_checker__ = DecoratedSecurityCheckerDescriptor()
+
+
+##############################################################################
+# Parentgeddon fixup:
+
+from zope.app.event.function import Subscriber
+from zope.app.interfaces.content.folder import IRootFolder
+from zope.app.interfaces.container import IWriteContainer
+from zope.app.interfaces.services.service import IPossibleSite
+from zope.app.interfaces.container import IContainer
+from transaction import get_transaction
+from zope.component.exceptions import ComponentLookupError
+from zope.app.interfaces.services.registration import IRegistry, INameRegistry
+from zope.app.services.registration import RegistrationStack
+
+def parentgeddonFixup(event):
+ """Fixup databases to work with the result of parentgeddon.
+ """
+
+ database = event.database
+ connection = database.open()
+ app = connection.root().get('Application')
+ if app is None:
+ # No old data
+ return
+
+ if IRootFolder.isImplementedBy(app):
+ # already did fixup
+ return
+
+ print "Fixing up data for parentgeddon"
+
+ zope.interface.directlyProvides(
+ app, IRootFolder,
+ zope.interface.directlyProvidedBy(app))
+
+ global fixing_up
+ fixing_up = True
+
+ try:
+ fixcontainer(app)
+ get_transaction().commit()
+ finally:
+ get_transaction().abort()
+ fixing_up = False
+ connection.close()
+
+parentgeddonFixup = Subscriber(parentgeddonFixup)
+
+def fixcontainer(container):
+
+ # Step 1: fix items:
+ for name in container:
+ ob = container[name]
+
+ if not IContained.isImplementedBy(ob):
+ # remove the old item and reassign it
+ del container[name]
+ ob = contained(ob, container, name)
+ container[name] = ob
+ elif ob.__parent__ is not container:
+ ob.__parent__ = container
+ ob.__name__ = name
+
+ if IContainer.isImplementedBy(ob):
+ fixcontainer(ob)
+
+ if IRegistry.isImplementedBy(ob):
+ fixregistry(ob)
+
+ if IPossibleSite.isImplementedBy(container):
+ try:
+ sm = container.getSiteManager()
+ except ComponentLookupError:
+ pass # nothing to do
+ else:
+ fixupsitemanager(sm, container)
+
+def fixupsitemanager(sm, container):
+ sm.__parent__ = container
+ sm.__name__ = '++etc++site'
+ sm._SampleContainer__data = sm.Packages._SampleContainer__data
+ del sm.Packages
+ fixregistry(sm)
+ fixcontainer(sm)
+
+def fixregistry(registry):
+ if INameRegistry.isImplementedBy(registry):
+ for name in registry.listRegistrationNames():
+ stack = registry.queryRegistrations(name)
+ stack.__parent__ = registry
+ return
+ for data in registry.getRegisteredMatching():
+ for ob in data:
+ if isinstance(ob, RegistrationStack):
+ ob.__parent__ = registry
+
More information about the Zope3-Checkins
mailing list