[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/OFS/Services/tests - TestingConfigurationRegistry.py:1.1.2.1 TestingServiceManager.py:1.1.2.1 __init__.py:1.1.2.1 testConfigurationRegistry.py:1.1.2.1 testConfigurationStatusProperty.py:1.1.2.1

Jim Fulton jim@zope.com
Sat, 30 Nov 2002 07:44:32 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/App/OFS/Services/tests
In directory cvs.zope.org:/tmp/cvs-serv30047/lib/python/Zope/App/OFS/Services/tests

Added Files:
      Tag: Zope3-Bangalore-TTW-Branch
	TestingConfigurationRegistry.py TestingServiceManager.py 
	__init__.py testConfigurationRegistry.py 
	testConfigurationStatusProperty.py 
Log Message:
Refactored the way TTW component registration is done.  There are now
separate registry objects that abstract the machinery for registering
multiple conflicting configurations and deciding which, if any are
active.  Also provided a new field and widget for the status
information.

Along the way, cleaned up and streamlined placeful testing
infrastructure a bit.

Now checking into branch. Will give file-by-file (or at least more
specific logs) when the changes are merged into the head.


=== Added File Zope3/lib/python/Zope/App/OFS/Services/tests/TestingConfigurationRegistry.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################

__metaclass__ = type

class TestingConfiguration:
    def __init__(self, id):
        self.id = id

    def __eq__(self, other):
        return self.id == getattr(other, 'id', 0)
    
class TestingConfigurationRegistry:

    class_ = TestingConfiguration

    def __init__(self, *args):
        self._data = args

    def register(self, configuration):
        cid = configuration.id

        if self._data:
            if cid in self._data:
                return # already registered
        else:
            # Nothing registered. Need to stick None in front so that nothing
            # is active.
            self._data = (None, )

        self._data += (cid, )

    def unregister(self, configuration):
        cid = configuration.id

        data = self._data
        if data:
            if data[0] == cid:
                # It's active, we need to switch in None
                self._data = (None, ) + data[1:]
            else:
                self._data = tuple([item for item in data if item != cid])

    def registered(self, configuration):
        cid = configuration.id
        return cid in self._data

    def activate(self, configuration):
        cid = configuration.id
        if self._data[0] == cid:
            return # already active

        if self._data[0] is None:
            # Remove leading None marker
            self._data = self._data[1:]

        self._data = (cid, ) + tuple(
            [item for item in self._data if item != cid]
            )

    def deactivate(self, configuration):
        cid = configuration.id
        if self._data[0] != cid:
            return # already inactive

        # Just stick None on the front
        self._data = (None, ) + self._data

    def active(self):
        if self._data:
            return self.class_(self._data[0])
                
        return None

    def __nonzero__(self):
        return bool(self._data)

    def info(self):
        result = [{'id': path,
                   'active': False,
                   'configuration': self.class_(path),
                   }
                  for path in self._data
                  ]

        if result:
            if result[0]['configuration'] is None:
                del result[0]
            else:
                result[0]['active'] = True
        
        return result


=== Added File Zope3/lib/python/Zope/App/OFS/Services/tests/TestingServiceManager.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""
$Id: TestingServiceManager.py,v 1.1.2.1 2002/11/30 12:44:31 jim Exp $
"""

__metaclass__ = type

from Zope.ComponentArchitecture.IServiceService import IServiceService
from Zope.App.ComponentArchitecture.NextService \
     import getNextService, getNextServiceManager
from Zope.Proxy.ContextWrapper import ContextWrapper
from Zope.App.OFS.Services.ServiceManager.IBindingAware import IBindingAware
from Zope.ContextWrapper import ContextMethod

class TestingServiceManager:
    """Simple placeful service manager used for writing tests
    """
    __implements__ =  IServiceService

    def getServiceDefinitions(self):
        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
        return getNextServiceManager.getServiceDefinitions()

    def getInterfaceFor(self, name):
        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
        return getNextServiceManager.getServiceDefinitions()

    def getService(self, name):
        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
        if hasattr(self, name):
            return ContextWrapper(getattr(self, name), self, name=name)
        return getNextServiceManager(self).getService(name)

    getService = ContextMethod(getService)

    def queryService(self, name, default=None):
        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
        if hasattr(self, name):
            return ContextWrapper(getattr(self, name), self, name=name)
        return getNextServiceManager(self).queryService(name, default)

    queryService = ContextMethod(queryService)

    def bindService(self, name, ob):
        setattr(self, name, ob)
        if IBindingAware.isImplementedBy(ob):
            ob.bound(name)

    bindService = ContextMethod(bindService)

    def unbindService(self, name):
        ob = getattr(self, name)
        if IBindingAware.isImplementedBy(ob):
            ob.unbound(name)
        delattr(self, name, ob)

    unbindService = ContextMethod(unbindService)


__doc__ = TestingServiceManager.__doc__ + __doc__



=== Added File Zope3/lib/python/Zope/App/OFS/Services/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################



=== Added File Zope3/lib/python/Zope/App/OFS/Services/tests/testConfigurationRegistry.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.

XXX longer description goes here.

$Id: testConfigurationRegistry.py,v 1.1.2.1 2002/11/30 12:44:31 jim Exp $
"""

from unittest import TestCase, TestSuite, main, makeSuite
from Zope.App.OFS.Services.ServiceManager.tests.PlacefulSetup \
     import PlacefulSetup
from Zope.Proxy.ContextWrapper import ContextWrapper, getItem
from Zope.App.OFS.Services.Configuration import ConfigurationRegistry
from Zope.App.OFS.Services.ServiceManager.ServiceManager import ServiceManager
from Zope.App.Traversing import traverse

class Configuration:

    active = 0

    def activated(self):
        self.active += 1

    def deactivated(self):
        self.active -= 1


class Test(PlacefulSetup, TestCase):

    def setUp(self):
        PlacefulSetup.setUp(self)
        self.buildFolders()
        root = self.rootFolder

        root.setServiceManager(ServiceManager())
        self.__default = traverse(root, "++etc++Services/Packages/default")
        self.__registry = ContextWrapper(ConfigurationRegistry(), root)

    def __config(self, name):
        self.__default.setObject(name, Configuration())
        return getItem(self.__default, name)
        
    def test_register_and_registered_and_nonzero_and_active(self):
        registry = self.__registry

        self.assertEqual(registry.active(), None)

        self.failIf(registry)
        self.__c1 = c1 = self.__config("1")
        registry.register(c1)
        self.failUnless(registry)
        self.failUnless(registry.registered(c1))
        self.assertEqual(c1.active, 0)

        self.assertEqual(registry.active(), None)

        self.__c2 = c2 = self.__config("2")
        self.failIf(registry.registered(c2))
        registry.register(c2)
        self.failUnless(registry)
        self.failUnless(registry.registered(c2))
        self.assertEqual(c2.active, 0)

        
    def test_unregister_and_registered_and_nonzero(self):
        # reuse registration test to set things up (more)
        self.test_register_and_registered_and_nonzero_and_active()

        registry = self.__registry

        c1 = self.__c1
        registry.unregister(c1)
        self.failIf(registry.registered(c1))
        self.assertEqual(c1.active, 0)

        c2 = self.__c2
        registry.unregister(c2)
        self.failIf(registry.registered(c2))
        self.assertEqual(c2.active, 0)
        
        self.failIf(registry)

    def test_activate_and_active(self):
        # reuse registration test to set things up (more)        
        self.test_register_and_registered_and_nonzero_and_active()

        registry = self.__registry
        self.assertEqual(registry.active(), None)

        c1 = self.__c1
        c2 = self.__c2
        
        registry.activate(c2)
        self.assertEqual(c1.active, 0)
        self.failUnless(registry.registered(c1))
        self.assertEqual(c2.active, 1)
        self.failUnless(registry.registered(c2))
        self.assertEqual(registry.active(), c2)
        
        registry.activate(c2)
        self.assertEqual(c1.active, 0)
        self.failUnless(registry.registered(c1))
        self.assertEqual(c2.active, 1)
        self.failUnless(registry.registered(c2))
        self.assertEqual(registry.active(), c2)
        
        registry.activate(c1)
        self.assertEqual(c1.active, 1)
        self.failUnless(registry.registered(c1))
        self.assertEqual(c2.active, 0)
        self.failUnless(registry.registered(c2))
        self.assertEqual(registry.active(), c1)

    def test_activate_unregistered(self):
        registry = self.__registry
        self.assertRaises(ValueError, registry.activate, self.__config('3'))
        self.test_activate_and_active()
        self.assertRaises(ValueError, registry.activate, self.__config('4'))
        
    def test_deactivate(self):
        self.test_activate_and_active()

        registry = self.__registry
        c1 = self.__c1
        c2 = self.__c2
        self.assertEqual(registry.active(), c1)

        registry.deactivate(c2)
        self.assertEqual(c2.active, 0)
        self.assertEqual(registry.active(), c1)

        registry.deactivate(c1)
        self.assertEqual(c2.active, 0)
        self.assertEqual(c1.active, 0)
        self.assertEqual(registry.active(), None)

        self.failUnless(registry.registered(c1))
        self.failUnless(registry.registered(c2))

    def test_unregister_active(self):
        self.test_activate_and_active()

        registry = self.__registry
        c1 = self.__c1
        c2 = self.__c2
        self.assertEqual(registry.active(), c1)

        registry.unregister(c1)
        self.assertEqual(c2.active, 0)
        self.assertEqual(c1.active, 0)
        self.assertEqual(registry.active(), None)

        self.failIf(registry.registered(c1))
        self.failUnless(registry.registered(c2))

    def test_deactivate_unregistered(self):
        registry = self.__registry
        self.assertRaises(ValueError, registry.deactivate, self.__config('3'))

    def test_info(self):
        self.test_activate_and_active()

        registry = self.__registry
        c1 = self.__c1
        c2 = self.__c2

        info = registry.info()
        info.sort(lambda a, b: cmp(a['id'], b['id']))
        self.assertEqual(
            info,
            [
              {'id': 'default/1',
               'active': True,
               'configuration': c1,
               },
              {'id': 'default/2',
               'active': False,
               'configuration': c2,
               },
              ])
        
        registry.deactivate(c1)

        info = registry.info()
        info.sort(lambda a, b: cmp(a['id'], b['id']))
        self.assertEqual(
            info,
            [
              {'id': 'default/1',
               'active': False,
               'configuration': c1,
               },
              {'id': 'default/2',
               'active': False,
               'configuration': c2,
               },
              ])

def test_suite():
    return TestSuite((
        makeSuite(Test),
        ))

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/lib/python/Zope/App/OFS/Services/tests/testConfigurationStatusProperty.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.

XXX longer description goes here.

$Id: testConfigurationStatusProperty.py,v 1.1.2.1 2002/11/30 12:44:31 jim Exp $
"""

from unittest import TestCase, TestSuite, main, makeSuite
from Zope.ComponentArchitecture.IServiceService import IServiceService
from Zope.App.OFS.Services.ServiceManager.tests.PlacefulSetup \
     import PlacefulSetup
from Zope.App.OFS.Services.RoleService.RoleService import RoleService
from TestingConfigurationRegistry \
     import TestingConfigurationRegistry, TestingConfiguration
from Zope.App.OFS.Services.Configuration import ConfigurationStatusProperty
from Zope.App.OFS.Services.ConfigurationInterfaces \
     import Active, Unregistered, Registered
from Zope.Proxy.ContextWrapper import ContextWrapper

class TestingConfiguration(TestingConfiguration):
    status = ConfigurationStatusProperty("Services")
    service_type = "Test"

class TestingConfigurationRegistry(TestingConfigurationRegistry):
    class_ = TestingConfiguration

class TestingServiceManager:

    __implements__ = IServiceService # I lied

    registry = None

    def getService(self, name):
        if name == "Services":
            return self
        raise ValueError("Wrong service name", name)

    def queryConfigurationsFor(self, configuration, default=None):
        if configuration.service_type != "Test":
            raise ValueError("Bad service type", configuration.service_type)
        return self.registry

    def createConfigurationsFor(self, configuration):
        if configuration.service_type != "Test":
            raise ValueError("Bad service type", configuration.service_type)
        self.registry = TestingConfigurationRegistry()
        return self.registry


class Test(PlacefulSetup, TestCase):

    def setUp(self):
        PlacefulSetup.setUp(self)
        self.buildFolders()
        self.__sm = TestingServiceManager()
        self.rootFolder.setServiceManager(self.__sm)

    def test(self):

        configa = ContextWrapper(TestingConfiguration('a'), self.rootFolder)
        self.assertEqual(configa.status, Unregistered)

        configa.status = Registered
        self.assertEqual(self.__sm.registry._data, (None, 'a'))
        self.assertEqual(configa.status, Registered)

        configa.status = Active
        self.assertEqual(self.__sm.registry._data, ('a', ))
        self.assertEqual(configa.status, Active)

        configb = ContextWrapper(TestingConfiguration('b'), self.rootFolder)
        self.assertEqual(self.__sm.registry._data, ('a', ))
        self.assertEqual(configb.status, Unregistered)

        configb.status = Registered
        self.assertEqual(self.__sm.registry._data, ('a', 'b'))
        self.assertEqual(configb.status, Registered)

        configc = ContextWrapper(TestingConfiguration('c'), self.rootFolder)
        self.assertEqual(configc.status, Unregistered)
        self.assertEqual(self.__sm.registry._data, ('a', 'b'))

        configc.status = Registered
        self.assertEqual(self.__sm.registry._data, ('a', 'b', 'c'))
        self.assertEqual(configc.status, Registered)

        configc.status = Active
        self.assertEqual(self.__sm.registry._data, ('c', 'a', 'b'))
        self.assertEqual(configc.status, Active)

        configc.status = Unregistered
        self.assertEqual(self.__sm.registry._data, (None, 'a', 'b'))
        self.assertEqual(configc.status, Unregistered)
        self.assertEqual(configb.status, Registered)
        self.assertEqual(configa.status, Registered)

def test_suite():
    return TestSuite((
        makeSuite(Test),
        ))

if __name__=='__main__':
    main(defaultTest='test_suite')