[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/OFS/Services/tests - TestingConfigurationRegistry.py:1.2 TestingServiceManager.py:1.2 __init__.py:1.2 testConfigurationRegistry.py:1.2 testConfigurationStatusProperty.py:1.2

Jim Fulton jim@zope.com
Sat, 30 Nov 2002 13:35:56 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/App/OFS/Services/tests
In directory cvs.zope.org:/tmp/cvs-serv11713/lib/python/Zope/App/OFS/Services/tests

Added Files:
	TestingConfigurationRegistry.py TestingServiceManager.py 
	__init__.py testConfigurationRegistry.py 
	testConfigurationStatusProperty.py 
Log Message:
Added a framework for managing configuration registration. This should
make it much easier to implement configurable services.



=== Zope3/lib/python/Zope/App/OFS/Services/tests/TestingConfigurationRegistry.py 1.1 => 1.2 ===
--- /dev/null	Sat Nov 30 13:35:56 2002
+++ Zope3/lib/python/Zope/App/OFS/Services/tests/TestingConfigurationRegistry.py	Sat Nov 30 13:35:55 2002
@@ -0,0 +1,103 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+# 
+##############################################################################
+
+__metaclass__ = type
+
+class TestingConfiguration:
+    def __init__(self, id):
+        self.id = id
+
+    def __eq__(self, other):
+        return self.id == getattr(other, 'id', 0)
+    
+class TestingConfigurationRegistry:
+
+    class_ = TestingConfiguration
+
+    def __init__(self, *args):
+        self._data = args
+
+    def register(self, configuration):
+        cid = configuration.id
+
+        if self._data:
+            if cid in self._data:
+                return # already registered
+        else:
+            # Nothing registered. Need to stick None in front so that nothing
+            # is active.
+            self._data = (None, )
+
+        self._data += (cid, )
+
+    def unregister(self, configuration):
+        cid = configuration.id
+
+        data = self._data
+        if data:
+            if data[0] == cid:
+                # It's active, we need to switch in None
+                self._data = (None, ) + data[1:]
+            else:
+                self._data = tuple([item for item in data if item != cid])
+
+    def registered(self, configuration):
+        cid = configuration.id
+        return cid in self._data
+
+    def activate(self, configuration):
+        cid = configuration.id
+        if self._data[0] == cid:
+            return # already active
+
+        if self._data[0] is None:
+            # Remove leading None marker
+            self._data = self._data[1:]
+
+        self._data = (cid, ) + tuple(
+            [item for item in self._data if item != cid]
+            )
+
+    def deactivate(self, configuration):
+        cid = configuration.id
+        if self._data[0] != cid:
+            return # already inactive
+
+        # Just stick None on the front
+        self._data = (None, ) + self._data
+
+    def active(self):
+        if self._data:
+            return self.class_(self._data[0])
+                
+        return None
+
+    def __nonzero__(self):
+        return bool(self._data)
+
+    def info(self):
+        result = [{'id': path,
+                   'active': False,
+                   'configuration': self.class_(path),
+                   }
+                  for path in self._data
+                  ]
+
+        if result:
+            if result[0]['configuration'] is None:
+                del result[0]
+            else:
+                result[0]['active'] = True
+        
+        return result


=== Zope3/lib/python/Zope/App/OFS/Services/tests/TestingServiceManager.py 1.1 => 1.2 ===
--- /dev/null	Sat Nov 30 13:35:56 2002
+++ Zope3/lib/python/Zope/App/OFS/Services/tests/TestingServiceManager.py	Sat Nov 30 13:35:55 2002
@@ -0,0 +1,73 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+# 
+##############################################################################
+"""
+$Id$
+"""
+
+__metaclass__ = type
+
+from Zope.ComponentArchitecture.IServiceService import IServiceService
+from Zope.App.ComponentArchitecture.NextService \
+     import getNextService, getNextServiceManager
+from Zope.Proxy.ContextWrapper import ContextWrapper
+from Zope.App.OFS.Services.ServiceManager.IBindingAware import IBindingAware
+from Zope.ContextWrapper import ContextMethod
+
+class TestingServiceManager:
+    """Simple placeful service manager used for writing tests
+    """
+    __implements__ =  IServiceService
+
+    def getServiceDefinitions(self):
+        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
+        return getNextServiceManager.getServiceDefinitions()
+
+    def getInterfaceFor(self, name):
+        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
+        return getNextServiceManager.getServiceDefinitions()
+
+    def getService(self, name):
+        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
+        if hasattr(self, name):
+            return ContextWrapper(getattr(self, name), self, name=name)
+        return getNextServiceManager(self).getService(name)
+
+    getService = ContextMethod(getService)
+
+    def queryService(self, name, default=None):
+        "See Zope.ComponentArchitecture.IServiceService.IServiceService"
+        if hasattr(self, name):
+            return ContextWrapper(getattr(self, name), self, name=name)
+        return getNextServiceManager(self).queryService(name, default)
+
+    queryService = ContextMethod(queryService)
+
+    def bindService(self, name, ob):
+        setattr(self, name, ob)
+        if IBindingAware.isImplementedBy(ob):
+            ob.bound(name)
+
+    bindService = ContextMethod(bindService)
+
+    def unbindService(self, name):
+        ob = getattr(self, name)
+        if IBindingAware.isImplementedBy(ob):
+            ob.unbound(name)
+        delattr(self, name, ob)
+
+    unbindService = ContextMethod(unbindService)
+
+
+__doc__ = TestingServiceManager.__doc__ + __doc__
+


=== Zope3/lib/python/Zope/App/OFS/Services/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null	Sat Nov 30 13:35:56 2002
+++ Zope3/lib/python/Zope/App/OFS/Services/tests/__init__.py	Sat Nov 30 13:35:55 2002
@@ -0,0 +1,14 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+# 
+##############################################################################
+


=== Zope3/lib/python/Zope/App/OFS/Services/tests/testConfigurationRegistry.py 1.1 => 1.2 ===
--- /dev/null	Sat Nov 30 13:35:56 2002
+++ Zope3/lib/python/Zope/App/OFS/Services/tests/testConfigurationRegistry.py	Sat Nov 30 13:35:55 2002
@@ -0,0 +1,217 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""XXX short summary goes here.
+
+XXX longer description goes here.
+
+$Id$
+"""
+
+from unittest import TestCase, TestSuite, main, makeSuite
+from Zope.App.OFS.Services.ServiceManager.tests.PlacefulSetup \
+     import PlacefulSetup
+from Zope.Proxy.ContextWrapper import ContextWrapper, getItem
+from Zope.App.OFS.Services.Configuration import ConfigurationRegistry
+from Zope.App.OFS.Services.ServiceManager.ServiceManager import ServiceManager
+from Zope.App.Traversing import traverse
+
+class Configuration:
+
+    active = 0
+
+    def activated(self):
+        self.active += 1
+
+    def deactivated(self):
+        self.active -= 1
+
+
+class Test(PlacefulSetup, TestCase):
+
+    def setUp(self):
+        PlacefulSetup.setUp(self)
+        self.buildFolders()
+        root = self.rootFolder
+
+        root.setServiceManager(ServiceManager())
+        self.__default = traverse(root, "++etc++Services/Packages/default")
+        self.__registry = ContextWrapper(ConfigurationRegistry(), root)
+
+    def __config(self, name):
+        self.__default.setObject(name, Configuration())
+        return getItem(self.__default, name)
+        
+    def test_register_and_registered_and_nonzero_and_active(self):
+        registry = self.__registry
+
+        self.assertEqual(registry.active(), None)
+
+        self.failIf(registry)
+        self.__c1 = c1 = self.__config("1")
+        registry.register(c1)
+        self.failUnless(registry)
+        self.failUnless(registry.registered(c1))
+        self.assertEqual(c1.active, 0)
+
+        self.assertEqual(registry.active(), None)
+
+        self.__c2 = c2 = self.__config("2")
+        self.failIf(registry.registered(c2))
+        registry.register(c2)
+        self.failUnless(registry)
+        self.failUnless(registry.registered(c2))
+        self.assertEqual(c2.active, 0)
+
+        
+    def test_unregister_and_registered_and_nonzero(self):
+        # reuse registration test to set things up (more)
+        self.test_register_and_registered_and_nonzero_and_active()
+
+        registry = self.__registry
+
+        c1 = self.__c1
+        registry.unregister(c1)
+        self.failIf(registry.registered(c1))
+        self.assertEqual(c1.active, 0)
+
+        c2 = self.__c2
+        registry.unregister(c2)
+        self.failIf(registry.registered(c2))
+        self.assertEqual(c2.active, 0)
+        
+        self.failIf(registry)
+
+    def test_activate_and_active(self):
+        # reuse registration test to set things up (more)        
+        self.test_register_and_registered_and_nonzero_and_active()
+
+        registry = self.__registry
+        self.assertEqual(registry.active(), None)
+
+        c1 = self.__c1
+        c2 = self.__c2
+        
+        registry.activate(c2)
+        self.assertEqual(c1.active, 0)
+        self.failUnless(registry.registered(c1))
+        self.assertEqual(c2.active, 1)
+        self.failUnless(registry.registered(c2))
+        self.assertEqual(registry.active(), c2)
+        
+        registry.activate(c2)
+        self.assertEqual(c1.active, 0)
+        self.failUnless(registry.registered(c1))
+        self.assertEqual(c2.active, 1)
+        self.failUnless(registry.registered(c2))
+        self.assertEqual(registry.active(), c2)
+        
+        registry.activate(c1)
+        self.assertEqual(c1.active, 1)
+        self.failUnless(registry.registered(c1))
+        self.assertEqual(c2.active, 0)
+        self.failUnless(registry.registered(c2))
+        self.assertEqual(registry.active(), c1)
+
+    def test_activate_unregistered(self):
+        registry = self.__registry
+        self.assertRaises(ValueError, registry.activate, self.__config('3'))
+        self.test_activate_and_active()
+        self.assertRaises(ValueError, registry.activate, self.__config('4'))
+        
+    def test_deactivate(self):
+        self.test_activate_and_active()
+
+        registry = self.__registry
+        c1 = self.__c1
+        c2 = self.__c2
+        self.assertEqual(registry.active(), c1)
+
+        registry.deactivate(c2)
+        self.assertEqual(c2.active, 0)
+        self.assertEqual(registry.active(), c1)
+
+        registry.deactivate(c1)
+        self.assertEqual(c2.active, 0)
+        self.assertEqual(c1.active, 0)
+        self.assertEqual(registry.active(), None)
+
+        self.failUnless(registry.registered(c1))
+        self.failUnless(registry.registered(c2))
+
+    def test_unregister_active(self):
+        self.test_activate_and_active()
+
+        registry = self.__registry
+        c1 = self.__c1
+        c2 = self.__c2
+        self.assertEqual(registry.active(), c1)
+
+        registry.unregister(c1)
+        self.assertEqual(c2.active, 0)
+        self.assertEqual(c1.active, 0)
+        self.assertEqual(registry.active(), None)
+
+        self.failIf(registry.registered(c1))
+        self.failUnless(registry.registered(c2))
+
+    def test_deactivate_unregistered(self):
+        registry = self.__registry
+        self.assertRaises(ValueError, registry.deactivate, self.__config('3'))
+
+    def test_info(self):
+        self.test_activate_and_active()
+
+        registry = self.__registry
+        c1 = self.__c1
+        c2 = self.__c2
+
+        info = registry.info()
+        info.sort(lambda a, b: cmp(a['id'], b['id']))
+        self.assertEqual(
+            info,
+            [
+              {'id': 'default/1',
+               'active': True,
+               'configuration': c1,
+               },
+              {'id': 'default/2',
+               'active': False,
+               'configuration': c2,
+               },
+              ])
+        
+        registry.deactivate(c1)
+
+        info = registry.info()
+        info.sort(lambda a, b: cmp(a['id'], b['id']))
+        self.assertEqual(
+            info,
+            [
+              {'id': 'default/1',
+               'active': False,
+               'configuration': c1,
+               },
+              {'id': 'default/2',
+               'active': False,
+               'configuration': c2,
+               },
+              ])
+
+def test_suite():
+    return TestSuite((
+        makeSuite(Test),
+        ))
+
+if __name__=='__main__':
+    main(defaultTest='test_suite')


=== Zope3/lib/python/Zope/App/OFS/Services/tests/testConfigurationStatusProperty.py 1.1 => 1.2 ===
--- /dev/null	Sat Nov 30 13:35:56 2002
+++ Zope3/lib/python/Zope/App/OFS/Services/tests/testConfigurationStatusProperty.py	Sat Nov 30 13:35:55 2002
@@ -0,0 +1,116 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""XXX short summary goes here.
+
+XXX longer description goes here.
+
+$Id$
+"""
+
+from unittest import TestCase, TestSuite, main, makeSuite
+from Zope.ComponentArchitecture.IServiceService import IServiceService
+from Zope.App.OFS.Services.ServiceManager.tests.PlacefulSetup \
+     import PlacefulSetup
+from Zope.App.OFS.Services.RoleService.RoleService import RoleService
+from TestingConfigurationRegistry \
+     import TestingConfigurationRegistry, TestingConfiguration
+from Zope.App.OFS.Services.Configuration import ConfigurationStatusProperty
+from Zope.App.OFS.Services.ConfigurationInterfaces \
+     import Active, Unregistered, Registered
+from Zope.Proxy.ContextWrapper import ContextWrapper
+
+class TestingConfiguration(TestingConfiguration):
+    status = ConfigurationStatusProperty("Services")
+    service_type = "Test"
+
+class TestingConfigurationRegistry(TestingConfigurationRegistry):
+    class_ = TestingConfiguration
+
+class TestingServiceManager:
+
+    __implements__ = IServiceService # I lied
+
+    registry = None
+
+    def getService(self, name):
+        if name == "Services":
+            return self
+        raise ValueError("Wrong service name", name)
+
+    def queryConfigurationsFor(self, configuration, default=None):
+        if configuration.service_type != "Test":
+            raise ValueError("Bad service type", configuration.service_type)
+        return self.registry
+
+    def createConfigurationsFor(self, configuration):
+        if configuration.service_type != "Test":
+            raise ValueError("Bad service type", configuration.service_type)
+        self.registry = TestingConfigurationRegistry()
+        return self.registry
+
+
+class Test(PlacefulSetup, TestCase):
+
+    def setUp(self):
+        PlacefulSetup.setUp(self)
+        self.buildFolders()
+        self.__sm = TestingServiceManager()
+        self.rootFolder.setServiceManager(self.__sm)
+
+    def test(self):
+
+        configa = ContextWrapper(TestingConfiguration('a'), self.rootFolder)
+        self.assertEqual(configa.status, Unregistered)
+
+        configa.status = Registered
+        self.assertEqual(self.__sm.registry._data, (None, 'a'))
+        self.assertEqual(configa.status, Registered)
+
+        configa.status = Active
+        self.assertEqual(self.__sm.registry._data, ('a', ))
+        self.assertEqual(configa.status, Active)
+
+        configb = ContextWrapper(TestingConfiguration('b'), self.rootFolder)
+        self.assertEqual(self.__sm.registry._data, ('a', ))
+        self.assertEqual(configb.status, Unregistered)
+
+        configb.status = Registered
+        self.assertEqual(self.__sm.registry._data, ('a', 'b'))
+        self.assertEqual(configb.status, Registered)
+
+        configc = ContextWrapper(TestingConfiguration('c'), self.rootFolder)
+        self.assertEqual(configc.status, Unregistered)
+        self.assertEqual(self.__sm.registry._data, ('a', 'b'))
+
+        configc.status = Registered
+        self.assertEqual(self.__sm.registry._data, ('a', 'b', 'c'))
+        self.assertEqual(configc.status, Registered)
+
+        configc.status = Active
+        self.assertEqual(self.__sm.registry._data, ('c', 'a', 'b'))
+        self.assertEqual(configc.status, Active)
+
+        configc.status = Unregistered
+        self.assertEqual(self.__sm.registry._data, (None, 'a', 'b'))
+        self.assertEqual(configc.status, Unregistered)
+        self.assertEqual(configb.status, Registered)
+        self.assertEqual(configa.status, Registered)
+
+def test_suite():
+    return TestSuite((
+        makeSuite(Test),
+        ))
+
+if __name__=='__main__':
+    main(defaultTest='test_suite')