[Zope-Checkins] CVS: Zope/lib/python/Zope/Startup/tests - testStarter.py:1.1

Chris McDonough chrism@zope.com
Sat, 2 Aug 2003 01:45:53 -0400


Update of /cvs-repository/Zope/lib/python/Zope/Startup/tests
In directory cvs.zope.org:/tmp/cvs-serv16419/lib/python/Zope/Startup/tests

Added Files:
	testStarter.py 
Log Message:
Refactor start_zope function and add tests for its functionality.

Changes:

 - startup log handler now pays attention to the logging levels of
   the handlers defined within the config file and uses the "lowest"
   level to log messages to stdout during startup.

 - entirely removed warning when the starting user's umask is "too
   permissive".  it wasn't clear that it added any value under normal
   operations.

 - replaced ancient setuid code with code stolen from zdaemon that
   works the same but looks nicer.



=== Added File Zope/lib/python/Zope/Startup/tests/testStarter.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Tests of the ZopeStarter class """

import os
import cStringIO
import tempfile
import unittest

import ZConfig
import Zope.Startup
from Zope.Startup import ZopeStarter

from App.config import getConfiguration
import logging

TEMPNAME = tempfile.mktemp()
TEMPPRODUCTS = os.path.join(TEMPNAME, "Products")

def getSchema():
    startup = os.path.dirname(os.path.realpath(Zope.Startup.__file__))
    schemafile = os.path.join(startup, 'zopeschema.xml')
    return ZConfig.loadSchema(schemafile)

# try to preserve logging state so we don't screw up other unit tests
# that come later

logger_states = {}
for name in ('event', 'trace', 'access'):
    logger = logging.getLogger(name)
    logger_states[name] = {'level':logger.level,
                           'propagate':logger.propagate,
                           'handlers':logger.handlers,
                           'filters':logger.filters}
    
class ZopeStarterTestCase(unittest.TestCase):

    def setUp(self):
        self.schema = getSchema()
        self.original_event_logger = logging.getLogger

    def tearDown(self):
        try:
            os.rmdir(TEMPPRODUCTS)
            os.rmdir(TEMPNAME)
        except:
            pass
        # reset logger states
        for name in ('event', 'access', 'trace'):
            logger = logging.getLogger(name)
            logger.__dict__.update(logger_states[name])

    def load_config_text(self, text):
        # We have to create a directory of our own since the existence
        # of the directory is checked.  This handles this in a
        # platform-independent way.
        schema = self.schema
        sio = cStringIO.StringIO(
            text.replace("<<INSTANCE_HOME>>", TEMPNAME))
        try:
            os.mkdir(TEMPNAME)
            os.mkdir(TEMPPRODUCTS)
        except OSError, why:
            if why == 17:
                # already exists
                pass
        conf, handler = ZConfig.loadConfigFile(schema, sio)
        self.assertEqual(conf.instancehome, TEMPNAME)
        return conf

    def testSetupLocale(self):
        import locale
        try:
            conf = self.load_config_text("""
                instancehome <<INSTANCE_HOME>>
                locale fr_FR"""
                )
            starter = ZopeStarter(conf)
            starter.setupLocale()
            self.assertEqual(locale.getlocale(), ['fr_FR', 'ISO8859-1'])
        finally:
            # resest to system-defined locale
            locale.setlocale(locale.LC_ALL, '')

    def testSetupStartupHandler(self):
        import zLOG
        import sys
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            debug-mode on
            <eventlog>
             level info
             <logfile>
               path <<INSTANCE_HOME>>/event.log
              level info
             </logfile>
           </eventlog>""")
        starter = ZopeStarter(conf)
        starter.setupStartupHandler()
        self.assert_(not zLOG._call_initialize)
        self.assertEqual(starter.startup_handler.formatter,
                         zLOG.EventLogger.formatters['file'])
        self.assertEqual(starter.startup_handler.level,
                         logging.DEBUG)
        self.assertEqual(starter.startup_handler,
                     zLOG.EventLogger.EventLogger.logger.handlers[0])
        self.assertEqual(len(zLOG.EventLogger.EventLogger.logger.handlers), 1)
        self.assertEqual(zLOG.EventLogger.EventLogger.logger.level,
                         logging.DEBUG)
        self.assertEqual(starter.startup_handler.level, logging.DEBUG)
        self.failUnlessEqual(starter.startup_handler.stream, sys.stderr)
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            debug-mode off
            <eventlog>
             level info
             <logfile>
               path <<INSTANCE_HOME>>/event.log
              level info
             </logfile>
           </eventlog>""")
        starter = ZopeStarter(conf)
        starter.setupStartupHandler()
        self.failIfEqual(starter.startup_handler.stream, sys.stderr)

    def testSetupZServerThreads(self):
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
           zserver-threads 10""")
        starter = ZopeStarter(conf)
        starter.setupZServerThreads()
        from ZServer.PubCore import _n
        self.assertEqual(_n, 10)

    def testSetupServers(self):
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            <http-server>
                address 18092
            </http-server>
            <ftp-server>
               address 18093
            </ftp-server>""")
        starter = ZopeStarter(conf)
        # do the job the 'handler' would have done (call prepare)
        for server in conf.servers:
            server.prepare('', None, 'Zope', {}, None)
        try:
            starter.setupServers()
            import ZServer
            self.assertEqual(conf.servers[0].__class__,
                             ZServer.HTTPServer.zhttp_server)
            self.assertEqual(conf.servers[1].__class__,
                             ZServer.FTPServer)
        finally:
            del conf.servers # should release servers
            pass
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            <http-server>
                address 18092
            </http-server>
            <ftp-server>
               address 18092 # conflict
            </ftp-server>""")
        starter = ZopeStarter(conf)
        # do the job the 'handler' would have done (call prepare)
        for server in conf.servers:
            server.prepare('', None, 'Zope', {}, None)
        try:
            self.assertRaises(ZConfig.ConfigurationError, starter.setupServers)
        finally:
            del conf.servers

    def testDropPrivileges(self):
        # somewhat incomplete because we we're never running as root
        # when we test, but we test as much as we can
        if os.name != 'posix':
            return
        _old_getuid = os.getuid
        def _return0():
            return 0
        try:
            os.getuid = _return0
            # no effective user
            conf = self.load_config_text("""
                instancehome <<INSTANCE_HOME>>""")
            starter = ZopeStarter(conf)
            self.assertRaises(ZConfig.ConfigurationError,
                              starter.dropPrivileges)
            # cant find user in passwd database
            conf = self.load_config_text("""
                instancehome <<INSTANCE_HOME>>
                effective-user n0sucHuS3r""")
            starter = ZopeStarter(conf)
            self.assertRaises(ZConfig.ConfigurationError,
                              starter.dropPrivileges)
            # can't specify '0' as effective user
            conf = self.load_config_text("""
                instancehome <<INSTANCE_HOME>>
                effective-user 0""")
            starter = ZopeStarter(conf)
            self.assertRaises(ZConfig.ConfigurationError,
                              starter.dropPrivileges)
            # setuid to test runner's uid XXX will this work cross-platform?
            runnerid = _old_getuid()
            conf = self.load_config_text("""
                instancehome <<INSTANCE_HOME>>
                effective-user %s""" % runnerid)
            starter = ZopeStarter(conf)
            finished = starter.dropPrivileges()
            self.failUnless(finished)
        finally:
            os.getuid = _old_getuid

    def testSetupConfiguredLoggers(self):
        import zLOG
        import logging
        import sys
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            debug-mode off 
            <eventlog>
             level info
             <logfile>
               path <<INSTANCE_HOME>>/event.log
              level info
             </logfile>
           </eventlog>
           <logger access>
             level info
             <logfile>
             path <<INSTANCE_HOME>>/Z2.log
             </logfile>
           </logger>
           <logger trace>
             level info
             <logfile>
             path <<INSTANCE_HOME>>/trace.log
             </logfile>
           </logger>
           """)
        try:
            starter = ZopeStarter(conf)
            starter.setupStartupHandler()
            starter.info('hello')
            starter.removeStartupHandler()
            starter.setupConfiguredLoggers()
            starter.flushStartupHandlerBuffer()
            l = open(os.path.join(TEMPNAME, 'event.log')).read()
            self.failUnless(l.find('hello') > -1)
            self.failUnless(os.path.exists(os.path.join(TEMPNAME, 'Z2.log')))
            self.failUnless(os.path.exists(os.path.join(TEMPNAME,'trace.log')))
        finally:
            for name in ('event.log', 'Z2.log', 'trace.log'):
                try:
                    os.unlink(os.path.join(TEMPNAME, name))
                except:
                    pass

    def testMakeLockFile(self):
        # put something in the way (it should be deleted)
        name = os.path.join(TEMPNAME, 'lock')
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            lock-filename %s""" % name
                                     )
        f = open(name, 'a')
        f.write('hello')
        f.close()
        try:
            starter = ZopeStarter(conf)
            starter.makeLockFile()
            self.failIf(open(name).read().find('hello') > -1)
        finally:
            starter.unlinkLockFile()
            self.failIf(os.path.exists(name))

    def testMakePidFile(self):
        # put something in the way (it should be deleted)
        name = os.path.join(TEMPNAME, 'pid')
        conf = self.load_config_text("""
            instancehome <<INSTANCE_HOME>>
            pid-filename %s""" % name
                                     )
        f = open(name, 'a')
        f.write('hello')
        f.close()
        try:
            starter = ZopeStarter(conf)
            starter.makePidFile()
            self.failIf(open(name).read().find('hello') > -1)
        finally:
            starter.unlinkPidFile()
            self.failIf(os.path.exists(name))

def test_suite():
    return unittest.makeSuite(ZopeStarterTestCase)

if __name__ == "__main__":
    unittest.main(defaultTest="test_suite")